Skip to content

Commit b50d72e

Browse files
committed
Consumer: metadata update+rejoin group when autocreate topics don't exist
1 parent b5d8ba1 commit b50d72e

File tree

1 file changed

+104
-95
lines changed

1 file changed

+104
-95
lines changed

aiokafka/consumer/group_coordinator.py

Lines changed: 104 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
UNKNOWN_OFFSET = -1
2525

2626

27+
class MissingAutoCreateTopic(Exception):
28+
...
29+
30+
2731
class BaseCoordinator(object):
2832

2933
def __init__(self, client, subscription, *, loop,
@@ -213,18 +217,8 @@ def __init__(self, client, subscription, *, loop,
213217
self._rejoin_needed_fut = create_future(loop=loop)
214218
self._coordinator_dead_fut = create_future(loop=loop)
215219

216-
self._coordination_task = ensure_future(
217-
self._coordination_routine(), loop=loop)
220+
self._coordination_task = None
218221

219-
def _on_coordination_done(fut):
220-
try:
221-
fut.result()
222-
except asyncio.CancelledError:
223-
pass
224-
except Exception: # pragma: no cover
225-
log.error(
226-
"Unexpected error in coordinator routine", exc_info=True)
227-
self._coordination_task.add_done_callback(_on_coordination_done)
228222

229223
# Will be started/stopped by coordination task
230224
self._heartbeat_task = None
@@ -241,6 +235,9 @@ def _on_coordination_done(fut):
241235
# Will be set on close
242236
self._closing = create_future(loop=loop)
243237

238+
self._coordination_task = ensure_future(
239+
self._coordination_routine(), loop=loop)
240+
244241
def _on_metadata_change(self):
245242
self.request_rejoin()
246243

@@ -505,92 +502,99 @@ def _coordination_routine(self):
505502
assignment = None
506503
performed_join_prepare = False
507504
while not self._closing.done():
508-
# Check if there was a change to subscription
509-
if subscription is not None and not subscription.active:
510-
# The subscription can change few times, so we can not rely on
511-
# flags or topic lists. For example if user changes
512-
# subscription from X to Y and back to X we still need to
513-
# rejoin group.
514-
self.request_rejoin()
515-
subscription = self._subscription.subscription
516-
if subscription is None:
517-
yield from asyncio.wait(
518-
[self._subscription.wait_for_subscription(),
519-
self._closing],
520-
return_when=asyncio.FIRST_COMPLETED, loop=self._loop)
521-
if self._closing.done():
522-
break
523-
subscription = self._subscription.subscription
524-
assert subscription is not None and subscription.active
525-
auto_assigned = self._subscription.partitions_auto_assigned()
526-
527-
# Ensure active group
528-
yield from self.ensure_coordinator_known()
529-
if auto_assigned and self.need_rejoin(subscription):
530-
# due to a race condition between the initial metadata
531-
# fetch and the initial rebalance, we need to ensure that
532-
# the metadata is fresh before joining initially. This
533-
# ensures that we have matched the pattern against the
534-
# cluster's topics at least once before joining.
535-
# Also the rebalance can be issued by another node, that
536-
# discovered a new topic, which is still unknown to this
537-
# one.
538-
if self._subscription.subscribed_pattern:
539-
yield from self._client.force_metadata_update()
540-
if not subscription.active:
505+
try:
506+
# Check if there was a change to subscription
507+
if subscription is not None and not subscription.active:
508+
# The subscription can change few times, so we can not rely on
509+
# flags or topic lists. For example if user changes
510+
# subscription from X to Y and back to X we still need to
511+
# rejoin group.
512+
self.request_rejoin()
513+
subscription = self._subscription.subscription
514+
if subscription is None:
515+
yield from asyncio.wait(
516+
[self._subscription.wait_for_subscription(),
517+
self._closing],
518+
return_when=asyncio.FIRST_COMPLETED, loop=self._loop)
519+
if self._closing.done():
520+
break
521+
subscription = self._subscription.subscription
522+
assert subscription is not None and subscription.active
523+
auto_assigned = self._subscription.partitions_auto_assigned()
524+
525+
# Ensure active group
526+
yield from self.ensure_coordinator_known()
527+
if auto_assigned and self.need_rejoin(subscription):
528+
# due to a race condition between the initial metadata
529+
# fetch and the initial rebalance, we need to ensure that
530+
# the metadata is fresh before joining initially. This
531+
# ensures that we have matched the pattern against the
532+
# cluster's topics at least once before joining.
533+
# Also the rebalance can be issued by another node, that
534+
# discovered a new topic, which is still unknown to this
535+
# one.
536+
if self._subscription.subscribed_pattern:
537+
yield from self._client.force_metadata_update()
538+
if not subscription.active:
539+
continue
540+
541+
if not performed_join_prepare:
542+
# NOTE: We pass the previously used assignment here.
543+
yield from self._on_join_prepare(assignment)
544+
performed_join_prepare = True
545+
546+
# NOTE: we did not stop heartbeat task before to keep the
547+
# member alive during the callback, as it can commit offsets.
548+
# See the ``RebalanceInProgressError`` case in heartbeat
549+
# handling.
550+
yield from self._stop_heartbeat_task()
551+
552+
# We will only try to perform the rejoin once. If it fails,
553+
# we will spin this loop another time, checking for coordinator
554+
# and subscription changes.
555+
# NOTE: We do re-join in sync. The group rebalance will fail on
556+
# subscription change and coordinator failure by itself and
557+
# this way we don't need to worry about racing or cancellation
558+
# issues that could occur if re-join were to be a task.
559+
success = yield from self._do_rejoin_group(subscription)
560+
if success:
561+
performed_join_prepare = False
562+
assignment = subscription.assignment
563+
self._start_heartbeat_task()
564+
else:
565+
# Backoff is done in group rejoin
541566
continue
542-
543-
if not performed_join_prepare:
544-
# NOTE: We pass the previously used assignment here.
545-
yield from self._on_join_prepare(assignment)
546-
performed_join_prepare = True
547-
548-
# NOTE: we did not stop heartbeat task before to keep the
549-
# member alive during the callback, as it can commit offsets.
550-
# See the ``RebalanceInProgressError`` case in heartbeat
551-
# handling.
552-
yield from self._stop_heartbeat_task()
553-
554-
# We will only try to perform the rejoin once. If it fails,
555-
# we will spin this loop another time, checking for coordinator
556-
# and subscription changes.
557-
# NOTE: We do re-join in sync. The group rebalance will fail on
558-
# subscription change and coordinator failure by itself and
559-
# this way we don't need to worry about racing or cancellation
560-
# issues that could occur if re-join were to be a task.
561-
success = yield from self._do_rejoin_group(subscription)
562-
if success:
563-
performed_join_prepare = False
564-
assignment = subscription.assignment
565-
self._start_heartbeat_task()
566567
else:
567-
# Backoff is done in group rejoin
568-
continue
569-
else:
570-
assignment = subscription.assignment
571-
572-
assert assignment is not None and assignment.active
573-
574-
# We will only try to commit offsets once here. In error case the
575-
# returned wait_timeout will be ``retry_backoff``. In success case
576-
# time to next autocommit deadline. If autocommit is disabled
577-
# timeout will be ``None``, ie. no timeout.
578-
wait_timeout = yield from self._maybe_do_autocommit(assignment)
579-
580-
futures = [
581-
self._closing, # Will exit fast if close() called
582-
self._coordinator_dead_fut,
583-
subscription.unsubscribe_future]
584-
# In case of manual assignment this future will be always set and
585-
# we don't want a heavy loop here.
586-
# NOTE: metadata changes are for partition count and pattern
587-
# subscription, which is irrelevant in case of user assignment.
588-
if auto_assigned:
589-
futures.append(self._rejoin_needed_fut)
590-
591-
done, _ = yield from asyncio.wait(
592-
futures, timeout=wait_timeout, loop=self._loop,
593-
return_when=asyncio.FIRST_COMPLETED)
568+
assignment = subscription.assignment
569+
570+
assert assignment is not None and assignment.active
571+
572+
# We will only try to commit offsets once here. In error case the
573+
# returned wait_timeout will be ``retry_backoff``. In success case
574+
# time to next autocommit deadline. If autocommit is disabled
575+
# timeout will be ``None``, ie. no timeout.
576+
wait_timeout = yield from self._maybe_do_autocommit(assignment)
577+
578+
futures = [
579+
self._closing, # Will exit fast if close() called
580+
self._coordinator_dead_fut,
581+
subscription.unsubscribe_future]
582+
# In case of manual assignment this future will be always set and
583+
# we don't want a heavy loop here.
584+
# NOTE: metadata changes are for partition count and pattern
585+
# subscription, which is irrelevant in case of user assignment.
586+
if auto_assigned:
587+
futures.append(self._rejoin_needed_fut)
588+
589+
done, _ = yield from asyncio.wait(
590+
futures, timeout=wait_timeout, loop=self._loop,
591+
return_when=asyncio.FIRST_COMPLETED)
592+
except asyncio.CancelledError:
593+
pass
594+
except MissingAutoCreateTopic as exc:
595+
log.error("Rejoining group -- %s", exc)
596+
yield from self._client.force_metadata_update()
597+
self.request_rejoin()
594598

595599
# Closing finallization
596600
if assignment is not None:
@@ -752,6 +756,10 @@ def _do_rejoin_group(self, subscription):
752756
subscription, self._assignors, self._session_timeout_ms,
753757
self._retry_backoff_ms, loop=self._loop)
754758
assignment = yield from rebalance.perform_group_join()
759+
if self._client.cluster.missing_autocreate_topics:
760+
raise MissingAutoCreateTopic(
761+
"Need to rejoin! -- Topics not yet created: {0!r}".format(
762+
self._client.cluster.missing_autocreate_topics))
755763

756764
if not subscription.active:
757765
log.debug("Subscription changed during rebalance from %s to %s. "
@@ -1154,6 +1162,7 @@ def perform_group_join(self):
11541162
"Unexpected error in join group '%s' response: %s",
11551163
self.group_id, err)
11561164
raise Errors.KafkaError(repr(err))
1165+
11571166
return None
11581167

11591168
@asyncio.coroutine

0 commit comments

Comments
 (0)