Skip to content

Commit 88b6134

Browse files
committed
XCOMMONS-3451: Realtime editing doesn't support clustering
1 parent f330069 commit 88b6134

File tree

20 files changed

+1121
-75
lines changed

20 files changed

+1121
-75
lines changed

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/pom.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
<properties>
3535
<!-- Name to display by the Extension Manager -->
3636
<xwiki.extension.name>Netflux API</xwiki.extension.name>
37-
<xwiki.jacoco.instructionRatio>0.91</xwiki.jacoco.instructionRatio>
37+
<xwiki.jacoco.instructionRatio>0.90</xwiki.jacoco.instructionRatio>
3838
<!-- Replaces the old realtime backend from XWiki Contrib -->
3939
<xwiki.extension.features>
4040
org.xwiki.contrib:xwiki-contrib-rtbackend,
@@ -89,5 +89,17 @@
8989
<artifactId>jakarta.websocket-api</artifactId>
9090
<scope>test</scope>
9191
</dependency>
92+
<dependency>
93+
<groupId>org.xwiki.commons</groupId>
94+
<artifactId>xwiki-commons-observation-local</artifactId>
95+
<version>${commons.version}</version>
96+
<scope>test</scope>
97+
</dependency>
98+
<dependency>
99+
<groupId>org.xwiki.platform</groupId>
100+
<artifactId>xwiki-platform-user-default</artifactId>
101+
<version>${project.version}</version>
102+
<scope>test</scope>
103+
</dependency>
92104
</dependencies>
93105
</project>

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/src/main/java/org/xwiki/netflux/internal/DefaultEntityChannelStore.java

Lines changed: 4 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,8 @@
1919
*/
2020
package org.xwiki.netflux.internal;
2121

22-
import java.util.Collections;
2322
import java.util.List;
24-
import java.util.Map;
25-
import java.util.Objects;
2623
import java.util.Optional;
27-
import java.util.concurrent.ConcurrentHashMap;
28-
import java.util.concurrent.CopyOnWriteArrayList;
29-
import java.util.stream.Collectors;
3024

3125
import jakarta.inject.Inject;
3226
import jakarta.inject.Singleton;
@@ -47,64 +41,23 @@
4741
public class DefaultEntityChannelStore implements EntityChannelStore
4842
{
4943
@Inject
50-
private ChannelStore channelStore;
51-
52-
private final Map<EntityReference, List<EntityChannel>> entityChannels = new ConcurrentHashMap<>();
44+
private InternalEntityChannelStore store;
5345

5446
@Override
5547
public List<EntityChannel> getChannels(EntityReference entityReference)
5648
{
57-
List<EntityChannel> channels = this.entityChannels.get(entityReference);
58-
if (channels != null) {
59-
this.channelStore.prune();
60-
List<EntityChannel> availableChannels = channels.stream().filter(this::hasRawChannel)
61-
.collect(Collectors.toCollection(CopyOnWriteArrayList::new));
62-
if (availableChannels.isEmpty()) {
63-
this.entityChannels.remove(entityReference);
64-
} else {
65-
this.entityChannels.put(entityReference, availableChannels);
66-
return Collections.unmodifiableList(availableChannels);
67-
}
68-
}
69-
return Collections.emptyList();
49+
return store.getChannels(entityReference);
7050
}
7151

7252
@Override
7353
public synchronized EntityChannel createChannel(EntityReference entityReference, List<String> path)
7454
{
75-
Optional<EntityChannel> existingChannel = getChannel(entityReference, path);
76-
if (existingChannel.isPresent()) {
77-
// Return existing channel.
78-
return existingChannel.get();
79-
}
80-
81-
// Create new channel.
82-
EntityChannel channel = new EntityChannel(entityReference, path, this.channelStore.create().getKey());
83-
List<EntityChannel> channels =
84-
this.entityChannels.computeIfAbsent(entityReference, key -> new CopyOnWriteArrayList<>());
85-
channels.add(channel);
86-
87-
// Ask again the bots to join the channel now that we have an entity channel.
88-
this.channelStore.askBotsToJoin(this.channelStore.get(channel.getKey()));
89-
90-
return channel;
55+
return store.createChannel(entityReference, path);
9156
}
9257

9358
@Override
9459
public Optional<EntityChannel> getChannel(String key)
9560
{
96-
return this.entityChannels.values().stream().flatMap(List::stream)
97-
.filter(channel -> Objects.equals(channel.getKey(), key)).filter(this::hasRawChannel).findFirst();
98-
}
99-
100-
private boolean hasRawChannel(EntityChannel channel)
101-
{
102-
Channel rawChannel = this.channelStore.get(channel.getKey());
103-
if (rawChannel != null) {
104-
channel.setUserCount(rawChannel.getConnectedUsers().size());
105-
return true;
106-
} else {
107-
return false;
108-
}
61+
return store.getChannel(key);
10962
}
11063
}

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/src/main/java/org/xwiki/netflux/internal/EntityChange.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public enum ScriptLevel
7171
/**
7272
* The timestamp of the change.
7373
*/
74-
private final long timestamp = new Date().getTime();
74+
private final long timestamp;
7575

