Skip to content

Commit e1238c9

Browse files
api to create or delete all kafka topics for app shared subs entities
1 parent 7984476 commit e1238c9

File tree

3 files changed

+46
-24
lines changed

3 files changed

+46
-24
lines changed

application/src/main/java/org/thingsboard/mqtt/broker/controller/AppSharedSubscriptionController.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.springframework.web.bind.annotation.RequestParam;
2929
import org.springframework.web.bind.annotation.RestController;
3030
import org.thingsboard.mqtt.broker.common.data.ApplicationSharedSubscription;
31+
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
3132
import org.thingsboard.mqtt.broker.common.data.exception.ThingsboardErrorCode;
3233
import org.thingsboard.mqtt.broker.common.data.exception.ThingsboardException;
3334
import org.thingsboard.mqtt.broker.common.data.page.PageData;
@@ -36,6 +37,8 @@
3637
import org.thingsboard.mqtt.broker.dao.topic.TopicValidationService;
3738
import org.thingsboard.mqtt.broker.service.mqtt.persistence.application.topic.ApplicationTopicService;
3839

40+
import java.util.function.Consumer;
41+
3942
@RestController
4043
@RequiredArgsConstructor
4144
@RequestMapping("/api/app/shared/subs")
@@ -106,4 +109,39 @@ public void deleteSharedSubscription(@PathVariable String id) throws Thingsboard
106109
applicationTopicService.deleteSharedTopic(sharedSubscription);
107110
}
108111
}
112+
113+
@PreAuthorize("hasAuthority('SYS_ADMIN')")
114+
@PostMapping(value = "/createTopics")
115+
public void createKafkaTopicsForAppSharedSubscriptions() {
116+
processAppSharedSubsRequest(applicationTopicService::createSharedTopic, "create");
117+
}
118+
119+
@PreAuthorize("hasAuthority('SYS_ADMIN')")
120+
@DeleteMapping(value = "/deleteTopics")
121+
public void deleteKafkaTopicsForAppSharedSubscriptions() {
122+
if (!enableTopicDeletion) {
123+
log.debug("Cannot delete topics due to TB_KAFKA_ENABLE_TOPIC_DELETION is set to false");
124+
return;
125+
}
126+
processAppSharedSubsRequest(applicationTopicService::deleteSharedTopic, "delete");
127+
}
128+
129+
private void processAppSharedSubsRequest(Consumer<ApplicationSharedSubscription> consumer, String request) {
130+
PageLink pageLink = new PageLink(BrokerConstants.DEFAULT_PAGE_SIZE);
131+
PageData<ApplicationSharedSubscription> pageData;
132+
do {
133+
pageData = applicationSharedSubscriptionService.getSharedSubscriptions(pageLink);
134+
pageData.getData().forEach(sharedSubscription -> {
135+
try {
136+
consumer.accept(sharedSubscription);
137+
} catch (Exception e) {
138+
log.error("Failed to {} kafka topic for shared subscription {}", request, sharedSubscription, e);
139+
}
140+
});
141+
if (pageData.hasNext()) {
142+
pageLink = pageLink.nextPageLink();
143+
}
144+
} while (pageData.hasNext());
145+
}
146+
109147
}

application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/client/cleanup/ClientSessionCleanUpServiceImpl.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,27 +51,21 @@ public class ClientSessionCleanUpServiceImpl implements ClientSessionCleanUpServ
5151

5252
@Override
5353
public void removeClientSession(String clientId, UUID sessionId) throws ThingsboardException {
54-
if (log.isTraceEnabled()) {
55-
log.trace("[{}] Removing ClientSession.", clientId);
56-
}
54+
log.trace("[{}] Removing ClientSession.", clientId);
5755
ClientSessionInfo clientSessionInfo = clientSessionCache.getClientSessionInfo(clientId);
5856
if (clientSessionInfo == null || differentSession(sessionId, clientSessionInfo)) {
5957
throw new ThingsboardException("No such client session", ThingsboardErrorCode.ITEM_NOT_FOUND);
6058
}
6159
if (clientSessionInfo.isConnected()) {
6260
throw new ThingsboardException("Client is currently connected", ThingsboardErrorCode.GENERAL);
6361
}
64-
if (log.isDebugEnabled()) {
65-
log.debug("[{}] Cleaning up client session.", clientId);
66-
}
62+
log.debug("[{}] Cleaning up client session.", clientId);
6763
clientSessionEventService.requestSessionCleanup(ClientSessionInfoFactory.clientSessionInfoToSessionInfo(clientSessionInfo));
6864
}
6965

