Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.6</version>
<version>3.5.10</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>io.numaproj.pulsar</groupId>
Expand All @@ -25,7 +25,7 @@
<dependency>
<groupId>io.numaproj.numaflow</groupId>
<artifactId>numaflow-java</artifactId>
<version>0.9.0</version>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/io/numaproj/pulsar/consumer/NumaHeaderKeys.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.numaproj.pulsar.consumer;
/**
* Header keys for Pulsar message metadata passed to Numaflow.
*/
class NumaHeaderKeys {

static final String PULSAR_PRODUCER_NAME = "x-pulsar-producer-name";
static final String PULSAR_MESSAGE_ID = "x-pulsar-message-id";
static final String PULSAR_TOPIC_NAME = "x-pulsar-topic-name";
static final String PULSAR_PUBLISH_TIME = "x-pulsar-publish-time";
static final String PULSAR_EVENT_TIME = "x-pulsar-event-time";
static final String PULSAR_REDELIVERY_COUNT = "x-pulsar-redelivery-count";

private NumaHeaderKeys() {
// Utility class, prevent instantiation
}
}
38 changes: 37 additions & 1 deletion src/main/java/io/numaproj/pulsar/consumer/PulsarSource.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.pulsar.consumer;

import io.numaproj.numaflow.sourcer.AckRequest;
import io.numaproj.numaflow.sourcer.NackRequest;
import io.numaproj.numaflow.sourcer.Message;
import io.numaproj.numaflow.sourcer.Offset;
import io.numaproj.numaflow.sourcer.OutputObserver;
Expand Down Expand Up @@ -86,7 +87,10 @@ public void read(ReadRequest request, OutputObserver observer) {
byte[] offsetBytes = msgId.getBytes(StandardCharsets.UTF_8);
Offset offset = new Offset(offsetBytes);

Message message = new Message(pMsg.getValue(), offset, Instant.now());

Map<String, String> headers = buildHeaders(pMsg);

Message message = new Message(pMsg.getValue(), offset, Instant.now(), headers);
observer.send(message);

messagesToAck.put(msgId, pMsg);
Expand Down Expand Up @@ -135,6 +139,38 @@ public void ack(AckRequest request) {
}
}

/**
* Builds headers from Pulsar message metadata
*/
private Map<String, String> buildHeaders(org.apache.pulsar.client.api.Message<byte[]> pulsarMessage) {
Map<String, String> headers = new HashMap<>();

headers.put(NumaHeaderKeys.PULSAR_PRODUCER_NAME, pulsarMessage.getProducerName());
headers.put(NumaHeaderKeys.PULSAR_MESSAGE_ID, pulsarMessage.getMessageId().toString());
headers.put(NumaHeaderKeys.PULSAR_TOPIC_NAME, pulsarMessage.getTopicName());
headers.put(NumaHeaderKeys.PULSAR_PUBLISH_TIME, String.valueOf(pulsarMessage.getPublishTime()));
headers.put(NumaHeaderKeys.PULSAR_EVENT_TIME, String.valueOf(pulsarMessage.getEventTime()));
headers.put(NumaHeaderKeys.PULSAR_REDELIVERY_COUNT, String.valueOf(pulsarMessage.getRedeliveryCount()));

// Add message properties as headers
if (pulsarMessage.getProperties() != null && !pulsarMessage.getProperties().isEmpty()) {
pulsarMessage.getProperties().forEach((key, value) -> {
if (key != null && value != null) {
headers.put(key, value);
}
});
}

log.trace("Message headers: {}", headers);
return headers;
}

@Override
public void nack(NackRequest request) {
// TODO : implement nack logic
throw new UnsupportedOperationException("Unimplemented method 'nack'");
}

