Skip to content

Commit 293b98a

Browse files
szymon-miezaljacek-lewandowski
authored andcommitted
STAR-1903: ConsistencyLevel.THREE means ALL_BUT_ONE (#943)
Add a system property (dse.consistency_level.three_means_all_but_one) that changes the behavior of ConsistencyLevel.THREE to mean ALL_BUT_ONE, i.e. that all replicas except for at most one in the cluster (across all DCs) must accept the write for it to be successful. (cherry picked from commit 9875081)
1 parent f367338 commit 293b98a

File tree

3 files changed

+206
-0
lines changed

3 files changed

+206
-0
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,9 @@ public enum CassandraRelevantProperties
658658
TEST_UTIL_ALLOW_TOOL_REINIT_FOR_TEST("org.apache.cassandra.tools.UtilALLOW_TOOL_REINIT_FOR_TEST"),
659659
/** Activate write survey mode. The node not becoming an active ring member, and you must use JMX StorageService->joinRing() to finalize the ring joining. */
660660
TEST_WRITE_SURVEY("cassandra.write_survey"),
661+
// Changes the semantic of the "THREE" consistency level to mean "all but one"
662+
// i.e. that all replicas except for at most one in the cluster (across all DCs) must accept the write for it to be successful.
663+
THREE_MEANS_ALL_BUT_ONE("dse.consistency_level.three_means_all_but_one", "false"),
661664
TOLERATE_SSTABLE_SIZE("cassandra.tolerate_sstable_size"),
662665
TRIGGERS_DIR("cassandra.triggers_dir"),
663666
TRUNCATE_BALLOT_METADATA("cassandra.truncate_ballot_metadata"),

src/java/org/apache/cassandra/db/ConsistencyLevel.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import javax.annotation.Nullable;
2323

2424
import com.carrotsearch.hppc.ObjectIntHashMap;
25+
import org.apache.cassandra.config.CassandraRelevantProperties;
2526
import org.apache.cassandra.locator.Endpoints;
2627
import org.apache.cassandra.locator.InOurDc;
2728
import org.apache.cassandra.schema.TableMetadata;
@@ -51,6 +52,8 @@ public enum ConsistencyLevel
5152
LOCAL_ONE (10, true),
5253
NODE_LOCAL (11, true);
5354

55+
public static final boolean THREE_MEANS_ALL_BUT_ONE = CassandraRelevantProperties.THREE_MEANS_ALL_BUT_ONE.getBoolean();
56+
5457
// Used by the binary protocol
5558
public final int code;
5659
private final boolean isDCLocal;
@@ -87,6 +90,16 @@ public static ConsistencyLevel fromCode(int code)
8790
return codeIdx[code];
8891
}
8992

93+
@Override
94+
public String toString()
95+
{
96+
if (this == THREE && THREE_MEANS_ALL_BUT_ONE)
97+
{
98+
return "THREE (ALL_BUT_ONE)";
99+
}
100+
return super.toString();
101+
}
102+
90103
public static ConsistencyLevel fromString(String str)
91104
{
92105
return valueOf(str.toUpperCase(Locale.US));
@@ -97,6 +110,12 @@ public static int quorumFor(AbstractReplicationStrategy replicationStrategy)
97110
return (replicationStrategy.getReplicationFactor().allReplicas / 2) + 1;
98111
}
99112

