Skip to content

Commit f172b38

Browse files
committed
implement persisting DeviceToClientMapRepresentation for [#381](#381)
1 parent f81d0b7 commit f172b38

File tree

9 files changed

+33
-10
lines changed

9 files changed

+33
-10
lines changed

dynamic-mapper-service/src/main/java/dynamic/mapper/connector/core/client/AConnectorClient.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ public CompletableFuture<Void> submitDisconnect() {
321321
}
322322

323323
public void submitHousekeeping() {
324-
log.debug("{} - Starting housekeeping...", tenant);
324+
log.debug("{} - Starting housekeeping for {} ...", tenant, connectorName);
325325
housekeepingExecutor.scheduleAtFixedRate(
326326
this::runHousekeeping,
327327
0,
@@ -750,9 +750,7 @@ private void performHousekeepingTasks() throws Exception {
750750
monitorSubscriptions();
751751
}
752752

753-
protected void connectorSpecificHousekeeping(String tenant) {
754-
// Implement in specific connector if needed
755-
}
753+
protected abstract void connectorSpecificHousekeeping(String tenant);
756754

757755
private void logHousekeepingStatus(Instant now) {
758756
if (Duration.between(start, now).getSeconds() < 1800) {

dynamic-mapper-service/src/main/java/dynamic/mapper/connector/http/HttpClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,10 @@ public void onMessage(ConnectorMessage message) {
231231
dispatcher.onMessage(message);
232232
}
233233

234+
@Override
235+
public void connectorSpecificHousekeeping(String tenant) {
236+
}
237+
234238
@Override
235239
public List<Direction> supportedDirections() {
236240
return new ArrayList<>(Arrays.asList(Direction.INBOUND));

dynamic-mapper-service/src/main/java/dynamic/mapper/connector/kafka/KafkaClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,10 @@ public boolean isConfigValid(ConnectorConfiguration configuration) {
392392
return true;
393393
}
394394

395+
@Override
396+
public void connectorSpecificHousekeeping(String tenant) {
397+
}
398+
395399
@Override
396400
public void publishMEAO(ProcessingContext<?> context) {
397401
C8YRequest currentRequest = context.getCurrentRequest();

dynamic-mapper-service/src/main/java/dynamic/mapper/connector/mqtt/MQTT3Client.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,10 @@ public void monitorSubscriptions() {
563563
// nothing to do
564564
}
565565

566+
@Override
567+
public void connectorSpecificHousekeeping(String tenant) {
568+
}
569+
566570
@Override
567571
public List<Direction> supportedDirections() {
568572
return new ArrayList<>(Arrays.asList(Direction.INBOUND, Direction.OUTBOUND));

dynamic-mapper-service/src/main/java/dynamic/mapper/connector/mqtt/MQTT5Client.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,10 @@ public void monitorSubscriptions() {
563563
// nothing to do
564564
}
565565

566+
@Override
567+
public void connectorSpecificHousekeeping(String tenant) {
568+
}
569+
566570
@Override
567571
public List<Direction> supportedDirections() {
568572
return new ArrayList<>(Arrays.asList(Direction.INBOUND, Direction.OUTBOUND));

dynamic-mapper-service/src/main/java/dynamic/mapper/connector/pulsar/MQTTServicePulsarClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,7 @@ public void unsubscribe(String topic) throws Exception {
476476
log.info("{} - Unsubscription registered for topic: [{}]", tenant, topic);
477477
}
478478

479+
479480
@Override
480481
public void connectorSpecificHousekeeping(String tenant) {
481482
mappingService.sendDeviceToClientMap(tenant);

dynamic-mapper-service/src/main/java/dynamic/mapper/connector/pulsar/PulsarConnectorClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,10 @@ private void sendMessageWithQos(Producer<byte[]> producer, String payload, Qos q
810810
}
811811
}
812812

813+
@Override
814+
public void connectorSpecificHousekeeping(String tenant) {
815+
}
816+
813817
/**
814818
* Handles publish errors with connection recovery
815819
*/

dynamic-mapper-service/src/main/java/dynamic/mapper/connector/webhook/WebHook.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,10 @@ public JsonNode mergeNodes(JsonNode existingNode, JsonNode updateNode) {
615615
return result;
616616
}
617617

618+
@Override
619+
public void connectorSpecificHousekeeping(String tenant) {
620+
}
621+
618622
// Helper method for deep merging of objects - unlike mergeNodes, this preserves
619623
// all fields
620624
private JsonNode deepMergeObjects(JsonNode existingObj, JsonNode updateObj) {

dynamic-mapper-service/src/main/java/dynamic/mapper/service/MappingService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,17 +228,17 @@ public void sendMappingStatus(String tenant) {
228228
}
229229

230230
public void sendDeviceToClientMap(String tenant) {
231-
if (configurationRegistry.getAllClientMappings(tenant) == null
232-
& configurationRegistry.getAllClientMappings(tenant).size() >= 0) {
231+
Map<String, String> clientToDeviceMap = configurationRegistry.getAllClientMappings(tenant);
232+
if (clientToDeviceMap != null
233+
&& clientToDeviceMap.size() >= 0) {
233234
subscriptionsService.runForTenant(tenant, () -> {
234235
String deviceToClientMapId = configurationRegistry
235236
.getDeviceToClientMapId(tenant);
236-
Map<String, String> clientToDeviceMap = configurationRegistry.getAllClientMappings(tenant);
237237

238-
log.debug("{} - Sending Device To Client Map: {}", tenant, clientToDeviceMap.values().size());
239-
238+
log.debug("{} - Sending Device To Client Map: {}", tenant, clientToDeviceMap.size());
239+
240240
Map<String, Object> fragment = new ConcurrentHashMap<String, Object>();
241-
fragment.put(DeviceToClientMapRepresentation.DEVICE_TO_CLIENT_MAP_FRAGMENT, fragment);
241+
fragment.put(DeviceToClientMapRepresentation.DEVICE_TO_CLIENT_MAP_FRAGMENT, clientToDeviceMap);
242242
ManagedObjectRepresentation updateMor = new ManagedObjectRepresentation();
243243
updateMor.setId(GId.asGId(deviceToClientMapId));
244244
updateMor.setAttrs(fragment);

0 commit comments

Comments
 (0)