Skip to content

Commit cd92fea

Browse files
committed
Reset stored offset on assign() and prevent offsets_store() for unassigned partitions
1 parent 8817338 commit cd92fea

File tree

11 files changed

+280
-25
lines changed

11 files changed

+280
-25
lines changed

CHANGELOG.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,26 @@ librdkafka v1.9.0 is a feature release:
66
* Added KIP-140 Admin API ACL support (by @emasab, #2676)
77

88

9+
## Upgrade considerations
10+
11+
* Consumer:
12+
`rd_kafka_offsets_store()` (et.al) will now return an error for any
13+
partition that is not currently assigned (through `rd_kafka_*assign()`).
14+
This prevents a race condition where an application would store offsets
15+
after the assigned partitions had been revoked (which resets the stored
16+
offset), that could cause these old stored offsets to be committed later
17+
when the same partitions were assigned to this consumer again - effectively
18+
overwriting any committed offsets by any consumers that were assigned the
19+
same partitions previously. This would typically result in the offsets
20+
rewinding and messages to be reprocessed.
21+
As an extra effort to avoid this situation the stored offset is now
22+
also reset when partitions are assigned (through `rd_kafka_*assign()`).
23+
Applications that explicitly call `..offset*_store()` will now need
24+
to handle the case where `RD_KAFKA_RESP_ERR__STATE` is returned
25+
in the per-partition `.err` field - meaning the partition is no longer
26+
assigned to this consumer and the offset could not be stored for commit.
27+
28+
929
## Enhancements
1030

1131
* Windows: Added native Win32 IO/Queue scheduling. This removes the
@@ -43,6 +63,11 @@ librdkafka v1.9.0 is a feature release:
4363

4464
### Consumer fixes
4565

66+
* `rd_kafka_offsets_store()` (et.al) will now return an error for any
67+
partition that is not currently assigned (through `rd_kafka_*assign()`).
68+
See **Upgrade considerations** above for more information.
69+
* `rd_kafka_*assign()` will now reset/clear the stored offset.
70+
See **Upgrade considerations** above for more information.
4671
* A `ERR_MSG_SIZE_TOO_LARGE` consumer error would previously be raised
4772
if the consumer received a maximum sized FetchResponse only containing
4873
(transaction) aborted messages with no control messages. The fetching did

src/rdkafka.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3824,6 +3824,11 @@ int rd_kafka_consume_callback_queue(
38243824
* The \c offset + 1 will be committed (written) to broker (or file) according
38253825
* to \c `auto.commit.interval.ms` or manual offset-less commit()
38263826
*
3827+
* @warning This method may only be called for partitions that are currently
3828+
* assigned.
3829+
* Non-assigned partitions will fail with RD_KAFKA_RESP_ERR__STATE.
3830+
* Since v1.9.0.
3831+
*
38273832
* @remark \c `enable.auto.offset.store` must be set to "false" when using
38283833
* this API.
38293834
*
@@ -3841,18 +3846,23 @@ rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset);
38413846
* to \c `auto.commit.interval.ms` or manual offset-less commit().
38423847
*
38433848
* Per-partition success/error status propagated through each partition's
3844-
* \c .err field.
3849+
* \c .err for all return values (even NO_ERROR) except INVALID_ARG.
3850+
*
3851+
* @warning This method may only be called for partitions that are currently
3852+
* assigned.
3853+
* Non-assigned partitions will fail with RD_KAFKA_RESP_ERR__STATE.
3854+
* Since v1.9.0.
38453855
*
38463856
* @remark The \c .offset field is stored as is, it will NOT be + 1.
38473857
*
38483858
* @remark \c `enable.auto.offset.store` must be set to "false" when using
38493859
* this API.
38503860
*
3851-
* @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or
3852-
* RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION if none of the
3853-
* offsets could be stored, or
3861+
* @returns RD_KAFKA_RESP_ERR_NO_ERROR on (partial) success, or
38543862
* RD_KAFKA_RESP_ERR__INVALID_ARG if \c enable.auto.offset.store
3855-
* is true.
3863+
* is true, or
3864+
* RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or RD_KAFKA_RESP_ERR__STATE
3865+
* if none of the offsets could be stored.
38563866
*/
38573867
RD_EXPORT rd_kafka_resp_err_t
38583868
rd_kafka_offsets_store(rd_kafka_t *rk,

src/rdkafka_assignment.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,15 @@ static int rd_kafka_assignment_serve_removals(rd_kafka_t *rk) {
342342
* a manual offset-less commit() or the auto-committer
343343
* will not commit a stored offset from a previous
344344
* assignment (issue #2782). */
345-
rd_kafka_offset_store0(rktp, RD_KAFKA_OFFSET_INVALID,
345+
rd_kafka_offset_store0(rktp, RD_KAFKA_OFFSET_INVALID, rd_true,
346346
RD_DONT_LOCK);
347347

348348
/* Partition is no longer desired */
349349
rd_kafka_toppar_desired_del(rktp);
350+
351+
rd_assert((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED));
352+
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ASSIGNED;
353+
350354
rd_kafka_toppar_unlock(rktp);
351355

352356
rd_kafka_dbg(rk, CGRP, "REMOVE",
@@ -713,6 +717,28 @@ rd_kafka_assignment_add(rd_kafka_t *rk,
713717
rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true);
714718
}
715719

720+
/* Mark all partition objects as assigned and reset the stored
721+
* offsets back to invalid in case it was explicitly stored during
722+
* the time the partition was not assigned. */
723+
for (i = 0; i < partitions->cnt; i++) {
724+
rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
725+
rd_kafka_toppar_t *rktp =
726+
rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true);
727+
728+
rd_kafka_toppar_lock(rktp);
729+
730+
rd_assert(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED));
731+
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ASSIGNED;
732+
733+
/* Reset the stored offset to INVALID to avoid the race
734+
* condition described in rdkafka_offset.h */
735+
rd_kafka_offset_store0(rktp, RD_KAFKA_OFFSET_INVALID,
736+
rd_true /* force */, RD_DONT_LOCK);
737+
738+
rd_kafka_toppar_unlock(rktp);
739+
}
740+
741+
716742
/* Add the new list of partitions to the current assignment.
717743
* Only need to sort the final assignment if it was non-empty
718744
* to begin with since \p partitions is sorted above. */

src/rdkafka_offset.c

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *app_rkt,
636636
int64_t offset) {
637637
rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
638638
rd_kafka_toppar_t *rktp;
639+
rd_kafka_resp_err_t err;
639640

640641
/* Find toppar */
641642
rd_kafka_topic_rdlock(rkt);
@@ -645,19 +646,21 @@ rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *app_rkt,
645646
}
646647
rd_kafka_topic_rdunlock(rkt);
647648

648-
rd_kafka_offset_store0(rktp, offset + 1, 1 /*lock*/);
649+
err = rd_kafka_offset_store0(rktp, offset + 1,
650+
rd_false /* Don't force */, RD_DO_LOCK);
649651

650652
rd_kafka_toppar_destroy(rktp);
651653

652-
return RD_KAFKA_RESP_ERR_NO_ERROR;
654+
return err;
653655
}
654656

655657

656658
rd_kafka_resp_err_t
657659
rd_kafka_offsets_store(rd_kafka_t *rk,
658660
rd_kafka_topic_partition_list_t *offsets) {
659661
int i;
660-
int ok_cnt = 0;
662+
int ok_cnt = 0;
663+
rd_kafka_resp_err_t last_err = RD_KAFKA_RESP_ERR_NO_ERROR;
661664

662665
if (rk->rk_conf.enable_auto_offset_store)
663666
return RD_KAFKA_RESP_ERR__INVALID_ARG;
@@ -670,19 +673,23 @@ rd_kafka_offsets_store(rd_kafka_t *rk,
670673
rd_kafka_topic_partition_get_toppar(rk, rktpar, rd_false);
671674
if (!rktp) {
672675
rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
676+
last_err = rktpar->err;
673677
continue;
674678
}
675679

676-
rd_kafka_offset_store0(rktp, rktpar->offset, 1 /*lock*/);
680+
rktpar->err = rd_kafka_offset_store0(rktp, rktpar->offset,
681+
rd_false /* don't force */,
682+
RD_DO_LOCK);
677683
rd_kafka_toppar_destroy(rktp);
678684

679-
rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
680-
ok_cnt++;
685+
if (rktpar->err)
686+
last_err = rktpar->err;
687+
else
688+
ok_cnt++;
681689
}
682690

683-
return offsets->cnt > 0 && ok_cnt == 0
684-
? RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION
685-
: RD_KAFKA_RESP_ERR_NO_ERROR;
691+
return offsets->cnt > 0 && ok_cnt == 0 ? last_err
692+
: RD_KAFKA_RESP_ERR_NO_ERROR;
686693
}
687694

688695

@@ -1044,7 +1051,7 @@ rd_kafka_resp_err_t rd_kafka_offset_store_stop(rd_kafka_toppar_t *rktp) {
10441051
rktp->rktp_stored_offset == RD_KAFKA_OFFSET_INVALID &&
10451052
rktp->rktp_offsets_fin.eof_offset > 0)
10461053
rd_kafka_offset_store0(rktp, rktp->rktp_offsets_fin.eof_offset,
1047-
0 /*no lock*/);
1054+
rd_true /* force */, RD_DONT_LOCK);
10481055

