Skip to content

Commit 875fa60

Browse files
author
Tianci Shen
committed
feat: add nucleus connectivity validation
1 parent cdf1ce9 commit 875fa60

File tree

17 files changed

+933
-79
lines changed

17 files changed

+933
-79
lines changed

src/integrationtests/java/com/aws/greengrass/integrationtests/deployment/DeploymentConfigMergingTest.java

Lines changed: 225 additions & 2 deletions
Large diffs are not rendered by default.

src/integrationtests/java/com/aws/greengrass/integrationtests/lifecyclemanager/ServiceDependencyLifecycleTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import software.amazon.awssdk.services.greengrassv2.model.DeploymentConfigurationValidationPolicy;
3131

3232
import java.net.URL;
33+
import java.util.ArrayList;
3334
import java.util.Arrays;
3435
import java.util.Collections;
3536
import java.util.HashMap;
@@ -181,8 +182,8 @@ void GIVEN_hard_dependency_WHEN_dependency_goes_through_lifecycle_events_THEN_cu
181182
Map<String, Object> configRemoveDep = new HashMap<String, Object>() {{
182183
put(SERVICES_NAMESPACE_TOPIC, new HashMap<String, Object>() {{
183184
put("main", new HashMap<String, Object>() {{
184-
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, Arrays.asList(CustomerApp,
185-
DEFAULT_NUCLEUS_COMPONENT_NAME));
185+
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, new ArrayList<>(Arrays.asList(CustomerApp,
186+
DEFAULT_NUCLEUS_COMPONENT_NAME)));
186187
}});
187188
put(CustomerApp, new HashMap<String, Object>() {{
188189
putAll(kernel.findServiceTopic(CustomerApp).toPOJO());
@@ -294,8 +295,8 @@ void GIVEN_soft_dependency_WHEN_dependency_goes_through_lifecycle_events_THEN_cu
294295
Map<String, Object> configRemoveDep = new HashMap<String, Object>() {{
295296
put(SERVICES_NAMESPACE_TOPIC, new HashMap<String, Object>() {{
296297
put("main", new HashMap<String, Object>() {{
297-
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, Arrays.asList(CustomerApp,
298-
DEFAULT_NUCLEUS_COMPONENT_NAME));
298+
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, new ArrayList<>(Arrays.asList(CustomerApp,
299+
DEFAULT_NUCLEUS_COMPONENT_NAME)));
299300
}});
300301
put(CustomerApp, new HashMap<String, Object>() {{
301302
putAll(kernel.findServiceTopic(CustomerApp).toPOJO());

src/integrationtests/java/com/aws/greengrass/integrationtests/status/EventFleetStatusServiceTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.aws.greengrass.componentmanager.exceptions.PackageLoadingException;
1111
import com.aws.greengrass.dependency.ComponentStatusCode;
1212
import com.aws.greengrass.dependency.State;
13+
import com.aws.greengrass.deployment.ConnectivityValidator;
1314
import com.aws.greengrass.deployment.DeploymentQueue;
1415
import com.aws.greengrass.deployment.DeploymentService;
1516
import com.aws.greengrass.deployment.DeviceConfiguration;
@@ -126,6 +127,8 @@ class EventFleetStatusServiceTest extends BaseITCase {
126127
private IotJobsClientWrapper mockIotJobsClientWrapper;
127128
@Mock
128129
private ThingGroupHelper thingGroupHelper;
130+
@Mock
131+
private ConnectivityValidator connectivityValidator;
129132

130133
private AtomicReference<List<FleetStatusDetails>> fleetStatusDetailsList;
131134
private final CountDownLatch mainFinished = new CountDownLatch(1);
@@ -177,6 +180,7 @@ void setupKernel(ExtensionContext context) throws Exception {
177180
EventFleetStatusServiceTest.class.getResource("onlyMain.yaml"));
178181
kernel.getContext().put(MqttClient.class, mqttClient);
179182
kernel.getContext().put(ThingGroupHelper.class, thingGroupHelper);
183+
kernel.getContext().put(ConnectivityValidator.class, connectivityValidator);
180184

181185
// Mock out cloud communication
182186
GreengrassServiceClientFactory mgscf = mock(GreengrassServiceClientFactory.class);
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
services:
2+
main:
3+
dependencies:
4+
- aws.greengrass.Nucleus
5+
aws.greengrass.Nucleus:
6+
componentType: NUCLEUS
7+
configuration:
8+
awsRegion: us-east-1
9+
greengrassDataPlanePort: 443
10+
iotCredEndpoint: xxxxxx.credentials.iot.us-east-1.amazonaws.com
11+
iotDataEndpoint: xxxxxx-ats.iot.us-east-1.amazonaws.com
12+
iotRoleAlias: roleAliasName
13+
mqtt:
14+
port: 8080
15+
runWithDefault:
16+
posixUser: nobody
17+
windowsUser: integ-tester
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package com.aws.greengrass.deployment;
7+
8+
import com.aws.greengrass.config.Configuration;
9+
import com.aws.greengrass.config.Topic;
10+
import com.aws.greengrass.config.Topics;
11+
import com.aws.greengrass.dependency.Context;
12+
import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
13+
import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException;
14+
import com.aws.greengrass.deployment.model.DeploymentResult;
15+
import com.aws.greengrass.lifecyclemanager.Kernel;
16+
import com.aws.greengrass.logging.api.Logger;
17+
import com.aws.greengrass.logging.impl.LogManager;
18+
import com.aws.greengrass.mqttclient.MqttClient;
19+
import com.aws.greengrass.security.SecurityService;
20+
import com.aws.greengrass.util.Coerce;
21+
import com.aws.greengrass.util.GreengrassServiceClientFactory;
22+
import com.aws.greengrass.util.Pair;
23+
import com.aws.greengrass.util.Utils;
24+
import lombok.AccessLevel;
25+
import lombok.Setter;
26+
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
27+
import software.amazon.awssdk.crt.mqtt.MqttException;
28+
import software.amazon.awssdk.http.SdkHttpClient;
29+
import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder;
30+
import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClient;
31+
import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceRequest;
32+
33+
import java.util.Map;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.TimeoutException;
38+
39+
public class ConnectivityValidator {
40+
private static final Logger logger = LogManager.getLogger(ConnectivityValidator.class);
41+
public static final String CLIENT_ID_SUFFIX = "#validation";
42+
private static final String WITH_NEW_CONFIG_MESSAGE_SUFFIX = " with the new configuration";
43+
private final MqttClient mqttClient;
44+
private final GreengrassServiceClientFactory factory;
45+
private final SecurityService securityService;
46+
@Setter(AccessLevel.PACKAGE) // for unit tests
47+
private DeviceConfiguration deploymentConfiguration;
48+
49+
/**
50+
* Constructor.
51+
*
52+
* @param kernel kernel to get current config and context from
53+
* @param context temporary context used for deployment configuration
54+
* @param mqttClient Used to create AWS Mqtt Connection Client
55+
* @param factory Used to create Greengrass data client
56+
* @param securityService Used for Mqtt connection builder
57+
* @param deploymentConfig Map of the configs to be merged in and validated against
58+
* @param timestamp time stamp of the deployment
59+
*/
60+
public ConnectivityValidator(Kernel kernel, Context context, MqttClient mqttClient,
61+
GreengrassServiceClientFactory factory, SecurityService securityService,
62+
Map<String, Object> deploymentConfig, long timestamp) {
63+
this.mqttClient = mqttClient;
64+
this.factory = factory;
65+
this.securityService = securityService;
66+
67+
Configuration config = new Configuration(context);
68+
// Copy the current device configuration Map to preserve time stamps
69+
config.copyFrom(kernel.getConfig());
70+
// Attempt to merge the deployment configs using the deployment time stamp
71+
config.mergeMap(timestamp, deploymentConfig);
72+
try {
73+
config.waitConfigUpdateComplete();
74+
} catch (InterruptedException e) {
75+
logger.atInfo().log("Interrupted while waiting for deployment config update to complete");
76+
Thread.currentThread().interrupt();
77+
}
78+
deploymentConfiguration = new DeviceConfiguration(config, kernel.getKernelCommandLine());
79+
/*
80+
* Need to set security service due to plugin dependency workaround
81+
* after removing Kernel from DeviceConfiguration
82+
*/
83+
deploymentConfiguration.setSecurityService(securityService);
84+
}
85+
86+
private boolean mqttClientNeedsValidation(DeviceConfiguration currentDeviceConfiguration) {
87+
return networkPoxyHasChanged(currentDeviceConfiguration) || awsRegionHasChanged(currentDeviceConfiguration)
88+
|| mqttHasChanged(currentDeviceConfiguration) || iotDataEndpointHasChanged(currentDeviceConfiguration);
89+
}
90+
91+
private boolean httpClientNeedsValidation(DeviceConfiguration currentDeviceConfiguration) {
92+
return networkPoxyHasChanged(currentDeviceConfiguration) || awsRegionHasChanged(currentDeviceConfiguration)
93+
|| greengrassDataPlaneEndpointHasChanged(currentDeviceConfiguration)
94+
|| greengrassDataPlanePortHasChanged(currentDeviceConfiguration);
95+
}
96+
97+
private boolean networkPoxyHasChanged(DeviceConfiguration currentdeviceConfiguration) {
98+
Topics currentNetworkProxy = currentdeviceConfiguration.getNetworkProxyNamespace();
99+
Topics newNetworkProxy = deploymentConfiguration.getNetworkProxyNamespace();
100+
return !Topics.compareChildren(currentNetworkProxy, newNetworkProxy);
101+
}
102+
103+
private boolean awsRegionHasChanged(DeviceConfiguration currentdeviceConfiguration) {
104+
// Defaults to empty string topic
105+
Topic currentAwsRegion = currentdeviceConfiguration.getAWSRegion();
106+
Topic newAwsRegion = deploymentConfiguration.getAWSRegion();
107+
return !Topic.compareValue(currentAwsRegion, newAwsRegion);
108+
}
109+
110+
private boolean mqttHasChanged(DeviceConfiguration currentdeviceConfiguration) {
111+
Topics currentMqtt = currentdeviceConfiguration.getMQTTNamespace();
112+
Topics newMqtt = deploymentConfiguration.getMQTTNamespace();
113+
return !Topics.compareChildren(currentMqtt, newMqtt);
114+
}
115+
116+
private boolean iotDataEndpointHasChanged(DeviceConfiguration currentdeviceConfiguration) {
117+
// Defaults to empty string topic
118+
Topic currentIotDataEndpoint = currentdeviceConfiguration.getIotDataEndpoint();
119+
Topic newIotDataEndpoint = deploymentConfiguration.getIotDataEndpoint();
120+
return !Topic.compareValue(currentIotDataEndpoint, newIotDataEndpoint);
121+
}
122+
123+
private boolean greengrassDataPlaneEndpointHasChanged(DeviceConfiguration currentdeviceConfiguration) {
124+
// Defaults to empty string topic
125+
Topic currentGreengrassDataPlaneEndpoint = currentdeviceConfiguration.getGGDataEndpoint();
126+
Topic newGreengrassDataPlaneEndpoint = deploymentConfiguration.getGGDataEndpoint();
127+
return !Topic.compareValue(currentGreengrassDataPlaneEndpoint, newGreengrassDataPlaneEndpoint);
128+
}
129+
130+
private boolean greengrassDataPlanePortHasChanged(DeviceConfiguration currentdeviceConfiguration) {
131+
Topic currentGreengrassDataPlanePort = currentdeviceConfiguration.getGreengrassDataPlanePort();
132+
Topic newGreengrassDataPlanePort = deploymentConfiguration.getGreengrassDataPlanePort();
133+
return !Topic.compareValue(currentGreengrassDataPlanePort, newGreengrassDataPlanePort);
134+
}
135+
136+
/**
137+
* Creates an MQTT client and checks if the device can connect to AWS.
138+
*
139+
* @param totallyCompleteFuture Future will be updated if validation fails
140+
*/
141+
public boolean mqttClientCanConnect(CompletableFuture<DeploymentResult> totallyCompleteFuture) {
142+
logger.atDebug().log("Checking MQTT client can connect");
143+
144+
MqttClientConnection connection = null;
145+
String message;
146+
try (AwsIotMqttConnectionBuilder builder = mqttClient.createMqttConnectionBuilder(deploymentConfiguration,
147+
securityService, null)) {
148+
String clientId = Coerce.toString(deploymentConfiguration.getThingName()) + CLIENT_ID_SUFFIX;
149+
connection = builder.withClientId(clientId).build();
150+
int operationTimeoutMillis = MqttClient.getMqttOperationTimeoutMillis(deploymentConfiguration);
151+
connection.connect().get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
152+
return true;
153+
} catch (MqttException e) {
154+
message = "Mqtt client failed to connect: " + Utils.generateFailureMessage(e);
155+
logger.atError().cause(e).log(message + WITH_NEW_CONFIG_MESSAGE_SUFFIX);
156+
} catch (TimeoutException e) {
157+
message = "Mqtt client validation timed out"; // exception has no message
158+
logger.atError().cause(e).log(message + WITH_NEW_CONFIG_MESSAGE_SUFFIX);
159+
} catch (ExecutionException e) {
160+
message = "Mqtt client validation completed exceptionally: " + Utils.generateFailureMessage(e);
161+
logger.atError().cause(e).log(message + WITH_NEW_CONFIG_MESSAGE_SUFFIX);
162+
} catch (InterruptedException e) {
163+
message = "Mqtt client connection was interrupted: " + Utils.generateFailureMessage(e);
164+
logger.atInfo().cause(e).log(message);
165+
Thread.currentThread().interrupt();
166+
} finally {
167+
if (connection != null) {
168+
connection.disconnect();
169+
connection.close();
170+
}
171+
}
172+
173+
totallyCompleteFuture
174+
.complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE,
175+
new ComponentConfigurationValidationException(message,
176+
DeploymentErrorCode.NUCLEUS_CONNECTIVITY_CONFIG_NOT_VALID)));
177+
return false;
178+
}
179+
180+
/**
181+
* Creates an HTTP client and checks if the device can connect to AWS.
182+
*
183+
* @param totallyCompleteFuture Future will be updated if validation fails
184+
*/
185+
@SuppressWarnings("PMD.AvoidCatchingGenericException")
186+
public boolean httpClientCanConnect(CompletableFuture<DeploymentResult> totallyCompleteFuture) {
187+
logger.atDebug().log("Checking HTTP client can connect");
188+
189+
String message;
190+
Pair<SdkHttpClient, GreengrassV2DataClient> pair = factory.createClientFromConfig(deploymentConfiguration);
191+
try (SdkHttpClient httpClient = pair.getLeft();
192+
GreengrassV2DataClient greengrassV2DataClient = pair.getRight()) {
193+
ListThingGroupsForCoreDeviceRequest request = ListThingGroupsForCoreDeviceRequest.builder()
194+
.coreDeviceThingName(Coerce.toString(deploymentConfiguration.getThingName())).build();
195+
greengrassV2DataClient.listThingGroupsForCoreDevice(request);
196+
return true;
197+
} catch (Exception e) {
198+
message = "HTTP client validation failed due to: " + Utils.generateFailureMessage(e);
199+
logger.atError().cause(e).log(message + WITH_NEW_CONFIG_MESSAGE_SUFFIX);
200+
}
201+
202+
totallyCompleteFuture
203+
.complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE,
204+
new ComponentConfigurationValidationException(message,
205+
DeploymentErrorCode.NUCLEUS_CONNECTIVITY_CONFIG_NOT_VALID)));
206+
return false;
207+
}
208+
209+
/**
210+
* Perform connectivity validation if configs have changed meaningfully.
211+
* Checks if validation is enabled
212+
* If MQTT related configurations have been changed validate MQTT connectivity
213+
* Then if HTTP related configurations have been changed validate HTTP connectivity
214+
*
215+
* @param totallyCompleteFuture Future will be updated if validation fails
216+
* @param currentDeviceConfiguration current configs the device is using
217+
*/
218+
@SuppressWarnings("PMD.SimplifyBooleanReturns")
219+
public boolean validate(CompletableFuture<DeploymentResult> totallyCompleteFuture,
220+
DeviceConfiguration currentDeviceConfiguration) {
221+
boolean validationEnabled = deploymentConfiguration.isConnectivityValidationEnabled();
222+
boolean configuredToTalkToCloud = deploymentConfiguration.isDeviceConfiguredToTalkToCloud();
223+
if (!validationEnabled || !configuredToTalkToCloud) {
224+
logger.atDebug().log("Skipping connectivity validation");
225+
return true;
226+
}
227+
boolean needsValidation;
228+
needsValidation = mqttClientNeedsValidation(currentDeviceConfiguration);
229+
if (needsValidation && !mqttClientCanConnect(totallyCompleteFuture)) {
230+
return false;
231+
}
232+
needsValidation = httpClientNeedsValidation(currentDeviceConfiguration);
233+
if (needsValidation && !httpClientCanConnect(totallyCompleteFuture)) {
234+
return false;
235+
}
236+
return true;
237+
}
238+
}

src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88

99
import com.aws.greengrass.config.Topics;
10+
import com.aws.greengrass.dependency.Context;
1011
import com.aws.greengrass.dependency.Context.Value;
1112
import com.aws.greengrass.dependency.State;
1213
import com.aws.greengrass.deployment.activator.DeploymentActivator;
@@ -27,11 +28,16 @@
2728
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
2829
import com.aws.greengrass.logging.api.Logger;
2930
import com.aws.greengrass.logging.impl.LogManager;
31+
import com.aws.greengrass.mqttclient.MqttClient;
32+
import com.aws.greengrass.security.SecurityService;
3033
import com.aws.greengrass.util.Coerce;
34+
import com.aws.greengrass.util.GreengrassServiceClientFactory;
35+
import com.aws.greengrass.util.Utils;
3136
import lombok.AccessLevel;
3237
import lombok.AllArgsConstructor;
3338
import lombok.Getter;
3439
import software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction;
40+
import software.amazon.awssdk.services.greengrassv2.model.DeploymentConfigurationValidationPolicy;
3541

3642
import java.util.Collection;
3743
import java.util.HashMap;
@@ -66,6 +72,9 @@ public class DeploymentConfigMerger {
6672
private Kernel kernel;
6773
private DeviceConfiguration deviceConfiguration;
6874
private DynamicComponentConfigurationValidator validator;
75+
private MqttClient mqttClient;
76+
private GreengrassServiceClientFactory factory;
77+
private SecurityService securityService;
6978

7079
/**
7180
* Merge in new configuration values and new services.
@@ -112,6 +121,7 @@ public Future<DeploymentResult> mergeInNewConfig(Deployment deployment,
112121
return totallyCompleteFuture;
113122
}
114123

124+
@SuppressWarnings({"PMD.AvoidCatchingGenericException", "PMD.AvoidCatchingThrowable", "PMD.PreserveStackTrace"})
115125
private void updateActionForDeployment(Map<String, Object> newConfig, Deployment deployment,
116126
DeploymentActivator activator,
117127
CompletableFuture<DeploymentResult> totallyCompleteFuture) {
@@ -147,6 +157,28 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
147157
return;
148158
}
149159

160+
// As long as the timeout is not 0, we will try to run the connectivity validation
161+
DeploymentConfigurationValidationPolicy policy = deployment.getDeploymentDocumentObj()
162+
.getConfigurationValidationPolicy();
163+
if (policy != null && Coerce.toInt(policy.timeoutInSeconds()) != 0) {
164+
try (Context context = new Context()) {
165+
ConnectivityValidator connectivityValidator = new ConnectivityValidator(kernel, context, mqttClient,
166+
factory, securityService, newConfig, deployment.getDeploymentDocumentObj().getTimestamp());
167+
if (!connectivityValidator.validate(totallyCompleteFuture, deviceConfiguration)) {
168+
return;
169+
}
170+
} catch (Throwable e) {
171+
String message = "Unexpected exception during connectivity validation: "
172+
+ Utils.generateFailureMessage(e);
173+
logger.atInfo().cause(e).log(message);
174+
totallyCompleteFuture
175+
.complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE,
176+
new ComponentConfigurationValidationException(message,
177+
DeploymentErrorCode.NUCLEUS_CONNECTIVITY_CONFIG_NOT_VALID)));
178+
return;
179+
}
180+
}
181+
150182
logger.atInfo(MERGE_CONFIG_EVENT_KEY).kv("deployment", deploymentId)
151183
.log("Applying deployment changes, deployment cannot be cancelled now");
152184
activator.activate(newConfig, deployment, totallyCompleteFuture);

0 commit comments

Comments
 (0)