Skip to content

Commit 1d8fdc6

Browse files
MikeDomboshaguptashaikh
authored andcommitted
fix: add mqtt change subscribe from publish queue
1 parent 0f611e5 commit 1d8fdc6

File tree

2 files changed

+66
-61
lines changed

2 files changed

+66
-61
lines changed

src/main/java/com/aws/greengrass/mqttclient/MqttClient.java

Lines changed: 65 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -286,76 +286,80 @@ protected MqttClient(DeviceConfiguration deviceConfiguration,
286286
deviceConfiguration.getSpoolerNamespace();
287287
deviceConfiguration.getAWSRegion();
288288

289-
// If anything in the device configuration changes, then we will need to reconnect to the cloud
290-
// using the new settings. We do this by calling reconnect() on all of our connections
291-
this.deviceConfiguration.onAnyChange(new BatchedSubscriber((what, node) -> {
292-
// Skip events that don't change anything
293-
if (WhatHappened.timestampUpdated.equals(what) || WhatHappened.interiorAdded.equals(what) || node == null) {
294-
return true;
295-
}
289+
// Setup change subscriber from the publish queue so that all pending events are cleared before we subscribe
290+
mqttTopics.context.runOnPublishQueue(() -> {
291+
// If anything in the device configuration changes, then we will need to reconnect to the cloud
292+
// using the new settings. We do this by calling reconnect() on all of our connections
293+
this.deviceConfiguration.onAnyChange(new BatchedSubscriber((what, node) -> {
294+
// Skip events that don't change anything
295+
if (WhatHappened.timestampUpdated.equals(what) || WhatHappened.interiorAdded.equals(what)
296+
|| node == null) {
297+
return true;
298+
}
296299

297-
// List of configuration nodes that we need to reconfigure for if they change
298-
if (!(node.childOf(DEVICE_MQTT_NAMESPACE) || node.childOf(DEVICE_PARAM_THING_NAME) || node.childOf(
299-
DEVICE_PARAM_IOT_DATA_ENDPOINT) || node.childOf(DEVICE_PARAM_PRIVATE_KEY_PATH) || node.childOf(
300-
DEVICE_PARAM_CERTIFICATE_FILE_PATH) || node.childOf(DEVICE_PARAM_ROOT_CA_PATH) || node.childOf(
301-
DEVICE_PARAM_AWS_REGION))) {
302-
return true;
303-
}
300+
// List of configuration nodes that we need to reconfigure for if they change
301+
if (!(node.childOf(DEVICE_MQTT_NAMESPACE) || node.childOf(DEVICE_PARAM_THING_NAME) || node.childOf(
302+
DEVICE_PARAM_IOT_DATA_ENDPOINT) || node.childOf(DEVICE_PARAM_PRIVATE_KEY_PATH) || node.childOf(
303+
DEVICE_PARAM_CERTIFICATE_FILE_PATH) || node.childOf(DEVICE_PARAM_ROOT_CA_PATH) || node.childOf(
304+
DEVICE_PARAM_AWS_REGION))) {
305+
return true;
306+
}
304307

305-
// Only reconnect when the region changed if the proxy exists
306-
if (node.childOf(DEVICE_PARAM_AWS_REGION) && !ProxyUtils.isProxyConfigured(deviceConfiguration)) {
307-
return true;
308-
}
308+
// Only reconnect when the region changed if the proxy exists
309+
if (node.childOf(DEVICE_PARAM_AWS_REGION) && !ProxyUtils.isProxyConfigured(deviceConfiguration)) {
310+
return true;
311+
}
309312

310-
logger.atDebug().kv("modifiedNode", node.getFullName()).kv("changeType", what)
311-
.log("Reconfiguring MQTT clients");
312-
return false;
313-
}, (what) -> {
314-
validateAndSetMqttPublishConfiguration();
315-
316-
// Reconnect in separate thread to not block publish thread
317-
// Schedule the reconnection for slightly in the future to de-dupe multiple changes
318-
Future<?> oldFuture = reconfigureFuture.getAndSet(ses.schedule(() -> {
319-
// If the rootCa path changed, then we need to update the TLS options
320-
String newRootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath());
321-
synchronized (httpProxyLock) {
322-
if (!Objects.equals(rootCaPath, newRootCaPath)) {
323-
if (proxyTlsOptions != null) {
324-
proxyTlsOptions.close();
325-
}
326-
if (proxyTlsContext != null) {
327-
proxyTlsContext.close();
313+
logger.atDebug().kv("modifiedNode", node.getFullName()).kv("changeType", what)
314+
.log("Reconfiguring MQTT clients");
315+
return false;
316+
}, (what) -> {
317+
validateAndSetMqttPublishConfiguration();
318+
319+
// Reconnect in separate thread to not block publish thread
320+
// Schedule the reconnection for slightly in the future to de-dupe multiple changes
321+
Future<?> oldFuture = reconfigureFuture.getAndSet(ses.schedule(() -> {
322+
// If the rootCa path changed, then we need to update the TLS options
323+
String newRootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath());
324+
synchronized (httpProxyLock) {
325+
if (!Objects.equals(rootCaPath, newRootCaPath)) {
326+
if (proxyTlsOptions != null) {
327+
proxyTlsOptions.close();
328+
}
329+
if (proxyTlsContext != null) {
330+
proxyTlsContext.close();
331+
}
332+
rootCaPath = newRootCaPath;
333+
proxyTlsOptions = getTlsContextOptions(rootCaPath);
334+
proxyTlsContext = new ClientTlsContext(proxyTlsOptions);
328335
}
329-
rootCaPath = newRootCaPath;
330-
proxyTlsOptions = getTlsContextOptions(rootCaPath);
331-
proxyTlsContext = new ClientTlsContext(proxyTlsOptions);
332336
}
333-
}
334337

335-
// Continually try to reconnect until all the connections are reconnected
336-
Set<IndividualMqttClient> brokenConnections = new CopyOnWriteArraySet<>(connections);
337-
do {
338-
for (IndividualMqttClient connection : brokenConnections) {
339-
if (Thread.currentThread().isInterrupted()) {
340-
return;
341-
}
338+
// Continually try to reconnect until all the connections are reconnected
339+
Set<IndividualMqttClient> brokenConnections = new CopyOnWriteArraySet<>(connections);
340+
do {
341+
for (IndividualMqttClient connection : brokenConnections) {
342+
if (Thread.currentThread().isInterrupted()) {
343+
return;
344+
}
342345

343-
try {
344-
connection.reconnect(getMqttOperationTimeoutMillis());
345-
brokenConnections.remove(connection);
346-
} catch (InterruptedException | ExecutionException | TimeoutException e) {
347-
logger.atError().setCause(e).kv(CLIENT_ID_KEY, connection.getClientId())
348-
.log("Error while reconnecting MQTT client");
346+
try {
347+
connection.reconnect(getMqttOperationTimeoutMillis());
348+
brokenConnections.remove(connection);
349+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
350+
logger.atError().setCause(e).kv(CLIENT_ID_KEY, connection.getClientId())
351+
.log("Error while reconnecting MQTT client");
352+
}
349353
}
350-
}
351-
} while (!brokenConnections.isEmpty());
352-
}, 1, TimeUnit.SECONDS));
354+
} while (!brokenConnections.isEmpty());
355+
}, 1, TimeUnit.SECONDS));
353356

354-
// If a reconfiguration task already existed, then kill it and create a new one
355-
if (oldFuture != null) {
356-
oldFuture.cancel(true);
357-
}
358-
}));
357+
// If a reconfiguration task already existed, then kill it and create a new one
358+
if (oldFuture != null) {
359+
oldFuture.cancel(true);
360+
}
361+
}));
362+
});
359363
}
360364

361365
/**

src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ void GIVEN_connection_WHEN_settings_change_THEN_reconnects_on_valid_changes()
299299
ArgumentCaptor<ChildChanged> cc = ArgumentCaptor.forClass(ChildChanged.class);
300300
doNothing().when(deviceConfiguration).onAnyChange(cc.capture());
301301
MqttClient client = spy(new MqttClient(deviceConfiguration, (c) -> builder, ses, executorService));
302+
mqttNamespace.context.waitForPublishQueueToClear();
302303

303304
AwsIotMqttClient iClient1 = mock(AwsIotMqttClient.class);
304305
when(iClient1.subscribe(any())).thenReturn(CompletableFuture.completedFuture(null));

0 commit comments

Comments
 (0)