diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/AggregateDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/AggregateDefinition.java index 163e57368d53d..88276c4923959 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -90,6 +90,9 @@ public class AggregateDefinition extends OutputDefinition @Metadata(javaType = "java.lang.Boolean") private String optimisticLocking; @XmlAttribute + @Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue = "false") + private String syncOptimisticRetry; + @XmlAttribute @Metadata(label = "advanced", javaType = "java.util.concurrent.ExecutorService") private String executorService; @XmlAttribute @@ -173,6 +176,7 @@ protected AggregateDefinition(AggregateDefinition source) { ? source.optimisticLockRetryPolicyDefinition.copyDefinition() : null; this.parallelProcessing = source.parallelProcessing; this.optimisticLocking = source.optimisticLocking; + this.syncOptimisticRetry = source.syncOptimisticRetry; this.executorService = source.executorService; this.timeoutCheckerExecutorService = source.timeoutCheckerExecutorService; this.aggregateController = source.aggregateController; @@ -501,6 +505,14 @@ public void setOptimisticLocking(String optimisticLocking) { this.optimisticLocking = optimisticLocking; } + public String getSyncOptimisticRetry() { + return syncOptimisticRetry; + } + + public void setSyncOptimisticRetry(String syncOptimisticRetry) { + this.syncOptimisticRetry = syncOptimisticRetry; + } + public String getParallelProcessing() { return parallelProcessing; } @@ -1004,6 +1016,16 @@ public AggregateDefinition optimisticLocking() { return this; } + /** + * When optimistic locking is enabled, retries happen synchronously in the same thread instead of being scheduled on + * a background thread. This preserves transaction context for repositories that require single-thread transactional + * guarantees. + */ + public AggregateDefinition syncOptimisticRetry() { + setSyncOptimisticRetry(Boolean.toString(true)); + return this; + } + /** * Allows to configure retry settings when using optimistic locking. */ diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 4b5298b6c9bfb..91a9a71f5732b 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -231,6 +231,7 @@ public void setStatisticsEnabled(boolean statisticsEnabled) { private Integer closeCorrelationKeyOnCompletion; private boolean parallelProcessing; private boolean optimisticLocking; + private boolean syncOptimisticRetry; // different ways to have completion triggered private boolean eagerCheckCompletion; @@ -374,6 +375,20 @@ protected boolean doInOptimisticLock(Exchange exchange, String key, AsyncCallbac "On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to aggregate exchange: {}", attempt, aggregationRepository, exchange, e); if (optimisticLockRetryPolicy.shouldRetry(attempt)) { + if (syncOptimisticRetry) { + // Synchronous retry: delay in the same thread instead of + // scheduling on a background thread. This ensures aggregation + // stays within a single thread (e.g. for transactional processing). + try { + optimisticLockRetryPolicy.doDelay(attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + exchange.setException(ie); + callback.done(sync); + return sync; + } + continue; + } long delay = optimisticLockRetryPolicy.getDelay(attempt); if (delay > 0) { int nextAttempt = attempt; @@ -1126,6 +1141,14 @@ public void setOptimisticLocking(boolean optimisticLocking) { this.optimisticLocking = optimisticLocking; } + public boolean isSyncOptimisticRetry() { + return syncOptimisticRetry; + } + + public void setSyncOptimisticRetry(boolean syncOptimisticRetry) { + this.syncOptimisticRetry = syncOptimisticRetry; + } + public AggregationRepository getAggregationRepository() { return aggregationRepository; } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java index d3f2f4744e675..b11e8e0fc9e80 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java @@ -137,6 +137,10 @@ && parseBoolean(definition.getDiscardOnAggregationFailure(), false)) { if (optimisticLocking != null) { answer.setOptimisticLocking(optimisticLocking); } + Boolean syncOptimisticRetry = parseBoolean(definition.getSyncOptimisticRetry()); + if (syncOptimisticRetry != null) { + answer.setSyncOptimisticRetry(syncOptimisticRetry); + } if (definition.getCompletionPredicate() != null) { Predicate predicate = createPredicate(definition.getCompletionPredicate()); answer.setCompletionPredicate(predicate); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateOptimisticLockSyncRetryTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateOptimisticLockSyncRetryTest.java new file mode 100644 index 0000000000000..c704bf23e0b9b --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateOptimisticLockSyncRetryTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.BodyInAggregatingStrategy; +import org.apache.camel.processor.aggregate.MemoryAggregationRepository; +import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests that optimistic locking retries happen synchronously in the same thread when syncOptimisticRetry is enabled. + */ +public class AggregateOptimisticLockSyncRetryTest extends ContextTestSupport { + + private static final int FAIL_FIRST_N_ATTEMPTS = 3; + + private final AtomicInteger addCounter = new AtomicInteger(); + private volatile String aggregateThreadName; + + /** + * Repository that throws OptimisticLockingException for the first N attempts, then succeeds. + */ + private final MemoryAggregationRepository repository = new MemoryAggregationRepository(true) { + @Override + public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) { + int count = addCounter.incrementAndGet(); + // Record the thread name on every attempt + aggregateThreadName = Thread.currentThread().getName(); + if (count <= FAIL_FIRST_N_ATTEMPTS) { + throw new OptimisticLockingException(); + } + return super.add(camelContext, key, oldExchange, newExchange); + } + }; + + @Test + public void testSyncRetryHappensInSameThread() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + String callerThread = Thread.currentThread().getName(); + + template.sendBodyAndHeader("direct:start", "A", "id", 1); + template.sendBodyAndHeader("direct:start", "B", "id", 1); + + mock.assertIsSatisfied(); + + // The repository should have been called more than FAIL_FIRST_N_ATTEMPTS times + // (the first N fail, then succeed) + assertTrue(addCounter.get() > FAIL_FIRST_N_ATTEMPTS, + "Expected more than " + FAIL_FIRST_N_ATTEMPTS + " attempts, got " + addCounter.get()); + + // Since syncOptimisticRetry is enabled, the retry should happen in a Camel thread + // (the route's thread), NOT in the AggregateOptimisticLockingExecutor thread pool. + // The key assertion is that the thread name does NOT contain the async executor name. + if (aggregateThreadName != null) { + assertFalse(aggregateThreadName.contains("AggregateOptimisticLockingExecutor"), + "Expected synchronous retry but found async executor thread: " + aggregateThreadName); + } + } + + @Test + public void testSyncRetryCompletes() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("A+B"); + + template.sendBodyAndHeader("direct:start", "A", "id", 1); + + // Reset counter so next message triggers failures and retries + addCounter.set(0); + template.sendBodyAndHeader("direct:start", "B", "id", 1); + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .aggregate(header("id"), new BodyInAggregatingStrategy()) + .aggregationRepository(repository) + .optimisticLocking() + .syncOptimisticRetry() + .optimisticLockRetryPolicy( + new OptimisticLockRetryPolicy().maximumRetries(10).retryDelay(0)) + .completionSize(2) + .to("mock:result"); + } + }; + } +}