Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
48bb204
AMQ-9809: org.apache.activemq.perf.InactiveDurableTopicTest hanging
jeanouii Nov 18, 2025
a09e5b7
AMQ-8525: Improve execution time for CI
jeanouii Nov 18, 2025
f58b250
AMQ-8525: improve mqtt module performances
jeanouii Nov 18, 2025
03f1174
AMQ-8525: fix parallel execution of MQTT tests
jeanouii Nov 19, 2025
2589eed
AMQ-8525: amqp module parallelism
jeanouii Nov 20, 2025
ccd412f
AMQ-8525: kahadb-store module parallelism
jeanouii Nov 20, 2025
5cb10dd
AMQ-8525: Now that it worked, remove the skipTests flag so command li…
jeanouii Nov 21, 2025
dafee55
AMQ-8525: Now that it worked, remove the skipTests flag so command li…
jeanouii Nov 21, 2025
e692216
AMQ-8525: Now that it worked, remove the skipTests flag so command li…
jeanouii Nov 21, 2025
4e3108a
AMQ-8525: improve stomp module performances
jeanouii Nov 18, 2025
6b95992
AMQ-8525: Now that it worked, remove the skipTests flag so command li…
jeanouii Nov 21, 2025
e6e4f41
AMQ-8525: Final review of eligible tests for parallel execution.
jeanouii Nov 24, 2025
dca934e
AMQ-8525: Add better reporting for parallel execution
jeanouii Nov 24, 2025
5c0a6ec
AMQ-8525: Add better reporting and additional tests to parallel execu…
jeanouii Nov 25, 2025
187db01
AMQ-8525: All stable and green, merging back into default configurati…
jeanouii Nov 25, 2025
3b8c011
AMQ-8525: All stable and green, merging back into default configurati…
jeanouii Nov 25, 2025
88d1d58
AMQ-8525: Clean up configuration and move remaining tests to parallel…
jeanouii Nov 25, 2025
c0d34e4
AMQ-8525: Clean up configuration and exclude load tests
jeanouii Nov 25, 2025
b05d74b
AMQ-8525: Clean up configuration and add a test to parallel execution
jeanouii Nov 25, 2025
c3fb77d
AMQ-8525: IO are slow on CI. Fix random failures with tests and add p…
jeanouii Nov 19, 2025
b4058b1
AMQ-XX: Fix long running test on CI
jeanouii Dec 2, 2025
f14bdbe
AMQ-8525: Try to harden the JMS Client tests
jeanouii Dec 4, 2025
f249920
AMQ-8525: Try to harden the random failures
jeanouii Dec 4, 2025
0ca7291
AMQ-8525: bigger timeout
jeanouii Dec 4, 2025
f938bf9
AMQ-8525: See if we can avoid tests to hang forever or too long at least
jeanouii Dec 4, 2025
932bab1
AMQ-8525: Fix the test until we can add a guard on the record size so…
jeanouii Dec 5, 2025
54f0dd0
AMQ-8525: Harden the 2 tests failing randomly
jeanouii Dec 5, 2025
9674ab3
AMQ-8525: Test hanging - if we can reconnect, let's at least fail.
jeanouii Dec 5, 2025
093fb21
AMQ-8525: Extract plugin definition in the parent pom so it benefits …
jeanouii Dec 7, 2025
a146406
AMQ-8525: Give it a bit more time to drain to avoid random failures
jeanouii Dec 8, 2025
79623a7
AMQ-8525: Avoid OOM and moe time to drain the queue
jeanouii Dec 9, 2025
0649c37
Merge branch 'fix/AMQ-8525_mqtt-module' into fix/AMQ-8525_AllTogether…
jeanouii Dec 9, 2025
60996c7
Merge branch 'fix/AMQ-8525_stomp-module' into fix/AMQ-8525_AllTogethe…
jeanouii Dec 9, 2025
0dd7d70
Merge branch 'fix/InactiveDurableTopicTest-hanging' into fix/AMQ-8525…
jeanouii Dec 9, 2025
aa5f89f
Merge branch 'fix/TransactedStoreUsageSuspendResumeTest' into fix/AMQ…
jeanouii Dec 9, 2025
51d3616
Merge branch 'fix/KahaDBOffsetRecoveryListenerTest' into fix/AMQ-8525…
jeanouii Dec 9, 2025
a6d3c64
Merge branch 'fix/AMQ-8525_unit-tests-module' into fix/AMQ-8525_AllTo…
jeanouii Dec 9, 2025
6c5447d
Merge branch 'AMQ-8525_amqp-module' into fix/AMQ-8525_AllTogetherTest1
jeanouii Dec 9, 2025
238a27f
[AMQ-8525] Fix random failures and run an eligible subset in parallel
jeanouii Nov 19, 2025
194c00d
Merge branch 'fix/AMQ-8525_unit-tests-module' into fix/AMQ-8525_AllTo…
jeanouii Dec 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
74 changes: 74 additions & 0 deletions activemq-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,80 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire.version}</version>
<configuration>
<argLine>${surefire.argLine}</argLine>
<runOrder>alphabetical</runOrder>
<reportFormat>plain</reportFormat>
<failIfNoTests>false</failIfNoTests>
<excludedGroups>org.apache.activemq.transport.amqp.ParallelTest</excludedGroups>
<systemPropertyVariables>
<org.apache.activemq.default.directory.prefix>${project.build.directory}/</org.apache.activemq.default.directory.prefix>
</systemPropertyVariables>
<consoleOutputReporter>
<disable>true</disable>
</consoleOutputReporter>
<statelessTestsetInfoReporter
implementation="org.apache.maven.plugin.surefire.extensions.junit5.JUnit5StatelessTestsetInfoTreeReporter">
<printStacktraceOnError>true</printStacktraceOnError>
<printStacktraceOnFailure>true</printStacktraceOnFailure>
<printStdoutOnError>true</printStdoutOnError>
<printStdoutOnFailure>true</printStdoutOnFailure>
<printStderrOnError>true</printStderrOnError>
<printStderrOnFailure>true</printStderrOnFailure>
</statelessTestsetInfoReporter>
<includes>
<include>**/*Test.*</include>
</includes>
<excludes>
<exclude>**/*LoadTest.java</exclude>
<exclude>**/*StressTest.java</exclude>
</excludes>
</configuration>
<executions>
<execution>
<id>parallel</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<!-- drop the default excludedGroups -->
<excludedGroups combine.self="override"/>
<groups>org.apache.activemq.transport.amqp.ParallelTest</groups>
<forkCount>2C</forkCount>
<reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
<failIfNoTests>false</failIfNoTests>
<systemPropertyVariables>
<org.apache.activemq.default.directory.prefix>${project.build.directory}/parallel-tests-${surefire.forkNumber}/</org.apache.activemq.default.directory.prefix>
<!-- when running tests in parallel in the CI (quite slow) we need to bump the wireformat negotiation timeout (5s by default) -->
<org.apache.activemq.transport.wireFormatNegotiationTimeout>30000</org.apache.activemq.transport.wireFormatNegotiationTimeout>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${surefire.version}</version>
</dependency>
<dependency>
<groupId>me.fabriciorby</groupId>
<artifactId>maven-surefire-junit5-tree-reporter</artifactId>
<version>1.5.1</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<!-- profile which is activated is the swiftmq-client-home prop is defined.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(ParallelTest.class)
public class AmqpAndMqttTest {

protected BrokerService broker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(ParallelTest.class)
public class AmqpAndStompInteropTest {

private static final Logger LOG = LoggerFactory.getLogger(AmqpAndStompInteropTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -61,7 +62,6 @@
public class AmqpTestSupport {

public static final String MESSAGE_NUMBER = "MessageNumber";
public static final String KAHADB_DIRECTORY = "target/activemq-data/";

@Rule public TestName name = new TestName();

Expand Down Expand Up @@ -120,7 +120,7 @@ protected void createBroker(boolean deleteAllMessages) throws Exception {
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
if (isPersistent()) {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
kaha.setDirectory(new File(IOHelper.getDefaultDataDirectory() + getTestName()));
brokerService.setPersistenceAdapter(kaha);
}
brokerService.setSchedulerSupport(isSchedulerEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(ParallelTest.class)
public class AmqpTransformerTest {

private static final Logger LOG = LoggerFactory.getLogger(AmqpTransformerTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(ParallelTest.class)
public class JMSClientSimpleAuthTest {

@Rule public TestName name = new TestName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@
import org.apache.activemq.util.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(ParallelTest.class)
public class JMSClientTest extends JMSClientTestSupport {

protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
Expand Down Expand Up @@ -631,7 +633,7 @@ public void testBrokerRestartWontHangConnectionClose() throws Exception {
@Test(timeout=30 * 1000)
public void testProduceAndConsumeLargeNumbersOfMessages() throws Exception {
int count = 1000;
connection = createConnection();
connection = createConnectionWithRetry(name.toString(), false);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,37 @@ protected Connection createConnection(String clientId, boolean syncPublish) thro
connection.start();
return connection;
}

/**
* Some SSL/NIO test combinations can occasionally fail the initial frame negotiation
* when brokers are started in parallel forks. Retry once on the specific framing error
* to smooth out that transient race without hiding other issues.
*/
protected Connection createConnectionWithRetry(String clientId, boolean syncPublish) throws JMSException, InterruptedException {
JMSException lastException = null;
for (int attempt = 0; attempt < 2; attempt++) {
try {
return createConnection(clientId, syncPublish);
} catch (JMSException ex) {
lastException = ex;
if (!containsAmqpHeaderMismatch(ex)) {
throw ex;
}
LOG.warn("AMQP header mismatch on connection attempt {} for {}, retrying once", attempt + 1, getBrokerURI());
Thread.sleep(500);
}
}
throw lastException;
}

