diff --git a/pom.xml b/pom.xml
index 4040662..bca9df2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.boot
spring-boot-starter-parent
- 3.3.6
+ 3.5.10
io.numaproj.pulsar
@@ -25,7 +25,7 @@
io.numaproj.numaflow
numaflow-java
- 0.9.0
+ 0.11.0
org.projectlombok
diff --git a/src/main/java/io/numaproj/pulsar/consumer/NumaHeaderKeys.java b/src/main/java/io/numaproj/pulsar/consumer/NumaHeaderKeys.java
new file mode 100644
index 0000000..175242f
--- /dev/null
+++ b/src/main/java/io/numaproj/pulsar/consumer/NumaHeaderKeys.java
@@ -0,0 +1,17 @@
+package io.numaproj.pulsar.consumer;
+/**
+ * Header keys for Pulsar message metadata passed to Numaflow.
+ */
+class NumaHeaderKeys {
+
+ static final String PULSAR_PRODUCER_NAME = "x-pulsar-producer-name";
+ static final String PULSAR_MESSAGE_ID = "x-pulsar-message-id";
+ static final String PULSAR_TOPIC_NAME = "x-pulsar-topic-name";
+ static final String PULSAR_PUBLISH_TIME = "x-pulsar-publish-time";
+ static final String PULSAR_EVENT_TIME = "x-pulsar-event-time";
+ static final String PULSAR_REDELIVERY_COUNT = "x-pulsar-redelivery-count";
+
+ private NumaHeaderKeys() {
+ // Utility class, prevent instantiation
+ }
+}
diff --git a/src/main/java/io/numaproj/pulsar/consumer/PulsarSource.java b/src/main/java/io/numaproj/pulsar/consumer/PulsarSource.java
index f0872fb..9edffd6 100644
--- a/src/main/java/io/numaproj/pulsar/consumer/PulsarSource.java
+++ b/src/main/java/io/numaproj/pulsar/consumer/PulsarSource.java
@@ -1,6 +1,7 @@
package io.numaproj.pulsar.consumer;
import io.numaproj.numaflow.sourcer.AckRequest;
+import io.numaproj.numaflow.sourcer.NackRequest;
import io.numaproj.numaflow.sourcer.Message;
import io.numaproj.numaflow.sourcer.Offset;
import io.numaproj.numaflow.sourcer.OutputObserver;
@@ -86,7 +87,10 @@ public void read(ReadRequest request, OutputObserver observer) {
byte[] offsetBytes = msgId.getBytes(StandardCharsets.UTF_8);
Offset offset = new Offset(offsetBytes);
- Message message = new Message(pMsg.getValue(), offset, Instant.now());
+
+ Map headers = buildHeaders(pMsg);
+
+ Message message = new Message(pMsg.getValue(), offset, Instant.now(), headers);
observer.send(message);
messagesToAck.put(msgId, pMsg);
@@ -135,6 +139,38 @@ public void ack(AckRequest request) {
}
}
+ /**
+ * Builds headers from Pulsar message metadata
+ */
+ private Map buildHeaders(org.apache.pulsar.client.api.Message pulsarMessage) {
+ Map headers = new HashMap<>();
+
+ headers.put(NumaHeaderKeys.PULSAR_PRODUCER_NAME, pulsarMessage.getProducerName());
+ headers.put(NumaHeaderKeys.PULSAR_MESSAGE_ID, pulsarMessage.getMessageId().toString());
+ headers.put(NumaHeaderKeys.PULSAR_TOPIC_NAME, pulsarMessage.getTopicName());
+ headers.put(NumaHeaderKeys.PULSAR_PUBLISH_TIME, String.valueOf(pulsarMessage.getPublishTime()));
+ headers.put(NumaHeaderKeys.PULSAR_EVENT_TIME, String.valueOf(pulsarMessage.getEventTime()));
+ headers.put(NumaHeaderKeys.PULSAR_REDELIVERY_COUNT, String.valueOf(pulsarMessage.getRedeliveryCount()));
+
+ // Add message properties as headers
+ if (pulsarMessage.getProperties() != null && !pulsarMessage.getProperties().isEmpty()) {
+ pulsarMessage.getProperties().forEach((key, value) -> {
+ if (key != null && value != null) {
+ headers.put(key, value);
+ }
+ });
+ }
+
+ log.trace("Message headers: {}", headers);
+ return headers;
+ }
+
+ @Override
+ public void nack(NackRequest request) {
+ // TODO : implement nack logic
+ throw new UnsupportedOperationException("Unimplemented method 'nack'");
+ }
+
@Override
public long getPending() {
try {
diff --git a/src/test/java/io/numaproj/pulsar/consumer/PulsarSourceTest.java b/src/test/java/io/numaproj/pulsar/consumer/PulsarSourceTest.java
index b76224a..138da88 100644
--- a/src/test/java/io/numaproj/pulsar/consumer/PulsarSourceTest.java
+++ b/src/test/java/io/numaproj/pulsar/consumer/PulsarSourceTest.java
@@ -165,6 +165,27 @@ public void readWhenMessagesReceived() {
when(msg2.getMessageId()).thenReturn(msgId2);
when(msg1.getValue()).thenReturn("Hello".getBytes(StandardCharsets.UTF_8));
when(msg2.getValue()).thenReturn("World".getBytes(StandardCharsets.UTF_8));
+
+ // Stub metadata methods required by buildHeaders()
+ when(msg1.getProducerName()).thenReturn("test-producer");
+ when(msg2.getProducerName()).thenReturn("test-producer");
+ when(msg1.getTopicName()).thenReturn("test-topic");
+ when(msg2.getTopicName()).thenReturn("test-topic");
+ when(msg1.getPublishTime()).thenReturn(1000L);
+ when(msg2.getPublishTime()).thenReturn(2000L);
+ when(msg1.getEventTime()).thenReturn(1000L);
+ when(msg2.getEventTime()).thenReturn(2000L);
+ when(msg1.getRedeliveryCount()).thenReturn(0);
+ when(msg2.getRedeliveryCount()).thenReturn(0);
+ when(msg1.getProperties()).thenReturn(Collections.emptyMap());
+ when(msg2.getProperties()).thenReturn(Collections.emptyMap());
+
+ // Add custom properties to msg2
+ Map customProps = new HashMap<>();
+ customProps.put("custom-key-1", "custom-value-1");
+ customProps.put("custom-key-2", "custom-value-2");
+ customProps.put("app-version", "1.2.3");
+ when(msg2.getProperties()).thenReturn(customProps);
// Create a fake Messages object
Messages messages = mock(Messages.class);
@@ -189,10 +210,36 @@ public void readWhenMessagesReceived() {
verify(observer, times(2)).send(messageCaptor.capture());
java.util.List sentMessages = messageCaptor.getAllValues();
assertEquals(2, sentMessages.size());
+
// Validate contents of messages using getValue().
assertEquals("Hello", new String(sentMessages.get(0).getValue(), StandardCharsets.UTF_8));
assertEquals("World", new String(sentMessages.get(1).getValue(), StandardCharsets.UTF_8));
+ // Verify headers are correctly populated for first message
+ Message firstMessage = sentMessages.get(0);
+ assertNotNull("Headers should not be null", firstMessage.getHeaders());
+ assertEquals("test-producer", firstMessage.getHeaders().get("x-pulsar-producer-name"));
+ assertEquals("msg1", firstMessage.getHeaders().get("x-pulsar-message-id"));
+ assertEquals("test-topic", firstMessage.getHeaders().get("x-pulsar-topic-name"));
+ assertEquals("1000", firstMessage.getHeaders().get("x-pulsar-publish-time"));
+ assertEquals("1000", firstMessage.getHeaders().get("x-pulsar-event-time"));
+ assertEquals("0", firstMessage.getHeaders().get("x-pulsar-redelivery-count"));
+
+ // Verify headers are correctly populated for second message
+ Message secondMessage = sentMessages.get(1);
+ assertNotNull("Headers should not be null", secondMessage.getHeaders());
+ assertEquals("test-producer", secondMessage.getHeaders().get("x-pulsar-producer-name"));
+ assertEquals("msg2", secondMessage.getHeaders().get("x-pulsar-message-id"));
+ assertEquals("test-topic", secondMessage.getHeaders().get("x-pulsar-topic-name"));
+ assertEquals("2000", secondMessage.getHeaders().get("x-pulsar-publish-time"));
+ assertEquals("2000", secondMessage.getHeaders().get("x-pulsar-event-time"));
+ assertEquals("0", secondMessage.getHeaders().get("x-pulsar-redelivery-count"));
+
+ // Verify custom properties are included in headers
+ assertEquals("custom-value-1", secondMessage.getHeaders().get("custom-key-1"));
+ assertEquals("custom-value-2", secondMessage.getHeaders().get("custom-key-2"));
+ assertEquals("1.2.3", secondMessage.getHeaders().get("app-version"));
+
// Confirm messages are tracked for ack.
// The keys should be "msg1" and "msg2"
java.util.Map ackMap = (java.util.Map) ReflectionTestUtils.getField(pulsarSource,