Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public class AggregateDefinition extends OutputDefinition<AggregateDefinition>
@Metadata(javaType = "java.lang.Boolean")
private String optimisticLocking;
@XmlAttribute
@Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue = "false")
private String syncOptimisticRetry;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit / naming: consider optimisticLockingSyncRetry (or optimisticLockRetrySync) to group it visually with the existing optimisticLocking field in docs and configuration. syncOptimisticRetry reads a bit ambiguously ("synchronize" vs "synchronous"). Not a blocker — just a readability suggestion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the suggested rename of the option

@XmlAttribute
@Metadata(label = "advanced", javaType = "java.util.concurrent.ExecutorService")
private String executorService;
@XmlAttribute
Expand Down Expand Up @@ -173,6 +176,7 @@ protected AggregateDefinition(AggregateDefinition source) {
? source.optimisticLockRetryPolicyDefinition.copyDefinition() : null;
this.parallelProcessing = source.parallelProcessing;
this.optimisticLocking = source.optimisticLocking;
this.syncOptimisticRetry = source.syncOptimisticRetry;
this.executorService = source.executorService;
this.timeoutCheckerExecutorService = source.timeoutCheckerExecutorService;
this.aggregateController = source.aggregateController;
Expand Down Expand Up @@ -501,6 +505,14 @@ public void setOptimisticLocking(String optimisticLocking) {
this.optimisticLocking = optimisticLocking;
}

public String getSyncOptimisticRetry() {
return syncOptimisticRetry;
}

public void setSyncOptimisticRetry(String syncOptimisticRetry) {
this.syncOptimisticRetry = syncOptimisticRetry;
}

public String getParallelProcessing() {
return parallelProcessing;
}
Expand Down Expand Up @@ -1004,6 +1016,16 @@ public AggregateDefinition optimisticLocking() {
return this;
}

/**
* When optimistic locking is enabled, retries happen synchronously in the same thread instead of being scheduled on
* a background thread. This preserves transaction context for repositories that require single-thread transactional
* guarantees.
*/
Comment on lines +1019 to +1023
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice Javadoc. Consider adding a note that this option only takes effect when optimisticLocking is also enabled:

Suggested change
/**
* When optimistic locking is enabled, retries happen synchronously in the same thread instead of being scheduled on
* a background thread. This preserves transaction context for repositories that require single-thread transactional
* guarantees.
*/
/**
* When optimistic locking is enabled, retries happen synchronously in the same thread instead of being scheduled on
* a background thread. This preserves transaction context for repositories that require single-thread transactional
* guarantees. Only takes effect when {@link #optimisticLocking()} is also enabled.
*/

public AggregateDefinition syncOptimisticRetry() {
setSyncOptimisticRetry(Boolean.toString(true));
return this;
}

/**
* Allows to configure retry settings when using optimistic locking.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public void setStatisticsEnabled(boolean statisticsEnabled) {
private Integer closeCorrelationKeyOnCompletion;
private boolean parallelProcessing;
private boolean optimisticLocking;
private boolean syncOptimisticRetry;

// different ways to have completion triggered
private boolean eagerCheckCompletion;
Expand Down Expand Up @@ -374,6 +375,20 @@ protected boolean doInOptimisticLock(Exchange exchange, String key, AsyncCallbac
"On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to aggregate exchange: {}",
attempt, aggregationRepository, exchange, e);
if (optimisticLockRetryPolicy.shouldRetry(attempt)) {
if (syncOptimisticRetry) {
// Synchronous retry: delay in the same thread instead of
// scheduling on a background thread. This ensures aggregation
// stays within a single thread (e.g. for transactional processing).
try {
optimisticLockRetryPolicy.doDelay(attempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
exchange.setException(ie);
callback.done(sync);
return sync;
}
continue;
}
long delay = optimisticLockRetryPolicy.getDelay(attempt);
if (delay > 0) {
int nextAttempt = attempt;
Expand Down Expand Up @@ -1126,6 +1141,14 @@ public void setOptimisticLocking(boolean optimisticLocking) {
this.optimisticLocking = optimisticLocking;
}

public boolean isSyncOptimisticRetry() {
return syncOptimisticRetry;
}

public void setSyncOptimisticRetry(boolean syncOptimisticRetry) {
this.syncOptimisticRetry = syncOptimisticRetry;
}

public AggregationRepository getAggregationRepository() {
return aggregationRepository;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ && parseBoolean(definition.getDiscardOnAggregationFailure(), false)) {
if (optimisticLocking != null) {
answer.setOptimisticLocking(optimisticLocking);
}
Boolean syncOptimisticRetry = parseBoolean(definition.getSyncOptimisticRetry());
if (syncOptimisticRetry != null) {
answer.setSyncOptimisticRetry(syncOptimisticRetry);
}
if (definition.getCompletionPredicate() != null) {
Predicate predicate = createPredicate(definition.getCompletionPredicate());
answer.setCompletionPredicate(predicate);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor.aggregator;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.BodyInAggregatingStrategy;
import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests that optimistic locking retries happen synchronously in the same thread when syncOptimisticRetry is enabled.
*/
public class AggregateOptimisticLockSyncRetryTest extends ContextTestSupport {

private static final int FAIL_FIRST_N_ATTEMPTS = 3;

private final AtomicInteger addCounter = new AtomicInteger();
private volatile String aggregateThreadName;

/**
* Repository that throws OptimisticLockingException for the first N attempts, then succeeds.
*/
private final MemoryAggregationRepository repository = new MemoryAggregationRepository(true) {
@Override
public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) {
int count = addCounter.incrementAndGet();
// Record the thread name on every attempt
aggregateThreadName = Thread.currentThread().getName();
if (count <= FAIL_FIRST_N_ATTEMPTS) {
throw new OptimisticLockingException();
}
return super.add(camelContext, key, oldExchange, newExchange);
}
};

@Test
public void testSyncRetryHappensInSameThread() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);

