Skip to content
Closed

GED #38

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions consumer-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: consumer-config
data:
application.yml: |
spring:
pulsar:
client: # see here for all configurations: https://pulsar.apache.org/reference/#/4.0.x/client/client-configuration-client
clientConfig:
serviceUrl: "pulsar://host.docker.internal:6650"
consumer:
enabled: true
consumerConfig: # see here for all configurations: https://pulsar.apache.org/reference/#/4.0.x/client/client-configuration-producer
topicNames: "anotha"
subscriptionName: "sub"
admin:
adminConfig: # Accepts the same key-value pair configurations as pulsar client: https://pulsar.apache.org/reference/#/4.0.x/client/client-configuration-client
serviceUrl: "http://host.docker.internal:8080"


43 changes: 43 additions & 0 deletions consumer-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: consumer-pipeline
spec:
limits:
readBatchSize: 1 # Change if you want a different batch size
vertices:
- name: in
scale:
min: 1
volumes:
- name: pulsar-config-volume
configMap:
name: consumer-config
items:
- key: application.yml
path: application.yml
source:
udsource:
container:
image: apache-pulsar-java:v0.3.0
args: [ "--spring.config.location=file:/conf/application.yml" ]
imagePullPolicy: Never
volumeMounts:
- name: pulsar-config-volume
mountPath: /conf
- name: p1
scale:
min: 1
udf:
builtin:
name: cat
- name: out
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: p1
- from: p1
to: out
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
version: '3.5'

services:
pulsar:
image: "apachepulsar/pulsar:4.0.2"
command: bin/pulsar standalone
environment:
PULSAR_MEM: "-Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"
volumes:
- ./schema.avsc:/pulsar/schemas/schema.avsc


ports:
- "6650:6650"
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>

<build>
Expand Down
18 changes: 18 additions & 0 deletions producer-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: producer-config
data:
application.yml: |
spring:
pulsar:
client: # see here for all configurations: https://pulsar.apache.org/reference/#/4.0.x/client/client-configuration-client
clientConfig:
serviceUrl: "pulsar://host.docker.internal:6650"
producer:
enabled: true
producerConfig: # see here for all configurations: https://pulsar.apache.org/reference/#/4.0.x/client/client-configuration-producer
topicName: "anotha"
sendTimeoutMs: 2000


44 changes: 44 additions & 0 deletions producer-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: producer-pipeline
spec:
vertices:
- name: in
scale:
min: 1
source:
generator:
rpu: 1
duration: 1s
msgSize: 01
- name: p1
scale:
min: 1
udf:
builtin:
name: cat
- name: out
scale:
min: 1
volumes: # Shared between containers that are part of the same pod, useful for sharing configurations
- name: pulsar-config-volume
configMap:
name: producer-config
items:
- key: application.yml
path: application.yml
sink:
udsink:
container:
image: apache-pulsar-java:v0.3.0 # TO DO: Replace with quay.io link
args: [ "--spring.config.location=file:/conf/application.yml" ] # Use external configuration file
imagePullPolicy: Never
volumeMounts:
- name: pulsar-config-volume
mountPath: /conf
edges:
- from: in
to: p1
- from: p1
to: out
5 changes: 5 additions & 0 deletions schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"numagen\",\"fields\":[{\"name\":\"Createdts\",\"type\":\"long\"},{\"name\":\"Data\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"DataRecord\",\"fields\":[{\"name\":\"padding\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"value\",\"type\":\"long\"}]}],\"default\":null}],\"aliases\":[\"numagen\"]}",
"properties": {}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package io.numaproj.pulsar.config.producer;

import io.numaproj.pulsar.producer.NumagenMessage;

import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.Resource;

import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;

Expand All @@ -23,7 +29,8 @@ public class PulsarProducerConfig {

@Bean
@ConditionalOnProperty(prefix = "spring.pulsar.producer", name = "enabled", havingValue = "true", matchIfMissing = false)
public Producer<byte[]> pulsarProducer(PulsarClient pulsarClient, PulsarProducerProperties pulsarProducerProperties)
public Producer<GenericRecord> pulsarProducer(PulsarClient pulsarClient,
PulsarProducerProperties pulsarProducerProperties)
throws Exception {
String podName = env.getProperty("NUMAFLOW_POD", "pod-" + UUID.randomUUID());
String producerName = "producerName";
Expand All @@ -35,7 +42,7 @@ public Producer<byte[]> pulsarProducer(PulsarClient pulsarClient, PulsarProducer
}
producerConfig.put(producerName, podName);

return pulsarClient.newProducer(Schema.BYTES)
return pulsarClient.newProducer(Schema.AVRO(GenericRecord.class))
.loadConf(producerConfig)
.create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import io.numaproj.pulsar.producer.NumagenMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
Expand All @@ -33,10 +34,10 @@ public class PulsarConsumerManager {
private PulsarClient pulsarClient;

// The current consumer instance.
private Consumer<byte[]> currentConsumer;
private Consumer<NumagenMessage> currentConsumer;

// Returns the current consumer if it exists. If not, creates a new one.
public Consumer<byte[]> getOrCreateConsumer(long count, long timeoutMillis)
public Consumer<NumagenMessage> getOrCreateConsumer(long count, long timeoutMillis)
throws PulsarClientException {
if (currentConsumer != null) {
return currentConsumer;
Expand All @@ -48,7 +49,7 @@ public Consumer<byte[]> getOrCreateConsumer(long count, long timeoutMillis)
// than 2^63 - 1 which will cause an overflow
.build();

currentConsumer = pulsarClient.newConsumer(Schema.BYTES)
currentConsumer = pulsarClient.newConsumer(Schema.AVRO(NumagenMessage.class))
.loadConf(pulsarConsumerProperties.getConsumerConfig())
.batchReceivePolicy(batchPolicy)
.subscriptionType(SubscriptionType.Shared) // Must be shared to support multiple pods
Expand Down
Loading