Skip to content

Conversation

joe-redpanda
Copy link
Contributor

@joe-redpanda joe-redpanda commented Aug 27, 2025

find_coordinator v4 is specified in KIP-699. It adds support for batched find_coordinator requests by swapping from a singular request key to a list of request keys.

This PR adds support for v4 find_coordinator alongside unit tests to validate it.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.2.x
  • v25.1.x
  • v24.3.x

Release Notes

Features

  • Upgrade find_coordinator api from v3 to v4

@joe-redpanda joe-redpanda requested review from a team, piyushredpanda, lf-rep, bharathv, bashtanov and mmaslankaprv and removed request for a team, piyushredpanda and lf-rep August 27, 2025 21:55
@joe-redpanda joe-redpanda force-pushed the find_coordinator_v4 branch 2 times, most recently from c58a8f1 to 4ea02a6 Compare August 27, 2025 22:29
@joe-redpanda joe-redpanda marked this pull request as ready for review August 27, 2025 22:29
Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm finding it difficult to grok the changes in find_coordinator.cc. is it mostly net-new code and then a removal of old code? if so, consider repositioning the new code in the file so that the diff hunks aren't intermingled. otherwise trying to organize the changes into a series of smaller transformations is useful.

but before that i think the capturing lambda coroutine needs to be fixed.

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Aug 28, 2025

CI test results

test results on build#71470
test_class test_method test_arguments test_kind job_url test_status passed reason
MasterTestSuite test_chunk_dl_with_random_http_errors unit https://buildkite.com/redpanda/redpanda/builds/71470#0198edea-6dec-4879-8b7a-05c0831fe6e9 FAIL 0/1
test results on build#71865
test_class test_method test_arguments test_kind job_url test_status passed reason
PartitionMovementUpgradeTest test_basic_upgrade null integration https://buildkite.com/redpanda/redpanda/builds/71865#01992b6f-0797-4f02-9ba3-9b84fa1a145f FLAKY 19/21 upstream reliability is '99.74937343358395'. current run reliability is '90.47619047619048'. drift is 9.27318 and the allowed drift is set to 50. The test should PASS
test results on build#72110
test_class test_method test_arguments test_kind job_url test_status passed reason
NodesDecommissioningTest test_recommissioning_node null integration https://buildkite.com/redpanda/redpanda/builds/72110#01993a8e-70b2-463c-bfa1-8ebff15c9202 FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
PartitionBalancerTest test_recovery_mode_rebalance_finish null integration https://buildkite.com/redpanda/redpanda/builds/72110#01993a8e-70b2-47ae-abac-2d7bd2312173 FLAKY 16/21 upstream reliability is '96.5909090909091'. current run reliability is '76.19047619047619'. drift is 20.40043 and the allowed drift is set to 50. The test should PASS
SimpleEndToEndTest test_consumer_interruption null integration https://buildkite.com/redpanda/redpanda/builds/72110#01993a8e-ab25-4240-82b9-0b0af3ff592d FLAKY 18/21 upstream reliability is '99.29328621908127'. current run reliability is '85.71428571428571'. drift is 13.579 and the allowed drift is set to 50. The test should PASS
test results on build#72120
test_class test_method test_arguments test_kind job_url test_status passed reason
AuditLogTestOauth test_kafka_oauth {"authz_match": "acl"} integration https://buildkite.com/redpanda/redpanda/builds/72120#01993b3d-59f6-4192-a95a-6e954ac48bc9 FLAKY 14/21 upstream reliability is '97.87234042553192'. current run reliability is '66.66666666666666'. drift is 31.20567 and the allowed drift is set to 50. The test should PASS

chunked_vector<group_id> authorized_keys,
chunked_vector<kafka::coordinator>& out_vector) {
auto loop_body = [&ctx, &out_vector](group_id group_id) {
// if consumer group topic already exists, get the relevant partition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is an api to check if topic exists, maybe this can be simplified to something like this:

ss::future<> f = ss::now();

if(!topic_exists){
    f= ctx.group_initializer().assure_topic_exists(true);
}

return f.then(....);

This way the logic to collect the out vector will only exists in a single .then

Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its coming together nicely.

}

// multiple key handler
[[maybe_unused]] static ss::future<find_coordinator_response>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need static if these are defined in the anonymous namespace.

std::move(keys), kafka::error_code::unsupported_version);
}