String callerThread = Thread.currentThread().getName();

template.sendBodyAndHeader("direct:start", "A", "id", 1);
template.sendBodyAndHeader("direct:start", "B", "id", 1);

mock.assertIsSatisfied();

// The repository should have been called more than FAIL_FIRST_N_ATTEMPTS times
// (the first N fail, then succeed)
assertTrue(addCounter.get() > FAIL_FIRST_N_ATTEMPTS,
"Expected more than " + FAIL_FIRST_N_ATTEMPTS + " attempts, got " + addCounter.get());

// Since syncOptimisticRetry is enabled, the retry should happen in a Camel thread
// (the route's thread), NOT in the AggregateOptimisticLockingExecutor thread pool.
// The key assertion is that the thread name does NOT contain the async executor name.
if (aggregateThreadName != null) {
assertFalse(aggregateThreadName.contains("AggregateOptimisticLockingExecutor"),
"Expected synchronous retry but found async executor thread: " + aggregateThreadName);
}
}

@Test
public void testSyncRetryCompletes() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("A+B");

template.sendBodyAndHeader("direct:start", "A", "id", 1);

// Reset counter so next message triggers failures and retries
addCounter.set(0);
template.sendBodyAndHeader("direct:start", "B", "id", 1);

mock.assertIsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("direct:start")
.aggregate(header("id"), new BodyInAggregatingStrategy())
.aggregationRepository(repository)
.optimisticLocking()
.syncOptimisticRetry()
.optimisticLockRetryPolicy(
new OptimisticLockRetryPolicy().maximumRetries(10).retryDelay(0))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good test coverage overall. One observation: retryDelay(0) means doDelay() returns immediately without sleeping, so the Thread.sleep + InterruptedException code path isn't exercised.

Consider adding a small test with a nonzero delay (e.g., retryDelay(10)) to verify the synchronous sleep actually happens on the calling thread under realistic conditions. A test that interrupts the thread during retry would further strengthen coverage of the InterruptedException branch. Not a blocker, but would increase confidence.

.completionSize(2)
.to("mock:result");
}
};
}
}
Loading