Skip to content
This repository was archived by the owner on Nov 2, 2021. It is now read-only.

switch to kafkaAdminClient api, deprecate zookeeper api #9

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,32 +75,37 @@ you'd like to provide a callback to be invoked when the send has been acknowledg

## Topic Management

Create an admin client

```clojure
(def admin-client (admin "localhost:9092"))
```

Create a topic:

```clojure
(create-topic {:connection-string "localhost:2181"} "some-topic" {})
(create-topic admin-client "some-topic" {})
```
That empty map can be used to specify configuration for number of topic partitions, replication factor,

Delete a topic:

``` clojure
(delete-topic {:connection-string "localhost:2181"} "some-topic")
(delete-topic admin-client "some-topic")
```

Query about a topic's existence:

``` clojure
(topic-exists? {:connection-string "localhost:2181"} "some-topic")
(topic-exists? admin-client "some-topic")
```

List existing topics:

``` clojure
(topics {:connection-string "localhost:2181"})
(topics admin-client)
```


## License

Distributed under the Eclipse Public License either version 1.0 or (at your option) any
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.10.0"]
[org.apache.kafka/kafka_2.12 "2.1.1"]]
[org.apache.kafka/kafka_2.12 "2.4.1"]]
:plugins [[lein-eftest "0.5.6"]]
:deploy-repositories {"clojars" {:url "https://clojars.org/repo"
:sign-releases false
Expand Down
158 changes: 91 additions & 67 deletions src/gregor/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
ConsumerRebalanceListener]
[org.apache.kafka.clients.producer Producer KafkaProducer Callback
ProducerRecord RecordMetadata]
[kafka.admin AdminUtils]
[kafka.utils ZkUtils]
[org.apache.kafka.clients.admin
KafkaAdminClient
DescribeConfigsOptions
DescribeTopicsOptions
NewTopic]
[org.apache.kafka.common.config ConfigResource ConfigResource$Type]
[java.util.concurrent TimeUnit]
[scala.collection JavaConversions])
(:require [clojure.set :as set]
Expand Down Expand Up @@ -449,6 +453,18 @@
;;;;;;;;;;;;;;;;;;;;;;


(defn admin
"Return a `KafkaAdminClient`

The kafkaAdminClient is used to manage topics and brokers.
Args:
- `servers`: comma-separated host:port strs or list of strs as bootstrap servers"
^KafkaAdminClient
[servers]
(KafkaAdminClient/create (as-properties {"bootstrap.servers" servers})))



(defn- validate-zookeeper-config
"A helper for validating a Zookeeper configuration map and applying default values. Any
invalid item in the provided config will result in an assertion failure.
Expand Down Expand Up @@ -478,99 +494,107 @@
merged))