113+
static int allButOneFor(AbstractReplicationStrategy replicationStrategy)
114+
{
115+
int rf = replicationStrategy.getReplicationFactor().fullReplicas;
116+
return rf <= 1 ? rf : rf - 1;
117+
}
118+
100119
public static int localQuorumFor(AbstractReplicationStrategy replicationStrategy, String dc)
101120
{
102121
return (replicationStrategy instanceof NetworkTopologyStrategy)
@@ -146,6 +165,10 @@ public int blockFor(AbstractReplicationStrategy replicationStrategy)
146165
case TWO:
147166
return 2;
148167
case THREE:
168+
if (THREE_MEANS_ALL_BUT_ONE)
169+
{
170+
return allButOneFor(replicationStrategy);
171+
}
149172
return 3;
150173
case QUORUM:
151174
case SERIAL:
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db;
20+
21+
import java.net.UnknownHostException;
22+
import java.util.Collections;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import com.google.common.collect.HashMultimap;
27+
import com.google.common.collect.Multimap;
28+
import org.junit.After;
29+
import org.junit.AfterClass;
30+
import org.junit.BeforeClass;
31+
import org.junit.Test;
32+
33+
import org.apache.cassandra.config.DatabaseDescriptor;
34+
import org.apache.cassandra.dht.OrderPreservingPartitioner;
35+
import org.apache.cassandra.dht.Token;
36+
import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
37+
import org.apache.cassandra.locator.AbstractReplicationStrategy;
38+
import org.apache.cassandra.locator.EverywhereStrategy;
39+
import org.apache.cassandra.locator.IEndpointSnitch;
40+
import org.apache.cassandra.locator.InetAddressAndPort;
41+
import org.apache.cassandra.locator.LocalStrategy;
42+
import org.apache.cassandra.locator.NetworkTopologyStrategy;
43+
import org.apache.cassandra.locator.SimpleStrategy;
44+
import org.apache.cassandra.locator.TokenMetadata;
45+
import org.apache.cassandra.utils.Pair;
46+
47+
import static org.junit.Assert.*;
48+
49+
public class ConsistencyLevelTest
50+
{
51+
private static final String KS = "test";
52+
private static final Map<String, String> RACK = new HashMap<>(), DATACENTER = new HashMap<>();
53+
private static final IEndpointSnitch SNITCH = new AbstractNetworkTopologySnitch()
54+
{
55+
@Override
56+
public String getRack(InetAddressAndPort endpoint)
57+
{
58+
return RACK.getOrDefault(endpoint.getHostAddress(false), "RC1");
59+
}
60+
61+
@Override
62+
public String getDatacenter(InetAddressAndPort endpoint)
63+
{
64+
return DATACENTER.getOrDefault(endpoint.getHostAddress(false), "DC1");
65+
}
66+
};
67+
68+
@BeforeClass
69+
public static void setSnitch()
70+
{
71+
DatabaseDescriptor.daemonInitialization();
72+
DatabaseDescriptor.setEndpointSnitch(SNITCH);
73+
}
74+
75+
@AfterClass
76+
public static void resetSnitch()
77+
{
78+
DatabaseDescriptor.setEndpointSnitch(null);
79+
}
80+
81+
@After
82+
public void resetSnitchState()
83+
{
84+
RACK.clear();
85+
DATACENTER.clear();
86+
}
87+
88+
@Test
89+
public void allButOne_shouldBe_2_forReplicationFactor_3()
90+
{
91+
testAllButOne(simpleStrategy(3), 2);
92+
}
93+
94+
@Test
95+
public void allButOne_shouldBe_1_forReplicationFactor_2()
96+
{
97+
testAllButOne(simpleStrategy(2), 1);
98+
}
99+
100+
@Test
101+
public void allButOne_shouldBe_1_forReplicationFactor_1()
102+
{
103+
testAllButOne(simpleStrategy(1), 1);
104+
}
105+
106+
@Test
107+
public void allButOne_shouldBe_1_forLocalStrategy()
108+
{
109+
testAllButOne(localStrategy(), 1);
110+
}
111+
112+
@Test
113+
public void allButOne_shouldBe_8_forReplicationFactor_3_3_3()
114+
{
115+
testAllButOne(networkTopologyStrategy(3, 3, 3), 8);
116+
}
117+
118+
@Test
119+
public void allButOne_shouldBe_11_forEverywhereStrategyOnClusterOf_12() throws Exception
120+
{
121+
testAllButOne(everywhereStrategy(
122+
dc(1, Pair.create("192.168.0.1", "A"), Pair.create("192.168.0.2", "E"), Pair.create("192.168.0.3", "H"),
123+
Pair.create("192.168.0.4", "C"), Pair.create("192.168.0.5", "I"), Pair.create("192.168.0.6", "J")),
124+
dc(2, Pair.create("192.168.1.1", "B"), Pair.create("192.168.1.2", "G"), Pair.create("192.168.1.3", "L"),
125+
Pair.create("192.168.1.4", "D"), Pair.create("192.168.1.5", "F"), Pair.create("192.168.1.6", "K"))),
126+
11);
127+
}
128+
129+
private void testAllButOne(AbstractReplicationStrategy replicationStrategy, int expected)
130+
{
131+
// when
132+
int blockFor = ConsistencyLevel.allButOneFor(replicationStrategy);
133+
134+
// then
135+
assertEquals("number of nodes to block for", expected, blockFor);
136+
}
137+
138+
private static NetworkTopologyStrategy networkTopologyStrategy(int... dc)
139+
{
140+
Map<String, String> config = new HashMap<>();
141+
for (int i = 0; i < dc.length; i++)
142+
{
143+
config.put("DC" + i, Integer.toString(dc[i]));
144+
}
145+
return new NetworkTopologyStrategy(KS, new TokenMetadata(), SNITCH, config);
146+
}
147+
148+
private static AbstractReplicationStrategy simpleStrategy(int replicationFactory)
149+
{
150+
Map<String, String> config = Collections.singletonMap("replication_factor", Integer.toString(replicationFactory));
151+
return new SimpleStrategy(KS, new TokenMetadata(), SNITCH, config);
152+
}
153+
154+
@SafeVarargs
155+
private static AbstractReplicationStrategy everywhereStrategy(Multimap<InetAddressAndPort, Token>... dcs)
156+
{
157+
TokenMetadata metadata = new TokenMetadata();
158+
for (Multimap<InetAddressAndPort, Token> dc : dcs)
159+
{
160+
metadata.updateNormalTokens(dc);
161+
}
162+
return new EverywhereStrategy(KS, metadata, SNITCH, Collections.emptyMap());
163+
}
164+
165+
private static AbstractReplicationStrategy localStrategy()
166+
{
167+
return new LocalStrategy(KS, new TokenMetadata(), SNITCH, Collections.emptyMap());
168+
}
169+
170+
private static Multimap<InetAddressAndPort, Token> dc(int id, Pair<String, String>... addressToken) throws UnknownHostException
171+
{
172+
Multimap<InetAddressAndPort, Token> dc = HashMultimap.create();
173+
for (Pair<String, String> pair : addressToken)
174+
{
175+
DATACENTER.put(pair.left, "DC" + id);
176+
dc.put(InetAddressAndPort.getByName(pair.left), new OrderPreservingPartitioner.StringToken(pair.right));
177+
}
178+
return dc;
179+
}
180+
}

0 commit comments

Comments
 (0)