Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.

Commit 54565e0

Browse files
committed
APEXMALHAR-2497 APEXMALHAR-2162 1) Refactor the Exactly Once output operator. 2) Refactor and fix the issues of unit tests.
1 parent a37869e commit 54565e0

26 files changed

+1821
-2335
lines changed

examples/sql/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,19 @@
9090
</exclusion>
9191
</exclusions>
9292
</dependency>
93+
<dependency>
94+
<groupId>org.apache.apex</groupId>
95+
<artifactId>malhar-kafka-common</artifactId>
96+
<version>${project.parent.version}</version>
97+
<type>test-jar</type>
98+
<scope>test</scope>
99+
<exclusions>
100+
<exclusion>
101+
<groupId>*</groupId>
102+
<artifactId>*</artifactId>
103+
</exclusion>
104+
</exclusions>
105+
</dependency>
93106
<dependency>
94107
<groupId>org.apache.kafka</groupId>
95108
<artifactId>kafka_2.11</artifactId>

kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
2828
import org.apache.kafka.common.Metric;
2929
import org.apache.kafka.common.MetricName;
30+
import org.apache.kafka.common.PartitionInfo;
3031
import org.apache.kafka.common.TopicPartition;
3132

3233
/**
@@ -53,7 +54,7 @@ public interface AbstractKafkaConsumer
5354
* @param timeOut time in milliseconds, spent waiting in poll if data is not available in buffer.
5455
* @return records
5556
*/
56-
ConsumerRecords<byte[], byte[]> pollRecords(long timeOut);
57+
ConsumerRecords pollRecords(long timeOut);
5758

5859
/**
5960
* Commit the specified offsets for the specified list of topics and partitions to Kafka.
@@ -124,4 +125,6 @@ public interface AbstractKafkaConsumer
124125
* @param tp partition
125126
*/
126127
long positionPartition(TopicPartition tp);
128+
129+
List<PartitionInfo> partitionsFor(String topic);
127130
}

0 commit comments

Comments
 (0)