7066
@Override
7167
public void disconnectClientSession(String clientId, UUID sessionId) throws ThingsboardException {
72-
if (log.isTraceEnabled()) {
73-
log.trace("[{}][{}] Disconnecting ClientSession.", clientId, sessionId);
74-
}
68+
log.trace("[{}][{}] Disconnecting ClientSession.", clientId, sessionId);
7569
ClientSessionInfo clientSessionInfo = clientSessionCache.getClientSessionInfo(clientId);
7670
if (clientSessionInfo == null || differentSession(sessionId, clientSessionInfo)) {
7771
throw new ThingsboardException("No such client session", ThingsboardErrorCode.ITEM_NOT_FOUND);
@@ -85,9 +79,7 @@ public void disconnectClientSession(String clientId, UUID sessionId) throws Thin
8579

8680
@Override
8781
public void disconnectClientSession(String clientId) {
88-
if (log.isTraceEnabled()) {
89-
log.trace("[{}] Disconnecting ClientSession", clientId);
90-
}
82+
log.trace("[{}] Disconnecting ClientSession", clientId);
9183
ClientSessionInfo clientSessionInfo = clientSessionCache.getClientSessionInfo(clientId);
9284
if (clientSessionInfo == null || !clientSessionInfo.isConnected()) {
9385
return;

application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/persistence/application/topic/ApplicationTopicServiceImpl.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ public class ApplicationTopicServiceImpl implements ApplicationTopicService {
4646

4747
@Override
4848
public String createTopic(String clientId) {
49-
if (log.isDebugEnabled()) {
50-
log.debug("[{}] Creating APPLICATION topic", clientId);
51-
}
49+
log.debug("[{}] Creating APPLICATION topic", clientId);
5250
String clientTopic = appClientHelperService.getAppTopic(clientId, validateClientId);
5351
queueAdmin.createTopic(clientTopic, applicationPersistenceMsgQueueFactory.getTopicConfigs());
5452
return clientTopic;
@@ -57,9 +55,7 @@ public String createTopic(String clientId) {
5755
@Override
5856
public void createSharedTopic(ApplicationSharedSubscription subscription) {
5957
String topic = subscription.getTopicFilter();
60-
if (log.isDebugEnabled()) {
61-
log.debug("[{}] Creating shared APPLICATION topic", topic);
62-
}
58+
log.debug("[{}] Creating shared APPLICATION topic", topic);
6359

6460
final var topicToCreate = appClientHelperService.getSharedAppTopic(topic, validateSharedTopicFilter);
6561

@@ -70,9 +66,7 @@ public void createSharedTopic(ApplicationSharedSubscription subscription) {
7066

7167
@Override
7268
public void deleteTopic(String clientId, BasicCallback callback) {
73-
if (log.isDebugEnabled()) {
74-
log.debug("[{}] Deleting APPLICATION topic", clientId);
75-
}
69+
log.debug("[{}] Deleting APPLICATION topic", clientId);
7670
String clientTopic = appClientHelperService.getAppTopic(clientId, validateClientId);
7771
queueAdmin.deleteTopic(clientTopic, callback);
7872
String consumerGroup = appClientHelperService.getAppConsumerGroup(clientId);
@@ -82,9 +76,7 @@ public void deleteTopic(String clientId, BasicCallback callback) {
8276
@Override
8377
public void deleteSharedTopic(ApplicationSharedSubscription subscription) {
8478
String topic = subscription.getTopicFilter();
85-
if (log.isDebugEnabled()) {
86-
log.debug("[{}] Deleting shared APPLICATION topic", topic);
87-
}
79+
log.debug("[{}] Deleting shared APPLICATION topic", topic);
8880

8981
final var topicToDelete = appClientHelperService.getSharedAppTopic(topic, validateSharedTopicFilter);
9082

0 commit comments

Comments
 (0)