(defmacro with-zookeeper
"A utility macro for interacting with Zookeeper.

Args:
- `zk-config`: a map with Zookeeper connection details. This will be validated using
`validate-zookeeper-config` before use.
- `zookeeper`: this will be bound to an instance of `ZkUtils` while the body is executed.
The connection to Zookeeper will be cleaned up when the body exits."
[zk-config zookeeper & body]
`(let [zk-config# (validate-zookeeper-config ~zk-config)
client-and-conn# (ZkUtils/createZkClientAndConnection
(:connection-string zk-config#)
(:session-timeout zk-config#)
(:connect-timeout zk-config#))]
(with-open [client# (._1 client-and-conn#)
connection# (._2 client-and-conn#)]
(let [~zookeeper (ZkUtils. client# connection# false)]
~@body))))


(def rack-aware-modes
{:disabled (kafka.admin.RackAwareMode$Disabled$.)
:enforced (kafka.admin.RackAwareMode$Enforced$.)
:safe (kafka.admin.RackAwareMode$Safe$.)})


(defn- rack-aware-mode-constant
"Convert a keyword name for a `RackAwareMode` into the appropriate constant from the
underlying Kafka library.

Args:
- `mode`: a keyword of the same name as one of the constants in `kafka.admin.RackAwareMode`."
[mode]
(when-not (contains? rack-aware-modes mode)
(throw (IllegalArgumentException. (format "Bad RackAwareMode: %s" mode))))
(get rack-aware-modes mode))


(defn create-topic
"Create a topic.

Args:
- `zk-config`: a map with Zookeeper connection details as expected by `with-zookeeper`.
- `admin`: a kafkaAdminClient.
- `topic`: the name of the topic to create.
- an unnamed configuration map. Valid keys are as follows:

`:partitions` (optional) The number of ways to partition the topic. Defaults to 1.
`:replication-factor` (optional) The replication factor for the topic. Defaults to 1.
`:config` (optional) A map of configuration options for the topic.
`:rack-aware-mode` (optional) Control how rack aware replica assignment is done.
Valid values are `:disabled`, `:enforced`, `:safe`.
Default is `:safe`."
[zk-config topic {:keys [partitions replication-factor config rack-aware-mode]
`:config` (optional) A map of configuration options for the topic."
[^KafkaAdminClient admin topic {:keys [partitions replication-factor config]
:or {partitions 1
replication-factor 1
config nil
rack-aware-mode :safe}}]
(with-zookeeper zk-config zookeeper
(AdminUtils/createTopic zookeeper
topic
(int partitions)
(int replication-factor)
(as-properties config)
(rack-aware-mode-constant rack-aware-mode))))
config nil}}]
(let [^NewTopic new-topic (NewTopic. topic (int partitions) (short replication-factor))
^NewTopic new-topic (if (some? config) (.configs new-topic config) new-topic)]
(.get (.all (.createTopics admin [new-topic])))))


(defn topics
"Query existing topics.

Args:
- `zk-config`: a map with Zookeeper connection details as expected by `with-zookeeper`."
[zk-config]
(with-zookeeper zk-config zookeeper
(-> zookeeper .getAllTopics JavaConversions/seqAsJavaList seq)))
- `admin`: a kafkaAdminClient"
[^KafkaAdminClient admin]
(seq
(.get
(.names
(.listTopics admin)))))

(defn topics-partitions-summary
"Return a summary (partitions and replication factor) of a list of topics as a clojure map.

Args
- `admin`: a KafkaAdminClient
- `topics`: a list of topics"
[^KafkaAdminClient admin topics]
(let [topics-description (.get
(.all
(.describeTopics
admin
topics)))]
(reduce
(fn [acc desc]
(assoc acc (.name desc)
{:partitions (count (.partitions desc))
:replication-factor (count (.replicas (first (.partitions desc))))}))
{}
(vals topics-description))))

(defn- topic-config-resource
[topic]
(ConfigResource. ConfigResource$Type/TOPIC topic))

(defn- entries->map
[config include-default?]
(reduce (fn [acc entry]
(if include-default?
(assoc acc (.name entry) (.value entry))
(if-not (= "DEFAULT_CONFIG" (str (.source entry)))
(assoc acc (.name entry) (.value entry))
acc))) {} config))

(defn topics-config
"Return topics configuration.

Args:
- `admin`: a kafkaAdminClient
- `topics`: A collection of topics to retrieve configuration
- `include-default?`: A boolean to indicate whether return default setting or not (default to false)"
([^KafkaAdminClient admin topics]
(topics-config admin topics false))
([^KafkaAdminClient admin topics include-default?]
(let [topics-config-res (map topic-config-resource topics)
raw-topics-config (.get
(.all
(.describeConfigs
admin
topics-config-res)))]
(into {} (map
(fn [[k v]]
[(.name k) (entries->map (.entries v) include-default?)])
raw-topics-config)))))


(defn topic-exists?
"Query whether or not a topic exists.

Args:
- `zk-config`: a map with Zookeeper connection details as expected `with-zookeeper`.
- `admin`: a kafkaAdminClient
- `topic`: The name of the topic to check for."
[zk-config topic]
(with-zookeeper zk-config zookeeper
(AdminUtils/topicExists zookeeper topic)))
[^KafkaAdminClient admin topic]
(.contains (topics admin) topic))


(defn delete-topic
"Delete a topic.

Args:
- `zk-config`: A map with Zookeeper connection details as expected by `with-zookeeper`.
- `admin`: a kafkaAdminClient
- `topic`: The name of the topic to delete."
[zk-config topic]
(with-zookeeper zk-config zookeeper
(AdminUtils/deleteTopic zookeeper topic)))
[^KafkaAdminClient admin topic]
(.get (.all (.deleteTopics admin [topic]))))