From 19bc75ef50a54e41b6b1f57498949901018a461c Mon Sep 17 00:00:00 2001 From: appsadm Date: Wed, 15 Jun 2022 06:15:10 -0700 Subject: [PATCH 1/8] Upgrade jdk 8 to 11, Upgrade kafka client to 3.1.1, upgrade gradle to 5.6 --- .gitignore | 2 ++ broker/src/broker/ClusterHelper.java | 2 +- build.gradle | 4 +-- gradle/wrapper/gradle-wrapper.properties | 2 +- .../common/internal/util/TimeUnitUtil.java | 28 +++++++++++++++++ .../consumer/ConsumerConfiguration.java | 2 +- .../opendxl/databus/producer/Producer.java | 31 ++++++++++++++++++- .../java/com/opendxl/databus/util/Topic.java | 2 +- 8 files changed, 66 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java diff --git a/.gitignore b/.gitignore index 94c3e12..79ead6f 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ out # Version /VERSION +*.class +/bin diff --git a/broker/src/broker/ClusterHelper.java b/broker/src/broker/ClusterHelper.java index 8080cbf..2bf603c 100644 --- a/broker/src/broker/ClusterHelper.java +++ b/broker/src/broker/ClusterHelper.java @@ -173,7 +173,7 @@ public String apply() { } }); return KafkaZkClient.apply(connectString, - JaasUtils.isZkSecurityEnabled(), + JaasUtils.isZkSaslEnabled(), SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS, MAX_IN_FLIGHT_REQUESTS, diff --git a/build.gradle b/build.gradle index 8718d81..ea7970d 100644 --- a/build.gradle +++ b/build.gradle @@ -24,7 +24,7 @@ plugins { group 'com.opendxl' -sourceCompatibility = 1.8 +sourceCompatibility = 11 apply plugin: 'base' apply plugin: 'java' @@ -57,7 +57,7 @@ configurations { } dependencies { - implementation ('org.apache.kafka:kafka-clients:2.3.1') { + implementation ('org.apache.kafka:kafka-clients:3.1.1') { exclude group: 'org.scala-lang', module: 'scala-reflect' exclude group: 'org.lz4', module: 'lz4-java' } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index cfaab3d..fb1f468 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-5.6-all.zip diff --git a/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java b/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java new file mode 100644 index 0000000..559e653 --- /dev/null +++ b/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java @@ -0,0 +1,28 @@ +package com.opendxl.databus.common.internal.util; + +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; +import java.util.concurrent.TimeUnit; + +public class TimeUnitUtil { + public static TemporalUnit convert(final TimeUnit timeUnit) { + switch (timeUnit) { + case NANOSECONDS: + return ChronoUnit.NANOS; + case MICROSECONDS: + return ChronoUnit.MICROS; + case MILLISECONDS: + return ChronoUnit.MILLIS; + case SECONDS: + return ChronoUnit.SECONDS; + case MINUTES: + return ChronoUnit.MINUTES; + case HOURS: + return ChronoUnit.HOURS; + case DAYS: + return ChronoUnit.DAYS; + default: + return ChronoUnit.SECONDS; + } + } +} diff --git a/src/main/java/com/opendxl/databus/consumer/ConsumerConfiguration.java b/src/main/java/com/opendxl/databus/consumer/ConsumerConfiguration.java index b50b3b4..145c6fa 100644 --- a/src/main/java/com/opendxl/databus/consumer/ConsumerConfiguration.java +++ b/src/main/java/com/opendxl/databus/consumer/ConsumerConfiguration.java @@ -11,7 +11,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.serialization.Deserializer; import java.util.Collections; diff --git a/src/main/java/com/opendxl/databus/producer/Producer.java b/src/main/java/com/opendxl/databus/producer/Producer.java index 8e7288e..2ea880b 100644 --- a/src/main/java/com/opendxl/databus/producer/Producer.java +++ b/src/main/java/com/opendxl/databus/producer/Producer.java @@ -13,6 +13,7 @@ import com.opendxl.databus.common.internal.adapter.DatabusProducerRecordAdapter; import com.opendxl.databus.common.internal.adapter.MetricNameMapAdapter; import com.opendxl.databus.common.internal.adapter.PartitionInfoListAdapter; +import com.opendxl.databus.common.internal.util.TimeUnitUtil; import com.opendxl.databus.consumer.OffsetAndMetadata; import com.opendxl.databus.consumer.OffsetCommitCallback; import com.opendxl.databus.entities.internal.DatabusMessage; @@ -28,6 +29,8 @@ import java.util.List; import java.util.HashMap; import java.util.concurrent.TimeUnit; +import java.time.Duration; +import java.time.temporal.TemporalUnit; /** * A abstract producer, responsible for handling Databus outgoing messages. @@ -289,11 +292,37 @@ public void close() { */ public void close(final long timeout, final TimeUnit timeUnit) { try { - producer.close(timeout, timeUnit); + TemporalUnit convertedUnit = TimeUnitUtil.convert(timeUnit); + producer.close(Duration.of(timeout, convertedUnit)); } catch (Exception e) { throw new DatabusClientRuntimeException("close cannot be performed :" + e.getMessage(), e, Producer.class); } + } + /** + * This method waits up to timeout for the producer to complete the sending of all incomplete requests. + *

