Skip to content

Commit 1236db9

Browse files
committed
XCOMMONS-3451: Realtime editing doesn't support clustering
* Remove leftovers from a previous implementation which did not really made sense anymore (mainly around the concept of "connected user") * Add the concept of Remote and Local users * Reusing the websocket session id as local user id * Make modifications go through events (even locally) so that other cluster members know about them
1 parent 1a68ec1 commit 1236db9

21 files changed

+1297
-373
lines changed

xwiki-commons-core/xwiki-commons-netflux/pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
<properties>
3535
<!-- Name to display by the Extension Manager -->
3636
<xwiki.extension.name>Netflux Server</xwiki.extension.name>
37-
<xwiki.jacoco.instructionRatio>0.92</xwiki.jacoco.instructionRatio>
37+
<xwiki.jacoco.instructionRatio>0.91</xwiki.jacoco.instructionRatio>
3838
</properties>
3939
<dependencies>
4040
<dependency>
@@ -59,12 +59,19 @@
5959
<groupId>com.fasterxml.jackson.core</groupId>
6060
<artifactId>jackson-databind</artifactId>
6161
</dependency>
62+
6263
<!-- Test dependencies -->
6364
<dependency>
6465
<groupId>${project.groupId}</groupId>
6566
<artifactId>xwiki-commons-tool-test-component</artifactId>
6667
<version>${project.version}</version>
6768
<scope>test</scope>
6869
</dependency>
70+
<dependency>
71+
<groupId>${project.groupId}</groupId>
72+
<artifactId>xwiki-commons-observation-local</artifactId>
73+
<version>${project.version}</version>
74+
<scope>test</scope>
75+
</dependency>
6976
</dependencies>
7077
</project>

xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Bot.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,10 @@ default boolean onJoinChannel(Channel channel)
5656
*
5757
* @param sender the user who sent the message
5858
* @param message the message that was sent
59+
* @since 17.10.1
60+
* @since 18.0.0RC1
5961
*/
60-
default void onUserMessage(User sender, List<Object> message)
62+
default void onUserMessage(LocalUser sender, List<Object> message)
6163
{
6264
// Do nothing by default
6365
}

xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Channel.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Deque;
2323
import java.util.LinkedHashMap;
2424
import java.util.LinkedList;
25-
import java.util.List;
2625
import java.util.Map;
2726

2827
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -88,14 +87,6 @@ public Map<String, Bot> getBots()
8887
return this.bots;
8988
}
9089

91-
/**
92-
* @return the list of users that are currently connected to this channel
93-
*/
94-
public List<User> getConnectedUsers()
95-
{
96-
return this.users.values().stream().filter(user -> user.getSession() != null && user.isConnected()).toList();
97-
}
98-
9990
/**
10091
* @return the channel messages
10192
*/

xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/ChannelStore.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,23 @@ public class ChannelStore
5454
*/
5555
public Channel create()
5656
{
57-
Channel channel = new Channel(this.idGenerator.generateChannelId());
57+
return create(this.idGenerator.generateChannelId());
58+
}
59+
60+
/**
61+
* Creates a new channel with a passed key.
62+
*
63+
* @param channelKey the identifier of the new channel
64+
* @return the new channel
65+
* @since 17.10.1
66+
* @since 18.0.0RC1
67+
*/
68+
public Channel create(String channelKey)
69+
{
70+
Channel channel = new Channel(channelKey);
5871
askBotsToJoin(channel);
5972
this.channelByKey.put(channel.getKey(), channel);
73+
6074
return channel;
6175
}
6276

@@ -85,6 +99,26 @@ public Channel get(String key)
8599
return this.channelByKey.get(key);
86100
}
87101

102+
/**
103+
* Access an existing channel by its key.
104+
*
105+
* @param key the channel key
106+
* @param create if true, create the channel when it does not exist
107+
* @return the corresponding channel
108+
* @since 17.10.1
109+
* @since 18.0.0RC1
110+
*/
111+
public Channel get(String key, boolean create)
112+
{
113+
Channel channel = get(key);
114+
115+
if (channel == null) {
116+
channel = create(key);
117+
}
118+
119+
return channel;
120+
}
121+
88122
/**
89123
* Remove a channel from memory.
90124
*
@@ -106,8 +140,7 @@ public void prune()
106140
try {
107141
long currentTime = System.currentTimeMillis();
108142
for (Channel channel : this.channelByKey.values()) {
109-
if (channel.getConnectedUsers().isEmpty()
110-
&& (currentTime - channel.getCreationDate()) > (1000 * 60 * 60 * 2)) {
143+
if (channel.getUsers().isEmpty() && (currentTime - channel.getCreationDate()) > (1000 * 60 * 60 * 2)) {
111144
remove(channel);
112145
}
113146
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import jakarta.inject.Singleton;
23+
import jakarta.websocket.Session;
24+
25+
import org.xwiki.component.annotation.Component;
26+
27+
/**
28+
* Default implementation of {@link LocalUserFactory}.
29+
* <p>
30+
* Use the session id as user id.
31+
*
32+
* @version $Id$
33+
* @since 17.10.1
34+
* @since 18.0.0RC1
35+
*/
36+
@Component
37+
@Singleton
38+
public class DefaultLocalUserFactory implements LocalUserFactory
39+
{
40+
@Override
41+
public LocalUser createLocalUser(Session session)
42+
{
43+
return new LocalUser(session, getId(session));
44+
}
45+
46+
protected String getId(Session session)
47+
{
48+
return session.getId();
49+
}
50+
}

xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/HistoryKeeper.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
import org.slf4j.Logger;
3232
import org.xwiki.component.annotation.Component;
33+
import org.xwiki.netflux.internal.event.NetfluxMessageUserEvent;
34+
import org.xwiki.observation.ObservationManager;
3335

3436
/**
3537
* Holds the key of the history keeper fake user that is added to all Netflux channels.
@@ -51,6 +53,9 @@ public class HistoryKeeper extends AbstractBot
5153
@Inject
5254
private MessageBuilder messageBuilder;
5355

56+
@Inject
57+
private ObservationManager observation;
58+
5459
@Override
5560
public String getId()
5661
{
@@ -60,7 +65,7 @@ public String getId()
6065
}
6166

6267
@Override
63-
public void onUserMessage(User sender, List<Object> message)
68+
public void onUserMessage(LocalUser sender, List<Object> message)
6469
{
6570
// The history keeper responds only to GET_HISTORY messages.
6671

@@ -99,7 +104,7 @@ private void sendChannelHistory(User user, String channelKey)
99104

100105
try {
101106
for (String msg : (Iterable<String>) messages::iterator) {
102-
user.getSession().getBasicRemote().sendText(msg);
107+
this.observation.notify(new NetfluxMessageUserEvent(user.getName(), msg), null);
103108
}
104109
} catch (Exception e) {
105110
this.logger.debug("Failed to send channel history.", e);
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import jakarta.websocket.Session;
23+
24+
/**
25+
* A user accessing the current instance.
26+
*
27+
* @version $Id$
28+
* @since 17.10.1
29+
* @since 18.0.0RC1
30+
*/
31+
public class LocalUser extends User
32+
{
33+
private final Session session;
34+
35+
/**
36+
* Creates a new user with the specified name, using the given WebSocket session.
37+
*
38+
* @param session the WebSocket session used to communicate with the user
39+
* @param name the identifier of the user
40+
*/
41+
public LocalUser(Session session, String name)
42+
{
43+
super(name);
44+
45+
this.session = session;
46+
}
47+
48+
/**
49+
* @return the WebSocket session
50+
*/
51+
public Session getSession()
52+
{
53+
return this.session;
54+
}
55+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import jakarta.websocket.Session;
23+
24+
import org.xwiki.component.annotation.Role;
25+
26+
/**
27+
* Component in charge of generating a new local user for a given session.
28+
*
29+
* @version $Id$
30+
* @since 17.10.1
31+
* @since 18.0.0RC1
32+
*/
33+
@Role
34+
public interface LocalUserFactory
35+
{
36+
/**
37+
* @param session the WebSocket session
38+
* @return the new instance of {@link LocalUser}
39+
*/
40+
LocalUser createLocalUser(Session session);
41+
}

0 commit comments

Comments
 (0)