Skip to content

Commit 9f1c33f

Browse files
committed
Fix registry subscription issue.
1 parent df689b5 commit 9f1c33f

File tree

2 files changed

+63
-33
lines changed

2 files changed

+63
-33
lines changed

joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/RegistryConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,9 @@ public class RegistryConfig {
4848
public boolean isRegisterAppServiceEnabled() {
4949
return registerAppServiceEnabled && enabled;
5050
}
51+
52+
public boolean isEmpty() {
53+
return clusters == null || clusters.isEmpty();
54+
}
5155
}
5256

joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/registry/LiveRegistry.java

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,13 @@ public class LiveRegistry extends AbstractService
112112
// fix for eureka
113113
private final Map<String, Subscription> subscriptions = new CaseInsensitiveConcurrentMap<>();
114114

115+
private final Map<ServiceId, Optional<Consumer<RegistryEvent>>> pendings = new ConcurrentHashMap<>();
116+
115117
// add listener to warmup for spring cloud
116118
private final Set<Consumer<List<ServiceId>>> listeners = new CopyOnWriteArraySet<>();
117119

120+
private final AtomicBoolean prepare = new AtomicBoolean(false);
121+
118122
private final AtomicBoolean ready = new AtomicBoolean(false);
119123

120124
@Override
@@ -396,10 +400,14 @@ private ServiceId customize(ServiceId serviceId, ServiceRole role) {
396400
* @see RegistryEvent
397401
*/
398402
private void doSubscribe(ServiceId serviceId, Consumer<RegistryEvent> consumer) {
399-
// CaseInsensitiveConcurrentHashMap
400-
Subscription subscription = subscriptions.computeIfAbsent(serviceId.getUniqueName(), s -> createSubscription(serviceId));
401-
subscription.addConsumer(consumer);
402-
subscription.subscribe();
403+
if (!prepare.get()) {
404+
pendings.put(serviceId, Optional.ofNullable(consumer));
405+
} else {
406+
// CaseInsensitiveConcurrentHashMap
407+
Subscription subscription = subscriptions.computeIfAbsent(serviceId.getUniqueName(), s -> createSubscription(serviceId));
408+
subscription.addConsumer(consumer);
409+
subscription.subscribe();
410+
}
403411
}
404412

405413
/**
@@ -426,39 +434,57 @@ private void startCluster(RegistryService registry) throws Exception {
426434
*/
427435
private void onAppEnvironmentPrepared(AppEnvironment environment) {
428436
// start registries in environment prepared event
437+
try {
438+
registries = createRegistry(environment);
439+
prepare.set(true);
440+
pendings.forEach((id, optional) -> {
441+
doSubscribe(id, optional.orElse(null));
442+
});
443+
pendings.clear();
444+
} catch (Throwable e) {
445+
registries = new ArrayList<>();
446+
publisher.offer(AgentEvent.onAgentFailure("Failed to start registry cluster.", e));
447+
}
448+
}
449+
450+
/**
451+
* Creates registry service instances based on configuration.
452+
*
453+
* @param environment The application environment containing configuration properties
454+
* @return A list of initialized registry service instances
455+
* @throws Exception If any error occurs during registry creation or initialization
456+
*/
457+
private List<RegistryService> createRegistry(AppEnvironment environment) throws Exception {
458+
List<RegistryService> result = new ArrayList<>();
459+
if (!registryConfig.isEnabled() || registryConfig.isEmpty()) {
460+
return result;
461+
}
462+
// source config
463+
List<Map<String, Object>> maps = option.getObject(GovernanceConfig.CONFIG_REGISTRY_CLUSTERS);
464+
AppEnvironmentOption env = new AppEnvironmentOption(environment);
429465
List<RegistryClusterConfig> clusters = registryConfig.getClusters();
430-
List<RegistryService> registries = new ArrayList<>();
431-
if (registryConfig.isEnabled() && clusters != null) {
432-
try {
433-
// source config
434-
List<Map<String, Object>> maps = option.getObject(GovernanceConfig.CONFIG_REGISTRY_CLUSTERS);
435-
AppEnvironmentOption env = new AppEnvironmentOption(environment);
436-
for (int i = 0; i < clusters.size(); i++) {
437-
// reinject config by app envrionment
438-
RegistryClusterConfig cluster = clusters.get(i);
439-
cluster.supplement(MapOption.of(maps.get(i)), env);
440-
// validate
441-
if (cluster.validate()) {
442-
// create registry service
443-
RegistryFactory factory = factories.get(cluster.getType());
444-
if (factory == null) {
445-
throw new RegistryException("registry type " + cluster.getType() + " is not supported");
446-
}
447-
registries.add(factory.create(cluster));
448-
}
449-
}
450-
if (!registries.isEmpty()) {
451-
for (RegistryService registry : registries) {
452-
startCluster(registry);
453-
}
454-
} else {
455-
logger.warn("No registry cluster is configured.");
466+
for (int i = 0; i < clusters.size(); i++) {
467+
// reinject config by app envrionment
468+
RegistryClusterConfig cluster = clusters.get(i);
469+
cluster.supplement(MapOption.of(maps.get(i)), env);
470+
// validate
471+
if (cluster.validate()) {
472+
// create registry service
473+
RegistryFactory factory = factories.get(cluster.getType());
474+
if (factory == null) {
475+
throw new RegistryException("registry type " + cluster.getType() + " is not supported");
456476
}
457-
} catch (Throwable e) {
458-
publisher.offer(AgentEvent.onAgentFailure("Failed to start registry cluster.", e));
477+
result.add(factory.create(cluster));
459478
}
460479
}
461-
this.registries = registries;
480+
if (!result.isEmpty()) {
481+
for (RegistryService registry : result) {
482+
startCluster(registry);
483+
}
484+
} else {
485+
logger.warn("No registry cluster is configured.");
486+
}
487+
return result;
462488
}
463489

464490
/**

0 commit comments

Comments
 (0)