Skip to content

Commit ffbf162

Browse files
jacumsteveniemitz
authored andcommitted
updated to kafka 1.1.0 (#321)
* updated to kafka 1.1.0 * fixed for kafka 1.1.0 * fixed for kafka 1.1.0 * fixed for kafka 1.1.0 - empty commit * Revert "fixed for kafka 1.1.0 - empty commit" This reverts commit e5fc7fe
1 parent 4830c0f commit ffbf162

File tree

7 files changed

+140
-31
lines changed

7 files changed

+140
-31
lines changed

.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ jdk:
2020
- oraclejdk8
2121
- oraclejdk7
2222
env:
23+
- KAFKA_VER=1.1.0
2324
- KAFKA_VER=0.11.0.0
2425
- KAFKA_VER=0.10.2.0
2526
- KAFKA_VER=0.9.0.1
2627
- KAFKA_VER=0.8.2.2
2728
matrix:
2829
exclude:
2930
- jdk: oraclejdk7
30-
env: KAFKA_VER=0.11.0.0
31+
env: KAFKA_VER=1.1.0
3132
- jdk: oraclejdk7
3233
env: KAFKA_VER=0.9.0.1
3334
- jdk: oraclejdk7

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ plugins {
88

99
def scalaVersion = "2.11.8"
1010
def mesosVersion = "0.28.0"
11-
ext.kafkaVersion = project.hasProperty("kafkaVersion") ? project.getProperty("kafkaVersion") : "0.11.0.0"
11+
ext.kafkaVersion = project.hasProperty("kafkaVersion") ? project.getProperty("kafkaVersion") : "1.1.0"
1212
def kafkaMajorVersion="${kafkaVersion.split("\\.").take(2).join("_")}"
1313
def kafkaScalaVersion=scalaVersion.split("\\.").take(2).join(".")
1414
def jettyVersion = "9.0.4.v20130625"
1515
def jerseyVersion = "2.24"
16-
def jacksonVersion = "2.8.2"
16+
def jacksonVersion = "2.9.5"
1717

1818
version = new File('src/scala/main/ly/stealth/mesos/kafka/Config.scala')
1919
.readLines()

gradle/wrapper/gradle-wrapper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip
6+
distributionUrl=https\://services.gradle.org/distributions/gradle-4.7-bin.zip

src/docker/Dockerfile

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,4 @@
1-
# Licensed to the Apache Software Foundation (ASF) under one or more
2-
# contributor license agreements. See the NOTICE file distributed with
3-
# this work for additional information regarding copyright ownership.
4-
# The ASF licenses this file to You under the Apache License, Version 2.0
5-
# (the "License"); you may not use this file except in compliance with
6-
# the License. You may obtain a copy of the License at
7-
#
8-
# http://www.apache.org/licenses/LICENSE-2.0
9-
#
10-
# Unless required by applicable law or agreed to in writing, software
11-
# distributed under the License is distributed on an "AS IS" BASIS,
12-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
# See the License for the specific language governing permissions and
14-
# limitations under the License.
15-
16-
FROM ubuntu:14.04
17-
18-
RUN \
19-
if [ -f "/tmp/apt-proxy" ]; then \
20-
APT_PROXY=$(cat "/tmp/apt-proxy"); \
21-
echo "Using apt-proxy: $APT_PROXY"; \
22-
echo "Acquire::http::Proxy \"$APT_PROXY\";" > /etc/apt/apt.conf.d/90-apt-proxy.conf ; \
23-
fi
1+
FROM ubuntu:16.04
242

253
# Add mesos repo
264
RUN \
@@ -31,8 +9,8 @@ RUN \
319
RUN \
3210
apt-get update && \
3311
apt-get install -qy \
34-
git vim zip mc curl \
35-
openjdk-7-jdk mesos libc6
12+
git zip mc curl \
13+
openjdk-8-jdk mesos libc6
3614

3715
# Add kafka-mesos & kafka
3816
COPY .docker/kafka* /opt/kafka-mesos/

src/docker/build-image.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ base_dir=../..
55
tmp_dir=.docker
66
mkdir -p $tmp_dir
77

8-
kafka_version=0.8.2.2
9-
scala_version=2.10
8+
kafka_version=1.1.0
9+
scala_version=2.11
1010
docker_tag=`whoami`/kafka-mesos
1111

1212
print_help() {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package ly.stealth.mesos.kafka.interface.impl
18+
19+
import kafka.utils.{ZkUtils => KafkaZkUtils}
20+
import kafka.admin.{BrokerMetadata, AdminUtils => KafkaAdminUtils}
21+
import kafka.zk.AdminZkClient
22+
import java.util.Properties
23+
import ly.stealth.mesos.kafka.interface.{AdminUtilsProxy, FeatureSupport}
24+
import scala.collection.Map
25+
26+
27+
class AdminUtils(zkUrl: String) extends AdminUtilsProxy {
28+
private val DEFAULT_TIMEOUT_MS = 30000
29+
private val zkUtils = KafkaZkUtils(zkUrl, DEFAULT_TIMEOUT_MS, DEFAULT_TIMEOUT_MS, isZkSecurityEnabled = false)
30+
31+
override def fetchAllTopicConfigs(): Map[String, Properties] = KafkaAdminUtils.fetchAllTopicConfigs(zkUtils)
32+
33+
override def createOrUpdateTopicPartitionAssignmentPathInZK(
34+
topic: String,
35+
partitionReplicaAssignment: Map[Int, Seq[Int]],
36+
config: Properties,
37+
update: Boolean
38+
): Unit = KafkaAdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaAssignment, config, update)
39+
40+
override def changeTopicConfig(
41+
topic: String,
42+
configs: Properties
43+
): Unit = KafkaAdminUtils.changeTopicConfig(zkUtils, topic, configs)
44+
45+
override def deleteTopic(topicToDelete: String): Unit =
46+
KafkaAdminUtils.deleteTopic(zkUtils, topicToDelete)
47+
48+
override def fetchEntityConfig(
49+
entityType: String,
50+
entity: String
51+
): Properties = KafkaAdminUtils.fetchEntityConfig(zkUtils, entityType, entity)
52+
53+
override def changeClientIdConfig(
54+
clientId: String,
55+
configs: Properties
56+
): Unit = KafkaAdminUtils.changeClientIdConfig(zkUtils, clientId, configs)
57+
58+
override def fetchAllEntityConfigs(entityType: String): Map[String, Properties]
59+
= KafkaAdminUtils.fetchAllEntityConfigs(zkUtils, entityType)
60+
61+
override def assignReplicasToBrokers(
62+
ids: Seq[Int],
63+
nPartitions: Int,
64+
replicationFactor: Int,
65+
fixedStartIndex: Int,
66+
startPartitionId: Int
67+
): Map[Int, Seq[Int]] = {
68+
val md = ids.map(BrokerMetadata(_, None))
69+
KafkaAdminUtils.assignReplicasToBrokers(md, nPartitions, replicationFactor, fixedStartIndex, startPartitionId)
70+
}
71+
72+
override val features: FeatureSupport = FeatureSupport(quotas = true, genericEntityConfigs = true)
73+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package ly.stealth.mesos.kafka.interface.impl
18+
19+
import kafka.utils.{ZkUtils => KafkaZkUtils}
20+
import kafka.common.TopicAndPartition
21+
import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
22+
import ly.stealth.mesos.kafka.interface.ZkUtilsProxy
23+
import scala.collection.{Map, Set, mutable}
24+
25+
class ZkUtils(zkUrl: String) extends ZkUtilsProxy {
26+
private val DEFAULT_TIMEOUT_MS = 30000
27+
private val zkUtils = KafkaZkUtils(zkUrl, DEFAULT_TIMEOUT_MS, DEFAULT_TIMEOUT_MS, isZkSecurityEnabled = false)
28+
29+
override def getAllTopics(): Seq[String] = zkUtils.getAllTopics()
30+
31+
override def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]]
32+
= zkUtils.getReplicaAssignmentForTopics(topics)
33+
34+
override def getPartitionsBeingReassigned(): Map[TopicAndPartition, ReassignedPartitionsContext]
35+
= zkUtils.getPartitionsBeingReassigned()
36+
37+
override def getReplicasForPartition(
38+
topic: String,
39+
partition: Int
40+
): Seq[Int] = zkUtils.getReplicasForPartition(topic, partition)
41+
42+
override def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): Unit
43+
= zkUtils.updatePartitionReassignmentData(partitionsToBeReassigned)
44+
45+
override def createPersistentPath(
46+
path: String,
47+
data: String
48+
): Unit = zkUtils.createPersistentPath(path, data)
49+
50+
override def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, Map[Int, Seq[Int]]]
51+
= zkUtils.getPartitionAssignmentForTopics(topics)
52+
53+
override def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch]
54+
= zkUtils.getPartitionLeaderAndIsrForTopics(topicAndPartitions)
55+
56+
override def getSortedBrokerList(): Seq[Int] = zkUtils.getSortedBrokerList()
57+
}

0 commit comments

Comments
 (0)