Skip to content

Commit 4de2fb2

Browse files
author
Tianci Shen
committed
feat: add nucleus connectivity validation
1 parent 2f671e3 commit 4de2fb2

File tree

6 files changed

+406
-21
lines changed

6 files changed

+406
-21
lines changed

src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java

Lines changed: 337 additions & 4 deletions
Large diffs are not rendered by default.

src/main/java/com/aws/greengrass/deployment/DeviceConfiguration.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ public class DeviceConfiguration {
136136
public static final long COMPONENT_STORE_MAX_SIZE_DEFAULT_BYTES = 10_000_000_000L;
137137
public static final long DEPLOYMENT_POLLING_FREQUENCY_DEFAULT_SECONDS = 15L;
138138
public static final String DEVICE_PARAM_GG_DATA_PLANE_PORT = "greengrassDataPlanePort";
139-
private static final int GG_DATA_PLANE_PORT_DEFAULT = 8443;
139+
public static final int GG_DATA_PLANE_PORT_DEFAULT = 8443;
140140

141-
private static final String DEVICE_PARAM_ENV_STAGE = "envStage";
142-
private static final String DEFAULT_ENV_STAGE = "prod";
141+
public static final String DEVICE_PARAM_ENV_STAGE = "envStage";
142+
public static final String DEFAULT_ENV_STAGE = "prod";
143143
private static final String CANNOT_BE_EMPTY = " cannot be empty";
144144
private static final Logger logger = LogManager.getLogger(DeviceConfiguration.class);
145145
public static final String AWS_IOT_THING_NAME_ENV = "AWS_IOT_THING_NAME";

src/main/java/com/aws/greengrass/deployment/ThingGroupHelper.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package com.aws.greengrass.deployment;
77

8+
import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
9+
import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException;
810
import com.aws.greengrass.deployment.exceptions.DeviceConfigurationException;
911
import com.aws.greengrass.deployment.exceptions.RetryableServerErrorException;
1012
import com.aws.greengrass.logging.api.Logger;
@@ -16,6 +18,7 @@
1618
import lombok.Getter;
1719
import lombok.Setter;
1820
import software.amazon.awssdk.core.exception.SdkClientException;
21+
import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClient;
1922
import software.amazon.awssdk.services.greengrassv2data.model.GreengrassV2DataException;
2023
import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceRequest;
2124
import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceResponse;
@@ -102,4 +105,40 @@ public Optional<Set<String>> listThingGroupsForDevice(int maxAttemptCount) throw
102105
return Optional.of(thingGroupNames);
103106
}, "get-thing-group-hierarchy", logger);
104107
}
108+
109+
public void waitForReconnect(long timeoutMillis, GreengrassV2DataClient greengrassV2DataClient) throws ComponentConfigurationValidationException {
110+
if (!deviceConfiguration.isDeviceConfiguredToTalkToCloud()) {
111+
return;
112+
}
113+
Duration initialInterval = Duration.ofMillis(timeoutMillis / 8);
114+
Duration maxRetryInterval = Duration.ofMillis(timeoutMillis / 4);
115+
116+
try {
117+
RetryUtils.runWithRetry(clientExceptionRetryConfig.toBuilder()
118+
.maxAttempt(3)
119+
.initialRetryInterval(initialInterval)
120+
.maxRetryInterval(maxRetryInterval)
121+
.build(),
122+
() -> {
123+
ListThingGroupsForCoreDeviceRequest request = ListThingGroupsForCoreDeviceRequest.builder()
124+
.coreDeviceThingName(Coerce.toString(deviceConfiguration.getThingName()))
125+
.build();
126+
127+
ListThingGroupsForCoreDeviceResponse response;
128+
try {
129+
response = greengrassV2DataClient.listThingGroupsForCoreDevice(request);
130+
} catch (GreengrassV2DataException e) {
131+
if (RetryUtils.retryErrorCodes(e.statusCode())) {
132+
throw new RetryableServerErrorException("Failed with retryable error " + e.statusCode()
133+
+ " when calling listThingGroupsForCoreDevice", e);
134+
}
135+
throw e;
136+
}
137+
return response;
138+
}, "get-thing-group-hierarchy", logger);
139+
} catch (Exception e) {
140+
throw new ComponentConfigurationValidationException("HTTP client failed to reconnect with new configuration: " + e,
141+
DeploymentErrorCode.FAILED_TO_RECONNECT);
142+
}
143+
}
105144
}

src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public enum DeploymentErrorCode {
8787
UNSUPPORTED_REGION(DeploymentErrorType.REQUEST_ERROR),
8888
IOT_CRED_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR),
8989
IOT_DATA_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR),
90+
FAILED_TO_RECONNECT(DeploymentErrorType.REQUEST_ERROR),
9091

9192
/* Docker issues */
9293
DOCKER_ERROR(DeploymentErrorType.DEPENDENCY_ERROR),

src/main/java/com/aws/greengrass/mqttclient/MqttClient.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import com.aws.greengrass.config.Topics;
99
import com.aws.greengrass.config.WhatHappened;
1010
import com.aws.greengrass.deployment.DeviceConfiguration;
11+
import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
12+
import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException;
1113
import com.aws.greengrass.lifecyclemanager.Kernel;
1214
import com.aws.greengrass.logging.api.LogEventBuilder;
1315
import com.aws.greengrass.logging.api.Logger;
@@ -94,18 +96,18 @@
9496
@SuppressWarnings({"PMD.AvoidDuplicateLiterals"})
9597
public class MqttClient implements Closeable {
9698
private static final Logger logger = LogManager.getLogger(MqttClient.class);
97-
static final String MQTT_KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeoutMs";
98-
static final int DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT = (int) Duration.ofSeconds(60).toMillis();
99-
static final String MQTT_PING_TIMEOUT_KEY = "pingTimeoutMs";
100-
private static final int DEFAULT_MQTT_PING_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
99+
public static final String MQTT_KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeoutMs";
100+
public static final int DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT = (int) Duration.ofSeconds(60).toMillis();
101+
public static final String MQTT_PING_TIMEOUT_KEY = "pingTimeoutMs";
102+
public static final int DEFAULT_MQTT_PING_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
101103
private static final String MQTT_THREAD_POOL_SIZE_KEY = "threadPoolSize";
102104
public static final int DEFAULT_MQTT_PORT = 8883;
103105
public static final String MQTT_PORT_KEY = "port";
104-
private static final String MQTT_SOCKET_TIMEOUT_KEY = "socketTimeoutMs";
106+
public static final String MQTT_SOCKET_TIMEOUT_KEY = "socketTimeoutMs";
105107
// Default taken from AWS SDK
106-
private static final int DEFAULT_MQTT_SOCKET_TIMEOUT = (int) Duration.ofSeconds(3).toMillis();
107-
static final String MQTT_OPERATION_TIMEOUT_KEY = "operationTimeoutMs";
108-
static final int DEFAULT_MQTT_OPERATION_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
108+
public static final int DEFAULT_MQTT_SOCKET_TIMEOUT = (int) Duration.ofSeconds(3).toMillis();
109+
public static final String MQTT_OPERATION_TIMEOUT_KEY = "operationTimeoutMs";
110+
public static final int DEFAULT_MQTT_OPERATION_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
109111
static final int DEFAULT_MQTT_CLOSE_TIMEOUT = (int) Duration.ofSeconds(2).toMillis();
110112
static final String MQTT_MAX_IN_FLIGHT_PUBLISHES_KEY = "maxInFlightPublishes";
111113
static final int DEFAULT_MAX_IN_FLIGHT_PUBLISHES = 5;
@@ -159,6 +161,7 @@ public class MqttClient implements Closeable {
159161
private int maxPublishRetryCount;
160162
private int maxPublishMessageSize;
161163
private final AtomicBoolean isClosed = new AtomicBoolean(false);
164+
private boolean isReconnected = true;
162165

163166
@Getter(AccessLevel.PROTECTED)
164167
private final MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() {
@@ -312,6 +315,7 @@ protected MqttClient(DeviceConfiguration deviceConfiguration,
312315

313316
logger.atDebug().kv("modifiedNode", node.getFullName()).kv("changeType", what)
314317
.log("Reconfiguring MQTT clients");
318+
isReconnected = false;
315319
return false;
316320
}, (what) -> {
317321
validateAndSetMqttPublishConfiguration();
@@ -351,6 +355,9 @@ protected MqttClient(DeviceConfiguration deviceConfiguration,
351355
.log("Error while reconnecting MQTT client");
352356
}
353357
}
358+
if (brokenConnections.isEmpty()){
359+
isReconnected = true;
360+
}
354361
} while (!brokenConnections.isEmpty());
355362
}, 1, TimeUnit.SECONDS));
356363

@@ -390,7 +397,7 @@ public MqttClient(DeviceConfiguration deviceConfiguration, Spool spool, boolean
390397
validateAndSetMqttPublishConfiguration();
391398
}
392399

393-
private TlsContextOptions getTlsContextOptions(String rootCaPath) {
400+
public static TlsContextOptions getTlsContextOptions(String rootCaPath) {
394401
return Utils.isNotEmpty(rootCaPath)
395402
? TlsContextOptions.createDefaultClient().withCertificateAuthorityFromPath(null, rootCaPath)
396403
: TlsContextOptions.createDefaultClient();

src/test/java/com/aws/greengrass/deployment/DeploymentConfigMergerTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
2727
import com.aws.greengrass.logging.api.Logger;
2828
import com.aws.greengrass.logging.impl.LogManager;
29+
import com.aws.greengrass.mqttclient.MqttClient;
2930
import com.aws.greengrass.testcommons.testutilities.GGExtension;
3031
import org.junit.jupiter.api.AfterEach;
3132
import org.junit.jupiter.api.BeforeEach;
@@ -90,6 +91,10 @@ class DeploymentConfigMergerTest {
9091
@Mock
9192
private DynamicComponentConfigurationValidator validator;
9293
@Mock
94+
private MqttClient mqttClient;
95+
@Mock
96+
private ThingGroupHelper thingGroupHelper;
97+
@Mock
9398
private Context context;
9499

95100
@BeforeEach
@@ -307,7 +312,7 @@ void GIVEN_deployment_WHEN_check_safety_selected_THEN_check_safety_before_update
307312
when(deploymentActivatorFactory.getDeploymentActivator(any())).thenReturn(deploymentActivator);
308313
when(context.get(DeploymentActivatorFactory.class)).thenReturn(deploymentActivatorFactory);
309314

310-
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
315+
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
311316

312317
DeploymentDocument doc = new DeploymentDocument();
313318
doc.setConfigurationArn("NoSafetyCheckDeploy");
@@ -345,7 +350,7 @@ void GIVEN_deployment_WHEN_task_cancelled_THEN_update_is_cancelled() throws Thro
345350
});
346351

347352
// GIVEN
348-
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
353+
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
349354
DeploymentDocument doc = mock(DeploymentDocument.class);
350355
when(doc.getDeploymentId()).thenReturn("DeploymentId");
351356
when(doc.getComponentUpdatePolicy()).thenReturn(
@@ -381,7 +386,7 @@ void GIVEN_deployment_WHEN_task_not_cancelled_THEN_update_is_continued() throws
381386
when(context.get(DefaultActivator.class)).thenReturn(defaultActivator);
382387

383388
// GIVEN
384-
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
389+
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
385390
DeploymentDocument doc = mock(DeploymentDocument.class);
386391
when(doc.getDeploymentId()).thenReturn("DeploymentId");
387392
when(doc.getComponentUpdatePolicy()).thenReturn(
@@ -437,7 +442,7 @@ void GIVEN_deployment_activate_WHEN_deployment_has_new_config_THEN_new_config_is
437442
newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3);
438443
newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2);
439444
// GIVEN
440-
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
445+
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
441446
DeploymentDocument doc = mock(DeploymentDocument.class);
442447
when(doc.getDeploymentId()).thenReturn("DeploymentId");
443448
when(doc.getComponentUpdatePolicy()).thenReturn(
@@ -498,7 +503,7 @@ void GIVEN_deployment_activate_WHEN_deployment_has_some_new_config_THEN_old_conf
498503
newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3);
499504
newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2);
500505
// GIVEN
501-
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
506+
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
502507
DeploymentDocument doc = mock(DeploymentDocument.class);
503508
when(doc.getDeploymentId()).thenReturn("DeploymentId");
504509
when(doc.getComponentUpdatePolicy()).thenReturn(

0 commit comments

Comments
 (0)