7676
/**
7777
* Creates a new entity change instance.
@@ -81,10 +81,26 @@ public enum ScriptLevel
8181
* @param scriptLevel the script level of the author of the change relative to the changed entity
8282
*/
8383
public EntityChange(EntityReference entityReference, UserReference author, ScriptLevel scriptLevel)
84+
{
85+
this(entityReference, author, scriptLevel, new Date().getTime());
86+
}
87+
88+
/**
89+
* Creates a new entity change instance.
90+
*
91+
* @param entityReference the changed entity
92+
* @param author the author of the change
93+
* @param scriptLevel the script level of the author of the change relative to the changed entity
94+
* @param timestamp the timestamp of the change
95+
* @since 17.10.1
96+
* @since 18.0.0RC1
97+
*/
98+
public EntityChange(EntityReference entityReference, UserReference author, ScriptLevel scriptLevel, long timestamp)
8499
{
85100
this.entityReference = entityReference;
86101
this.author = author;
87102
this.scriptLevel = scriptLevel;
103+
this.timestamp = timestamp;
88104
}
89105

90106
/**
@@ -111,6 +127,16 @@ public ScriptLevel getScriptLevel()
111127
return scriptLevel;
112128
}
113129

130+
/**
131+
* @return the timestamp of the change
132+
* @since 17.10.1
133+
* @since 18.0.0RC1
134+
*/
135+
public long getTimestamp()
136+
{
137+
return timestamp;
138+
}
139+
114140
@Override
115141
public int hashCode()
116142
{

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/src/main/java/org/xwiki/netflux/internal/EntityChannelScriptAuthorBot.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ public void onChannelMessage(Channel channel, User sender, String messageType, S
8484
{
8585
// We're interested only in messages that have content (we want to ignore for instance join, leave or ping
8686
// messages).
87-
if (MessageBuilder.COMMAND_MSG.equals(messageType)) {
87+
if (MessageBuilder.COMMAND_MSG.equals(messageType) && sender instanceof LocalUser localSender) {
8888
this.entityChannels.getChannel(channel.getKey())
89-
.ifPresent(entityChannel -> this.webSocketContext.run(sender.getSession(), () -> {
89+
.ifPresent(entityChannel -> this.webSocketContext.run(localSender.getSession(), () -> {
9090
UserReference senderUserReference = this.currentUserResolver.resolve(CurrentUserReference.INSTANCE);
9191
this.scriptAuthorTracker.maybeUpdateScriptAuthor(entityChannel, senderUserReference);
9292
}));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import javax.inject.Named;
23+
import javax.inject.Singleton;
24+
25+
import jakarta.inject.Inject;
26+
27+
import org.xwiki.component.annotation.Component;
28+
import org.xwiki.netflux.EntityChannel;
29+
import org.xwiki.netflux.internal.event.EntityChannelScriptAuthorChangeEvent;
30+
import org.xwiki.observation.AbstractEventListener;
31+
import org.xwiki.observation.event.Event;
32+
33+
/**
34+
* Update the script author for a specific {@link EntityChannel}.
35+
*
36+
* @version $Id$
37+
* @since 17.10.0RC1
38+
*/
39+
@Component
40+
@Named(EntityChannelScriptAuthorListener.NAME)
41+
@Singleton
42+
public class EntityChannelScriptAuthorListener extends AbstractEventListener
43+
{
44+
/**
45+
* The name of this event listener (and its component hint at the same time).
46+
*/
47+
public static final String NAME = "org.xwiki.netflux.internal.EntityChannelScriptAuthorListener";
48+
49+
@Inject
50+
private EntityChannelScriptAuthorTracker tracker;
51+
52+
/**
53+
* Setup the listener.
54+
*/
55+
public EntityChannelScriptAuthorListener()
56+
{
57+
super(NAME, new EntityChannelScriptAuthorChangeEvent());
58+
}
59+
60+
@Override
61+
public void onEvent(Event event, Object source, Object data)
62+
{
63+
if (event instanceof EntityChannelScriptAuthorChangeEvent changedEvent) {
64+
EntityChange change = (EntityChange) source;
65+
66+
this.tracker.setScriptAuthor(changedEvent.getChannel(), change);
67+
}
68+
}
69+
}

xwiki-platform-core/xwiki-platform-netflux/xwiki-platform-netflux-api/src/main/java/org/xwiki/netflux/internal/EntityChannelScriptAuthorTracker.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.xwiki.netflux.EntityChannel;
3737
import org.xwiki.netflux.EntityChannelStore;
3838
import org.xwiki.netflux.internal.EntityChange.ScriptLevel;
39+
import org.xwiki.netflux.internal.event.EntityChannelScriptAuthorChangeEvent;
40+
import org.xwiki.observation.ObservationManager;
3941
import org.xwiki.security.authorization.DocumentAuthorizationManager;
4042
import org.xwiki.security.authorization.Right;
4143
import org.xwiki.user.CurrentUserReference;
@@ -76,6 +78,9 @@ public class EntityChannelScriptAuthorTracker
7678
@Named("explicit")
7779
private EntityReferenceResolver<String> explicitEntityReferenceResolver;
7880

81+
@Inject
82+
private ObservationManager observation;
83+
7984
private final Map<String, EntityChange> scriptAuthors = new ConcurrentHashMap<>();
8085

8186
/**
@@ -111,6 +116,17 @@ public Optional<EntityChange> getScriptAuthor(String channelId)
111116
return returnValue;
112117
}
113118

119+
/**
120+
* @since 17.10.0
121+
*/
122+
void setScriptAuthor(String entityChannel, EntityChange scriptAuthorChange)
123+
{
124+
this.scriptAuthors.put(entityChannel, scriptAuthorChange);
125+
126+
this.logger.debug("Updated the script author associated with the entity channel [{}] to [{}].", entityChannel,
127+
scriptAuthorChange);
128+
}
129+
114130
void maybeUpdateScriptAuthor(EntityChannel entityChannel, UserReference scriptAuthor)
115131
{
116132
EntityChange channelScriptAuthor = this.scriptAuthors.get(entityChannel.getKey());
@@ -120,9 +136,10 @@ void maybeUpdateScriptAuthor(EntityChannel entityChannel, UserReference scriptAu
120136
// need to update the channel script level in order to prevent privilege escalation.
121137
EntityChange scriptAuthorChange =
122138
new EntityChange(entityChannel.getEntityReference(), scriptAuthor, userScriptLevel);
123-
this.scriptAuthors.put(entityChannel.getKey(), scriptAuthorChange);
124-
this.logger.debug("Updated the script author associated with the entity channel [{}] to [{}].",
125-
entityChannel, scriptAuthorChange);
139+
140+
// Modify the script author mapping through a listener to better cover clustering use case
141+
this.observation.notify(new EntityChannelScriptAuthorChangeEvent(entityChannel.getKey()),
142+
scriptAuthorChange);
126143
}
127144
}
128145

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.CopyOnWriteArrayList;
26+
27+
import jakarta.inject.Singleton;
28+
29+
import org.xwiki.component.annotation.Component;
30+
import org.xwiki.model.reference.EntityReference;
31+
import org.xwiki.netflux.EntityChannel;
32+
33+
/**
34+
* The channels.
35+
*
36+
* @version $Id$
37+
* @since 17.10.1
38+
*/
39+
@Component(roles = EntityChannels.class)
40+
@Singleton
41+
public class EntityChannels
42+
{
43+
private final Map<EntityReference, List<EntityChannel>> channels = new ConcurrentHashMap<>();
44+
45+
/**
46+
* @param entityReference the reference of the entity
47+
* @return the channels associated with the entity
48+
*/
49+
public List<EntityChannel> get(EntityReference entityReference)
50+
{
51+
return this.channels.get(entityReference);
52+
}
53+
54+
/**
55+
* @param entityReference the reference of the entity
56+
* @return the previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}
57+
*/
58+
public List<EntityChannel> remove(EntityReference entityReference)
59+
{
60+
return this.channels.remove(entityReference);
61+
}
62+
63+
/**
64+
* @param entityReference the reference of the entity
65+
* @param availableChannels the channels associated with the entity
66+
* @return the previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}.
67+
* (A {@code null} return can also indicate that the map previously associated {@code null} with
68+
* {@code key}, if the implementation supports {@code null} values.)
69+
*/
70+
public List<EntityChannel> put(EntityReference entityReference, List<EntityChannel> availableChannels)
71+
{
72+
return this.channels.put(entityReference, availableChannels);
73+
}
74+
75+
/**
76+
* Associate a new channel to the specified entity, having the given path.
77+
*
78+
* @param channel the channel to add
79+
*/
80+
public void addChannel(EntityChannel channel)
81+
{
82+
List<EntityChannel> entityChannels =
83+
this.channels.computeIfAbsent(channel.getEntityReference(), key -> new CopyOnWriteArrayList<>());
84+
entityChannels.add(channel);
85+
}
86+
87+
/**
88+
* @return the channels
89+
*/
90+
public Map<EntityReference, List<EntityChannel>> getChannels()
91+
{
92+
return this.channels;
93+
}
94+
}

0 commit comments

Comments
 (0)