Skip to content

Commit 8fa31ce

Browse files
committed
XCOMMONS-3451: Realtime editing doesn't support clustering
1 parent 889ef18 commit 8fa31ce

File tree

13 files changed

+369
-21
lines changed

13 files changed

+369
-21
lines changed

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/src/main/java/org/xwiki/netflux/internal/DefaultEntityChannelStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private boolean hasRawChannel(EntityChannel channel)
101101
{
102102
Channel rawChannel = this.channelStore.get(channel.getKey());
103103
if (rawChannel != null) {
104-
channel.setUserCount(rawChannel.getConnectedUsers().size());
104+
channel.setUserCount(rawChannel.getUsers().size());
105105
return true;
106106
} else {
107107
return false;

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/src/main/java/org/xwiki/netflux/internal/EntityChannelScriptAuthorBot.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,19 @@ public void onChannelMessage(Channel channel, User sender, String messageType, S
8686
// messages).
8787
if (MessageBuilder.COMMAND_MSG.equals(messageType)) {
8888
this.entityChannels.getChannel(channel.getKey())
89-
.ifPresent(entityChannel -> this.webSocketContext.run(sender.getSession(), () -> {
90-
UserReference senderUserReference = this.currentUserResolver.resolve(CurrentUserReference.INSTANCE);
91-
this.scriptAuthorTracker.maybeUpdateScriptAuthor(entityChannel, senderUserReference);
92-
}));
89+
.ifPresent(entityChannel -> maybeUpdateScriptAuthor(sender, entityChannel));
90+
}
91+
}
92+
93+
private void maybeUpdateScriptAuthor(User sender, EntityChannel entityChannel)
94+
{
95+
if (sender instanceof LocalUser localUser) {
96+
this.webSocketContext.run(localUser.getSession(), () -> {
97+
UserReference senderUserReference = this.currentUserResolver.resolve(CurrentUserReference.INSTANCE);
98+
this.scriptAuthorTracker.maybeUpdateScriptAuthor(entityChannel, senderUserReference);
99+
});
100+
} else if (sender instanceof RemoteUser remoteUser) {
101+
this.scriptAuthorTracker.maybeUpdateScriptAuthor(entityChannel, remoteUser.getUserReference());
93102
}
94103
}
95104

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import jakarta.inject.Inject;
23+
import jakarta.inject.Singleton;
24+
import jakarta.websocket.Session;
25+
26+
import org.xwiki.component.annotation.Component;
27+
import org.xwiki.observation.remote.RemoteObservationManagerConfiguration;
28+
29+
/**
30+
* An implementation of {@link LocalUserFactory} which avoid collision in a cluster use case.
31+
*
32+
* @version $Id$
33+
* @since 17.10.0RC1
34+
*/
35+
@Component
36+
@Singleton
37+
public class RemoteLocalUserFactory extends DefaultLocalUserFactory
38+
{
39+
@Inject
40+
private RemoteObservationManagerConfiguration remoteConfiguation;
41+
42+
@Override
43+
protected String getId(Session session)
44+
{
45+
return this.remoteConfiguation.getId() + '-' + super.getId(session);
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import org.xwiki.user.UserReference;
23+
24+
/**
25+
* A user accessing the current instance.
26+
*
27+
* @version $Id$
28+
* @since 17.10.0RC1
29+
*/
30+
public class RemoteUser extends User
31+
{
32+
private final String instance;
33+
34+
private final UserReference userReference;
35+
36+
/**
37+
* Creates a new user with the specified name, using the given WebSocket session.
38+
*
39+
* @param name the identifier of the user
40+
* @param instance the identifier of the instance on which the user is connected
41+
* @param userReference the reference of the user
42+
*/
43+
public RemoteUser(String name, String instance, UserReference userReference)
44+
{
45+
super(name);
46+
47+
this.instance = instance;
48+
this.userReference = userReference;
49+
}
50+
51+
/**
52+
* @return the identifier of the instance on which the user is connected
53+
*/
54+
public String getInstance()
55+
{
56+
return this.instance;
57+
}
58+
59+
/**
60+
* @return the userReference
61+
*/
62+
public UserReference getUserReference()
63+
{
64+
return this.userReference;
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal.event;
21+
22+
import java.io.Serializable;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import jakarta.inject.Inject;
27+
28+
import org.xwiki.netflux.internal.RemoteUser;
29+
import org.xwiki.observation.event.Event;
30+
import org.xwiki.observation.remote.LocalEventData;
31+
import org.xwiki.observation.remote.RemoteEventData;
32+
import org.xwiki.observation.remote.RemoteObservationManagerConfiguration;
33+
import org.xwiki.observation.remote.converter.AbstractEventConverter;
34+
import org.xwiki.user.UserReference;
35+
import org.xwiki.user.UserReferenceResolver;
36+
import org.xwiki.user.UserReferenceSerializer;
37+
38+
/**
39+
* Component in charge of adapting local Netflux event to remote instances.
40+
*
41+
* @version $Id$
42+
* @since 17.10.0RC1
43+
*/
44+
public class RemoteCommandEventConverter extends AbstractEventConverter
45+
{
46+
private static final String PROP_INSTANCE = "instance";
47+
48+
private static final String PROP_ID = "id";
49+
50+
private static final String PROP_USERREFERENCE = "userreference";
51+
52+
@Inject
53+
private RemoteObservationManagerConfiguration configuration;
54+
55+
@Inject
56+
private UserReferenceSerializer<String> userSerializer;
57+
58+
@Inject
59+
private UserReferenceResolver<String> userResolver;
60+
61+
@Override
62+
public boolean toRemote(LocalEventData localEvent, RemoteEventData remoteEvent)
63+
{
64+
if (localEvent.getEvent() instanceof NetfluxUserJoinedEvent joinEvent) {
65+
Map<String, String> user = new HashMap<>();
66+
user.put(PROP_ID, joinEvent.getUser());
67+
user.put(PROP_INSTANCE, this.configuration.getId());
68+
user.put(PROP_USERREFERENCE, serializeUserReference(sessionUser));
69+
70+
remoteEvent.setEvent(joinEvent);
71+
remoteEvent.setSource((Serializable) user);
72+
73+
return true;
74+
}
75+
76+
return false;
77+
}
78+
79+
@Override
80+
public boolean fromRemote(RemoteEventData remoteEvent, LocalEventData localEvent)
81+
{
82+
Map<String, String> user = (Map<String, String>) remoteEvent.getSource();
83+
84+
localEvent.setEvent((Event) remoteEvent.getEvent());
85+
localEvent.setSource(new RemoteUser(user.get(PROP_ID), user.get(PROP_INSTANCE),
86+
unzerializeUserReference(user.get(PROP_USERREFERENCE))));
87+
88+
return false;
89+
}
90+
91+
private UserReference unzerializeUserReference(String reference)
92+
{
93+
return this.userResolver.resolve(reference);
94+
}
95+
96+
private String serializeUserReference(UserReference userReference)
97+
{
98+
return this.userSerializer.serialize(userReference);
99+
}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal.event;
21+
22+
import jakarta.inject.Named;
23+
import jakarta.inject.Singleton;
24+
25+
import org.xwiki.component.annotation.Component;
26+
27+
/**
28+
* Extend {@link CommandListener} to add clustering related improvements.
29+
*
30+
* @version $Id$
31+
* @since 17.10.0RC1
32+
*/
33+
@Component
34+
@Named(CommandListener.NAME)
35+
@Singleton
36+
public class RemoteCommandListener extends CommandListener
37+
{
38+
@Override
39+
protected void userMessage(NetfluxMessageUserEvent event)
40+
{
41+
// TODO: To limit conflict, make all NetfluxMessageUserEvent events go through the cluster leader
42+
}
43+
}

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/src/main/resources/META-INF/components.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ org.xwiki.netflux.internal.DefaultEntityChannelStore
22
org.xwiki.netflux.internal.EffectiveAuthorSetterListener
33
org.xwiki.netflux.internal.EntityChannelScriptAuthorBot
44
org.xwiki.netflux.internal.EntityChannelScriptAuthorTracker
5+
org.xwiki.netflux.internal.XWikiLocalUserFactory
56
org.xwiki.netflux.script.NetfluxScriptService

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/src/test/java/org/xwiki/netflux/internal/DefaultEntityChannelStoreTest.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import jakarta.websocket.Session;
2525

2626
import org.junit.jupiter.api.Test;
27-
import org.mockito.Mock;
2827
import org.xwiki.model.reference.WikiReference;
2928
import org.xwiki.netflux.EntityChannel;
3029
import org.xwiki.test.junit5.mockito.ComponentTest;
@@ -34,6 +33,7 @@
3433
import static org.junit.jupiter.api.Assertions.assertEquals;
3534
import static org.junit.jupiter.api.Assertions.assertFalse;
3635
import static org.junit.jupiter.api.Assertions.assertSame;
36+
import static org.mockito.Mockito.mock;
3737
import static org.mockito.Mockito.when;
3838

3939
/**
@@ -50,9 +50,6 @@ class DefaultEntityChannelStoreTest
5050
@MockComponent
5151
private ChannelStore channelStore;
5252

53-
@Mock
54-
private Session session;
55-
5653
private final WikiReference entityReference = new WikiReference("test");
5754

5855
@Test
@@ -77,18 +74,14 @@ void createAndGetChannel()
7774
assertSame(entityChannel, this.entityChannelStore.createChannel(this.entityReference, path));
7875

7976
// Add an user to the channel.
80-
User me = new User(this.session, "mflorea");
77+
User me = new LocalUser(mock(Session.class), "mflorea");
8178
channel.getUsers().put(me.getName(), me);
8279

8380
// Get should return the existing channel.
8481
assertSame(entityChannel, this.entityChannelStore.getChannel(this.entityReference, path).get());
8582
assertSame(entityChannel, this.entityChannelStore.getChannel(channel.getKey()).get());
8683
assertEquals(1, entityChannel.getUserCount());
8784

88-
// Disconnect the user and check again the user count.
89-
me.setConnected(false);
90-
assertEquals(0, this.entityChannelStore.getChannel(this.entityReference, path).get().getUserCount());
91-
9285
// Disconnect the raw channel and check the entity channel.
9386
when(this.channelStore.get(channel.getKey())).thenReturn(null);
9487
assertFalse(this.entityChannelStore.getChannel(this.entityReference, path).isPresent());

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/src/test/java/org/xwiki/netflux/internal/EntityChannelScriptAuthorBotTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ void onJoinChannel()
105105
@Test
106106
void onChannelMessage()
107107
{
108-
User sender = mock(User.class);
108+
LocalUser sender = mock(LocalUser.class);
109109
Session session = mock(Session.class);
110110
when(sender.getSession()).thenReturn(session);
111111

0 commit comments

Comments
 (0)