Skip to content

Conversation

cjlee38
Copy link

@cjlee38 cjlee38 commented Jan 17, 2024

KafkaReceiver currently is not possible to close safely. An example is below:

val disposable = kafkaReceiver.receive()
    .flatMapSequential { record -> process(record).then(record) }
    .delayUntil { 
        it.receiverOffset().acknowledge() 
        it.receiverOffset().commit()
    }
    .subscribe()

dispose() method would be a way, but it might lead to unexpected result because of cancellation leading processed records not to be committed. I think This PR is discussed at #247, and also helps to address #378 indirectly.(People can customize KafkaReceiver's lifecycle after stopping followed by disposable.isDisposed()

I've considered several situations related to thread-safety or something, but might miss I didn't expect. Any further suggestions would be appreciated.

@cjlee38
Copy link
Author

cjlee38 commented Jun 5, 2024

Hello,

It has been quite some time since I submitted this PR, and I haven't received any response yet. I was wondering if support for reactor-kafka has ended, or if there might be another reason for the delay? I don't mean to rush, but I'm leaving this message because it has been a considerable amount of time without any reply.

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant