Skip to content

feat(grpc): add aggregate_states in child_manager #2363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

cjqzhao
Copy link
Contributor

@cjqzhao cjqzhao commented Jul 31, 2025

Aggregate states in child_manager. This PR includes unit tests for aggregate_states and added functionality in test_utils.

@easwars @dfawley

Copy link
Collaborator

@dfawley dfawley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also discussed offline that you would add the unit tests you have for this function?

@cjqzhao cjqzhao force-pushed the aggregationchildmanager branch from 003473b to ea15d3b Compare August 14, 2025 04:26
Copy link
Contributor

@easwars easwars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please see if you can handle clippy warnings on lines that you have added/changed as part of this PR. Thanks.

) -> Result<Box<dyn Iterator<Item = ChildUpdate<Endpoint>>>, Box<dyn Error + Send + Sync>>
{
let mut endpoint_to_child_map = HashMap::new();
for endpoint in resolver_update.endpoints.clone().unwrap().iter() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work instead: for endpoint in &resolver_update.endpoints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did for endpoint in resolver_update.endpoints.unwrap().iter() instead

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah you need to unwrap because it's an Option.. & seems a little cleaner to me, but the compiled result is identical so 🤷.

Comment on lines 292 to 294
fn eq(&self, other: &Self) -> bool {
self.addresses == other.addresses
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an incomplete comparison since it doesn't look at a field. It looks like, for now, Attributes derives PartialEq, so we can compare it. We could even just derive this for Endpoint too I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just derived it for Endpoint


impl StubPolicyBuilder {
pub fn new(name: &'static str, funcs: StubPolicyFuncs) -> Self {
Self { name, funcs }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you don't need to register the builder for the tests that you are currently adding, but I think ideally the stub builder should be registered with the LB policy registry.

Copy link
Contributor Author

@cjqzhao cjqzhao Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds good to me. I will change it to be registered.

Copy link
Contributor

@easwars easwars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

resolver_update: ResolverUpdate,
) -> Result<Box<dyn Iterator<Item = ChildUpdate<Endpoint>>>, Box<dyn Error + Send + Sync>>
{
let mut sharded_endpoints = HashMap::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why do you need a HashMap here if you are going to only return the values from the map? Can't you use a Vector instead? The child identifier is anyways part of the ChildUpdate struct which is the item returned by the returned iterator.

created_subchannel: Option<Arc<dyn Subchannel>>,
}

// Defines the functions resolver_update and subchannel_update to test aggregate_states
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like I still see a lot of lines which seem excessively long. But I'm just going to let them go at this point.

Comment on lines 223 to 224
let funcs = self.funcs.clone();
Box::new(StubPolicy { funcs })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit/Optional: this could be single line now without the need for the funcs local variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

endpoints.push(Endpoint {
addresses: addresses[i].clone(),
addresses: addresses.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No clone here... Please take a pass through the whole PR and remove places where you use clone and then never use the original value after that.

Copy link
Contributor Author

@cjqzhao cjqzhao Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will do. Will do better on this for future PRs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed another commit that gets rid of cloning of subchannels in the actual tests too. Let me know if that is necessary.

// receives.
resolver_update: Some(move |update: ResolverUpdate, _, controller| {
assert_eq!(update.endpoints.iter().len(), 1);
let endpoint = update.endpoints.as_ref().unwrap()[0].clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks way more complicated than it should need to be. Try endpoints.pop()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Just pushed a commit fixing this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants