entry = keyValueStoreIter.next();
LOG.info(entry.key + entry.value);
}
}
diff --git a/src/main/java/com/opendxl/databus/common/internal/adapter/DatabusProducerRecordAdapter.java b/src/main/java/com/opendxl/databus/common/internal/adapter/DatabusProducerRecordAdapter.java
index bd75071..0a583f1 100644
--- a/src/main/java/com/opendxl/databus/common/internal/adapter/DatabusProducerRecordAdapter.java
+++ b/src/main/java/com/opendxl/databus/common/internal/adapter/DatabusProducerRecordAdapter.java
@@ -86,8 +86,6 @@ public DatabusProducerRecordAdapter(final Serializer messageSerializer) {
sourceProducerRecord.getRoutingData().getTenantGroup());
final List kafkaHeaders = produceKafkaHeaders
? generateKafkaHeaders(databusMessage.getHeaders()) : null;
- System.out.println("produceKafkaHeaders: " + produceKafkaHeaders);
- System.out.println("kafkaHeaders: " + kafkaHeaders);
final org.apache.kafka.clients.producer.ProducerRecord targetProducerRecord =
new org.apache.kafka.clients.producer.ProducerRecord<>(targetTopic,
sourceProducerRecord.getRoutingData().getPartition(),
diff --git a/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java b/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java
new file mode 100644
index 0000000..401aa04
--- /dev/null
+++ b/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java
@@ -0,0 +1,31 @@
+package com.opendxl.databus.common.internal.util;
+
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalUnit;
+import java.util.concurrent.TimeUnit;
+
+public final class TimeUnitUtil {
+ private TimeUnitUtil() {
+ //not called
+ }
+ public static TemporalUnit convert(final TimeUnit timeUnit) {
+ switch (timeUnit) {
+ case NANOSECONDS:
+ return ChronoUnit.NANOS;
+ case MICROSECONDS:
+ return ChronoUnit.MICROS;
+ case MILLISECONDS:
+ return ChronoUnit.MILLIS;
+ case SECONDS:
+ return ChronoUnit.SECONDS;
+ case MINUTES:
+ return ChronoUnit.MINUTES;
+ case HOURS:
+ return ChronoUnit.HOURS;
+ case DAYS:
+ return ChronoUnit.DAYS;
+ default:
+ return ChronoUnit.SECONDS;
+ }
+ }
+}
diff --git a/src/main/java/com/opendxl/databus/consumer/ConsumerConfiguration.java b/src/main/java/com/opendxl/databus/consumer/ConsumerConfiguration.java
index b50b3b4..145c6fa 100644
--- a/src/main/java/com/opendxl/databus/consumer/ConsumerConfiguration.java
+++ b/src/main/java/com/opendxl/databus/consumer/ConsumerConfiguration.java
@@ -11,7 +11,7 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Collections;
diff --git a/src/main/java/com/opendxl/databus/producer/Producer.java b/src/main/java/com/opendxl/databus/producer/Producer.java
index 0678ddc..2fa5b1c 100644
--- a/src/main/java/com/opendxl/databus/producer/Producer.java
+++ b/src/main/java/com/opendxl/databus/producer/Producer.java
@@ -13,6 +13,7 @@
import com.opendxl.databus.common.internal.adapter.DatabusProducerRecordAdapter;
import com.opendxl.databus.common.internal.adapter.MetricNameMapAdapter;
import com.opendxl.databus.common.internal.adapter.PartitionInfoListAdapter;
+import com.opendxl.databus.common.internal.util.TimeUnitUtil;
import com.opendxl.databus.consumer.OffsetAndMetadata;
import com.opendxl.databus.consumer.OffsetCommitCallback;
import com.opendxl.databus.entities.internal.DatabusMessage;
@@ -28,6 +29,8 @@
import java.util.List;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.time.temporal.TemporalUnit;
/**
* A abstract producer, responsible for handling Databus outgoing messages.
@@ -305,11 +308,37 @@ public void close() {
*/
public void close(final long timeout, final TimeUnit timeUnit) {
try {
- producer.close(timeout, timeUnit);
+ TemporalUnit convertedUnit = TimeUnitUtil.convert(timeUnit);
+ producer.close(Duration.of(timeout, convertedUnit));
} catch (Exception e) {
throw new DatabusClientRuntimeException("close cannot be performed :" + e.getMessage(), e, Producer.class);
}
+ }
+ /**
+ * This method waits up to timeout for the producer to complete the sending of all incomplete requests.
+ *
+ * If the producer is unable to complete all requests before the timeout expires, this method will fail
+ * any unsent and unacknowledged records immediately.
+ *
+ * If invoked from within a {@link Callback} this method will not block and will be equivalent to
+ * close(0, TimeUnit.MILLISECONDS). This is done since no further sending will happen while
+ * blocking the I/O thread of the producer.
+ *
+ * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
+ * non-negative. Specifying a timeout of zero means do not wait for pending send
+ * requests to complete.
+ * @param timeUnit The time unit for the timeoutl
+ * @throws DatabusClientRuntimeException If close method fails. The original cause could be any of these exceptions:
+ *
InterruptException If the thread is interrupted while blocked
+ *
IllegalArgumentException If the timeout is negative.
+ */
+ public void close(final long timeout, final TemporalUnit timeUnit) {
+ try {
+ producer.close(Duration.of(timeout, timeUnit));
+ } catch (Exception e) {
+ throw new DatabusClientRuntimeException("close cannot be performed :" + e.getMessage(), e, Producer.class);
+ }
}
/**
diff --git a/src/test/java/com/opendxl/databus/util/AdminClientHelper.java b/src/test/java/com/opendxl/databus/util/AdminClientHelper.java
new file mode 100644
index 0000000..1bc266e
--- /dev/null
+++ b/src/test/java/com/opendxl/databus/util/AdminClientHelper.java
@@ -0,0 +1,20 @@
+package com.opendxl.databus.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.admin.AdminClient;
+
+import com.opendxl.databus.producer.ProducerConfig;
+
+public class AdminClientHelper {
+ private static final String BROKER_HOST = Constants.KAFKA_HOST;
+ private static final String BROKER_PORT = Constants.KAFKA_PORT;
+
+ public static AdminClient createAdminClient() {
+ final Map props = new HashMap<>();
+ final String bootstrapServer = BROKER_HOST.concat(":").concat(BROKER_PORT);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+ return AdminClient.create(props);
+ }
+}
diff --git a/src/test/java/com/opendxl/databus/util/Topic.java b/src/test/java/com/opendxl/databus/util/Topic.java
index 14907c0..74bf8be 100644
--- a/src/test/java/com/opendxl/databus/util/Topic.java
+++ b/src/test/java/com/opendxl/databus/util/Topic.java
@@ -5,11 +5,12 @@
package com.opendxl.databus.util;
import kafka.admin.TopicCommand;
+import kafka.admin.TopicCommand.TopicService;
import kafka.zk.KafkaZkClient;
+
+import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.utils.SystemTime;
-import scala.runtime.AbstractFunction0;
-
import java.util.UUID;
public class Topic {
@@ -21,21 +22,21 @@ public static class Builder {
private String topicName = getRandomTopicName();
private int partitions = 1;
private int replicationFactor = 1;
- private String zkHost = Constants.ZOOKEEPER_HOST;
- private String zkPort = Constants.ZOOKEEPER_PORT;
+ private String kafkaHost = Constants.KAFKA_HOST;
+ private String KafkaPort = Constants.KAFKA_PORT;
public Builder partitions(final int partitions) {
this.partitions = partitions;
return this;
}
- public Builder zkHost(final String zkHost) {
- this.zkHost = zkHost;
+ public Builder kafkaHost(final String kafkaHost) {
+ this.kafkaHost = kafkaHost;
return this;
}
- public Builder zkPort(final String zkPort) {
- this.zkPort = zkPort;
+ public Builder kafkaPort(final String KafkaPort) {
+ this.KafkaPort = KafkaPort;
return this;
}
@@ -51,8 +52,8 @@ public Builder replicationFactor(final int replicationFactor) {
public String go() {
String[] arguments = {"--create",
- "--zookeeper",
- zkHost.concat(":").concat(zkPort),
+ "--bootstrap-server",
+ kafkaHost.concat(":").concat(KafkaPort),
"--replication-factor",
String.valueOf(replicationFactor),
"--partitions",
@@ -61,29 +62,33 @@ public String go() {
topicName};
TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(arguments);
- try (KafkaZkClient zkUtils = getZkClient(opts)) {
- new TopicCommand.ZookeeperTopicService(zkUtils).createTopic(opts);
+ TopicService topicService=null;
+ try {
+ final AdminClient adminClient = AdminClientHelper.createAdminClient();
+ topicService = new TopicCommand.TopicService(adminClient);
+ topicService.createTopic(opts);
+ }
+ finally{
+ if (topicService!=null) {
+ topicService.close();
+ }
}
return topicName;
-
-
}
private KafkaZkClient getZkClient(TopicCommand.TopicCommandOptions opts) {
- final String connectString = opts.zkConnect().getOrElse(new AbstractFunction0() {
- @Override
- public String apply() {
- return "";
- } });
+ final String connectString = "";
return KafkaZkClient.apply(connectString,
- JaasUtils.isZkSecurityEnabled(),
+ JaasUtils.isZkSaslEnabled(),
30000,
30000,
1000,
new SystemTime(),
+ "", null,
"kafka.server",
- "SessionExpireListener", null);
+ "SessionExpireListener", false);
}
}
+
}