Skip to content

Commit 813116b

Browse files
author
Tianci Shen
committed
feat: add nucleus connectivity validation
1 parent 2f671e3 commit 813116b

File tree

5 files changed

+195
-17
lines changed

5 files changed

+195
-17
lines changed

src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java

Lines changed: 129 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,26 @@
2727
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
2828
import com.aws.greengrass.logging.api.Logger;
2929
import com.aws.greengrass.logging.impl.LogManager;
30+
import com.aws.greengrass.mqttclient.MqttClient;
31+
import com.aws.greengrass.security.SecurityService;
32+
import com.aws.greengrass.security.exceptions.MqttConnectionProviderException;
3033
import com.aws.greengrass.util.Coerce;
34+
import com.aws.greengrass.util.ProxyUtils;
35+
import com.aws.greengrass.util.Utils;
3136
import lombok.AccessLevel;
3237
import lombok.AllArgsConstructor;
3338
import lombok.Getter;
39+
import software.amazon.awssdk.crt.http.HttpProxyOptions;
40+
import software.amazon.awssdk.crt.io.ClientTlsContext;
41+
import software.amazon.awssdk.crt.io.SocketOptions;
42+
import software.amazon.awssdk.crt.io.TlsContextOptions;
43+
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
44+
import software.amazon.awssdk.crt.mqtt.MqttException;
45+
import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder;
3446
import software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction;
3547

48+
import java.time.Duration;
49+
import java.util.Arrays;
3650
import java.util.Collection;
3751
import java.util.HashMap;
3852
import java.util.HashSet;
@@ -47,9 +61,17 @@
4761
import javax.inject.Inject;
4862

4963
import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
64+
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_MQTT_NAMESPACE;
65+
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_NETWORK_PROXY_NAMESPACE;
5066
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_AWS_REGION;
5167
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_IOT_CRED_ENDPOINT;
5268
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_IOT_DATA_ENDPOINT;
69+
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_NO_PROXY_ADDRESSES;
70+
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_PROXY_URL;
71+
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_PROXY_USERNAME;
72+
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_PROXY_PASSWORD;
73+
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PROXY_NAMESPACE;
74+
import static com.aws.greengrass.deployment.DynamicComponentConfigurationValidator.DEFAULT_TIMEOUT_SECOND;
5375
import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICES_NAMESPACE_TOPIC;
5476
import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICE_NAME_KEY;
5577

@@ -66,6 +88,9 @@ public class DeploymentConfigMerger {
6688
private Kernel kernel;
6789
private DeviceConfiguration deviceConfiguration;
6890
private DynamicComponentConfigurationValidator validator;
91+
private MqttClient mqttClient;
92+
private ThingGroupHelper thingGroupHelper;
93+
private SecurityService securityService;
6994

7095
/**
7196
* Merge in new configuration values and new services.
@@ -143,7 +168,8 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
143168
}
144169

145170
// Validate the AWS region, IoT credentials endpoint as well as the IoT data endpoint.
146-
if (!validateNucleusConfig(totallyCompleteFuture, nucleusConfig)) {
171+
if (!validateNucleusConfig(totallyCompleteFuture, nucleusConfig,
172+
deployment.getDeploymentDocumentObj().getConfigurationValidationPolicy().timeoutInSeconds())) {
147173
return;
148174
}
149175

@@ -153,7 +179,7 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
153179
}
154180

155181
private boolean validateNucleusConfig(CompletableFuture<DeploymentResult> totallyCompleteFuture,
156-
Map<String, Object> nucleusConfig) {
182+
Map<String, Object> nucleusConfig, Integer timeoutSec) {
157183
if (nucleusConfig != null) {
158184
String awsRegion = tryGetAwsRegionFromNewConfig(nucleusConfig);
159185
String iotCredEndpoint = tryGetIoTCredEndpointFromNewConfig(nucleusConfig);
@@ -166,10 +192,111 @@ private boolean validateNucleusConfig(CompletableFuture<DeploymentResult> totall
166192
.complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, e));
167193
return false;
168194
}
195+
196+
long configTimeout = Duration.ofSeconds(DEFAULT_TIMEOUT_SECOND).toMillis();
197+
if (timeoutSec != null) {
198+
configTimeout = Duration.ofSeconds(timeoutSec).toMillis();
199+
}
200+
201+
if (configTimeout == 0
202+
|| !deviceConfiguration.isDeviceConfiguredToTalkToCloud()) {
203+
logger.atDebug().log("Skipping connectivity validation");
204+
return true;
205+
}
206+
try {
207+
// Check that MQTT client has reconnected
208+
logger.atDebug().log("Checking MQTT Reconnected");
209+
// mqttClient.waitForReconnect(configTimeout);
210+
MqttClientConnection connection = createMqttConnection(nucleusConfig);
211+
// TODO - pass connection into ScheduledExecuterService for wait and retry
212+
connection.connect().get();
213+
214+
// Check that HTTP client works
215+
logger.atDebug().log("Checking HTTP Reconnected");
216+
// TODO - create new http client
217+
// thingGroupHelper.waitForReconnect(configTimeout);
218+
} catch (Exception e) {
219+
logger.atError().cause(e).log("Nucleus connectivity validation failed");
220+
totallyCompleteFuture
221+
.complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, e));
222+
return false;
223+
}
169224
}
170225
return true;
171226
}
172227

228+
private MqttClientConnection createMqttConnection(Map<String, Object> nucleusConfig) {
229+
AwsIotMqttConnectionBuilder builder;
230+
try {
231+
builder = securityService.getDeviceIdentityMqttConnectionBuilder();
232+
} catch (MqttConnectionProviderException e) {
233+
throw new MqttException(e.getMessage());
234+
}
235+
236+
// get mqtt values from nucleus config to construct a MQTT client
237+
Map<String, Object> mqtt = (Map<String, Object>) nucleusConfig.get(DEVICE_MQTT_NAMESPACE);
238+
int pingTimeoutMs = Coerce.toInt(mqtt.getOrDefault(MqttClient.MQTT_PING_TIMEOUT_KEY, MqttClient.DEFAULT_MQTT_PING_TIMEOUT));
239+
int keepAliveMs = Coerce.toInt(mqtt.getOrDefault(MqttClient.MQTT_KEEP_ALIVE_TIMEOUT_KEY, MqttClient.DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT));
240+
if (keepAliveMs != 0 && keepAliveMs <= pingTimeoutMs) {
241+
throw new MqttException(String.format("%s must be greater than %s",
242+
MqttClient.MQTT_KEEP_ALIVE_TIMEOUT_KEY, MqttClient.MQTT_PING_TIMEOUT_KEY));
243+
}
244+
String rootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath());
245+
String endpoint = Coerce.toString(nucleusConfig.get(DEVICE_PARAM_IOT_DATA_ENDPOINT));
246+
short port = (short) Coerce.toInt(mqtt.getOrDefault(MqttClient.MQTT_PORT_KEY, MqttClient.DEFAULT_MQTT_PORT));
247+
int operationTimeout = Coerce.toInt(mqtt.getOrDefault(MqttClient.MQTT_OPERATION_TIMEOUT_KEY, MqttClient.DEFAULT_MQTT_OPERATION_TIMEOUT));
248+
int socketTimeout = Coerce.toInt(mqtt.getOrDefault(MqttClient.MQTT_SOCKET_TIMEOUT_KEY, MqttClient.DEFAULT_MQTT_SOCKET_TIMEOUT));
249+
250+
builder.withCertificateAuthorityFromPath(null, rootCaPath)
251+
.withClientId("test-id")
252+
.withEndpoint(endpoint)
253+
.withPort(port)
254+
.withCleanSession(true)
255+
.withKeepAliveMs(keepAliveMs)
256+
.withProtocolOperationTimeoutMs(operationTimeout)
257+
.withPingTimeoutMs(pingTimeoutMs)
258+
.withSocketOptions(new SocketOptions()).withTimeoutMs(socketTimeout);
259+
260+
// add proxy settings if needed
261+
Map<String, Object> networkProxy = (Map<String, Object>) nucleusConfig.get(DEVICE_NETWORK_PROXY_NAMESPACE);
262+
Map<String, Object> proxy = (Map<String, Object>) networkProxy.get(DEVICE_PROXY_NAMESPACE);
263+
String proxyUrl = Coerce.toString(proxy.get(DEVICE_PARAM_PROXY_URL));
264+
if (!Utils.isEmpty(proxyUrl)) {
265+
HttpProxyOptions httpProxyOptions = new HttpProxyOptions();
266+
httpProxyOptions.setHost(ProxyUtils.getHostFromProxyUrl(proxyUrl));
267+
httpProxyOptions.setPort(ProxyUtils.getPortFromProxyUrl(proxyUrl));
268+
httpProxyOptions.setConnectionType(HttpProxyOptions.HttpProxyConnectionType.Tunneling);
269+
270+
if ("https".equalsIgnoreCase(ProxyUtils.getSchemeFromProxyUrl(proxyUrl))) {
271+
TlsContextOptions proxyTlsOptions = MqttClient.getTlsContextOptions(rootCaPath);
272+
ClientTlsContext tlsContext = new ClientTlsContext(proxyTlsOptions);
273+
httpProxyOptions.setTlsContext(tlsContext);
274+
}
275+
276+
String username = Coerce.toString(proxy.getOrDefault(DEVICE_PARAM_PROXY_USERNAME, ""));
277+
String password = Coerce.toString(proxy.getOrDefault(DEVICE_PARAM_PROXY_PASSWORD, ""));
278+
String proxyUsername = ProxyUtils.getProxyUsername(proxyUrl, username);
279+
if (Utils.isNotEmpty(proxyUsername)) {
280+
httpProxyOptions.setAuthorizationType(HttpProxyOptions.HttpProxyAuthorizationType.Basic);
281+
httpProxyOptions.setAuthorizationUsername(proxyUsername);
282+
httpProxyOptions
283+
.setAuthorizationPassword(ProxyUtils.getProxyPassword(proxyUrl, password));
284+
}
285+
286+
String noProxy = Coerce.toString(proxy.getOrDefault(DEVICE_PARAM_NO_PROXY_ADDRESSES, ""));
287+
boolean useProxy = true;
288+
// Only use the proxy when the endpoint we're connecting to is not in the NoProxyAddress list
289+
if (Utils.isNotEmpty(noProxy) && Utils.isNotEmpty(endpoint)) {
290+
useProxy = Arrays.stream(noProxy.split(",")).noneMatch(endpoint::matches);
291+
}
292+
if (useProxy) {
293+
builder.withHttpProxyOptions(httpProxyOptions);
294+
}
295+
}
296+
297+
return builder.build();
298+
}
299+
173300
/**
174301
* Completes the provided future when all of the listed services are running.
175302
*
@@ -307,7 +434,6 @@ public AggregateServicesChangeManager createRollbackManager() {
307434

308435
/**
309436
* Start the new services the merge intends to add.
310-
*
311437
*/
312438
public void startNewServices() {
313439
for (String serviceName : servicesToAdd) {

src/main/java/com/aws/greengrass/deployment/ThingGroupHelper.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package com.aws.greengrass.deployment;
77

8+
import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
9+
import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException;
810
import com.aws.greengrass.deployment.exceptions.DeviceConfigurationException;
911
import com.aws.greengrass.deployment.exceptions.RetryableServerErrorException;
1012
import com.aws.greengrass.logging.api.Logger;
@@ -102,4 +104,41 @@ public Optional<Set<String>> listThingGroupsForDevice(int maxAttemptCount) throw
102104
return Optional.of(thingGroupNames);
103105
}, "get-thing-group-hierarchy", logger);
104106
}
107+
108+
public void waitForReconnect(long timeoutMillis) throws Exception {
109+
if (!deviceConfiguration.isDeviceConfiguredToTalkToCloud()) {
110+
return;
111+
}
112+
Duration initialInterval = Duration.ofMillis(timeoutMillis / 8);
113+
Duration maxRetryInterval = Duration.ofMillis(timeoutMillis / 4);
114+
115+
try {
116+
RetryUtils.runWithRetry(clientExceptionRetryConfig.toBuilder()
117+
.maxAttempt(3)
118+
.initialRetryInterval(initialInterval)
119+
.maxRetryInterval(maxRetryInterval)
120+
.build(),
121+
() -> {
122+
ListThingGroupsForCoreDeviceRequest request = ListThingGroupsForCoreDeviceRequest.builder()
123+
.coreDeviceThingName(Coerce.toString(deviceConfiguration.getThingName()))
124+
.build();
125+
126+
ListThingGroupsForCoreDeviceResponse response;
127+
try {
128+
response =
129+
clientFactory.fetchGreengrassV2DataClient().listThingGroupsForCoreDevice(request);
130+
} catch (GreengrassV2DataException e) {
131+
if (RetryUtils.retryErrorCodes(e.statusCode())) {
132+
throw new RetryableServerErrorException("Failed with retryable error " + e.statusCode()
133+
+ " when calling listThingGroupsForCoreDevice", e);
134+
}
135+
throw e;
136+
}
137+
return response;
138+
}, "get-thing-group-hierarchy", logger);
139+
} catch (Exception e) {
140+
throw new ComponentConfigurationValidationException("HTTP client failed to reconnect with new configuration: " + e,
141+
DeploymentErrorCode.FAILED_TO_RECONNECT);
142+
}
143+
}
105144
}

