Skip to content

Commit 56192a3

Browse files
Refresh nodes when renewSlotCache
This PR solves redis#2504 redis#2550, when renewSlotCache, we also remove dead nodes according to the latest query.
1 parent cfc227f commit 56192a3

File tree

2 files changed

+92
-7
lines changed

2 files changed

+92
-7
lines changed

src/main/java/redis/clients/jedis/JedisClusterInfoCache.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
import java.util.ArrayList;
44
import java.util.Collections;
55
import java.util.HashMap;
6+
import java.util.HashSet;
7+
import java.util.Iterator;
68
import java.util.List;
79
import java.util.Map;
10+
import java.util.Map.Entry;
11+
import java.util.Set;
812
import java.util.concurrent.locks.Lock;
913
import java.util.concurrent.locks.ReentrantLock;
1014
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -173,6 +177,7 @@ private void discoverClusterSlots(Jedis jedis) {
173177
w.lock();
174178
try {
175179
this.slots.clear();
180+
Set<String> hostAndPortKeys = new HashSet<>();
176181

177182
for (Object slotInfoObj : slots) {
178183
List<Object> slotInfo = (List<Object>) slotInfoObj;
@@ -183,15 +188,37 @@ private void discoverClusterSlots(Jedis jedis) {
183188

184189
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
185190

186-
// hostInfos
187-
List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);
188-
if (hostInfos.isEmpty()) {
189-
continue;
191+
int size = slotInfo.size();
192+
for (int i = MASTER_NODE_INDEX; i < size; i++) {
193+
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
194+
if (hostInfos.isEmpty()) {
195+
continue;
196+
}
197+
198+
HostAndPort targetNode = generateHostAndPort(hostInfos);
199+
hostAndPortKeys.add(getNodeKey(targetNode));
200+
setupNodeIfNotExist(targetNode);
201+
if (i == MASTER_NODE_INDEX) {
202+
assignSlotsToNode(slotNums, targetNode);
203+
}
190204
}
205+
}
191206

192-
// at this time, we just use master, discard slave information
193-
HostAndPort targetNode = generateHostAndPort(hostInfos);
194-
assignSlotsToNode(slotNums, targetNode);
207+
// Remove dead nodes according to the latest query
208+
Iterator<Entry<String, JedisPool>> entryIt = nodes.entrySet().iterator();
209+
while (entryIt.hasNext()) {
210+
Entry<String, JedisPool> entry = entryIt.next();
211+
if (!hostAndPortKeys.contains(entry.getKey())) {
212+
JedisPool pool = entry.getValue();
213+
try {
214+
if (pool != null) {
215+
pool.destroy();
216+
}
217+
} catch (Exception e) {
218+
// pass, may be this node dead
219+
}
220+
entryIt.remove();
221+
}
195222
}
196223
} finally {
197224
w.unlock();

src/test/java/redis/clients/jedis/tests/JedisClusterTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static org.junit.Assert.assertFalse;
55
import static org.junit.Assert.assertNotNull;
66
import static org.junit.Assert.assertNull;
7+
import static org.junit.Assert.assertTrue;
78
import static org.junit.Assert.fail;
89
import static redis.clients.jedis.tests.utils.AssertUtil.assertByteArraySetEquals;
910

@@ -805,6 +806,63 @@ public void nullKeys() {
805806
}
806807
}
807808

809+
810+
@Test
811+
public void clusterRefreshNodes() throws Exception {
812+
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
813+
jedisClusterNode.add(nodeInfo1);
814+
jedisClusterNode.add(nodeInfo2);
815+
jedisClusterNode.add(nodeInfo3);
816+
817+
try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT,
818+
DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
819+
assertEquals(3, cluster.getClusterNodes().size());
820+
cleanUp(); // cleanup and add node4
821+
822+
// at first, join node4 to cluster
823+
node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort());
824+
node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort());
825+
node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort());
826+
// split available slots across the three nodes
827+
int slotsPerNode = JedisCluster.HASHSLOTS / 4;
828+
int[] node1Slots = new int[slotsPerNode];
829+
int[] node2Slots = new int[slotsPerNode];
830+
int[] node3Slots = new int[slotsPerNode];
831+
int[] node4Slots = new int[slotsPerNode];
832+
for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < JedisCluster.HASHSLOTS; i++) {
833+
if (i < slotsPerNode) {
834+
node1Slots[slot1++] = i;
835+
} else if (i >= slotsPerNode && i < slotsPerNode*2) {
836+
node2Slots[slot2++] = i;
837+
} else if (i >= slotsPerNode*2 && i < slotsPerNode*3) {
838+
node3Slots[slot3++] = i;
839+
} else {
840+
node4Slots[slot4++] = i;
841+
}
842+
}
843+
844+
node1.clusterAddSlots(node1Slots);
845+
node2.clusterAddSlots(node2Slots);
846+
node3.clusterAddSlots(node3Slots);
847+
node4.clusterAddSlots(node4Slots);
848+
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4);
849+
850+
// cluster.set("key", "value"); will get JedisMovedDataException and renewSlotCache
851+
cluster.set("key", "value");
852+
853+
assertEquals(4, cluster.getClusterNodes().size());
854+
String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort();
855+
assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4));
856+
857+
// make 4 nodes to 3 nodes
858+
cleanUp();
859+
setUp();
860+
// cluster.set("bar", "foo") will get JedisMovedDataException and renewSlotCache
861+
cluster.set("bar", "foo");
862+
assertEquals(3, cluster.getClusterNodes().size());
863+
}
864+
}
865+
808866
@Test
809867
public void georadiusStore() {
810868
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();

0 commit comments

Comments
 (0)