Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,21 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
*/
@Override
public <T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type) {
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type) {
return subscribeToEvents(pubsubName, topic, listener, null, type);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, String deadLetterTopic, TypeRef<T> type) {
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.setDeadLetterTopic(deadLetterTopic)
.build();
DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
Expand All @@ -483,10 +493,20 @@ public <T> Subscription subscribeToEvents(
*/
@Override
public <T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) {
return subscribeToEvents(pubsubName, topic, (String) null, type);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Flux<CloudEvent<T>> subscribeToEvents(
String pubsubName, String topic, String deadLetterTopic, TypeRef<T> type) {
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.setDeadLetterTopic(deadLetterTopic)
.build();
DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
Expand Down
26 changes: 26 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,21 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
<T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);

/**
* Subscribe to pubsub via streaming.
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param listener Callback methods to process events.
* @param deadLetterTopic Topic to send dead letter messages to.
* @param type Type for object deserialization.
* @param <T> Type of object deserialization.
* @return An active subscription.
* @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach.
*/
@Deprecated
<T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, String deadLetterTopic, TypeRef<T> type);

/**
* Subscribe to pubsub events via streaming using Project Reactor Flux.
* @param pubsubName Name of the pubsub component.
Expand All @@ -293,6 +308,17 @@ <T> Subscription subscribeToEvents(
*/
<T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);

/**
* Subscribe to pubsub events via streaming using Project Reactor Flux.
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param deadLetterTopic Topic to send dead letter messages to.
* @param type Type for object deserialization.
* @return A Flux of CloudEvents containing deserialized event payloads and metadata.
* @param <T> Type of the event payload.
*/
<T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, String deadLetterTopic, TypeRef<T> type);

/*
* Converse with an LLM.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ private void handleProcessingError(String eventId, Exception cause) {

try {
// Try to send RETRY acknowledgment
// TODO: push the event to the dead letter topic in case of processing errors
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to publish the events to the deadLetter topic on the sdk level?

or is it already implemented on the dapr daemon level once we build Drop Ack in the request stream ?

requestStream.onNext(buildRetryAck(eventId));
} catch (Exception ackException) {
// Failed to send ack - this is critical
Expand Down