-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix(offset_manager): fetch current leader epoch before offset commit #3418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix(offset_manager): fetch current leader epoch before offset commit #3418
Conversation
Signed-off-by: jhampson-dbre <[email protected]>
1bf38c0 to
6e79aaf
Compare
puellanivis
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I’ve proposed some improvement changes, I’m not sure this is at all an advisable course of action.
If a consumer group has been disconnected this long enough of a period, then it’s highly likely that it could have missed numerous messages, and we don’t at all want to accidentally “fast-forward” our leader epoch to whatever the partition leader says is the latest epoch?
I mean, we would want to restart consumption from the committed offsets and catch up to whatever the current leader epoch is by processing messages?
| _, currentLeaderEpoch, err := om.client.LeaderAndEpoch(pom.topic, pom.partition) | ||
| if err == nil && currentLeaderEpoch >= 0 { | ||
| leaderEpoch = currentLeaderEpoch | ||
| pom.leaderEpoch = currentLeaderEpoch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not sure why we need to keep a local-variable copy of leaderEpoch when we’re just going to assign the value into the struct anyways. With this change, we don’t even use the leaderEpoch until we pass it into AddBlockWithLeaderEpoch.
Removing lines 359, and 366, and reverting line 370 to what was the previous line 359 accomplishes the exact same behavior as the complete set of changes currently proposed.
| type mockClientForLeaderEpoch struct { | ||
| Client | ||
| conf *Config | ||
| leaderAndEpochFn func(topic string, partitionID int32) (*Broker, int32, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not clear about why we would need to provide a full arbitrary function here. For the purposes used, we could just have fields for epoch and err and then LeaderAndEpoch can just be return nil, m.epoch, m.err.
| partition: 0, | ||
| offset: 100, | ||
| leaderEpoch: 44, // stale | ||
| metadata: "meta", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m unclear why all these fields are being set, are they actually necessary, or are they being filled in with unused dummy values? Nothing is actually testing this code, nor should anything fail to work properly if we didn’t set these values, so we should avoid setting them; as this indicates that the values are actually necessary for operation.
Thats a good point. I was misunderstanding the intent of the The problem arises when MarkOffset or ResetOffset is called for an offset that is likely relatively far in the future or past compared to the currently committed offset (i.e. a "fast-forward" or "rewind" scenario). In this case, partition offset manager makes no attempt to validate or otherwise check what the leader epoch of the offset was and continues to use pom.leaderEpoch for the currently committed offset and the If you have any advice on a workable approach to solve this problem, I try to implement this another way. Or I can convert this into an issue if it would be helpful to discuss further? |
|
An issue would probably be a good idea. Myself, I’m not sure I fully understand what problem you’re facing, but won’t really be able to think on the topic until probably tomorrow. (Happy New Year.) |
Problem
When committing consumer group offsets using protocol version 6+ (Kafka 2.1.0+), Sarama can commit a stale
committed_leader_epochthat doesn't match the current partition leader epoch. This causes Kafka brokers to log "Truncation detected" errors and can lead to consumers receiving truncation errors when they resume consumption. The most likely condition that could cause this is using Sarama client to reset consumer group offsets for a consumer group that as be disconnected for an extended period and the current leader epoch of the partition has changed since the consumer group's last committed offset.Root Cause
offsets.ManagePartition()is calledpartitionOffsetManagervianewPartitionOffsetManager().partitionOffsetManagerstores theleaderEpochit receives during initialization from theOffsetFetchResponse.fetchInitialOffset()which performs anOffsetFetchRequestIf a consumer group is disconnected for an extended period (e.g., weeks), and the partition leader changes multiple times during that period, the stored
leaderEpochbecomes stale. WhenconstructRequest()builds anOffsetCommitRequest, it uses the staleleaderEpochfrom thepartitionOffsetManagerwithout checking if the partition leader has changed. For protocol version 6+, this stale epoch is included in the commit request, causing the broker to detect a mismatch when the consumer group connects and attempts to start consuming from the committed offsets.Example Scenario
leaderEpoch = 4450kafkactl(or similar tool) which uses saramamarkOffset/resetOffsetto reset consumer group offsets to the latestOffsetCommitRequestis sent with stalecommitted_leader_epoch = 44auto.offset.resetbehavior is triggered, causing consumer to re-read the entire partition if set toearliest.Solution
This PR modifies
constructRequest()inoffset_manager.goto fetch the current partition leader epoch from metadata usingclient.LeaderAndEpoch()before constructing theOffsetCommitRequestfor protocol version 6+ (Kafka 2.1.0+).Changes
offset_manager.go: InconstructRequest(), when building the commit request for protocol version 6+, we now:om.client.LeaderAndEpoch(pom.topic, pom.partition)to get the current leader epoch from metadatapom.leaderEpochpom.leaderEpochto the current value for consistencyoffset_manager_test.go: AddedTestConstructRequestUsesCurrentLeaderEpochUnittest case that:partitionOffsetManagerwith a stale leader epoch (44)constructRequest()uses the current epoch (50) in the commit requestImplementation Details
The fix only applies to protocol version 6+ (Kafka 2.1.0+) because earlier versions don't include
committed_leader_epochin theOffsetCommitRequest. IfLeaderAndEpoch()fails or returns an invalid epoch, we fall back to using the stored epoch, ensuring backward compatibility.Testing
TestConstructRequestUsesCurrentLeaderEpochUnitpassesImpact