// NOLINTEND(cppcoreguidelines-avoid-reference-coroutine-parameters)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass by pointer instead of reference and clang won't complain

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik the linter is to stop something like

ss::future<> foo_async(ss::sstring const& key);

ss::future<> bar_async() {
    ss::sstring key{"key"};
    std::ignore = foo_async(key);
    return;
} // key dies but is still referenced by foo_async

for which raw pointer doesn't address the underlying issue

I lean toward silencing the linter over raw pointer. Does the project have precedent on this though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for which raw pointer doesn't address the underlying issue

correct, but the reference doesn't prevent the use-after-free either. in this example, you shoud pass by value. the idea is to prefer pass-by-value except in performance sensitive places.

static ss::future<> handle_authorized_txn_id(
request_context& ctx,
chunked_vector<transactional_id> authorized_keys,
chunked_vector<kafka::coordinator>& out_vector) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about returning the output, rather than using an output parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I did this to avoid zipping all of the vectors together, returned vector would definitely be cleaner though

Comment on lines 367 to 368
default:
// intentional fallthru
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you can remove this--there are only two types, and you covered both.

Comment on lines 336 to 331
// pack singular request into list form
if (is_single_element_request) {
if (!keys.empty()) {
co_return co_await ctx.respond(
find_coordinator_response(kafka::error_code::invalid_request));
}
keys.emplace_back(std::move(request.data.key));
request.data.key = "";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a v4 request arrives and it contains a single key, then even though it is using the multiple-key request type, does it have identical semantics to a v3 request where single-key handling is the only option?

If so, can we instead of handling the single-key and multiple-key cases separately, handle only the multiple-key case and then the first thing we do upon entering the request handler is that we normalize the request so that a v3 request is handled as if it were a v4 request? that way, we have virtually all code shared and only a small adapter that converts between the cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// pack singular request into list form
Thats what I did here, the if / else in the outermost handle function simply repacks the v3- request into a list, and then sets a flag 'is_single_element_request' to remind the handler to unpack the list into the v3 style request again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, good. i must have been thrown off by

auto response = co_await handle_multiple_keys(std::move(request), &ctx, g);

which looks specific rather than general, but i didn't read things closely. SGTM.

@joe-redpanda
Copy link
Contributor Author

Exception receiving message: <class 'ducktape.errors.TimeoutError'>: runner client unresponsive, active_tests:

[TestKey(test_id='rptest.tests.data_migrations_api_test.DataMigrationsApiTest.test_migrated_topic_data_integrity.transfer_leadership=True.include_groups=True.params=.cancellation.None.use_alias.True', test_index=113)]

Looks like known flakey test, retrying

@dotnwat
Copy link
Member

dotnwat commented Sep 9, 2025

Added enterprise team as they are generally owning the kafka layer.

Copy link
Contributor

@pgellert pgellert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly just had code structuring suggestions, the core logic looks good

Comment on lines 29 to 31
namespace {

static ss::future<response_ptr>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these functions no longer need to be static now

find_coordinator_request request,
request_context* ctx,
[[maybe_unused]] ss::smp_service_group g) {
auto keys = std::move(request.data.coordinator_keys);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This move is unnecessary. You can either make keys a reference or "inline it" and just move from request.data.coordinator_keys in each case.

Suggested change
auto keys = std::move(request.data.coordinator_keys);
auto& keys = request.data.coordinator_keys;

Comment on lines 294 to 296
auto key_type = request.data.key_type;

switch (key_type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just inline this to simplify

Suggested change
auto key_type = request.data.key_type;
switch (key_type) {
switch (request.data.key_type) {

Comment on lines 191 to 189
// if the consumer groups topic doesn't exist, attempt to create it
auto initialization_future = ss::make_ready_future<bool>(true);
if (!ctx->coordinator_mapper().topic_exists()) {
initialization_future
= ctx->group_initializer().assure_topic_exists();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be outside of the max_concurrent_for_each, because we only need to create the topic once.

(Moving it out of the lambda also allows you to use co_await)

Comment on lines 91 to 100
template<typename KeyType>
struct auth_check_success {
chunked_vector<KeyType> authorized_keys;
chunked_vector<KeyType> unauthorized_keys;
};

template<typename KeyType>
struct auth_check_failure {
chunked_vector<KeyType> all_keys;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the way we act on these structs, I think these should be called audit_success and audit_failure.

Comment on lines 99 to 100
auto found_key_it = std::find(
keys.begin(), keys.end(), coordinator_response.key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use ranges algos for any new code (std::ranges::find > std::find)

Comment on lines 82 to 90
std::vector<ss::sstring> keys = {"key1", "key2", "key3"};

chunked_vector<ss::sstring> request_keys{};
for (const auto& key : keys) {
request_keys.emplace_back(key);
}

kafka::find_coordinator_request request{
std::move(request_keys), kafka::coordinator_type::group};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could just create the keys once as a chunked_vector<ss::sstring>:

auto keys = chunked_vector<ss::sstring>{"key1", "key2", "key3"};

kafka::find_coordinator_request request{
  keys.copy(), kafka::coordinator_type::group};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

swapped to initializer list. chunked_vector doesn't have a single element erase
I'm checking all keys were found by erasing from the vector and asserting empty vector at the end

Comment on lines 135 to 86
auto& coordinator_responses = resp.data.coordinators;
for (const auto& coordinator_response : coordinator_responses) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inline coordinator_responses to simplify

for (const auto& coordinator_response : resp.data.coordinators) {

BOOST_TEST(coordinator_response.node_id == model::node_id(1));
BOOST_TEST(coordinator_response.host == "127.0.0.1");
BOOST_TEST(coordinator_response.port == 9092);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check here that all the keys got a response. Either by asserting that keys.empty() or by earlier checking the that size of resp.data.coordinators is the same as the original size of keys (before the deletions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good callout

}
}

FIXTURE_TEST(find_coordinator_invalid_version, redpanda_thread_fixture) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the name of this test makes me think that you're testing with a kafka request that sends a request with an API version that is invalid. I'd call this find_coordinator_invalid_key_type to better reflect what it tests.

Comment on lines 83 to 53
template<typename KeyType>
kafka::coordinator_response leader_to_coordinator(
request_context* ctx, const KeyType& key, model::node_id leader) {
auto broker = ctx->metadata_cache().get_node_metadata(leader);
if (broker) {
auto& b = *broker;
for (const auto& listener : b.broker.kafka_advertised_listeners()) {
if (listener.name == ctx->listener()) {
return kafka::coordinator_response{
key,
b.broker.id(),
listener.address.host(),
listener.address.port()};
}
}
}
return kafka::coordinator_response{
key, kafka::error_code::coordinator_not_available};
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be easier to review as a refactor of the existing code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following, can you elaborate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think ben may be saying fundamentally we are generalizing the existing code, so a series of small transformations which preserve / generalize existing semantics would be easier to review in that it would be easier to "see" that the same semantics are preserved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth chopping up?

@joe-redpanda joe-redpanda force-pushed the find_coordinator_v4 branch 2 times, most recently from 22f0a76 to 20bd2b4 Compare September 11, 2025 20:13
Adds the v4 handler logic to find_coordinator.
This logic is not yet enabled.

Handling will now be generic on key type with the following steps:
1. transmute sstring keys into the target key type
2. check auth
   a. fail all if auth system failure
   b. split keys into authed and unathed
3. error unathed keys
4. handle authed keys
5. glue together authed and unauthed into response
Swaps find_coordinator::handle for a v4 enabled version.
add transaction api tests

Add tests for v4 batch apis for groups, transactions, and invalid
request
@mmaslankaprv
Copy link
Member

LGTM, let's have somebody from enterprise team to approve

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants