|
16 | 16 | package org.thingsboard.mqtt.broker.dao;
|
17 | 17 |
|
18 | 18 | import lombok.extern.slf4j.Slf4j;
|
| 19 | +import org.assertj.core.api.Assertions; |
19 | 20 | import org.junit.ClassRule;
|
20 | 21 | import org.junit.rules.ExternalResource;
|
21 | 22 | import org.testcontainers.containers.GenericContainer;
|
22 |
| -import org.testcontainers.containers.Network; |
| 23 | +import org.testcontainers.containers.output.OutputFrame; |
23 | 24 | import org.testcontainers.containers.wait.strategy.Wait;
|
| 25 | +import org.testcontainers.lifecycle.Startables; |
24 | 26 |
|
25 | 27 | import java.time.Duration;
|
26 | 28 | import java.util.List;
|
27 | 29 | import java.util.concurrent.TimeUnit;
|
| 30 | +import java.util.stream.Stream; |
28 | 31 |
|
29 | 32 | @Slf4j
|
30 | 33 | public class AbstractRedisClusterContainer {
|
31 | 34 |
|
32 |
| - static final String nodes = "127.0.0.1:6371,127.0.0.1:6372,127.0.0.1:6373,127.0.0.1:6374,127.0.0.1:6375,127.0.0.1:6376"; |
| 35 | + static final String NODES = "127.0.0.1:6371,127.0.0.1:6372,127.0.0.1:6373,127.0.0.1:6374,127.0.0.1:6375,127.0.0.1:6376"; |
| 36 | + static final String IMAGE = "valkey/valkey:8.0"; |
| 37 | + static final String PASSWORD = "your-strong-password"; |
33 | 38 |
|
34 |
| - private static GenericContainer<?> redis(String port) { |
35 |
| - return new GenericContainer<>("bitnamilegacy/redis-cluster:7.2.5") |
36 |
| - .withEnv("REDIS_PORT_NUMBER", port) |
| 39 | + private static GenericContainer<?> valkey(String port) { |
| 40 | + return new GenericContainer<>(IMAGE) |
37 | 41 | .withNetworkMode("host")
|
38 |
| - .withLogConsumer(x -> log.warn("{}", x.getUtf8StringWithoutLineEnding())) |
39 |
| - .withEnv("REDIS_PASSWORD", "password") |
40 |
| - .withEnv("REDIS_NODES", nodes) |
41 |
| - .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(30))); |
| 42 | + .withLogConsumer(AbstractRedisClusterContainer::consumeLog) |
| 43 | + .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(30))) |
| 44 | + .withCommand("valkey-server", |
| 45 | + "--port " + port, |
| 46 | + "--requirepass " + PASSWORD, // Password for clients connecting to this node |
| 47 | + "--masterauth " + PASSWORD, // Password for replicas to authenticate with masters |
| 48 | + "--cluster-enabled yes", |
| 49 | + "--cluster-config-file nodes.conf", |
| 50 | + "--cluster-node-timeout 5000", |
| 51 | + "--appendonly no" |
| 52 | + ); |
42 | 53 | }
|
43 | 54 |
|
44 |
| - @ClassRule |
45 |
| - public static Network network = Network.newNetwork(); |
46 |
| - @ClassRule |
47 |
| - public static GenericContainer<?> redis1 = redis("6371"); |
48 |
| - @ClassRule |
49 |
| - public static GenericContainer<?> redis2 = redis("6372"); |
50 |
| - @ClassRule |
51 |
| - public static GenericContainer<?> redis3 = redis("6373"); |
52 |
| - @ClassRule |
53 |
| - public static GenericContainer<?> redis4 = redis("6374"); |
54 |
| - @ClassRule |
55 |
| - public static GenericContainer<?> redis5 = redis("6375"); |
56 |
| - @ClassRule |
57 |
| - public static GenericContainer<?> redis6 = redis("6376"); |
| 55 | + @ClassRule(order = 1) |
| 56 | + public static GenericContainer<?> valkey1 = valkey("6371"); |
| 57 | + @ClassRule(order = 2) |
| 58 | + public static GenericContainer<?> valkey2 = valkey("6372"); |
| 59 | + @ClassRule(order = 3) |
| 60 | + public static GenericContainer<?> valkey3 = valkey("6373"); |
| 61 | + @ClassRule(order = 4) |
| 62 | + public static GenericContainer<?> valkey4 = valkey("6374"); |
| 63 | + @ClassRule(order = 5) |
| 64 | + public static GenericContainer<?> valkey5 = valkey("6375"); |
| 65 | + @ClassRule(order = 6) |
| 66 | + public static GenericContainer<?> valkey6 = valkey("6376"); |
58 | 67 |
|
59 |
| - @ClassRule |
| 68 | + @ClassRule(order = 100) |
60 | 69 | public static ExternalResource resource = new ExternalResource() {
|
61 | 70 | @Override
|
62 | 71 | protected void before() throws Throwable {
|
63 |
| - redis1.start(); |
64 |
| - redis2.start(); |
65 |
| - redis3.start(); |
66 |
| - redis4.start(); |
67 |
| - redis5.start(); |
68 |
| - redis6.start(); |
| 72 | + Startables.deepStart(Stream.of(valkey1, valkey2, valkey3, valkey4, valkey5, valkey6)).join(); |
69 | 73 |
|
70 | 74 | Thread.sleep(TimeUnit.SECONDS.toMillis(5)); // otherwise not all containers have time to start
|
71 | 75 |
|
72 |
| - String clusterCreateCommand = "echo yes | redis-cli --cluster create " + |
73 |
| - nodes.replace(",", " ") + |
74 |
| - " --cluster-replicas 1"; |
75 |
| - log.warn("Command to init Redis Cluster: {}", clusterCreateCommand); |
76 |
| - var result = redis6.execInContainer("/bin/sh", "-c", "export REDISCLI_AUTH=password && " + clusterCreateCommand); |
| 76 | + String clusterCreateCommand = "valkey-cli -a " + PASSWORD + " --cluster create " + NODES.replace(",", " ") + " --cluster-replicas 1 --cluster-yes"; |
| 77 | + log.warn("Command to init ValKey Cluster: {}", clusterCreateCommand); |
| 78 | + var result = valkey6.execInContainer("/bin/sh", "-c", clusterCreateCommand); |
77 | 79 | log.warn("Init cluster result: {}", result);
|
| 80 | + Assertions.assertThat(result.getExitCode()).isEqualTo(0); |
78 | 81 |
|
79 | 82 | Thread.sleep(TimeUnit.SECONDS.toMillis(5)); // otherwise cluster not always ready
|
80 | 83 |
|
81 |
| - log.warn("Connect to nodes: {}", nodes); |
82 |
| - System.setProperty("redis.password", "password"); |
| 84 | + log.warn("Connect to nodes: {}", NODES); |
| 85 | + System.setProperty("redis.password", PASSWORD); |
83 | 86 | System.setProperty("redis.connection.type", "cluster");
|
84 |
| - System.setProperty("redis.cluster.nodes", nodes); |
| 87 | + System.setProperty("redis.cluster.nodes", NODES); |
85 | 88 | System.setProperty("redis.cluster.useDefaultPoolConfig", "false");
|
86 | 89 | }
|
87 | 90 |
|
88 | 91 | @Override
|
89 | 92 | protected void after() {
|
90 |
| - redis1.stop(); |
91 |
| - redis2.stop(); |
92 |
| - redis3.stop(); |
93 |
| - redis4.stop(); |
94 |
| - redis5.stop(); |
95 |
| - redis6.stop(); |
| 93 | + Stream.of(valkey1, valkey2, valkey3, valkey4, valkey5, valkey6).parallel().forEach(GenericContainer::stop); |
96 | 94 | List.of("redis.password", "redis.connection.type", "redis.cluster.nodes", "redis.cluster.useDefaultPoolConfig")
|
97 | 95 | .forEach(System.getProperties()::remove);
|
98 | 96 | }
|
99 | 97 | };
|
100 | 98 |
|
| 99 | + private static void consumeLog(Object x) { |
| 100 | + log.warn("{}", ((OutputFrame) x).getUtf8StringWithoutLineEnding()); |
| 101 | + } |
101 | 102 | }
|
0 commit comments