10491056
/* Commit offset to backing store.
10501057
* This might be an async operation. */

src/rdkafka_offset.h

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,71 @@ const char *rd_kafka_offset2str(int64_t offset);
3636

3737

3838
/**
39-
* Stores the offset for the toppar 'rktp'.
40-
* The actual commit of the offset to backing store is usually
41-
* performed at a later time (time or threshold based).
39+
* @brief Stores the offset for the toppar 'rktp'.
40+
* The actual commit of the offset to backing store is usually
41+
* performed at a later time (time or threshold based).
42+
*
43+
* For the high-level consumer (assign()), this function will reject absolute
44+
* offsets if the partition is not currently assigned, unless \p force is set.
45+
* This check was added to avoid a race condition where an application
46+
* would call offsets_store() after the partitions had been revoked, forcing
47+
* a future auto-committer on the next assignment to commit this old offset and
48+
* overwriting whatever newer offset was committed by another consumer.
49+
*
50+
* The \p force flag is useful for internal calls to offset_store0() which
51+
* do not need the protection described above.
52+
*
53+
*
54+
* There is one situation where the \p force flag is troublesome:
55+
* If the application is using any of the consumer batching APIs,
56+
* e.g., consume_batch() or the event-based consumption, then it's possible
57+
* that while the batch is being accumulated or the application is picking off
58+
* messages from the event a rebalance occurs (in the background) which revokes
59+
* the current assignment. This revokal will remove all queued messages, but
60+
* not the ones the application already has accumulated in the event object.
61+
* Enforcing assignment for store in this state is tricky with a bunch of
62+
* corner cases, so instead we let those places forcibly store the offset, but
63+
* then in assign() we reset the stored offset to .._INVALID, just like we do
64+
* on revoke.
65+
* Illustrated (with fix):
66+
* 1. ev = rd_kafka_queue_poll();
67+
* 2. background rebalance revoke unassigns the partition and sets the
68+
* stored offset to _INVALID.
69+
* 3. application calls message_next(ev) which forcibly sets the
70+
* stored offset.
71+
* 4. background rebalance assigns the partition again, but forcibly sets
72+
* the stored offset to .._INVALID to provide a clean state.
73+
*
74+
* @param offset Offset to set, may be an absolute offset or .._INVALID.
75+
* @param force Forcibly set \p offset regardless of assignment state.
76+
* @param do_lock Whether to lock the \p rktp or not (already locked by caller).
4277
*
4378
* See head of rdkafka_offset.c for more information.
79+
*
80+
* @returns RD_KAFKA_RESP_ERR__STATE if the partition is not currently assigned,
81+
* unless \p force is set.
4482
*/
45-
static RD_INLINE RD_UNUSED void
46-
rd_kafka_offset_store0(rd_kafka_toppar_t *rktp, int64_t offset, int lock) {
47-
if (lock)
83+
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
84+
rd_kafka_offset_store0(rd_kafka_toppar_t *rktp,
85+
int64_t offset,
86+
rd_bool_t force,
87+
rd_dolock_t do_lock) {
88+
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
89+
90+
if (do_lock)
4891
rd_kafka_toppar_lock(rktp);
49-
rktp->rktp_stored_offset = offset;
50-
if (lock)
92+
93+
if (unlikely(!force && !RD_KAFKA_OFFSET_IS_LOGICAL(offset) &&
94+
!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED) &&
95+
!rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk)))
96+
err = RD_KAFKA_RESP_ERR__STATE;
97+
else
98+
rktp->rktp_stored_offset = offset;
99+
100+
if (do_lock)
51101
rd_kafka_toppar_unlock(rktp);
102+
103+
return err;
52104
}
53105

