Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=3.7.2
version=3.7.3
org.gradle.caching=true
org.gradle.parallel=true
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public void testReinit() throws Exception {
*
* @throws Exception if any error occurs
*/
@Timeout(5)
@Timeout(20)
@Test
public void testParallelMethodCall() throws Exception {
final String scope = "/test/parallel";
Expand Down
1 change: 1 addition & 0 deletions module/communication/mqtt/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
api(project(":jul.extension.type.processing"))
api("com.hivemq:hivemq-mqtt-client:_")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:_")
testImplementation(project(":jul.communication.mqtt.test"))
testImplementation("org.testcontainers:junit-jupiter:_") {
exclude(group = "junit", module = "junit")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ object SharedMqttClient : Shutdownable {
sharedClients.values
.filter { (it as Mqtt5ClientWrapper).isConnected() }
.map { it.disconnect() }
.map { it.get() }
.map {
runCatching { it.get() }
.getOrElse { t -> check(t.message == "com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.") {
"Could not disconnect client connection: ${t.message}" }
null
}
}
.run { sharedClients.clear() }

@Synchronized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,77 +4,31 @@ import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.Timeout
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.GenericContainer
import org.openbase.jul.communication.config.CommunicatorConfig
import org.openbase.jul.communication.mqtt.test.MqttBrokerManager
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.time.Duration
import kotlin.io.path.absolute
import kotlin.io.path.deleteIfExists
import kotlin.io.path.writeLines

@Testcontainers
abstract class AbstractIntegrationTest {
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
open class AbstractIntegrationTest {

// the companion object makes sure that the container is started once before all tests instead of restarting for every test
companion object {

private const val port: Int = 1883

private val mosquittoConfig = kotlin.io.path.createTempFile(prefix = "mosquitto_", suffix = ".conf")
private lateinit var broker: MqttBrokerContainer

private var usageCounter = 0
private val lock = Any()

@JvmStatic
@BeforeAll
@Timeout(30)
fun setup() {
synchronized(lock) {
if (usageCounter == 0) {
mosquittoConfig.writeLines(
listOf(
"allow_anonymous true",
"listener 1883"
)
)

broker = MqttBrokerContainer()
.withExposedPorts(port)
.withFileSystemBind(
mosquittoConfig.absolute().toString(),
"/mosquitto/config/mosquitto.conf",
BindMode.READ_ONLY
)
broker.withStartupTimeout(Duration.ofSeconds(30)).start()
}
usageCounter++
}
}

@JvmStatic
@AfterAll
@Timeout(30)
fun cleanup() {
synchronized(lock) {
usageCounter--
if (usageCounter == 0) {
SharedMqttClient.waitForShutdown()
broker.stop()
mosquittoConfig.deleteIfExists()
}
}
}
@BeforeAll
@Timeout(30)
fun setupMqtt() {
MqttBrokerManager.setupMqtt(this::class.java.simpleName)
}

protected val brokerHost: String get() = broker.host
protected val brokerHost: String? get() = MqttBrokerManager.broker?.host

protected val brokerPort: Int get() = broker.firstMappedPort
}
protected val brokerPort: Int? get() = MqttBrokerManager.broker?.firstMappedPort

class MqttBrokerContainer : GenericContainer<MqttBrokerContainer>(DockerImageName.parse("eclipse-mosquitto"))
protected val config
get() = CommunicatorConfig(
hostname = brokerHost ?: error("Host not defined!"),
port = brokerPort ?: error("Port not defined!"),
)
}

fun Mqtt5Publish.clearTimestamp() = let {
this.extend().userProperties(Mqtt5UserProperties.of()).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import kotlin.concurrent.withLock
class IntegrationTest : AbstractIntegrationTest() {

private val scope = ScopeProcessor.generateScope("/test/integration")
private val config = CommunicatorConfig(brokerHost, brokerPort)

internal class MethodMock {
val lock = ReentrantLock()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.openbase.jul.communication.mqtt

import com.hivemq.client.internal.mqtt.util.MqttChecks.unsubscribe
import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe
import org.junit.jupiter.api.Test
Expand All @@ -15,7 +13,6 @@ internal class SharedMqttClientTest : AbstractIntegrationTest() {
@Test
@Timeout(value = 30)
fun `shutdown should be work as expected`() {
val config = CommunicatorConfig(brokerHost, brokerPort)
val client = SharedMqttClient
client.get(config).apply {
disconnect()
Expand All @@ -27,7 +24,6 @@ internal class SharedMqttClientTest : AbstractIntegrationTest() {
@Test
@Timeout(value = 30)
fun `subscription should be work as expected`() {
val config = CommunicatorConfig(brokerHost, brokerPort)
val topic = "/a/b/c"
val client = SharedMqttClient

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.openbase.jul.communication.mqtt.test

import org.openbase.jps.core.JPService
import org.openbase.jps.exception.JPServiceException
import org.openbase.jul.communication.jp.JPComHost
import org.openbase.jul.communication.jp.JPComPort
import org.openbase.jul.communication.mqtt.SharedMqttClient
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import org.testcontainers.utility.MountableFile
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
import kotlin.concurrent.Volatile
import kotlin.io.path.deleteIfExists

object MqttBrokerManager {

val STARTUP_TIMEOUT: Duration = Duration.ofSeconds(30)

var broker: GenericContainer<*>? = null
val lock = Any()

fun setupMqtt(testClassName: String, port: Int = 1883) {
synchronized(lock) {
if (broker == null) {
val mosquittoConfig: Path = Files.createTempFile("${testClassName}_mosquitto_", ".conf")
Files.write(
mosquittoConfig, listOf(
"allow_anonymous true",
"listener $port"
)
)
MqttBrokerContainer()
.withExposedPorts(port)
.withCopyFileToContainer(
MountableFile.forHostPath(mosquittoConfig.toString()),
"/mosquitto/config/mosquitto.conf"
)
.apply { withStartupTimeout(STARTUP_TIMEOUT).start() }
.also { broker = it }
.also { setupProperties() }
.also {
// Add shutdown hook to stop broker at JVM exit
Runtime.getRuntime().addShutdownHook(Thread {
synchronized(lock) {
broker?.stop()
}
})
}
mosquittoConfig.deleteIfExists()
}
}
}

@Throws(JPServiceException::class)
private fun setupProperties() {
JPService.reset()
JPService.registerProperty(JPComPort::class.java, broker!!.firstMappedPort)
JPService.registerProperty(JPComHost::class.java, broker!!.host)
JPService.setupJUnitTestMode()
}

class MqttBrokerContainer : GenericContainer<MqttBrokerContainer>(DockerImageName.parse("eclipse-mosquitto"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,7 @@ import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import org.openbase.jps.core.JPService
import org.openbase.jps.exception.JPServiceException
import org.openbase.jul.communication.jp.JPComHost
import org.openbase.jul.communication.jp.JPComPort
import org.openbase.jul.communication.mqtt.SharedMqttClient.waitForShutdown
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
import java.util.*
import org.testcontainers.junit.jupiter.Testcontainers

/*-
* #%L
Expand All @@ -42,60 +31,14 @@ import java.util.*
* */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(OpenbaseDeadlockChecker::class)
@Testcontainers
open class MqttIntegrationTest {

companion object {
const val port = 1884
var mosquittoConfig: Path? = null
var broker: GenericContainer<*>? = null
val configLock = Any()
}

@BeforeAll
@Timeout(30)
fun setupMqtt() {
synchronized(configLock) {
mosquittoConfig = Files.createTempFile("mosquitto_", ".conf")
Files.write(
mosquittoConfig, listOf(
"allow_anonymous true",
"listener " + port
)
)
GenericContainer(DockerImageName.parse("eclipse-mosquitto"))
.withExposedPorts(port)
.withFileSystemBind(
mosquittoConfig.toString(),
"/mosquitto/config/mosquitto.conf",
BindMode.READ_ONLY
)
.apply { withStartupTimeout(Duration.ofSeconds(30)).start() }
.also {
if (broker?.takeIf { it.containerId != null } != null)
error("broker was already initialized!")
}
.also { broker = it }
.also { setupProperties() }
}
}

@AfterAll
@Timeout(30)
fun tearDownMQTT() {
synchronized(configLock) {
waitForShutdown()
broker?.stop()
Files.delete(mosquittoConfig)
}
}

@Throws(JPServiceException::class)
private fun setupProperties() {
JPService.reset()
JPService.registerProperty(JPComPort::class.java, broker!!.firstMappedPort)
JPService.registerProperty(JPComHost::class.java, broker!!.host)
MqttBrokerManager.setupMqtt(this::class.java.simpleName, 1884)
setupCustomProperties()
JPService.setupJUnitTestMode()
}

open fun setupCustomProperties() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,12 @@ public ENTRY remove(final ENTRY entry) throws CouldNotPerformException {
fileSynchronizerMapLock.writeLock().unlock();
}

filePluginPool.beforeRemove(entry, fileSynchronizer);
fileSynchronizer.delete();
if (fileSynchronizer != null) {
filePluginPool.beforeRemove(entry, fileSynchronizer);
fileSynchronizer.delete();
filePluginPool.afterRemove(entry, fileSynchronizer);
}
fileSynchronizerMap.remove(entry.getId());
filePluginPool.afterRemove(entry, fileSynchronizer);
return removedValue;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
Expand Down
Loading
Loading