src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public enum DeploymentErrorCode {
8787
UNSUPPORTED_REGION(DeploymentErrorType.REQUEST_ERROR),
8888
IOT_CRED_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR),
8989
IOT_DATA_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR),
90+
FAILED_TO_RECONNECT(DeploymentErrorType.REQUEST_ERROR),
9091

9192
/* Docker issues */
9293
DOCKER_ERROR(DeploymentErrorType.DEPENDENCY_ERROR),

src/main/java/com/aws/greengrass/mqttclient/MqttClient.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import com.aws.greengrass.config.Topics;
99
import com.aws.greengrass.config.WhatHappened;
1010
import com.aws.greengrass.deployment.DeviceConfiguration;
11+
import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
12+
import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException;
1113
import com.aws.greengrass.lifecyclemanager.Kernel;
1214
import com.aws.greengrass.logging.api.LogEventBuilder;
1315
import com.aws.greengrass.logging.api.Logger;
@@ -94,18 +96,18 @@
9496
@SuppressWarnings({"PMD.AvoidDuplicateLiterals"})
9597
public class MqttClient implements Closeable {
9698
private static final Logger logger = LogManager.getLogger(MqttClient.class);
97-
static final String MQTT_KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeoutMs";
98-
static final int DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT = (int) Duration.ofSeconds(60).toMillis();
99-
static final String MQTT_PING_TIMEOUT_KEY = "pingTimeoutMs";
100-
private static final int DEFAULT_MQTT_PING_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
99+
public static final String MQTT_KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeoutMs";
100+
public static final int DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT = (int) Duration.ofSeconds(60).toMillis();
101+
public static final String MQTT_PING_TIMEOUT_KEY = "pingTimeoutMs";
102+
public static final int DEFAULT_MQTT_PING_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
101103
private static final String MQTT_THREAD_POOL_SIZE_KEY = "threadPoolSize";
102104
public static final int DEFAULT_MQTT_PORT = 8883;
103105
public static final String MQTT_PORT_KEY = "port";
104-
private static final String MQTT_SOCKET_TIMEOUT_KEY = "socketTimeoutMs";
106+
public static final String MQTT_SOCKET_TIMEOUT_KEY = "socketTimeoutMs";
105107
// Default taken from AWS SDK
106-
private static final int DEFAULT_MQTT_SOCKET_TIMEOUT = (int) Duration.ofSeconds(3).toMillis();
107-
static final String MQTT_OPERATION_TIMEOUT_KEY = "operationTimeoutMs";
108-
static final int DEFAULT_MQTT_OPERATION_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
108+
public static final int DEFAULT_MQTT_SOCKET_TIMEOUT = (int) Duration.ofSeconds(3).toMillis();
109+
public static final String MQTT_OPERATION_TIMEOUT_KEY = "operationTimeoutMs";
110+
public static final int DEFAULT_MQTT_OPERATION_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
109111
static final int DEFAULT_MQTT_CLOSE_TIMEOUT = (int) Duration.ofSeconds(2).toMillis();
110112
static final String MQTT_MAX_IN_FLIGHT_PUBLISHES_KEY = "maxInFlightPublishes";
111113
static final int DEFAULT_MAX_IN_FLIGHT_PUBLISHES = 5;
@@ -159,6 +161,7 @@ public class MqttClient implements Closeable {
159161
private int maxPublishRetryCount;
160162
private int maxPublishMessageSize;
161163
private final AtomicBoolean isClosed = new AtomicBoolean(false);
164+
private boolean isReconnected = true;
162165

163166
@Getter(AccessLevel.PROTECTED)
164167
private final MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() {
@@ -312,6 +315,7 @@ protected MqttClient(DeviceConfiguration deviceConfiguration,
312315

313316
logger.atDebug().kv("modifiedNode", node.getFullName()).kv("changeType", what)
314317
.log("Reconfiguring MQTT clients");
318+
isReconnected = false;
315319
return false;
316320
}, (what) -> {
317321
validateAndSetMqttPublishConfiguration();
@@ -351,6 +355,9 @@ protected MqttClient(DeviceConfiguration deviceConfiguration,
351355
.log("Error while reconnecting MQTT client");
352356
}
353357
}
358+
if (brokenConnections.isEmpty()){
359+
isReconnected = true;
360+
}
354361
} while (!brokenConnections.isEmpty());
355362
}, 1, TimeUnit.SECONDS));
356363

@@ -390,7 +397,7 @@ public MqttClient(DeviceConfiguration deviceConfiguration, Spool spool, boolean
390397
validateAndSetMqttPublishConfiguration();
391398
}
392399

393-
private TlsContextOptions getTlsContextOptions(String rootCaPath) {
400+
public static TlsContextOptions getTlsContextOptions(String rootCaPath) {
394401
return Utils.isNotEmpty(rootCaPath)
395402
? TlsContextOptions.createDefaultClient().withCertificateAuthorityFromPath(null, rootCaPath)
396403
: TlsContextOptions.createDefaultClient();

0 commit comments

Comments
 (0)