54106
rd_kafka_resp_err_t

src/rdkafka_op.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,8 @@ void rd_kafka_op_offset_store(rd_kafka_t *rk, rd_kafka_op_t *rko) {
897897
rd_kafka_toppar_lock(rktp);
898898
rktp->rktp_app_offset = offset;
899899
if (rk->rk_conf.enable_auto_offset_store)
900-
rd_kafka_offset_store0(rktp, offset, 0 /*no lock*/);
900+
rd_kafka_offset_store0(rktp, offset,
901+
/* force: ignore assignment state */
902+
rd_true, RD_DONT_LOCK);
901903
rd_kafka_toppar_unlock(rktp);
902904
}

src/rdkafka_partition.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,9 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
344344
#define RD_KAFKA_TOPPAR_F_ON_DESP 0x400 /**< On rkt_desp list */
345345
#define RD_KAFKA_TOPPAR_F_ON_CGRP 0x800 /**< On rkcg_toppars list */
346346
#define RD_KAFKA_TOPPAR_F_ON_RKB 0x1000 /**< On rkb_toppars list */
347+
#define RD_KAFKA_TOPPAR_F_ASSIGNED \
348+
0x2000 /**< Toppar is part of the consumer \
349+
* assignment. */
347350

348351
/*
349352
* Timers

0 commit comments

Comments
 (0)