diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index cfdd259e1e667..ab77195d963ba 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -23,6 +23,7 @@ import java.net.URI; import java.time.Duration; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -31,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import org.apache.camel.CamelException; @@ -108,6 +111,7 @@ public class SubscriptionHelper extends ServiceSupport { = new ConcurrentHashMap<>(); private final Set channelsToSubscribe = ConcurrentHashMap.newKeySet(); + private final Lock channelsLock = new ReentrantLock(); private final ClientSessionChannel.MessageListener handshakeListener = createHandshakeListener(); @@ -133,7 +137,7 @@ private MessageListener createHandshakeListener() { if (handshakeError != null) { if (handshakeError.startsWith("403::")) { String failureReason = getFailureReason(message); - if (failureReason.equals(AUTHENTICATION_INVALID)) { + if (AUTHENTICATION_INVALID.equals(failureReason)) { LOG.debug( "attempting login due to handshake error: 403 -> 401::Authentication invalid"); session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff); @@ -144,8 +148,13 @@ private MessageListener createHandshakeListener() { LOG.debug("Handshake failed, so try again."); client.handshake(); } else if (!channelToConsumers.isEmpty()) { - channelsToSubscribe.clear(); - channelsToSubscribe.addAll(channelToConsumers.keySet()); + channelsLock.lock(); + try { + channelsToSubscribe.clear(); + channelsToSubscribe.addAll(channelToConsumers.keySet()); + } finally { + channelsLock.unlock(); + } LOG.info("Handshake successful. Channels to subscribe: {}", channelsToSubscribe); } }); @@ -154,6 +163,9 @@ private MessageListener createHandshakeListener() { private MessageListener createConnectionListener() { return (channel, message) -> component.getHttpClient().getWorkerPool().execute(() -> { LOG.debug("[CHANNEL:META_CONNECT]: {}", message); + String reconnectAdvice = message.getAdvice() != null + ? (String) message.getAdvice().get("reconnect") + : null; if (!message.isSuccessful()) { LOG.warn("Connect failure: {}", message); @@ -165,15 +177,35 @@ private MessageListener createConnectionListener() { LOG.debug("Attempting login..."); session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff); } - if (message.getAdvice() == null || "none".equals(message.getAdvice().get("reconnect"))) { - LOG.debug("Advice == none, so handshaking"); + // Per Bayeux spec: handshake on null advice, "none", "handshake", or any non-"retry" value. + // When advice is "retry", the CometD client handles reconnection automatically. + if (reconnectAdvice == null || !"retry".equals(reconnectAdvice)) { + LOG.debug("Reconnect advice [{}] on failed connect, initiating handshake", reconnectAdvice); + client.handshake(); + } else if (isTemporaryError(message)) { + LOG.debug("Initiating handshake after temporary error: {}", message); client.handshake(); } - } else if (!channelsToSubscribe.isEmpty()) { - LOG.info("Subscribing to channels: {}", channelsToSubscribe); - for (var channelName : channelsToSubscribe) { - for (var consumer : channelToConsumers.get(channelName)) { - subscribe(consumer); + } else if (reconnectAdvice != null && !"retry".equals(reconnectAdvice)) { + LOG.warn("Reconnect advice [{}] on successful connect, initiating handshake", reconnectAdvice); + client.handshake(); + } else { + Set toSubscribe = null; + channelsLock.lock(); + try { + if (!channelsToSubscribe.isEmpty()) { + toSubscribe = new HashSet<>(channelsToSubscribe); + channelsToSubscribe.clear(); + } + } finally { + channelsLock.unlock(); + } + if (toSubscribe != null) { + LOG.info("Subscribing to channels: {}", toSubscribe); + for (var channelName : toSubscribe) { + for (var consumer : channelToConsumers.get(channelName)) { + subscribe(consumer); + } } } } @@ -448,7 +480,6 @@ public void subscribe(StreamingApiConsumer consumer) { // create subscription for consumer final String channelName = getChannelName(consumer.getTopicName()); channelToConsumers.computeIfAbsent(channelName, key -> ConcurrentHashMap.newKeySet()).add(consumer); - channelsToSubscribe.add(channelName); setReplayIdIfAbsent(consumer.getEndpoint());