Skip to content

Commit 8f98910

Browse files
authored
rebalance connections (#27)
1 parent 66c860d commit 8f98910

File tree

6 files changed

+116
-36
lines changed

6 files changed

+116
-36
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@
156156
<artifactId>slf4j-api</artifactId>
157157
<scope>provided</scope>
158158
</dependency>
159+
<dependency>
160+
<groupId>net.jcip</groupId>
161+
<artifactId>jcip-annotations</artifactId>
162+
<version>1.0</version>
163+
<scope>provided</scope>
164+
</dependency>
159165
<dependency>
160166
<groupId>org.apache.kafka</groupId>
161167
<artifactId>connect-runtime</artifactId>

src/main/java/com/arangodb/kafka/ArangoSinkConnector.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
public class ArangoSinkConnector extends SinkConnector {
3535
private static final Logger LOG = LoggerFactory.getLogger(SinkConnector.class);
3636
private Map<String, String> config;
37-
private boolean acquireHostList;
38-
private List<HostDescription> initialEndpoints;
3937
private HostListMonitor hostListMonitor;
4038

4139
@Override
@@ -56,13 +54,8 @@ public void start(Map<String, String> props) {
5654
throw new ConnectException(e);
5755
}
5856

59-
acquireHostList = sinkConfig.isAcquireHostListEnabled();
60-
initialEndpoints = sinkConfig.getEndpoints();
61-
62-
if (acquireHostList) {
63-
hostListMonitor = new HostListMonitor(sinkConfig, context);
64-
hostListMonitor.start();
65-
}
57+
hostListMonitor = new HostListMonitor(sinkConfig, context);
58+
hostListMonitor.start();
6659
}
6760

6861
@Override
@@ -72,7 +65,7 @@ public Class<? extends Task> taskClass() {
7265

7366
@Override
7467
public List<Map<String, String>> taskConfigs(int maxTasks) {
75-
List<HostDescription> endpoints = new ArrayList<>(acquireHostList ? hostListMonitor.getEndpoints() : initialEndpoints);
68+
List<HostDescription> endpoints = hostListMonitor.getEndpoints();
7669
int rotationDistance = endpoints.size() / maxTasks;
7770
if (rotationDistance == 0) {
7871
rotationDistance = 1;
@@ -94,10 +87,8 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
9487

9588
@Override
9689
public void stop() {
97-
if (acquireHostList) {
98-
LOG.info("stopping ArangoSinkConnector");
99-
hostListMonitor.stop();
100-
}
90+
LOG.info("stopping ArangoSinkConnector");
91+
hostListMonitor.stop();
10192
}
10293

10394
@Override

src/main/java/com/arangodb/kafka/HostListMonitor.java

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,43 +30,53 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33-
import java.util.Collections;
34-
import java.util.HashSet;
35-
import java.util.Set;
33+
import java.util.*;
3634
import java.util.concurrent.Executors;
3735
import java.util.concurrent.ScheduledExecutorService;
3836
import java.util.concurrent.TimeUnit;
3937

38+
import net.jcip.annotations.GuardedBy;
39+
4040
public class HostListMonitor {
4141
private static final Logger LOG = LoggerFactory.getLogger(HostListMonitor.class);
4242

43-
private final ArangoDB adb;
4443
private final ScheduledExecutorService es;
4544
private final ConnectorContext context;
4645
private final int acquireHostIntervalMs;
47-
private volatile Set<HostDescription> endpoints;
46+
private final int rebalanceIntervalMs;
47+
private final ArangoDB adb;
48+
@GuardedBy("this")
49+
private List<HostDescription> endpoints;
4850

4951
public HostListMonitor(ArangoSinkConfig sinkConfig, ConnectorContext context) {
5052
acquireHostIntervalMs = sinkConfig.getAcquireHostIntervalMs();
51-
endpoints = new HashSet<>(sinkConfig.getEndpoints());
52-
adb = sinkConfig.createMonitorClient();
53+
rebalanceIntervalMs = sinkConfig.getRebalanceIntervalMs();
5354
this.context = context;
5455
es = Executors.newSingleThreadScheduledExecutor();
56+
adb = sinkConfig.isAcquireHostListEnabled() ? sinkConfig.createMonitorClient() : null;
57+
endpoints = sinkConfig.getEndpoints();
5558
}
5659

5760
void start() {
5861
LOG.info("starting host list monitor background task");
59-
updateHostList();
60-
es.scheduleAtFixedRate(this::monitorHosts, acquireHostIntervalMs, acquireHostIntervalMs, TimeUnit.MILLISECONDS);
62+
if (adb != null) {
63+
updateHostList();
64+
es.scheduleAtFixedRate(this::monitorHosts, acquireHostIntervalMs, acquireHostIntervalMs, TimeUnit.MILLISECONDS);
65+
}
66+
es.scheduleAtFixedRate(this::rebalance, rebalanceIntervalMs, rebalanceIntervalMs, TimeUnit.MILLISECONDS);
6167
}
6268

63-
public Set<HostDescription> getEndpoints() {
64-
return endpoints;
69+
public List<HostDescription> getEndpoints() {
70+
synchronized (this) {
71+
return endpoints;
72+
}
6573
}
6674

6775
public void stop() {
6876
LOG.info("stopping host list monitor background task");
69-
adb.shutdown();
77+
if (adb != null) {
78+
adb.shutdown();
79+
}
7080
es.shutdown();
7181
try {
7282
if (!es.awaitTermination(ArangoSinkConfig.MONITOR_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
@@ -105,19 +115,32 @@ private Set<HostDescription> parseAcquireHostListResponse(ObjectNode node) {
105115
private boolean updateHostList() {
106116
LOG.debug("Fetching host list.");
107117
Set<HostDescription> hosts = acquireHostList();
108-
if (!hosts.isEmpty() && !endpoints.equals(hosts)) {
109-
LOG.info("Detected change in the acquired host list: \n\t old: {} \n\t new: {}", endpoints, hosts);
110-
endpoints = hosts;
111-
return true;
112-
} else {
113-
return false;
118+
synchronized (this) {
119+
if (!hosts.isEmpty() && !hosts.equals(new HashSet<>(endpoints))) {
120+
LOG.info("Detected change in the acquired host list: \n\t old: {} \n\t new: {}", endpoints, hosts);
121+
endpoints = new ArrayList<>(hosts);
122+
return true;
123+
} else {
124+
return false;
125+
}
114126
}
115127
}
116128

117129
private void monitorHosts() {
118130
if (updateHostList()) {
119-
LOG.info("Requesting tasks reconfiguration.");
120-
context.requestTaskReconfiguration();
131+
reconfigureTasks();
132+
}
133+
}
134+
135+
private void rebalance() {
136+
synchronized (this) {
137+
Collections.shuffle(endpoints);
121138
}
139+
reconfigureTasks();
140+
}
141+
142+
private void reconfigureTasks() {
143+
LOG.info("Requesting tasks reconfiguration.");
144+
context.requestTaskReconfiguration();
122145
}
123146
}

src/main/java/com/arangodb/kafka/config/ArangoSinkConfig.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ public enum DataErrorsTolerance {
105105
private static final String CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS_DOC = "Interval for acquiring the host list.";
106106
private static final String CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS_DISPLAY = "Acquire Host List Interval";
107107

108+
public static final String CONNECTION_REBALANCE_INTERVAL_MS = CONNECTION_PREFIX + "rebalance.interval.ms";
109+
private static final int CONNECTION_REBALANCE_INTERVAL_MS_DEFAULT = 30 * 60 * 1_000; // 30 min
110+
private static final String CONNECTION_REBALANCE_INTERVAL_MS_DOC = "Interval for re-balancing the connections " +
111+
"across the endpoints.";
112+
private static final String CONNECTION_REBALANCE_INTERVAL_MS_DISPLAY = "Connections re-balancing interval";
113+
108114
public static final String CONNECTION_PROTOCOL = CONNECTION_PREFIX + "protocol";
109115
private static final String CONNECTION_PROTOCOL_DEFAULT = Protocol.HTTP2.toString();
110116
private static final String CONNECTION_PROTOCOL_DOC = "Communication protocol.";
@@ -334,6 +340,17 @@ public enum DataErrorsTolerance {
334340
ConfigDef.Width.SHORT,
335341
CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS_DISPLAY
336342
)
343+
.define(
344+
CONNECTION_REBALANCE_INTERVAL_MS,
345+
ConfigDef.Type.INT,
346+
CONNECTION_REBALANCE_INTERVAL_MS_DEFAULT,
347+
ConfigDef.Importance.LOW,
348+
CONNECTION_REBALANCE_INTERVAL_MS_DOC,
349+
CONNECTION_GROUP,
350+
8,
351+
ConfigDef.Width.SHORT,
352+
CONNECTION_REBALANCE_INTERVAL_MS_DISPLAY
353+
)
337354
.define(
338355
CONNECTION_PROTOCOL,
339356
ConfigDef.Type.STRING,
@@ -342,7 +359,7 @@ public enum DataErrorsTolerance {
342359
ConfigDef.Importance.MEDIUM,
343360
CONNECTION_PROTOCOL_DOC,
344361
CONNECTION_GROUP,
345-
8,
362+
9,
346363
ConfigDef.Width.SHORT,
347364
CONNECTION_PROTOCOL_DISPLAY,
348365
new EnumRecommender(Protocol.class)
@@ -355,7 +372,7 @@ public enum DataErrorsTolerance {
355372
ConfigDef.Importance.LOW,
356373
CONNECTION_CONTENT_TYPE_DOC,
357374
CONNECTION_GROUP,
358-
9,
375+
10,
359376
ConfigDef.Width.SHORT,
360377
CONNECTION_CONTENT_TYPE_DISPLAY,
361378
new EnumRecommender(ContentType.class)
@@ -767,6 +784,10 @@ public int getAcquireHostIntervalMs() {
767784
return getInt(CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS);
768785
}
769786

787+
public int getRebalanceIntervalMs() {
788+
return getInt(CONNECTION_REBALANCE_INTERVAL_MS);
789+
}
790+
770791
public List<HostDescription> getEndpoints() {
771792
return getList(CONNECTION_ENDPOINTS).stream()
772793
.map(HostDescription::parse)

src/test/java/com/arangodb/kafka/HostListMonitorTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
import org.mockito.Spy;
1515

1616
import java.util.Collections;
17+
import java.util.List;
18+
import java.util.stream.Collectors;
19+
import java.util.stream.IntStream;
1720

1821
import static com.arangodb.kafka.utils.Utils.map;
1922
import static org.assertj.core.api.Assertions.assertThat;
@@ -90,5 +93,40 @@ void acquireHostList() throws InterruptedException {
9093
verify(adb, times(1)).shutdown();
9194
}
9295

96+
@Test
97+
void acquireHostListFalse() {
98+
ArangoSinkConfig cfg = new ArangoSinkConfig(config()
99+
.add(ArangoSinkConfig.CONNECTION_ACQUIRE_HOST_LIST_ENABLED, "false"));
100+
101+
HostListMonitor monitor = new HostListMonitor(cfg, context);
102+
monitor.start();
103+
assertThat(monitor.getEndpoints()).containsExactly(new HostDescription("a", 1));
104+
monitor.stop();
105+
}
106+
107+
@Test
108+
void rebalance() throws InterruptedException {
109+
String epList = IntStream.range(0, 10)
110+
.mapToObj(i -> "host:" + i)
111+
.collect(Collectors.joining(","));
112+
ArangoSinkConfig cfg = new ArangoSinkConfig(config()
113+
.add(ArangoSinkConfig.CONNECTION_ACQUIRE_HOST_LIST_ENABLED, "false")
114+
.add(ArangoSinkConfig.CONNECTION_ENDPOINTS, epList)
115+
.add(ArangoSinkConfig.CONNECTION_REBALANCE_INTERVAL_MS, "200")
116+
);
117+
118+
HostListMonitor monitor = new HostListMonitor(cfg, context);
119+
monitor.start();
120+
Thread.sleep(250);
121+
monitor.stop();
122+
verify(context, times(1)).requestTaskReconfiguration();
123+
124+
List<HostDescription> endpoints = IntStream.range(0, 10)
125+
.mapToObj(i -> new HostDescription("host", i))
126+
.collect(Collectors.toList());
127+
assertThat(monitor.getEndpoints())
128+
.containsExactlyInAnyOrderElementsOf(endpoints)
129+
.isNotEqualTo(endpoints);
130+
}
93131

94132
}

src/test/java/com/arangodb/kafka/config/ArangoSinkConfigTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ void defaults() {
3838
assertThat(config.getRetryBackoffMs()).isEqualTo(3000);
3939
assertThat(config.isAcquireHostListEnabled()).isFalse();
4040
assertThat(config.getAcquireHostIntervalMs()).isEqualTo(60_000);
41+
assertThat(config.getRebalanceIntervalMs()).isEqualTo(30 * 60 * 1_000);
4142
assertThat(config.getTolerateDataErrors()).isFalse();
4243
assertThat(config.getLogDataErrors()).isFalse();
4344
}

0 commit comments

Comments
 (0)