Skip to content

[watermill-kafka] - cannot process events concurrently without ACKing the event #374

@nachogiljaldo

Description

@nachogiljaldo

When using watermill-kafka's subscriber, only one event can be processed at the time, unless you ACK the event. That, disregard the number of partitions you may have.
If you ACK the event, the offset is marked as done, and therefore the event.

The consequence is that if you want to ensure at least once processing you can process only one event at the time.

I might be missing something, but I don't think I am, else I would expect this test to pass:

Publish 20 events in 8 partitions and expect to process at least 2 events in 15s without ACK

func TestConcurrentProcessingDifferentPartitions(t *testing.T) {
	pub, sub := createPartitionedPubSub(t)
	topicName := "topic_" + watermill.NewUUID()

	var messagesToPublish []*message.Message

	for i := 0; i < 20; i++ {
		id := watermill.NewUUID()
		messagesToPublish = append(messagesToPublish, message.NewMessage(id, nil))
	}
	err := pub.Publish(topicName, messagesToPublish...)
	require.NoError(t, err, "cannot publish message")

	messages, err := sub.Subscribe(context.Background(), topicName)
	require.NoError(t, err)

	var receivedMessages []*message.Message
	go func() {
		for {
			select {
			case msg, ok := <-messages:
				if ok {
					receivedMessages = append(receivedMessages, msg)
				}
			}
		}
	}()

	time.Sleep(15 * time.Second)
	require.GreaterOrEqual(t, len(receivedMessages), 2)
}

My expectation would be one of these (and ideally both configurable):

  • I can process concurrently N messages where N is the amount of partitions assigned to the consumer
  • I can get as many as I ask for (with a max buffer size) and I need to ACK / NACK them explicitly

Am I missing something, is it currently possible?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions