diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 05b555b5e9..8ff8fb1697 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -465,11 +465,21 @@ public Mono> publishEvents(BulkPublishRequest requ */ @Override public Subscription subscribeToEvents( - String pubsubName, String topic, SubscriptionListener listener, TypeRef type) { + String pubsubName, String topic, SubscriptionListener listener, TypeRef type) { + return subscribeToEvents(pubsubName, topic, listener, null, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Subscription subscribeToEvents( + String pubsubName, String topic, SubscriptionListener listener, String deadLetterTopic, TypeRef type) { DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest = DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() .setTopic(topic) .setPubsubName(pubsubName) + .setDeadLetterTopic(deadLetterTopic) .build(); DaprProtos.SubscribeTopicEventsRequestAlpha1 request = DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() @@ -483,10 +493,20 @@ public Subscription subscribeToEvents( */ @Override public Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type) { + return subscribeToEvents(pubsubName, topic, (String) null, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Flux> subscribeToEvents( + String pubsubName, String topic, String deadLetterTopic, TypeRef type) { DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest = DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() .setTopic(topic) .setPubsubName(pubsubName) + .setDeadLetterTopic(deadLetterTopic) .build(); DaprProtos.SubscribeTopicEventsRequestAlpha1 request = DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 17ca136492..0dfcd3b1b3 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -283,6 +283,21 @@ Mono> publishEvents(String pubsubName, String topicNa Subscription subscribeToEvents( String pubsubName, String topic, SubscriptionListener listener, TypeRef type); + /** + * Subscribe to pubsub via streaming. + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param listener Callback methods to process events. + * @param deadLetterTopic Topic to send dead letter messages to. + * @param type Type for object deserialization. + * @param Type of object deserialization. + * @return An active subscription. + * @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach. + */ + @Deprecated + Subscription subscribeToEvents( + String pubsubName, String topic, SubscriptionListener listener, String deadLetterTopic, TypeRef type); + /** * Subscribe to pubsub events via streaming using Project Reactor Flux. * @param pubsubName Name of the pubsub component. @@ -293,6 +308,17 @@ Subscription subscribeToEvents( */ Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type); + /** + * Subscribe to pubsub events via streaming using Project Reactor Flux. + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param deadLetterTopic Topic to send dead letter messages to. + * @param type Type for object deserialization. + * @return A Flux of CloudEvents containing deserialized event payloads and metadata. + * @param Type of the event payload. + */ + Flux> subscribeToEvents(String pubsubName, String topic, String deadLetterTopic, TypeRef type); + /* * Converse with an LLM. * diff --git a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java index 56131882b8..652ee67b7b 100644 --- a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java +++ b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java @@ -179,6 +179,7 @@ private void handleProcessingError(String eventId, Exception cause) { try { // Try to send RETRY acknowledgment + // TODO: push the event to the dead letter topic in case of processing errors requestStream.onNext(buildRetryAck(eventId)); } catch (Exception ackException) { // Failed to send ack - this is critical