private boolean containsAmqpHeaderMismatch(Throwable throwable) {
Throwable current = throwable;
while (current != null) {
if (current.getMessage() != null && current.getMessage().contains("AMQP header mismatch")) {
return true;
}
current = current.getCause();
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;


/**
* Marker interface used with {@code @Category(ParallelTest.class)} to opt a
* test class or method into the {@code all-parallel} Maven profile. Only tests
* explicitly tagged with this category execute when the profile is enabled,
* which allows a gradual migration toward full parallelism.
*/
public interface ParallelTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.ParallelTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;

/**
* Test for support of Anonymous sender links.
*/
@Category(ParallelTest.class)
public class AmqpAnonymousSenderTest extends AmqpClientTestSupport {

@Test(timeout = 60000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,20 @@
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.transport.amqp.ParallelTest;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;

/**
* Test handling of heartbeats requested by the broker.
*/
@RunWith(Parameterized.class)
@Category(ParallelTest.class)
public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {

private final int TEST_IDLE_TIMEOUT = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,20 @@
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.transport.amqp.ParallelTest;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;

/**
* Tests that cover broker behavior when the client requests heartbeats
*/
@RunWith(Parameterized.class)
@Category(ParallelTest.class)
public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {

private final int TEST_IDLE_TIMEOUT = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.ParallelTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;

/**
* Test for the transportConnector maximumConnections URI option.
*/
@RunWith(Parameterized.class)
@Category(ParallelTest.class)
public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {

private static final int MAX_CONNECTIONS = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.transport.amqp.ParallelTest;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
Expand All @@ -48,11 +49,13 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;

/**
* Test broker handling of AMQP connections with various configurations.
*/
@RunWith(Parameterized.class)
@Category(ParallelTest.class)
public class AmqpConnectionsTest extends AmqpClientTestSupport {

private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,22 @@
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.ParallelTest;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Tests that the AMQP CorrelationId value and type are preserved.
*/
@RunWith(Parameterized.class)
@Category(ParallelTest.class)
public class AmqpCorrelationIdPreservationTest extends AmqpClientTestSupport {

protected static final Logger LOG = LoggerFactory.getLogger(JMSInteroperabilityTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.ParallelTest;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.junit.experimental.categories.Category;

/**
* Test that broker closes connection and allows a new one when the transport
* receives a bad chunk of data after a successful connect.
*/
@Category(ParallelTest.class)
public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.ParallelTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;

/**
* Test around the handling of Deliver Annotations in messages sent and received.
*/
@RunWith(Parameterized.class)
@Category(ParallelTest.class)
public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {

private final String DELIVERY_ANNOTATION_NAME = "TEST-DELIVERY-ANNOTATION";
Expand Down
Loading