Skip to content

Commit e9320c5

Browse files
authored
GEODE-9674: fix durable client message loss issue in tests. (#6947)
* for caching_proxy region, count the events received instread of the region size * do not try to dispatch residual messages when exception occurred. * add durable client flag when register interest
1 parent b6f8cc8 commit e9320c5

File tree

8 files changed

+501
-292
lines changed

8 files changed

+501
-292
lines changed
Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
4+
* agreements. See the NOTICE file distributed with this work for additional information regarding
5+
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License. You may obtain a
7+
* copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software distributed under the License
12+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
13+
* or implied. See the License for the specific language governing permissions and limitations under
14+
* the License.
15+
*
16+
*/
17+
18+
package org.apache.geode.security;
19+
20+
import static org.apache.geode.cache.query.dunit.SecurityTestUtils.createAndExecuteCQ;
21+
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
22+
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
23+
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.CopyOnWriteArrayList;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.stream.IntStream;
31+
32+
import org.junit.After;
33+
import org.junit.Rule;
34+
import org.junit.Test;
35+
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
36+
import org.junit.experimental.categories.Category;
37+
38+
import org.apache.geode.cache.EntryEvent;
39+
import org.apache.geode.cache.InterestResultPolicy;
40+
import org.apache.geode.cache.Region;
41+
import org.apache.geode.cache.RegionShortcut;
42+
import org.apache.geode.cache.client.ClientCache;
43+
import org.apache.geode.cache.client.ClientRegionShortcut;
44+
import org.apache.geode.cache.query.dunit.SecurityTestUtils.EventsCqListner;
45+
import org.apache.geode.cache.util.CacheListenerAdapter;
46+
import org.apache.geode.test.dunit.AsyncInvocation;
47+
import org.apache.geode.test.dunit.rules.ClientVM;
48+
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
49+
import org.apache.geode.test.junit.categories.SecurityTest;
50+
import org.apache.geode.test.junit.rules.ServerStarterRule;
51+
52+
@Category({SecurityTest.class})
53+
public class AuthExpirationDUnitTest {
54+
@Rule
55+
public ClusterStartupRule cluster = new ClusterStartupRule();
56+
57+
@Rule
58+
public RestoreSystemProperties restore = new RestoreSystemProperties();
59+
60+
@Rule
61+
public ServerStarterRule server = new ServerStarterRule()
62+
.withSecurityManager(ExpirableSecurityManager.class)
63+
.withRegion(RegionShortcut.REPLICATE, "region");
64+
65+
66+
private ClientVM clientVM;
67+
68+
@After
69+
public void after() {
70+
if (clientVM != null) {
71+
clientVM.invoke(UpdatableUserAuthInitialize::reset);
72+
}
73+
getSecurityManager().close();
74+
}
75+
76+
private static EventsCqListner CQLISTENER0;
77+
78+
@Test
79+
public void cqClientWillReAuthenticateAutomatically() throws Exception {
80+
startClientWithCQ();
81+
Region<Object, Object> region = server.getCache().getRegion("/region");
82+
region.put("1", "value1");
83+
clientVM.invoke(() -> {
84+
await().untilAsserted(
85+
() -> assertThat(CQLISTENER0.getKeys())
86+
.asList()
87+
.containsExactly("1"));
88+
});
89+
90+
// expire the current user
91+
getSecurityManager().addExpiredUser("user1");
92+
93+
// update the user to be used before we try to send the 2nd event
94+
clientVM.invoke(() -> {
95+
UpdatableUserAuthInitialize.setUser("user2");
96+
});
97+
98+
// do a second put, the event should be queued until client re-authenticate
99+
region.put("2", "value2");
100+
101+
clientVM.invoke(() -> {
102+
// the client will eventually get the 2nd event
103+
await().untilAsserted(
104+
() -> assertThat(CQLISTENER0.getKeys())
105+
.asList()
106+
.containsExactly("1", "2"));
107+
});
108+
109+
Map<String, List<String>> authorizedOps = getSecurityManager().getAuthorizedOps();
110+
assertThat(authorizedOps.keySet().size()).isEqualTo(2);
111+
assertThat(authorizedOps.get("user1")).asList().containsExactly("DATA:READ:region",
112+
"DATA:READ:region:1");
113+
assertThat(authorizedOps.get("user2")).asList().containsExactly("DATA:READ:region:2");
114+
115+
Map<String, List<String>> unAuthorizedOps = getSecurityManager().getUnAuthorizedOps();
116+
assertThat(unAuthorizedOps.keySet().size()).isEqualTo(1);
117+
assertThat(unAuthorizedOps.get("user1")).asList().containsExactly("DATA:READ:region:2");
118+
}
119+
120+
@Test
121+
public void registeredInterest_slowReAuth_policyDefault() throws Exception {
122+
int serverPort = server.getPort();
123+
clientVM = cluster.startClientVM(0,
124+
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
125+
.withPoolSubscription(true)
126+
.withServerConnection(serverPort));
127+
128+
ClientVM client2 = cluster.startClientVM(1,
129+
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
130+
.withPoolSubscription(true)
131+
.withServerConnection(serverPort));
132+
133+
clientVM.invoke(() -> {
134+
UpdatableUserAuthInitialize.setUser("user1");
135+
Region<Object, Object> region = ClusterStartupRule.getClientCache()
136+
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
137+
138+
// this test will succeed because when clients re-connects, it will re-register inteest
139+
// a new queue will be created with all the data. Old queue is destroyed.
140+
region.registerInterestForAllKeys();
141+
UpdatableUserAuthInitialize.setUser("user11");
142+
// wait for time longer than server's max time to wait to ree-authenticate
143+
UpdatableUserAuthInitialize.setWaitTime(6000);
144+
});
145+
146+
AsyncInvocation<Void> invokePut = client2.invokeAsync(() -> {
147+
UpdatableUserAuthInitialize.setUser("user2");
148+
Region<Object, Object> region = ClusterStartupRule.getClientCache()
149+
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
150+
IntStream.range(0, 100).forEach(i -> region.put("key" + i, "value" + i));
151+
});
152+
153+
getSecurityManager().addExpiredUser("user1");
154+
invokePut.await();
155+
156+
// make sure this client recovers and get all the events and will be able to do client operation
157+
clientVM.invoke(() -> {
158+
Region<Object, Object> region = ClusterStartupRule.getClientCache().getRegion("region");
159+
await().untilAsserted(
160+
() -> assertThat(region.keySet()).hasSize(100));
161+
region.put("key100", "value100");
162+
});
163+
164+
// user1 should not be used to put any keys to the region
165+
assertThat(getSecurityManager().getAuthorizedOps().get("user1"))
166+
.containsExactly("DATA:READ:region");
167+
assertThat(getSecurityManager().getUnAuthorizedOps().get("user1"))
168+
.containsExactly("DATA:READ:region:key0");
169+
}
170+
171+
@Test
172+
public void registeredInterest_slowReAuth_policyKeys_durableClient() throws Exception {
173+
int serverPort = server.getPort();
174+
clientVM = cluster.startClientVM(0,
175+
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
176+
.withProperty(DURABLE_CLIENT_ID, "123456")
177+
.withPoolSubscription(true)
178+
.withServerConnection(serverPort));
179+
180+
181+
clientVM.invoke(() -> {
182+
UpdatableUserAuthInitialize.setUser("user1");
183+
ClientCache clientCache = ClusterStartupRule.getClientCache();
184+
Region<Object, Object> region = clientCache
185+
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
186+
187+
region.registerInterestForAllKeys(InterestResultPolicy.KEYS, true);
188+
clientCache.readyForEvents();
189+
UpdatableUserAuthInitialize.setUser("user11");
190+
// wait for time longer than server's max time to wait to re-authenticate
191+
UpdatableUserAuthInitialize.setWaitTime(6000);
192+
});
193+
194+
getSecurityManager().addExpiredUser("user1");
195+
Region<Object, Object> region = server.getCache().getRegion("/region");
196+
IntStream.range(0, 100).forEach(i -> region.put("key" + i, "value" + i));
197+
198+
// make sure this client recovers and get all the events and will be able to do client operation
199+
clientVM.invoke(() -> {
200+
Region<Object, Object> clientRegion = ClusterStartupRule.getClientCache().getRegion("region");
201+
await().untilAsserted(() -> assertThat(clientRegion).hasSize(100));
202+
clientRegion.put("key100", "value100");
203+
});
204+
205+
// user1 should not be used to put any keys to the region
206+
assertThat(getSecurityManager().getAuthorizedOps().get("user1"))
207+
.containsExactly("DATA:READ:region");
208+
assertThat(getSecurityManager().getUnAuthorizedOps().get("user1"))
209+
.containsExactly("DATA:READ:region:key0");
210+
}
211+
212+
private static class MyCacheListener extends CacheListenerAdapter<Object, Object> {
213+
public List<String> keys = new CopyOnWriteArrayList<>();
214+
215+
@Override
216+
public void afterCreate(EntryEvent event) {
217+
keys.add((String) event.getKey());
218+
}
219+
}
220+
221+
private static MyCacheListener myListener = new MyCacheListener();
222+
223+
@Test
224+
public void registeredInterest_slowReAuth_policyNone_durableClient() throws Exception {
225+
int serverPort = server.getPort();
226+
clientVM = cluster.startClientVM(0,
227+
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
228+
.withProperty(DURABLE_CLIENT_ID, "123456")
229+
.withPoolSubscription(true)
230+
.withServerConnection(serverPort));
231+
232+
233+
clientVM.invoke(() -> {
234+
UpdatableUserAuthInitialize.setUser("user1");
235+
myListener = new MyCacheListener();
236+
ClientCache clientCache = ClusterStartupRule.getClientCache();
237+
Region<Object, Object> region = clientCache
238+
.createClientRegionFactory(ClientRegionShortcut.PROXY)
239+
.addCacheListener(myListener).create("region");
240+
241+
// use NONE policy to make sure the old messages still sticks around
242+
region.registerInterestForAllKeys(InterestResultPolicy.NONE, true);
243+
clientCache.readyForEvents();
244+
UpdatableUserAuthInitialize.setUser("user11");
245+
// wait for time longer than server's max time to wait to re-authenticate
246+
UpdatableUserAuthInitialize.setWaitTime(6000);
247+
});
248+
249+
getSecurityManager().addExpiredUser("user1");
250+
Region<Object, Object> region = server.getCache().getRegion("/region");
251+
IntStream.range(0, 100).forEach(i -> region.put("key" + i, "value" + i));
252+
253+
// make sure this client recovers and get all the events and will be able to do client operation
254+
clientVM.invoke(() -> {
255+
Region<Object, Object> clientRegion = ClusterStartupRule.getClientCache().getRegion("region");
256+
await().untilAsserted(() -> assertThat(myListener.keys).hasSize(100));
257+
clientRegion.put("key100", "value100");
258+
});
259+
260+
// user1 should not be used to put any keys to the region
261+
assertThat(getSecurityManager().getAuthorizedOps().get("user1"))
262+
.containsExactly("DATA:READ:region");
263+
assertThat(getSecurityManager().getUnAuthorizedOps().get("user1"))
264+
.containsExactly("DATA:READ:region:key0");
265+
}
266+
267+
268+
@Test
269+
public void registeredInterest_slowReAuth_policyNone_nonDurableClient()
270+
throws Exception {
271+
int serverPort = server.getPort();
272+
clientVM = cluster.startClientVM(0,
273+
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
274+
.withPoolSubscription(true)
275+
.withServerConnection(serverPort));
276+
277+
278+
clientVM.invoke(() -> {
279+
UpdatableUserAuthInitialize.setUser("user1");
280+
ClientCache clientCache = ClusterStartupRule.getClientCache();
281+
Region<Object, Object> region = clientCache
282+
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
283+
284+
// use InterestResultPolicy.NONE to make sure the old queue is still around
285+
region.registerInterestForAllKeys(InterestResultPolicy.NONE);
286+
UpdatableUserAuthInitialize.setUser("user11");
287+
// wait for time longer than server's max time to wait to ree-authenticate
288+
UpdatableUserAuthInitialize.setWaitTime(6000);
289+
});
290+
291+
getSecurityManager().addExpiredUser("user1");
292+
Region<Object, Object> region = server.getCache().getRegion("/region");
293+
IntStream.range(0, 100).forEach(i -> region.put("key" + i, "value" + i));
294+
295+
// client will recover but there will be message loss
296+
clientVM.invoke(() -> {
297+
Region<Object, Object> clientRegion = ClusterStartupRule.getClientCache().getRegion("region");
298+
await().during(10, TimeUnit.SECONDS).untilAsserted(
299+
() -> assertThat(clientRegion.keySet()).hasSizeLessThan(100));
300+
clientRegion.put("key100", "value100");
301+
});
302+
303+
// user1 should not be used to put any keys to the region
304+
assertThat(getSecurityManager().getAuthorizedOps().get("user1"))
305+
.containsExactly("DATA:READ:region");
306+
assertThat(getSecurityManager().getAuthorizedOps().get("user11"))
307+
.contains("DATA:WRITE:region:key100");
308+
assertThat(getSecurityManager().getUnAuthorizedOps().get("user1"))
309+
.containsExactly("DATA:READ:region:key0");
310+
}
311+
312+
private void startClientWithCQ() throws Exception {
313+
int serverPort = server.getPort();
314+
clientVM = cluster.startClientVM(0,
315+
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
316+
.withCacheSetup(
317+
ccf -> ccf.setPoolSubscriptionRedundancy(2).setPoolSubscriptionEnabled(true))
318+
.withServerConnection(serverPort));
319+
320+
clientVM.invoke(() -> {
321+
UpdatableUserAuthInitialize.setUser("user1");
322+
CQLISTENER0 = createAndExecuteCQ(ClusterStartupRule.getClientCache().getQueryService(), "CQ1",
323+
"select * from /region");
324+
});
325+
}
326+
327+
private ExpirableSecurityManager getSecurityManager() {
328+
return (ExpirableSecurityManager) server.getCache().getSecurityService().getSecurityManager();
329+
}
330+
331+
332+
}
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/*
2+
*
23
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
34
* agreements. See the NOTICE file distributed with this work for additional information regarding
45
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
@@ -11,17 +12,18 @@
1112
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
1213
* or implied. See the License for the specific language governing permissions and limitations under
1314
* the License.
15+
*
1416
*/
1517
package org.apache.geode.security;
1618

1719
import static org.apache.geode.cache.execute.FunctionService.onRegion;
1820
import static org.apache.geode.cache.execute.FunctionService.onServer;
1921
import static org.apache.geode.cache.execute.FunctionService.onServers;
22+
import static org.apache.geode.cache.query.dunit.SecurityTestUtils.collectSecurityManagers;
23+
import static org.apache.geode.cache.query.dunit.SecurityTestUtils.getSecurityManager;
2024
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
2125
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_MANAGER;
2226
import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
23-
import static org.apache.geode.security.ClientAuthenticationTestUtils.collectSecurityManagers;
24-
import static org.apache.geode.security.ClientAuthenticationTestUtils.getSecurityManager;
2527
import static org.apache.geode.security.SecurityManager.PASSWORD;
2628
import static org.apache.geode.security.SecurityManager.USER_NAME;
2729
import static org.apache.geode.test.version.VersionManager.CURRENT_VERSION;

0 commit comments

Comments
 (0)