+ * If the producer is unable to complete all requests before the timeout expires, this method will fail + * any unsent and unacknowledged records immediately. + *

+ * If invoked from within a {@link Callback} this method will not block and will be equivalent to + * close(0, TimeUnit.MILLISECONDS). This is done since no further sending will happen while + * blocking the I/O thread of the producer. + * + * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be + * non-negative. Specifying a timeout of zero means do not wait for pending send + * requests to complete. + * @param timeUnit The time unit for the timeoutl + * @throws DatabusClientRuntimeException If close method fails. The original cause could be any of these exceptions: + *

InterruptException If the thread is interrupted while blocked + *

IllegalArgumentException If the timeout is negative. + */ + public void close(final long timeout, final TemporalUnit timeUnit) { + try { + producer.close(Duration.of(timeout, timeUnit)); + } catch (Exception e) { + throw new DatabusClientRuntimeException("close cannot be performed :" + e.getMessage(), e, Producer.class); + } } /** diff --git a/src/test/java/com/opendxl/databus/util/Topic.java b/src/test/java/com/opendxl/databus/util/Topic.java index 14907c0..55f5d58 100644 --- a/src/test/java/com/opendxl/databus/util/Topic.java +++ b/src/test/java/com/opendxl/databus/util/Topic.java @@ -77,7 +77,7 @@ public String apply() { } }); return KafkaZkClient.apply(connectString, - JaasUtils.isZkSecurityEnabled(), + JaasUtils.isZkSaslEnabled(), 30000, 30000, 1000, From 410d7d951d6b3c9f89fb45aa99dc3a15545c915f Mon Sep 17 00:00:00 2001 From: appsadm Date: Thu, 16 Jun 2022 05:32:48 -0700 Subject: [PATCH 2/8] Updated configuration in build.gradle to get away with VSCode problems of compilation --- build.gradle | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/build.gradle b/build.gradle index ea7970d..0f402f4 100644 --- a/build.gradle +++ b/build.gradle @@ -54,6 +54,9 @@ jacocoTestReport { configurations { kafkaInMemory sampleJars.extendsFrom testImplementation + // Added below to get away of VSCode complaints - This might need only on local + // Comment it out before generating final artifacts + implementation.extendsFrom kafkaInMemory } dependencies { @@ -220,6 +223,16 @@ sourceSets { compileClasspath += main.output + test.output + configurations.compileClasspath + configurations.kafkaInMemory runtimeClasspath += main.output + test.output + configurations.runtimeClasspath + configurations.kafkaInMemory } + broker { + java { + srcDir 'broker/src' + } + resources { + srcDir 'broker/src' + } + compileClasspath += main.output + test.output + configurations.compileClasspath + configurations.kafkaInMemory + runtimeClasspath += main.output + test.output + configurations.runtimeClasspath + configurations.kafkaInMemory + } test { java { srcDir 'src/test/java' From 33644e22f411769452c99592ee1bef2a3c912bdc Mon Sep 17 00:00:00 2001 From: umanekar-mcafee Date: Thu, 16 Jun 2022 05:38:33 -0700 Subject: [PATCH 3/8] clean up build.gradle --- build.gradle | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/build.gradle b/build.gradle index 0f402f4..341bc6d 100644 --- a/build.gradle +++ b/build.gradle @@ -223,16 +223,6 @@ sourceSets { compileClasspath += main.output + test.output + configurations.compileClasspath + configurations.kafkaInMemory runtimeClasspath += main.output + test.output + configurations.runtimeClasspath + configurations.kafkaInMemory } - broker { - java { - srcDir 'broker/src' - } - resources { - srcDir 'broker/src' - } - compileClasspath += main.output + test.output + configurations.compileClasspath + configurations.kafkaInMemory - runtimeClasspath += main.output + test.output + configurations.runtimeClasspath + configurations.kafkaInMemory - } test { java { srcDir 'src/test/java' From 30dbeb0ccad856ce3e55ab3cb31184646021f7f3 Mon Sep 17 00:00:00 2001 From: appsadm Date: Thu, 23 Jun 2022 00:16:52 -0700 Subject: [PATCH 4/8] changed kafka streams to 3.1.1 --- build.gradle | 4 ++-- sample/src/sample/BasicStreamingExample.java | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index 341bc6d..45eda50 100644 --- a/build.gradle +++ b/build.gradle @@ -64,13 +64,13 @@ dependencies { exclude group: 'org.scala-lang', module: 'scala-reflect' exclude group: 'org.lz4', module: 'lz4-java' } - implementation ('org.apache.kafka:kafka-streams:2.3.1') { + implementation ('org.apache.kafka:kafka-streams:3.1.1') { exclude group: 'org.scala-lang', module: 'scala-reflect' } implementation 'org.scala-lang:scala-reflect:2.12.11' implementation 'org.lz4:lz4-java:1.7.1' implementation 'org.apache.avro:avro:1.11.0' - implementation 'com.google.code.gson:gson:2.8.5' + implementation 'com.google.code.gson:gson:2.9.0' implementation 'org.apache.commons:commons-configuration2:2.7' implementation 'commons-lang:commons-lang:2.6' implementation 'org.slf4j:slf4j-api:1.7.30' diff --git a/sample/src/sample/BasicStreamingExample.java b/sample/src/sample/BasicStreamingExample.java index 293f569..714d649 100644 --- a/sample/src/sample/BasicStreamingExample.java +++ b/sample/src/sample/BasicStreamingExample.java @@ -253,12 +253,13 @@ private Runnable stateQuery() { return () -> { while(true) { - ReadOnlyKeyValueStore keyValueStore = - this.stream.store("keyvaluestore", QueryableStoreTypes.keyValueStore()); + StoreQueryParameters> storeQryParam = StoreQueryParameters + .fromNameAndType("keyvaluestore", QueryableStoreTypes.keyValueStore()); + ReadOnlyKeyValueStore keyValueStore = this.stream.store(storeQryParam); - KeyValueIterator iter = keyValueStore.all(); - while (iter.hasNext()) { - KeyValue entry = iter.next(); + KeyValueIterator keyValueStoreIter = keyValueStore.all(); + while(keyValueStoreIter.hasNext()){ + KeyValue entry = keyValueStoreIter.next(); LOG.info(entry.key + entry.value); } } From 3ad61d963de626c5ef6bfa0ebade3e476d1aa5b5 Mon Sep 17 00:00:00 2001 From: Ulhas Manekar Date: Thu, 30 Jun 2022 05:41:55 -0700 Subject: [PATCH 5/8] Fix sdk for jdk 11 --- broker/src/broker/ClusterHelper.java | 38 +++++++++++----- broker/src/broker/KafkaBroker.java | 37 +++++++++------ build.gradle | 4 +- .../databus/util/AdminClientHelper.java | 20 +++++++++ .../java/com/opendxl/databus/util/Topic.java | 45 ++++++++++--------- 5 files changed, 97 insertions(+), 47 deletions(-) create mode 100644 src/test/java/com/opendxl/databus/util/AdminClientHelper.java diff --git a/broker/src/broker/ClusterHelper.java b/broker/src/broker/ClusterHelper.java index 2bf603c..1a646b9 100644 --- a/broker/src/broker/ClusterHelper.java +++ b/broker/src/broker/ClusterHelper.java @@ -5,6 +5,7 @@ package broker; import kafka.admin.TopicCommand; +import kafka.admin.TopicCommand.TopicService; import kafka.zk.KafkaZkClient; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeClusterResult; @@ -12,7 +13,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.utils.SystemTime; -import scala.runtime.AbstractFunction0; import java.io.File; import java.nio.file.Files; @@ -33,7 +33,8 @@ public class ClusterHelper { private static int zookeeperPort = 2181; private static Zookeeper zkNode; private static List brokers = new ArrayList<>(); - private static final String ZKHOST = "localhost"; + private static final String KAFKAHOST = "localhost"; + private static int kafkaPort = 2181; private static final int SESSION_TIMEOUT_MS = 30000; private static final int CONNECTION_TIMEOUT_MS = 30000; private static final int MAX_IN_FLIGHT_REQUESTS = 1000; @@ -51,21 +52,38 @@ public void addNewKafkaTopic(final String topicName, final int replicationFactor int partitions) throws Exception { String[] arguments = { "--create", - "--zookeeper", ZKHOST.concat(":").concat(String.valueOf(zookeeperPort)), + "--bootstrap-server", KAFKAHOST.concat(":").concat(String.valueOf(kafkaPort)), "--replication-factor", String.valueOf(replicationFactor), "--partitions", String.valueOf(partitions), "--topic", topicName }; TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(arguments); - try (KafkaZkClient zkUtils = getZkClient(opts)) { - new TopicCommand.ZookeeperTopicService(zkUtils).createTopic(opts); + TopicService topicService=null; + try { + final AdminClient adminClient = createAdminClient(); + topicService = new TopicCommand.TopicService(adminClient); + topicService.createTopic(opts); } catch (Exception e) { // In case of exceptions, abort topic creation. throw new Exception("Error creating a new Kafka topic"); + }finally{ + if (topicService!=null) { + topicService.close(); + } } } + public AdminClient createAdminClient() { + final Map props = new HashMap<>(); + final Properties brokerConfig = brokers.get(0).getBrokerConfig(); + final String bootstrapServer = brokerConfig.getProperty("host.name") + .concat(":") + .concat(brokerConfig.getProperty("port")); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + return AdminClient.create(props); + } + public ClusterHelper addBroker(final int port) { checkCluster(); Properties config = getConfig(port); @@ -166,11 +184,7 @@ private Collection describe() { } private KafkaZkClient getZkClient(TopicCommand.TopicCommandOptions opts) { - final String connectString = opts.zkConnect().getOrElse(new AbstractFunction0() { - @Override - public String apply() { - return ""; - } }); + final String connectString = ""; return KafkaZkClient.apply(connectString, JaasUtils.isZkSaslEnabled(), @@ -178,7 +192,9 @@ public String apply() { CONNECTION_TIMEOUT_MS, MAX_IN_FLIGHT_REQUESTS, new SystemTime(), + "", null, METRIC_GROUP, - METRIC_TYPE, null); + METRIC_TYPE, false); } + } diff --git a/broker/src/broker/KafkaBroker.java b/broker/src/broker/KafkaBroker.java index 6c52657..6d7bebc 100644 --- a/broker/src/broker/KafkaBroker.java +++ b/broker/src/broker/KafkaBroker.java @@ -5,8 +5,11 @@ package broker; import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; +import kafka.server.KafkaServer; +import scala.Option; + import org.apache.commons.io.FileUtils; +import org.apache.kafka.common.utils.SystemTime; import org.slf4j.LoggerFactory; import org.slf4j.Logger; @@ -17,8 +20,7 @@ public class KafkaBroker { private Properties brokerConfig; - private Zookeeper zookeeper; - private KafkaServerStartable broker; + private KafkaServer kafkaServer; private static final Logger LOG = LoggerFactory.getLogger(KafkaBroker.class); public KafkaBroker(final Properties brokerConfig) { @@ -35,12 +37,19 @@ public void run() { } })); + try { + final KafkaConfig kafkaConfig = new KafkaConfig(brokerConfig); + kafkaServer = new KafkaServer(kafkaConfig, new SystemTime(), + Option.apply(this.getClass().getName()), true); + kafkaServer.startup(); + + LOG.info("Kafka broker started: " + brokerConfig.getProperty("host.name") + .concat(":") + .concat(brokerConfig.getProperty("port"))); - broker = new KafkaServerStartable(new KafkaConfig(brokerConfig)); - broker.startup(); - LOG.info("Kafka broker started: " + brokerConfig.getProperty("host.name") - .concat(":") - .concat(brokerConfig.getProperty("port"))); + } catch (Exception e) { + System.out.println(e.getMessage()); + } } private Runnable getDeleteLogDirectoryAction() { @@ -63,13 +72,13 @@ public void run() { } public synchronized void shutdown() { - if (broker != null) { - broker.shutdown(); - broker.awaitShutdown(); + if(kafkaServer != null){ + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); LOG.info("Kafka broker stopped: " + brokerConfig.getProperty("host.name") - .concat(":") - .concat(brokerConfig.getProperty("port"))); - broker = null; + .concat(":") + .concat(brokerConfig.getProperty("port"))); + kafkaServer = null; } } diff --git a/build.gradle b/build.gradle index 341bc6d..5e0ea03 100644 --- a/build.gradle +++ b/build.gradle @@ -75,7 +75,7 @@ dependencies { implementation 'commons-lang:commons-lang:2.6' implementation 'org.slf4j:slf4j-api:1.7.30' implementation 'net.sf.jopt-simple:jopt-simple:5.0.4' - testImplementation('org.apache.kafka:kafka_2.12:2.3.1') { + testImplementation('org.apache.kafka:kafka_2.12:3.1.1') { exclude group: 'org.scala-lang', module: 'scala-reflect' } testImplementation 'org.scala-lang:scala-reflect:2.12.11' @@ -88,7 +88,7 @@ dependencies { testImplementation 'com.github.stefanbirkner:system-rules:1.19.0' testImplementation 'com.e-movimento.tinytools:privilegedaccessor:1.2.2' - kafkaInMemory ('org.apache.kafka:kafka_2.12:2.3.1') { + kafkaInMemory ('org.apache.kafka:kafka_2.12:3.1.1') { exclude group: 'org.scala-lang', module: 'scala-reflect' exclude group: 'org.lz4', module: 'lz4-java' } diff --git a/src/test/java/com/opendxl/databus/util/AdminClientHelper.java b/src/test/java/com/opendxl/databus/util/AdminClientHelper.java new file mode 100644 index 0000000..1bc266e --- /dev/null +++ b/src/test/java/com/opendxl/databus/util/AdminClientHelper.java @@ -0,0 +1,20 @@ +package com.opendxl.databus.util; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClient; + +import com.opendxl.databus.producer.ProducerConfig; + +public class AdminClientHelper { + private static final String BROKER_HOST = Constants.KAFKA_HOST; + private static final String BROKER_PORT = Constants.KAFKA_PORT; + + public static AdminClient createAdminClient() { + final Map props = new HashMap<>(); + final String bootstrapServer = BROKER_HOST.concat(":").concat(BROKER_PORT); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + return AdminClient.create(props); + } +} diff --git a/src/test/java/com/opendxl/databus/util/Topic.java b/src/test/java/com/opendxl/databus/util/Topic.java index 55f5d58..74bf8be 100644 --- a/src/test/java/com/opendxl/databus/util/Topic.java +++ b/src/test/java/com/opendxl/databus/util/Topic.java @@ -5,11 +5,12 @@ package com.opendxl.databus.util; import kafka.admin.TopicCommand; +import kafka.admin.TopicCommand.TopicService; import kafka.zk.KafkaZkClient; + +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.utils.SystemTime; -import scala.runtime.AbstractFunction0; - import java.util.UUID; public class Topic { @@ -21,21 +22,21 @@ public static class Builder { private String topicName = getRandomTopicName(); private int partitions = 1; private int replicationFactor = 1; - private String zkHost = Constants.ZOOKEEPER_HOST; - private String zkPort = Constants.ZOOKEEPER_PORT; + private String kafkaHost = Constants.KAFKA_HOST; + private String KafkaPort = Constants.KAFKA_PORT; public Builder partitions(final int partitions) { this.partitions = partitions; return this; } - public Builder zkHost(final String zkHost) { - this.zkHost = zkHost; + public Builder kafkaHost(final String kafkaHost) { + this.kafkaHost = kafkaHost; return this; } - public Builder zkPort(final String zkPort) { - this.zkPort = zkPort; + public Builder kafkaPort(final String KafkaPort) { + this.KafkaPort = KafkaPort; return this; } @@ -51,8 +52,8 @@ public Builder replicationFactor(final int replicationFactor) { public String go() { String[] arguments = {"--create", - "--zookeeper", - zkHost.concat(":").concat(zkPort), + "--bootstrap-server", + kafkaHost.concat(":").concat(KafkaPort), "--replication-factor", String.valueOf(replicationFactor), "--partitions", @@ -61,20 +62,22 @@ public String go() { topicName}; TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(arguments); - try (KafkaZkClient zkUtils = getZkClient(opts)) { - new TopicCommand.ZookeeperTopicService(zkUtils).createTopic(opts); + TopicService topicService=null; + try { + final AdminClient adminClient = AdminClientHelper.createAdminClient(); + topicService = new TopicCommand.TopicService(adminClient); + topicService.createTopic(opts); + } + finally{ + if (topicService!=null) { + topicService.close(); + } } return topicName; - - } private KafkaZkClient getZkClient(TopicCommand.TopicCommandOptions opts) { - final String connectString = opts.zkConnect().getOrElse(new AbstractFunction0() { - @Override - public String apply() { - return ""; - } }); + final String connectString = ""; return KafkaZkClient.apply(connectString, JaasUtils.isZkSaslEnabled(), @@ -82,8 +85,10 @@ public String apply() { 30000, 1000, new SystemTime(), + "", null, "kafka.server", - "SessionExpireListener", null); + "SessionExpireListener", false); } } + } From 6deb1d132385aed2342179d901ebd7084a4d710b Mon Sep 17 00:00:00 2001 From: appsadm Date: Wed, 6 Jul 2022 23:36:47 -0700 Subject: [PATCH 6/8] upgraded packages --- build.gradle | 25 ++++++++++--------- .../common/internal/util/TimeUnitUtil.java | 5 +++- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/build.gradle b/build.gradle index 8585bb3..a6022b9 100644 --- a/build.gradle +++ b/build.gradle @@ -67,34 +67,35 @@ dependencies { implementation ('org.apache.kafka:kafka-streams:3.1.1') { exclude group: 'org.scala-lang', module: 'scala-reflect' } - implementation 'org.scala-lang:scala-reflect:2.12.11' - implementation 'org.lz4:lz4-java:1.7.1' + implementation 'org.scala-lang:scala-reflect:2.13.8' + implementation 'org.lz4:lz4-java:1.8.0' implementation 'org.apache.avro:avro:1.11.0' implementation 'com.google.code.gson:gson:2.9.0' - implementation 'org.apache.commons:commons-configuration2:2.7' + implementation 'org.apache.commons:commons-configuration2:2.8.0' implementation 'commons-lang:commons-lang:2.6' - implementation 'org.slf4j:slf4j-api:1.7.30' + implementation 'org.slf4j:slf4j-api:1.7.36' implementation 'net.sf.jopt-simple:jopt-simple:5.0.4' - testImplementation('org.apache.kafka:kafka_2.12:3.1.1') { + testImplementation('org.apache.kafka:kafka_2.13:3.1.1') { exclude group: 'org.scala-lang', module: 'scala-reflect' } - testImplementation 'org.scala-lang:scala-reflect:2.12.11' + testImplementation 'org.scala-lang:scala-reflect:2.13.8' testImplementation 'org.apache.zookeeper:zookeeper:3.5.7' - testImplementation('io.netty:netty-all:4.1.43.Final') { + testImplementation('io.netty:netty-all:4.1.78.Final') { force = true } - testImplementation 'commons-io:commons-io:2.7' + testImplementation 'commons-io:commons-io:2.11.0' testImplementation 'junit:junit:4.12' testImplementation 'com.github.stefanbirkner:system-rules:1.19.0' testImplementation 'com.e-movimento.tinytools:privilegedaccessor:1.2.2' - kafkaInMemory ('org.apache.kafka:kafka_2.12:3.1.1') { + kafkaInMemory ('org.apache.kafka:kafka_2.13:3.1.1') { exclude group: 'org.scala-lang', module: 'scala-reflect' exclude group: 'org.lz4', module: 'lz4-java' + exclude group: 'io.netty' } - kafkaInMemory 'org.scala-lang:scala-reflect:2.12.11' - kafkaInMemory 'org.lz4:lz4-java:1.7.1' - kafkaInMemory 'commons-io:commons-io:2.7' + kafkaInMemory 'org.scala-lang:scala-reflect:2.13.8' + kafkaInMemory 'org.lz4:lz4-java:1.8.0' + kafkaInMemory 'commons-io:commons-io:2.11.0' // This following section mitigates OWASP vulnerabilities report. // It enforces to use specific transitive dependency versions diff --git a/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java b/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java index 559e653..401aa04 100644 --- a/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java +++ b/src/main/java/com/opendxl/databus/common/internal/util/TimeUnitUtil.java @@ -4,7 +4,10 @@ import java.time.temporal.TemporalUnit; import java.util.concurrent.TimeUnit; -public class TimeUnitUtil { +public final class TimeUnitUtil { + private TimeUnitUtil() { + //not called + } public static TemporalUnit convert(final TimeUnit timeUnit) { switch (timeUnit) { case NANOSECONDS: From 73df2d618d5537e22ec818640ae191516fbb8a3d Mon Sep 17 00:00:00 2001 From: sshastr1 Date: Tue, 11 Apr 2023 09:12:06 -0700 Subject: [PATCH 7/8] fixed exceptions --- README.md | 4 ++-- build.gradle | 15 ++++++++------- docs/CLI-Example.rst | 8 ++++---- docs/index.rst | 4 ++-- gradle.properties | 2 +- 5 files changed, 17 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 6b3ee4b..f9933fc 100644 --- a/README.md +++ b/README.md @@ -36,12 +36,12 @@ Maven: com.opendxl dxldatabusclient - 2.4.8 + 2.5.0 ``` or Gradle: ```groovy -compile 'com.opendxl:dxldatabusclient:2.4.8' +compile 'com.opendxl:dxldatabusclient:2.5.0' ``` ## Bugs and Feedback diff --git a/build.gradle b/build.gradle index a6022b9..23ca849 100644 --- a/build.gradle +++ b/build.gradle @@ -60,11 +60,11 @@ configurations { } dependencies { - implementation ('org.apache.kafka:kafka-clients:3.1.1') { + implementation ('org.apache.kafka:kafka-clients:3.1.2') { exclude group: 'org.scala-lang', module: 'scala-reflect' exclude group: 'org.lz4', module: 'lz4-java' } - implementation ('org.apache.kafka:kafka-streams:3.1.1') { + implementation ('org.apache.kafka:kafka-streams:3.1.2') { exclude group: 'org.scala-lang', module: 'scala-reflect' } implementation 'org.scala-lang:scala-reflect:2.13.8' @@ -75,12 +75,12 @@ dependencies { implementation 'commons-lang:commons-lang:2.6' implementation 'org.slf4j:slf4j-api:1.7.36' implementation 'net.sf.jopt-simple:jopt-simple:5.0.4' - testImplementation('org.apache.kafka:kafka_2.13:3.1.1') { + testImplementation('org.apache.kafka:kafka_2.13:3.1.2') { exclude group: 'org.scala-lang', module: 'scala-reflect' } testImplementation 'org.scala-lang:scala-reflect:2.13.8' testImplementation 'org.apache.zookeeper:zookeeper:3.5.7' - testImplementation('io.netty:netty-all:4.1.78.Final') { + testImplementation('io.netty:netty-all:4.1.91.Final') { force = true } testImplementation 'commons-io:commons-io:2.11.0' @@ -88,7 +88,7 @@ dependencies { testImplementation 'com.github.stefanbirkner:system-rules:1.19.0' testImplementation 'com.e-movimento.tinytools:privilegedaccessor:1.2.2' - kafkaInMemory ('org.apache.kafka:kafka_2.13:3.1.1') { + kafkaInMemory ('org.apache.kafka:kafka_2.13:3.1.2') { exclude group: 'org.scala-lang', module: 'scala-reflect' exclude group: 'org.lz4', module: 'lz4-java' exclude group: 'io.netty' @@ -112,10 +112,11 @@ dependencies { // http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2019-16942 // http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2019-16943 // CVE-2020-25649 - compile ('com.fasterxml.jackson.core:jackson-databind:2.13.2.2') { + compile ('com.fasterxml.jackson.core:jackson-databind:2.14.2') { force = true } - kafkaInMemory ('com.fasterxml.jackson.core:jackson-databind:2.13.2.2') { + compile 'com.fasterxml.jackson.module:jackson-module-scala_3:2.14.2' + kafkaInMemory ('com.fasterxml.jackson.core:jackson-databind:2.14.2') { force = true } diff --git a/docs/CLI-Example.rst b/docs/CLI-Example.rst index eece77c..788060f 100644 --- a/docs/CLI-Example.rst +++ b/docs/CLI-Example.rst @@ -7,7 +7,7 @@ library with no arguments displays help information: :: - $ java -jar dxldatabusclient-2.4.8.jar + $ java -jar dxldatabusclient-2.5.0.jar ERROR: There are not options Option (* = required) Description @@ -50,7 +50,7 @@ few CLI operations. Operations arguments are placed after :: - $ java -jar dxldatabusclient-2.4.8.jar --operation ... + $ java -jar dxldatabusclient-2.5.0.jar --operation ... Operation Arguments ^^^^^^^^^^^^^^^^^^^ @@ -123,7 +123,7 @@ example :: - $ java -jar dxldatabusclient-2.4.8.jar \ + $ java -jar dxldatabusclient-2.5.0.jar \ --operation produce \ --to-topic \ --brokers <0.0.0.0>: \ @@ -230,7 +230,7 @@ example :: - java -jar dxldatabusclient-2.4.8.jar \ + java -jar dxldatabusclient-2.5.0.jar \ --operation consume \ --from-topic \ --brokers \ diff --git a/docs/index.rst b/docs/index.rst index e7f812f..de821ee 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -40,14 +40,14 @@ Maven: com.opendxl dxldatabusclient - 2.4.8 + 2.5.0 or Gradle: .. code:: groovy - compile 'com.opendxl:dxldatabusclient:2.4.8' + compile 'com.opendxl:dxldatabusclient:2.5.0' API Documentation ----------------- diff --git a/gradle.properties b/gradle.properties index 35a2ec6..152a88e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=2.4.8 +version=2.5.0 From d5831b28132225e61cd395f129d817a7932cc543 Mon Sep 17 00:00:00 2001 From: Ulhas Manekar Date: Thu, 13 Apr 2023 10:42:21 -0700 Subject: [PATCH 8/8] Comment system out logs --- .../common/internal/adapter/DatabusProducerRecordAdapter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/opendxl/databus/common/internal/adapter/DatabusProducerRecordAdapter.java b/src/main/java/com/opendxl/databus/common/internal/adapter/DatabusProducerRecordAdapter.java index bd75071..0a583f1 100644 --- a/src/main/java/com/opendxl/databus/common/internal/adapter/DatabusProducerRecordAdapter.java +++ b/src/main/java/com/opendxl/databus/common/internal/adapter/DatabusProducerRecordAdapter.java @@ -86,8 +86,6 @@ public DatabusProducerRecordAdapter(final Serializer

messageSerializer) { sourceProducerRecord.getRoutingData().getTenantGroup()); final List

kafkaHeaders = produceKafkaHeaders ? generateKafkaHeaders(databusMessage.getHeaders()) : null; - System.out.println("produceKafkaHeaders: " + produceKafkaHeaders); - System.out.println("kafkaHeaders: " + kafkaHeaders); final org.apache.kafka.clients.producer.ProducerRecord targetProducerRecord = new org.apache.kafka.clients.producer.ProducerRecord<>(targetTopic, sourceProducerRecord.getRoutingData().getPartition(),