@Override
public long getPending() {
try {
Expand Down
47 changes: 47 additions & 0 deletions src/test/java/io/numaproj/pulsar/consumer/PulsarSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,27 @@ public void readWhenMessagesReceived() {
when(msg2.getMessageId()).thenReturn(msgId2);
when(msg1.getValue()).thenReturn("Hello".getBytes(StandardCharsets.UTF_8));
when(msg2.getValue()).thenReturn("World".getBytes(StandardCharsets.UTF_8));

// Stub metadata methods required by buildHeaders()
when(msg1.getProducerName()).thenReturn("test-producer");
when(msg2.getProducerName()).thenReturn("test-producer");
when(msg1.getTopicName()).thenReturn("test-topic");
when(msg2.getTopicName()).thenReturn("test-topic");
when(msg1.getPublishTime()).thenReturn(1000L);
when(msg2.getPublishTime()).thenReturn(2000L);
when(msg1.getEventTime()).thenReturn(1000L);
when(msg2.getEventTime()).thenReturn(2000L);
when(msg1.getRedeliveryCount()).thenReturn(0);
when(msg2.getRedeliveryCount()).thenReturn(0);
when(msg1.getProperties()).thenReturn(Collections.emptyMap());
when(msg2.getProperties()).thenReturn(Collections.emptyMap());

// Add custom properties to msg2
Map<String, String> customProps = new HashMap<>();
customProps.put("custom-key-1", "custom-value-1");
customProps.put("custom-key-2", "custom-value-2");
customProps.put("app-version", "1.2.3");
when(msg2.getProperties()).thenReturn(customProps);

// Create a fake Messages<byte[]> object
Messages<byte[]> messages = mock(Messages.class);
Expand All @@ -189,10 +210,36 @@ public void readWhenMessagesReceived() {
verify(observer, times(2)).send(messageCaptor.capture());
java.util.List<Message> sentMessages = messageCaptor.getAllValues();
assertEquals(2, sentMessages.size());

// Validate contents of messages using getValue().
assertEquals("Hello", new String(sentMessages.get(0).getValue(), StandardCharsets.UTF_8));
assertEquals("World", new String(sentMessages.get(1).getValue(), StandardCharsets.UTF_8));

// Verify headers are correctly populated for first message
Message firstMessage = sentMessages.get(0);
assertNotNull("Headers should not be null", firstMessage.getHeaders());
assertEquals("test-producer", firstMessage.getHeaders().get("x-pulsar-producer-name"));
assertEquals("msg1", firstMessage.getHeaders().get("x-pulsar-message-id"));
assertEquals("test-topic", firstMessage.getHeaders().get("x-pulsar-topic-name"));
assertEquals("1000", firstMessage.getHeaders().get("x-pulsar-publish-time"));
assertEquals("1000", firstMessage.getHeaders().get("x-pulsar-event-time"));
assertEquals("0", firstMessage.getHeaders().get("x-pulsar-redelivery-count"));

// Verify headers are correctly populated for second message
Message secondMessage = sentMessages.get(1);
assertNotNull("Headers should not be null", secondMessage.getHeaders());
assertEquals("test-producer", secondMessage.getHeaders().get("x-pulsar-producer-name"));
assertEquals("msg2", secondMessage.getHeaders().get("x-pulsar-message-id"));
assertEquals("test-topic", secondMessage.getHeaders().get("x-pulsar-topic-name"));
assertEquals("2000", secondMessage.getHeaders().get("x-pulsar-publish-time"));
assertEquals("2000", secondMessage.getHeaders().get("x-pulsar-event-time"));
assertEquals("0", secondMessage.getHeaders().get("x-pulsar-redelivery-count"));

// Verify custom properties are included in headers
assertEquals("custom-value-1", secondMessage.getHeaders().get("custom-key-1"));
assertEquals("custom-value-2", secondMessage.getHeaders().get("custom-key-2"));
assertEquals("1.2.3", secondMessage.getHeaders().get("app-version"));

// Confirm messages are tracked for ack.
// The keys should be "msg1" and "msg2"
java.util.Map<String, ?> ackMap = (java.util.Map<String, ?>) ReflectionTestUtils.getField(pulsarSource,
Expand Down