diff --git a/xwiki-commons-core/xwiki-commons-netflux/pom.xml b/xwiki-commons-core/xwiki-commons-netflux/pom.xml index 8b87bd81cc..fb9f60e676 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/pom.xml +++ b/xwiki-commons-core/xwiki-commons-netflux/pom.xml @@ -34,7 +34,7 @@ Netflux Server - 0.92 + 0.91 @@ -59,6 +59,7 @@ com.fasterxml.jackson.core jackson-databind + ${project.groupId} @@ -66,5 +67,11 @@ ${project.version} test + + ${project.groupId} + xwiki-commons-observation-local + ${project.version} + test + diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Bot.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Bot.java index 0d26c07e2f..3dcd0b66dd 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Bot.java +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Bot.java @@ -57,7 +57,7 @@ default boolean onJoinChannel(Channel channel) * @param sender the user who sent the message * @param message the message that was sent */ - default void onUserMessage(User sender, List message) + default void onUserMessage(LocalUser sender, List message) { // Do nothing by default } diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Channel.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Channel.java index 4fc92d72e1..254993a9f1 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Channel.java +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Channel.java @@ -22,7 +22,6 @@ import java.util.Deque; import java.util.LinkedHashMap; import java.util.LinkedList; -import java.util.List; import java.util.Map; import org.apache.commons.lang3.builder.EqualsBuilder; @@ -88,14 +87,6 @@ public Map getBots() return this.bots; } - /** - * @return the list of users that are currently connected to this channel - */ - public List getConnectedUsers() - { - return this.users.values().stream().filter(user -> user.getSession() != null && user.isConnected()).toList(); - } - /** * @return the channel messages */ diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/ChannelStore.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/ChannelStore.java index 4c5ba0027f..66e0c2d5f2 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/ChannelStore.java +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/ChannelStore.java @@ -54,9 +54,22 @@ public class ChannelStore */ public Channel create() { - Channel channel = new Channel(this.idGenerator.generateChannelId()); + return create(this.idGenerator.generateChannelId()); + } + + /** + * Creates a new channel with a passed key. + * + * @param channelKey the identifier of the new channel + * @return the new channel + * @since 17.10.0RC1 + */ + public Channel create(String channelKey) + { + Channel channel = new Channel(channelKey); askBotsToJoin(channel); this.channelByKey.put(channel.getKey(), channel); + return channel; } @@ -85,6 +98,25 @@ public Channel get(String key) return this.channelByKey.get(key); } + /** + * Access an existing channel by its key. + * + * @param key the channel key + * @param create if true, create the channel when it does not exist + * @return the corresponding channel + * @since 17.10.0RC1 + */ + public Channel get(String key, boolean create) + { + Channel channel = get(key); + + if (channel == null) { + channel = create(key); + } + + return channel; + } + /** * Remove a channel from memory. * @@ -106,8 +138,7 @@ public void prune() try { long currentTime = System.currentTimeMillis(); for (Channel channel : this.channelByKey.values()) { - if (channel.getConnectedUsers().isEmpty() - && (currentTime - channel.getCreationDate()) > (1000 * 60 * 60 * 2)) { + if (channel.getUsers().isEmpty() && (currentTime - channel.getCreationDate()) > (1000 * 60 * 60 * 2)) { remove(channel); } } diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/DefaultLocalUserFactory.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/DefaultLocalUserFactory.java new file mode 100644 index 0000000000..5af5ed4c92 --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/DefaultLocalUserFactory.java @@ -0,0 +1,49 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal; + +import jakarta.inject.Singleton; +import jakarta.websocket.Session; + +import org.xwiki.component.annotation.Component; + +/** + * Default implementation of {@link LocalUserFactory}. + *

+ * Use the session id as user id. + * + * @version $Id$ + * @since 17.10.0RC1 + */ +@Component +@Singleton +public class DefaultLocalUserFactory implements LocalUserFactory +{ + @Override + public LocalUser createLocalUser(Session session) + { + return new LocalUser(session, getId(session)); + } + + protected String getId(Session session) + { + return session.getId(); + } +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/HistoryKeeper.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/HistoryKeeper.java index 7d2ee6b319..350bdc398f 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/HistoryKeeper.java +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/HistoryKeeper.java @@ -30,6 +30,8 @@ import org.slf4j.Logger; import org.xwiki.component.annotation.Component; +import org.xwiki.netflux.internal.event.NetfluxMessageUserEvent; +import org.xwiki.observation.ObservationManager; /** * Holds the key of the history keeper fake user that is added to all Netflux channels. @@ -51,6 +53,9 @@ public class HistoryKeeper extends AbstractBot @Inject private MessageBuilder messageBuilder; + @Inject + private ObservationManager observation; + @Override public String getId() { @@ -60,7 +65,7 @@ public String getId() } @Override - public void onUserMessage(User sender, List message) + public void onUserMessage(LocalUser sender, List message) { // The history keeper responds only to GET_HISTORY messages. @@ -99,7 +104,7 @@ private void sendChannelHistory(User user, String channelKey) try { for (String msg : (Iterable) messages::iterator) { - user.getSession().getBasicRemote().sendText(msg); + this.observation.notify(new NetfluxMessageUserEvent(user.getName(), msg), null); } } catch (Exception e) { this.logger.debug("Failed to send channel history.", e); diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/LocalUser.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/LocalUser.java new file mode 100644 index 0000000000..6473531830 --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/LocalUser.java @@ -0,0 +1,54 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal; + +import jakarta.websocket.Session; + +/** + * A user accessing the current instance. + * + * @version $Id$ + * @since 17.10.0RC1 + */ +public class LocalUser extends User +{ + private final Session session; + + /** + * Creates a new user with the specified name, using the given WebSocket session. + * + * @param session the WebSocket session used to communicate with the user + * @param name the identifier of the user + */ + public LocalUser(Session session, String name) + { + super(name); + + this.session = session; + } + + /** + * @return the WebSocket session + */ + public Session getSession() + { + return this.session; + } +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/LocalUserFactory.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/LocalUserFactory.java new file mode 100644 index 0000000000..c9b6658558 --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/LocalUserFactory.java @@ -0,0 +1,40 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal; + +import jakarta.websocket.Session; + +import org.xwiki.component.annotation.Role; + +/** + * Component in charge of generating a new local user for a given session. + * + * @version $Id$ + * @since 17.10.0RC1 + */ +@Role +public interface LocalUserFactory +{ + /** + * @param session the WebSocket session + * @return the new instance of {@link LocalUser} + */ + LocalUser createLocalUser(Session session); +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Netflux.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Netflux.java new file mode 100644 index 0000000000..cde3f23837 --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Netflux.java @@ -0,0 +1,414 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import jakarta.websocket.CloseReason; +import jakarta.websocket.Session; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.slf4j.Logger; +import org.xwiki.component.annotation.Component; +import org.xwiki.netflux.internal.event.NetfluxCommandChannelEvent; +import org.xwiki.netflux.internal.event.NetfluxMessageUserEvent; +import org.xwiki.netflux.internal.event.NetfluxUserJoinEvent; +import org.xwiki.netflux.internal.event.NetfluxUserLeaveEvent; +import org.xwiki.netflux.internal.event.NetfluxUserTimeOfLastMessageUpdateEvent; +import org.xwiki.observation.ObservationManager; +import org.xwiki.websocket.AbstractPartialStringMessageHandler; + +/** + * The Netflux kernel. + * + * @version $Id$ + * @since 17.10.0RC1 + */ +@Component(roles = Netflux.class) +@Singleton +public class Netflux +{ + // The client side keeps the connection alive by sending a PING message from time to time, using a timer + // (setTimeout). The browsers are slowing down timers used by inactive tabs / windows (that don't have + // the user focus). This is called timer throttling and can go up to 1 minute, which means inactive browser tabs + // won't be able to send PING messages more often than every minute. For this reason, we set the session idle + // timeout a little bit higher than the timer throttling value to make sure the WebSocket connection is not closed + // in background tabs. + // See https://developer.chrome.com/blog/timer-throttling-in-chrome-88/ + private static final long TIMEOUT_MILLISECONDS = 65000; + + private static final String NETFLUX_USER = "netflux.user"; + + private static final String COMMAND_LEAVE = "LEAVE"; + + private static final String COMMAND_JOIN = "JOIN"; + + private static final String ERROR_INVALID = "EINVAL"; + + private static final String ERROR_NO_ENTITY = "ENOENT"; + + private static final String ERROR_NOT_IN_CHAN = "NOT_IN_CHAN"; + + private final Object bigLock = new Object(); + + private final Map users = new HashMap<>(); + + @Inject + private ChannelStore channels; + + @Inject + private MessageBuilder messageBuilder; + + @Inject + private ObservationManager observation; + + @Inject + private LocalUserFactory localUserFactory; + + @Inject + private Logger logger; + + /** + * @param session the session that has just been activated. + */ + public void onOpen(Session session) + { + synchronized (this.bigLock) { + // Close the session if we don't receive any message from the user in TIMEOUT_MILLISECONDS. + session.setMaxIdleTimeout(TIMEOUT_MILLISECONDS); + + LocalUser user = getOrRegisterUser(session); + + // Send the IDENT message. + String identMessage = this.messageBuilder.buildDefault("", "IDENT", user.getName(), null); + if (!sendMessage(user, identMessage)) { + return; + } + + session.addMessageHandler(new AbstractPartialStringMessageHandler() + { + @Override + public void onMessage(String message) + { + synchronized (Netflux.this.bigLock) { + Netflux.this.onMessage(session, message); + } + } + }); + } + } + + /** + * @param session the session about to be closed + * @param closeReason the reason the session was closed + */ + public void onClose(Session session, CloseReason closeReason) + { + synchronized (this.bigLock) { + LocalUser user = getOrRegisterUser(session); + + this.logger.debug("Last message from [{}] received [{}ms] ago. Session idle timeout is [{}].", + user.getName(), System.currentTimeMillis() - user.getTimeOfLastMessage(), session.getMaxIdleTimeout()); + this.logger.debug("Disconnecting [{}] because [{}] ([{}])", user.getName(), closeReason.getReasonPhrase(), + closeReason.getCloseCode()); + + // We copy the set of channels because we're modifying it while iterating over it. + new LinkedList(user.getChannels()).forEach(channel -> this.observation + .notify(new NetfluxUserLeaveEvent(user.getName(), channel.getKey(), "Disconnected"), null)); + + this.users.remove(user.getName()); + } + } + + /** + * @param session the session in use when the error occurs + * @param thr the throwable representing the problem + */ + public void onError(Session session, Throwable thr) + { + this.logger.debug("Session closed with error.", thr); + onClose(session, + new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, ExceptionUtils.getRootCauseMessage(thr))); + } + + private LocalUser getOrRegisterUser(Session session) + { + LocalUser user = (LocalUser) session.getUserProperties().get(NETFLUX_USER); + if (user == null) { + // Register the user. + user = this.localUserFactory.createLocalUser(session); + session.getUserProperties().put(NETFLUX_USER, user); + + registerUser(user); + } + + return user; + } + + private void registerUser(User user) + { + this.users.put(user.getName(), user); + this.logger.debug("Registered user [{}]", user.getName()); + } + + private void onMessage(Session session, String message) + { + List msg = this.messageBuilder.decode(message); + if (msg == null) { + return; + } + + LocalUser user = getOrRegisterUser(session); + + // The time of the last message received from a user was initially used to close expired sessions (i.e. sessions + // in which we haven't received any message in the past TIMEOUT_MILLISECONDS). This is now done by setting the + // max idle timeout of the session to TIMEOUT_MILLISECONDS. We still keep track of the time of the last message + // mostly for debugging purposes. + this.observation + .notify(new NetfluxUserTimeOfLastMessageUpdateEvent(user.getName(), System.currentTimeMillis()), null); + + Integer seq = (Integer) msg.get(0); + String cmd = msg.get(1).toString(); + String obj = ""; + if (msg.size() >= 3) { + obj = Objects.toString(msg.get(2), null); + } + + if (COMMAND_JOIN.equals(cmd)) { + /* + * JOIN request: - Send a JACK - Join or create the channel - Send a JOIN message to the selected channel + */ + onCommandJoin(user, seq, obj); + } else if (COMMAND_LEAVE.equals(cmd)) { + /* + * LEAVE request: - Check if the request is correct - Send an ACK - Leave the channel - Send a LEAVE message + * to the selected channel + */ + onCommandLeave(user, seq, obj); + } else if (cmd.equals("PING")) { + /* + * PING: - Send an ACK + */ + pingUser(user, seq); + } else if (MessageBuilder.COMMAND_MSG.equals(cmd)) { + /* + * MSG (patch): - Send an ACK - Check if the history of the channel is requested - Yes : send the history - + * No : transfer the message to the recipient + */ + onCommandMessage(user, seq, obj, msg); + } + } + + private void onCommandJoin(LocalUser user, Integer seq, String channelKey) + { + // Expected channel key length is 48, see IdGenerator.generateChannelId(). + if (!StringUtils.isEmpty(channelKey) && channelKey.length() != 48) { + String errorMsg = this.messageBuilder.buildError(seq, ERROR_INVALID, "Invalid channel key"); + sendMessage(user, errorMsg); + return; + } + Channel channel = (channelKey == null) ? null : this.channels.get(channelKey); + // No key provided: create a new channel. + if (channel == null && StringUtils.isEmpty(channelKey)) { + channel = this.channels.create(); + } else if (channel == null) { + String errorMsg = this.messageBuilder.buildError(seq, ERROR_NO_ENTITY, + String.format("Channel [%s] not found", channelKey)); + sendMessage(user, errorMsg); + return; + } + String jackMsg = this.messageBuilder.buildJoinAck(seq, channel.getKey()); + sendMessage(user, jackMsg); + + // Notify that the user joined + this.observation.notify(new NetfluxUserJoinEvent(seq, user.getName(), channelKey), user); + } + + /** + * @param user the user who joined the channel + * @param channel the identifier of the channel + */ + public void userJoinedChannel(User user, Channel channel) + { + // The user might not exist yet (for example if it's a remote user from a different instance), so we make sure + // it's registered + if (!this.users.containsKey(user.getName())) { + registerUser(user); + } + + // Add the channel to the user + user.getChannels().add(channel); + + if (user instanceof LocalUser localUser) { + // The user that just joined the channel has to know what other users and bots are in the channel (for + // instance to find out the history keeper) so we send it a JOIN command for each member of the channel. + Set botsAndUsers = new LinkedHashSet<>(channel.getBots().keySet()); + botsAndUsers.addAll(channel.getUsers().keySet()); + for (String userOrBotId : botsAndUsers) { + String inChannelMsg = + this.messageBuilder.buildDefault(userOrBotId, COMMAND_JOIN, channel.getKey(), null); + sendMessage(localUser, inChannelMsg); + } + } + + // Add the user to the channel + channel.getUsers().put(user.getName(), user); + this.channels.prune(); + + // Inform the channel members of the join + String joinMsg = this.messageBuilder.buildDefault(user.getName(), COMMAND_JOIN, channel.getKey(), null); + sendChannelMessage(COMMAND_JOIN, user, channel, joinMsg); + } + + private void onCommandLeave(LocalUser user, Integer seq, String channelKey) + { + String errorMsg = null; + if (StringUtils.isEmpty(channelKey)) { + errorMsg = this.messageBuilder.buildError(seq, ERROR_INVALID, "Channel key is not specified"); + } else if (this.channels.get(channelKey) == null) { + errorMsg = this.messageBuilder.buildError(seq, ERROR_NO_ENTITY, channelKey); + } else if (!this.channels.get(channelKey).getUsers().containsKey(user.getName())) { + errorMsg = this.messageBuilder.buildError(seq, ERROR_NOT_IN_CHAN, channelKey); + } + if (errorMsg != null) { + sendMessage(user, errorMsg); + return; + } + String ackMsg = this.messageBuilder.buildAck(seq); + sendMessage(user, ackMsg); + + // Notify that the user left + this.observation.notify(new NetfluxUserLeaveEvent(user.getName(), channelKey, ""), null); + } + + /** + * @param user the user who left the channel + * @param channel the channel + * @param reason the reason why the user left the channel + */ + public void userLeftChannel(User user, Channel channel, String reason) + { + channel.getUsers().remove(user.getName()); + user.getChannels().remove(channel); + + String leaveMessage = this.messageBuilder.buildDefault(user.getName(), COMMAND_LEAVE, channel.getKey(), reason); + sendChannelMessage(COMMAND_LEAVE, user, channel, leaveMessage); + + // Remove the channel when there is no user anymore (the history keeper doesn't count). + if (channel.getUsers().isEmpty()) { + this.channels.remove(channel); + } + } + + /** + * @param user the user to ping + * @param sequence the sequence number of the message + */ + public void pingUser(LocalUser user, Integer sequence) + { + String ackMsg = this.messageBuilder.buildAck(sequence); + sendMessage(user, ackMsg); + } + + private void onCommandMessage(LocalUser user, Integer seq, String channelKeyOrUserName, List msg) + { + String ackMsg = this.messageBuilder.buildAck(seq); + sendMessage(user, ackMsg); + Optional bot = getBot(user, channelKeyOrUserName); + if (bot.isPresent()) { + // Send message to the specified bot. + bot.get().onUserMessage(user, msg); + } else if (this.channels.get(channelKeyOrUserName) != null) { + // Send message to the specified channel. + String msgMsg = this.messageBuilder.buildMessage(0, user.getName(), channelKeyOrUserName, msg.get(3)); + Channel chan = this.channels.get(channelKeyOrUserName); + sendChannelMessage(MessageBuilder.COMMAND_MSG, user, chan, msgMsg); + } else if (this.users.containsKey(channelKeyOrUserName)) { + // Send message to the specified user. + String msgMsg = this.messageBuilder.buildMessage(0, user.getName(), channelKeyOrUserName, msg.get(3)); + this.observation.notify(new NetfluxMessageUserEvent(channelKeyOrUserName, msgMsg), null); + } else if (!channelKeyOrUserName.isEmpty()) { + // Unknown channel / user / bot. + String errorMsg = this.messageBuilder.buildError(seq, ERROR_NO_ENTITY, channelKeyOrUserName); + sendMessage(user, errorMsg); + } + } + + private Optional getBot(User user, String id) + { + return user.getChannels().stream().map(channel -> channel.getBots().get(id)).filter(Objects::nonNull) + .findFirst(); + } + + /** + * @param userId the identifier of the user + * @return the {@link User} instance, or null of no user corresponding to the passed id could be found + */ + public User getUser(String userId) + { + return this.users.get(userId); + } + + /** + * @param user the local user to send the message to + * @param message the message to send + * @return true when the message was successfully sent + */ + @SuppressWarnings({"resource"}) + public boolean sendMessage(LocalUser user, String message) + { + try { + this.logger.debug("Sending to [{}] : [{}]", user.getName(), message); + user.getSession().getBasicRemote().sendText(message); + return true; + } catch (IOException e) { + this.logger.debug("Sending failed.", e); + onClose(user.getSession(), + new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, ExceptionUtils.getRootCauseMessage(e))); + return false; + } + } + + /** + * Broadcast a message to a channel. + * + * @param command the message type/command + * @param sender the sender + * @param channel the channel where the message is sent + * @param message the message + */ + private void sendChannelMessage(String command, User sender, Channel channel, String message) + { + // Send commands through events instead of directly so that other cluster members receive it too + this.observation.notify(new NetfluxCommandChannelEvent(command, channel.getKey(), sender.getName(), message), + null); + } +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/NetfluxEndpoint.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/NetfluxEndpoint.java index b351afb92d..6ed73ece1e 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/NetfluxEndpoint.java +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/NetfluxEndpoint.java @@ -19,16 +19,6 @@ */ package org.xwiki.netflux.internal; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; - import jakarta.inject.Inject; import jakarta.inject.Named; import jakarta.inject.Singleton; @@ -37,11 +27,7 @@ import jakarta.websocket.EndpointConfig; import jakarta.websocket.Session; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.slf4j.Logger; import org.xwiki.component.annotation.Component; -import org.xwiki.websocket.AbstractPartialStringMessageHandler; import org.xwiki.websocket.EndpointComponent; /** @@ -55,290 +41,24 @@ @Named("netflux") public class NetfluxEndpoint extends Endpoint implements EndpointComponent { - // The client side keeps the connection alive by sending a PING message from time to time, using a timer - // (setTimeout). The browsers are slowing down timers used by inactive tabs / windows (that don't have - // the user focus). This is called timer throttling and can go up to 1 minute, which means inactive browser tabs - // won't be able to send PING messages more often than every minute. For this reason, we set the session idle - // timeout a little bit higher than the timer throttling value to make sure the WebSocket connection is not closed - // in background tabs. - // See https://developer.chrome.com/blog/timer-throttling-in-chrome-88/ - private static final long TIMEOUT_MILLISECONDS = 65000; - - private static final String NETFLUX_USER = "netflux.user"; - - private static final String COMMAND_LEAVE = "LEAVE"; - - private static final String COMMAND_JOIN = "JOIN"; - - private static final String ERROR_INVALID = "EINVAL"; - - private static final String ERROR_NO_ENTITY = "ENOENT"; - - private final Object bigLock = new Object(); - - private final Map users = new HashMap<>(); - - @Inject - private Logger logger; - - @Inject - private IdGenerator idGenerator; - - @Inject - private ChannelStore channels; - @Inject - private MessageBuilder messageBuilder; + private Netflux netflux; @Override public void onOpen(Session session, EndpointConfig config) { - synchronized (this.bigLock) { - // Close the session if we don't receive any message from the user in TIMEOUT_MILLISECONDS. - session.setMaxIdleTimeout(TIMEOUT_MILLISECONDS); - - User user = getOrRegisterUser(session); - - // Send the IDENT message. - String identMessage = this.messageBuilder.buildDefault("", "IDENT", user.getName(), null); - if (!sendMessage(user, identMessage)) { - return; - } - - session.addMessageHandler(new AbstractPartialStringMessageHandler() - { - @Override - public void onMessage(String message) - { - synchronized (NetfluxEndpoint.this.bigLock) { - NetfluxEndpoint.this.onMessage(session, message); - } - } - }); - } + this.netflux.onOpen(session); } @Override public void onClose(Session session, CloseReason closeReason) { - synchronized (this.bigLock) { - User user = getOrRegisterUser(session); - - this.logger.debug("Last message from [{}] received [{}ms] ago. Session idle timeout is [{}].", - user.getName(), System.currentTimeMillis() - user.getTimeOfLastMessage(), session.getMaxIdleTimeout()); - this.logger.debug("Disconnecting [{}] because [{}] ([{}])", user.getName(), closeReason.getReasonPhrase(), - closeReason.getCloseCode()); - this.users.remove(user.getName()); - user.setConnected(false); - - // We copy the set of channels because we're modifying it while iterating over it. - new LinkedList(user.getChannels()).forEach(channel -> leaveChannel(user, channel, "Disconnected")); - } + this.netflux.onClose(session, closeReason); } @Override - public void onError(Session session, Throwable e) - { - this.logger.debug("Session closed with error.", e); - onClose(session, - new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, ExceptionUtils.getRootCauseMessage(e))); - } - - private User getOrRegisterUser(Session session) - { - User user = (User) session.getUserProperties().get(NETFLUX_USER); - if (user == null) { - // Register the user. - String userId = this.idGenerator.generateUserId(); - user = new User(session, userId); - this.users.put(userId, user); - session.getUserProperties().put(NETFLUX_USER, user); - this.logger.debug("Registered [{}]", userId); - } - return user; - } - - private void onMessage(Session session, String message) - { - List msg = this.messageBuilder.decode(message); - if (msg == null) { - return; - } - - User user = getOrRegisterUser(session); - // The time of the last message received from a user was initially used to close expired sessions (i.e. sessions - // in which we haven't received any message in the past TIMEOUT_MILLISECONDS). This is now done by setting the - // max idle timeout of the session to TIMEOUT_MILLISECONDS. We still keep track of the time of the last message - // mostly for debugging purposes. - user.setTimeOfLastMessage(System.currentTimeMillis()); - - Integer seq = (Integer) msg.get(0); - String cmd = msg.get(1).toString(); - String obj = ""; - if (msg.size() >= 3) { - obj = Objects.toString(msg.get(2), null); - } - - if (COMMAND_JOIN.equals(cmd)) { - /* - * JOIN request: - Send a JACK - Join or create the channel - Send a JOIN message to the selected channel - */ - onCommandJoin(user, seq, obj); - } else if (COMMAND_LEAVE.equals(cmd)) { - /* - * LEAVE request: - Check if the request is correct - Send an ACK - Leave the channel - Send a LEAVE message - * to the selected channel - */ - onCommandLeave(user, seq, obj); - } else if (cmd.equals("PING")) { - /* - * PING: - Send an ACK - */ - onCommandPing(user, seq); - } else if (MessageBuilder.COMMAND_MSG.equals(cmd)) { - /* - * MSG (patch): - Send an ACK - Check if the history of the channel is requested - Yes : send the history - - * No : transfer the message to the recipient - */ - onCommandMessage(user, seq, obj, msg); - } - } - - private void onCommandJoin(User user, Integer seq, String channelKey) - { - // Expected channel key length is 48, see IdGenerator.generateChannelId(). - if (!StringUtils.isEmpty(channelKey) && channelKey.length() != 48) { - String errorMsg = this.messageBuilder.buildError(seq, ERROR_INVALID, "Invalid channel key"); - sendMessage(user, errorMsg); - return; - } - Channel channel = (channelKey == null) ? null : this.channels.get(channelKey); - // No key provided: create a new channel. - if (channel == null && StringUtils.isEmpty(channelKey)) { - channel = this.channels.create(); - } else if (channel == null) { - String errorMsg = this.messageBuilder.buildError(seq, ERROR_NO_ENTITY, - String.format("Channel [%s] not found", channelKey)); - sendMessage(user, errorMsg); - return; - } - String jackMsg = this.messageBuilder.buildJoinAck(seq, channel.getKey()); - sendMessage(user, jackMsg); - user.getChannels().add(channel); - // The user that just joined the channel has to know what other users and bots are in the channel (for instance - // to find out the history keeper) so we send them a JOIN command for each member of the channel. - Set botsAndUsers = new LinkedHashSet<>(channel.getBots().keySet()); - botsAndUsers.addAll(channel.getUsers().keySet()); - for (String userOrBotId : botsAndUsers) { - String inChannelMsg = this.messageBuilder.buildDefault(userOrBotId, COMMAND_JOIN, channel.getKey(), null); - sendMessage(user, inChannelMsg); - } - channel.getUsers().put(user.getName(), user); - this.channels.prune(); - String joinMsg = this.messageBuilder.buildDefault(user.getName(), COMMAND_JOIN, channel.getKey(), null); - sendChannelMessage(COMMAND_JOIN, user, channel, joinMsg); - } - - private void onCommandLeave(User user, Integer seq, String channelKey) - { - String errorMsg = null; - if (StringUtils.isEmpty(channelKey)) { - errorMsg = this.messageBuilder.buildError(seq, ERROR_INVALID, "Channel key is not specified"); - } else if (this.channels.get(channelKey) == null) { - errorMsg = this.messageBuilder.buildError(seq, ERROR_NO_ENTITY, channelKey); - } else if (!this.channels.get(channelKey).getUsers().containsKey(user.getName())) { - errorMsg = this.messageBuilder.buildError(seq, "NOT_IN_CHAN", channelKey); - } - if (errorMsg != null) { - sendMessage(user, errorMsg); - return; - } - String ackMsg = this.messageBuilder.buildAck(seq); - sendMessage(user, ackMsg); - Channel channel = this.channels.get(channelKey); - leaveChannel(user, channel, ""); - } - - private void leaveChannel(User user, Channel channel, String reason) - { - channel.getUsers().remove(user.getName()); - user.getChannels().remove(channel); - - String leaveMessage = this.messageBuilder.buildDefault(user.getName(), COMMAND_LEAVE, channel.getKey(), reason); - sendChannelMessage(COMMAND_LEAVE, user, channel, leaveMessage); - - // Remove the channel when there is no user anymore (the history keeper doesn't count). - if (channel.getConnectedUsers().isEmpty()) { - this.channels.remove(channel); - } - } - - private void onCommandPing(User user, Integer seq) - { - String ackMsg = this.messageBuilder.buildAck(seq); - sendMessage(user, ackMsg); - } - - private void onCommandMessage(User user, Integer seq, String channelKeyOrUserName, List msg) - { - String ackMsg = this.messageBuilder.buildAck(seq); - sendMessage(user, ackMsg); - Optional bot = getBot(user, channelKeyOrUserName); - if (bot.isPresent()) { - // Send message to the specified bot. - bot.get().onUserMessage(user, msg); - } else if (this.channels.get(channelKeyOrUserName) != null) { - // Send message to the specified channel. - String msgMsg = this.messageBuilder.buildMessage(0, user.getName(), channelKeyOrUserName, msg.get(3)); - Channel chan = this.channels.get(channelKeyOrUserName); - sendChannelMessage(MessageBuilder.COMMAND_MSG, user, chan, msgMsg); - } else if (this.users.containsKey(channelKeyOrUserName)) { - // Send message to the specified user. - String msgMsg = this.messageBuilder.buildMessage(0, user.getName(), channelKeyOrUserName, msg.get(3)); - sendMessage(this.users.get(channelKeyOrUserName), msgMsg); - } else if (!channelKeyOrUserName.isEmpty()) { - // Unknown channel / user / bot. - String errorMsg = this.messageBuilder.buildError(seq, ERROR_NO_ENTITY, channelKeyOrUserName); - sendMessage(user, errorMsg); - } - } - - private Optional getBot(User user, String id) + public void onError(Session session, Throwable thr) { - return user.getChannels().stream().map(channel -> channel.getBots().get(id)).filter(Objects::nonNull) - .findFirst(); - } - - private boolean sendMessage(User user, String message) - { - try { - this.logger.debug("Sending to [{}] : [{}]", user.getName(), message); - user.getSession().getBasicRemote().sendText(message); - return true; - } catch (IOException e) { - this.logger.debug("Sending failed.", e); - onClose(user.getSession(), - new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, ExceptionUtils.getRootCauseMessage(e))); - return false; - } - } - - /** - * Broadcast a message to a channel. - * - * @param cmd the message type/command - * @param me the sender - * @param channel the channel where the message is sent - * @param message the message - */ - private void sendChannelMessage(String cmd, User me, Channel channel, String message) - { - // Broadcast the message to all the bots connected to the channel. - channel.getBots().values().forEach(bot -> bot.onChannelMessage(channel, me, cmd, message)); - - // Broadcast the message to all the users connected to the channel. - channel.getUsers().values().stream() - .filter(user -> !(MessageBuilder.COMMAND_MSG.equals(cmd) && user.equals(me))) - .forEach(user -> sendMessage(user, message)); + this.netflux.onError(session, thr); } } diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/User.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/User.java index 92bbb1f74b..c36a9e7c76 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/User.java +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/User.java @@ -22,8 +22,6 @@ import java.util.LinkedHashSet; import java.util.Set; -import jakarta.websocket.Session; - /** * Represents an user connected to a Netflux channel. * @@ -32,52 +30,28 @@ */ public class User { - private final Session session; - private final String name; private final Set channels = new LinkedHashSet<>(); - private boolean connected = true; - private long timeOfLastMessage = System.currentTimeMillis(); /** - * Creates a new user with the specified name, using the given WebSocket session. + * Creates a new user with the specified identifier. * - * @param session the WebSocket session used to communicate with the user - * @param name the user name + * @param name the identifier of the user */ - public User(Session session, String name) + public User(String name) { - this.session = session; this.name = name; } - /** - * @return whether this user is connected or not - */ - public boolean isConnected() - { - return connected; - } - - /** - * Sets whether this user is connected or not. - * - * @param connected {@code true} if the user is connected, {@code false} otherwise - */ - public void setConnected(boolean connected) - { - this.connected = connected; - } - /** * @return the time when this user sent their last message */ public long getTimeOfLastMessage() { - return timeOfLastMessage; + return this.timeOfLastMessage; } /** @@ -90,20 +64,12 @@ public void setTimeOfLastMessage(long timeOfLastMessage) this.timeOfLastMessage = timeOfLastMessage; } - /** - * @return the WebSocket session - */ - public Session getSession() - { - return session; - } - /** * @return the user name */ public String getName() { - return name; + return this.name; } /** @@ -111,6 +77,6 @@ public String getName() */ public Set getChannels() { - return channels; + return this.channels; } } diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/CommandListener.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/CommandListener.java new file mode 100644 index 0000000000..bb71532a81 --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/CommandListener.java @@ -0,0 +1,142 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal.event; + +import jakarta.inject.Inject; +import jakarta.inject.Named; +import jakarta.inject.Singleton; + +import org.xwiki.component.annotation.Component; +import org.xwiki.netflux.internal.Channel; +import org.xwiki.netflux.internal.ChannelStore; +import org.xwiki.netflux.internal.LocalUser; +import org.xwiki.netflux.internal.MessageBuilder; +import org.xwiki.netflux.internal.Netflux; +import org.xwiki.netflux.internal.User; +import org.xwiki.observation.AbstractEventListener; +import org.xwiki.observation.event.Event; + +/** + * Dispatch messages to local users. + * + * @version $Id$ + * @since 17.10.0RC1 + */ +@Component +@Named(CommandListener.NAME) +@Singleton +public class CommandListener extends AbstractEventListener +{ + /** + * The unique identifier of the listener. + */ + public static final String NAME = "org.xwiki.internal.document.DocumentOverrideListener"; + + @Inject + protected ChannelStore channelStore; + + @Inject + protected Netflux netflux; + + /** + * The default constructor. + */ + public CommandListener() + { + super(NAME, new NetfluxUserLeaveEvent(), new NetfluxUserJoinEvent(), new NetfluxMessageUserEvent(), + new NetfluxCommandChannelEvent(), new NetfluxUserTimeOfLastMessageUpdateEvent()); + } + + @Override + public void onEvent(Event event, Object source, Object data) + { + if (event instanceof NetfluxUserLeaveEvent leaveCommand) { + userLeft(leaveCommand); + } else if (event instanceof NetfluxUserJoinEvent joinCommand) { + userJoined(joinCommand, (User) source); + } else if (event instanceof NetfluxMessageUserEvent messageEvent) { + userMessage(messageEvent); + } else if (event instanceof NetfluxCommandChannelEvent messageEvent) { + channelMessage(messageEvent); + } else if (event instanceof NetfluxUserTimeOfLastMessageUpdateEvent userEvent) { + userTimeOfLastMessageUpdated(userEvent); + } + } + + protected void channelMessage(NetfluxCommandChannelEvent event) + { + Channel channel = this.channelStore.get(event.getChannel()); + + if (channel != null) { + User sender = this.netflux.getUser(event.getSender()); + + if (sender != null) { + String message = event.getMessage(); + + // Broadcast the message to all the bots connected to the channel. + channel.getBots().values() + .forEach(bot -> bot.onChannelMessage(channel, sender, event.getCommand(), message)); + + // Broadcast the message to all the users connected to the channel. + channel.getUsers().values().stream() + .filter(user -> user instanceof LocalUser + && !(MessageBuilder.COMMAND_MSG.equals(event.getCommand()) && user.equals(sender))) + .forEach(user -> this.netflux.sendMessage((LocalUser) user, message)); + } + } + } + + protected void userMessage(NetfluxMessageUserEvent event) + { + User user = this.netflux.getUser(event.getUser()); + + if (user instanceof LocalUser localUser) { + this.netflux.sendMessage(localUser, event.getMessage()); + } + } + + protected void userJoined(NetfluxUserJoinEvent event, User user) + { + // The channel might not exist yet (for example in the case of a cluster), so we make sure it's created when it + // does not exist yet + Channel channel = this.channelStore.get(event.getChannel(), true); + + this.netflux.userJoinedChannel(user, channel); + } + + protected void userLeft(NetfluxUserLeaveEvent event) + { + User user = this.netflux.getUser(event.getUser()); + Channel channel = this.channelStore.get(event.getChannel()); + + if (user != null && channel != null) { + this.netflux.userLeftChannel(user, channel, event.getReason()); + } + } + + private void userTimeOfLastMessageUpdated(NetfluxUserTimeOfLastMessageUpdateEvent userEvent) + { + User user = this.netflux.getUser(userEvent.getUser()); + + if (user != null) { + user.setTimeOfLastMessage(userEvent.getTimeOfLastMessage()); + } + } +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxCommandChannelEvent.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxCommandChannelEvent.java new file mode 100644 index 0000000000..edc6b6c66f --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxCommandChannelEvent.java @@ -0,0 +1,103 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal.event; + +import java.io.Serializable; + +import org.xwiki.observation.event.Event; + +/** + * An event triggered to indicate listeners to send a command to the indicated channel. + * + * @version $Id$ + * @since 17.10.0RC1 + */ +public class NetfluxCommandChannelEvent implements Event, Serializable +{ + private static final long serialVersionUID = 1L; + + private final String command; + + private final String channel; + + private final String sender; + + private final String message; + + /** + * @param command the command name + * @param channel the channel identifier + * @param user the user who sent the message + * @param message the message content + */ + public NetfluxCommandChannelEvent(String command, String channel, String user, String message) + { + this.command = command; + this.channel = channel; + this.sender = user; + this.message = message; + } + + /** + * Listen to all events. + */ + public NetfluxCommandChannelEvent() + { + this(null, null, null, null); + } + + /** + * @return the command name + */ + public String getCommand() + { + return this.command; + } + + /** + * @return the channel identifier + */ + public String getChannel() + { + return this.channel; + } + + /** + * @return the user who sent the command + */ + public String getSender() + { + return this.sender; + } + + /** + * @return the message content + */ + public String getMessage() + { + return this.message; + } + + @Override + public boolean matches(Object otherEvent) + { + return otherEvent instanceof NetfluxCommandChannelEvent; + } +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxMessageUserEvent.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxMessageUserEvent.java new file mode 100644 index 0000000000..70d7cf271c --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxMessageUserEvent.java @@ -0,0 +1,107 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal.event; + +import java.io.Serializable; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.xwiki.observation.event.Event; + +/** + * An event triggered to ping a user. + * + * @version $Id$ + * @since 17.10.0RC1 + */ +public class NetfluxMessageUserEvent implements Event, Serializable +{ + private static final long serialVersionUID = 1L; + + private final String user; + + private final String message; + + /** + * @param user the identifier of the user + * @param message the message to send to the user + */ + public NetfluxMessageUserEvent(String user, String message) + { + this.user = user; + this.message = message; + } + + /** + * Listen to all events. + */ + public NetfluxMessageUserEvent() + { + this(null, null); + } + + /** + * @return the identifier of the user + */ + public String getUser() + { + return this.user; + } + + /** + * @return the message to send to the user + */ + public String getMessage() + { + return this.message; + } + + @Override + public boolean matches(Object otherEvent) + { + return otherEvent instanceof NetfluxMessageUserEvent; + } + + @Override + public int hashCode() + { + HashCodeBuilder builder = new HashCodeBuilder(); + + builder.append(getUser()); + builder.append(getMessage()); + + return builder.build(); + } + + @Override + public boolean equals(Object obj) + { + if (obj instanceof NetfluxMessageUserEvent messageEvent) { + EqualsBuilder builder = new EqualsBuilder(); + + builder.append(getUser(), messageEvent.getUser()); + builder.append(getMessage(), messageEvent.getMessage()); + + return builder.build(); + } + + return false; + } +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxUserJoinEvent.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxUserJoinEvent.java new file mode 100644 index 0000000000..fe5118ec08 --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxUserJoinEvent.java @@ -0,0 +1,97 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal.event; + +import java.io.Serializable; + +import org.xwiki.observation.event.Event; + +/** + * An event triggered when a user is joining a channel. + *

+ * The event also send the following parameters: + *

+ *
    + *
  • source: the {@link org.xwiki.netflux.internal.User}
  • + *
+ * + * @version $Id$ + * @since 17.10.0RC1 + */ +public class NetfluxUserJoinEvent implements Event, Serializable +{ + private static final long serialVersionUID = 1L; + + private final Integer sequence; + + private final String user; + + private final String channel; + + /** + * @param sequence the sequence number of the message + * @param user the identifier of the user + * @param channel the identifier of the channel + */ + public NetfluxUserJoinEvent(Integer sequence, String user, String channel) + { + this.user = user; + this.sequence = sequence; + this.channel = channel; + } + + /** + * Listen to all events. + */ + public NetfluxUserJoinEvent() + { + this(null, null, null); + } + + /** + * @return the sequence number of the message + */ + public Integer getSequence() + { + return this.sequence; + } + + /** + * @return the identifier of the user + */ + public String getUser() + { + return this.user; + } + + /** + * @return the identifier of the channel + */ + public String getChannel() + { + return this.channel; + } + + @Override + public boolean matches(Object otherEvent) + { + return otherEvent instanceof NetfluxUserJoinEvent; + } +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxUserLeaveEvent.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxUserLeaveEvent.java new file mode 100644 index 0000000000..f08a3c0610 --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxUserLeaveEvent.java @@ -0,0 +1,91 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal.event; + +import java.io.Serializable; + +import org.xwiki.observation.event.Event; + +/** + * An event triggered when a user is leaving a channel. + * + * @version $Id$ + * @since 17.10.0RC1 + */ +public class NetfluxUserLeaveEvent implements Event, Serializable +{ + private static final long serialVersionUID = 1L; + + private final String user; + + private final String channel; + + private final String reason; + + /** + * @param userId the identifier of the user + * @param channelKey the identifier of the channel + * @param reason the reason why the user left + */ + public NetfluxUserLeaveEvent(String userId, String channelKey, String reason) + { + this.user = userId; + this.channel = channelKey; + this.reason = reason; + } + + /** + * Listen to all events. + */ + public NetfluxUserLeaveEvent() + { + this(null, null, null); + } + + /** + * @return the identifier of the user + */ + public String getUser() + { + return this.user; + } + + /** + * @return the identifier of the channel + */ + public String getChannel() + { + return this.channel; + } + + /** + * @return the reason why the user left + */ + public String getReason() + { + return this.reason; + } + + @Override + public boolean matches(Object otherEvent) + { + return otherEvent instanceof NetfluxUserLeaveEvent; + } +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxUserTimeOfLastMessageUpdateEvent.java b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxUserTimeOfLastMessageUpdateEvent.java new file mode 100644 index 0000000000..ab7156591d --- /dev/null +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/event/NetfluxUserTimeOfLastMessageUpdateEvent.java @@ -0,0 +1,80 @@ +/* + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.xwiki.netflux.internal.event; + +import java.io.Serializable; + +import org.xwiki.observation.event.Event; + +/** + * An event triggered when the time when the last message was sent by this user is updated. + * + * @version $Id$ + * @since 17.10.1 + */ +public class NetfluxUserTimeOfLastMessageUpdateEvent implements Event, Serializable +{ + private static final long serialVersionUID = 1L; + + private final String user; + + private final Long timeOfLastMessage; + + /** + * @param user the identifier of the user + * @param timeOfLastMessage the time when the last message was sent by this user + */ + public NetfluxUserTimeOfLastMessageUpdateEvent(String user, long timeOfLastMessage) + { + this.user = user; + this.timeOfLastMessage = timeOfLastMessage; + } + + /** + * Listen to all events. + */ + public NetfluxUserTimeOfLastMessageUpdateEvent() + { + this.user = null; + this.timeOfLastMessage = null; + } + + /** + * @return the identifier of the user + */ + public String getUser() + { + return this.user; + } + + /** + * @return the time when the last message was sent by this user + */ + public Long getTimeOfLastMessage() + { + return this.timeOfLastMessage; + } + + @Override + public boolean matches(Object otherEvent) + { + return otherEvent instanceof NetfluxUserTimeOfLastMessageUpdateEvent; + } +} diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/main/resources/META-INF/components.txt b/xwiki-commons-core/xwiki-commons-netflux/src/main/resources/META-INF/components.txt index 6f53c1fa3c..7beba57373 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/main/resources/META-INF/components.txt +++ b/xwiki-commons-core/xwiki-commons-netflux/src/main/resources/META-INF/components.txt @@ -1,5 +1,8 @@ org.xwiki.netflux.internal.ChannelStore org.xwiki.netflux.internal.HistoryKeeper org.xwiki.netflux.internal.IdGenerator +org.xwiki.netflux.internal.DefaultLocalUserFactory org.xwiki.netflux.internal.MessageBuilder +org.xwiki.netflux.internal.Netflux org.xwiki.netflux.internal.NetfluxEndpoint +org.xwiki.netflux.internal.event.CommandListener diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/ChannelTest.java b/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/ChannelTest.java index 9453bda2e9..580a375517 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/ChannelTest.java +++ b/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/ChannelTest.java @@ -19,11 +19,6 @@ */ package org.xwiki.netflux.internal; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; - import java.util.Date; import java.util.HashSet; import java.util.Set; @@ -32,6 +27,11 @@ import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + /** * Unit tests for {@link Channel}. * @@ -41,7 +41,7 @@ class ChannelTest { @Test - void getConnectedUsers() + void getUsers() { long before = new Date().getTime(); Channel channel = new Channel("test"); @@ -53,20 +53,18 @@ void getConnectedUsers() assertTrue(channel.getMessages().isEmpty()); assertEquals(0, channel.getUsers().size()); - assertEquals(0, channel.getConnectedUsers().size()); - Session session = mock(Session.class); - channel.getUsers().put("alice", new User(null, "alice")); - channel.getUsers().put("bob", new User(session, "bob")); + Session bobSession = mock(Session.class); + User bob = new LocalUser(bobSession, "bob"); + channel.getUsers().put(bob.getName(), bob); - User carol = new User(session, "carol"); - carol.setConnected(false); - channel.getUsers().put("carol", carol); + Session carolSession = mock(Session.class); + User carol = new LocalUser(carolSession, "carol"); + channel.getUsers().put(carol.getName(), carol); - assertEquals(3, channel.getUsers().size()); - assertEquals(1, channel.getConnectedUsers().size()); - assertEquals("bob", channel.getConnectedUsers().iterator().next().getName()); + assertEquals(2, channel.getUsers().size()); + assertEquals("bob", channel.getUsers().values().iterator().next().getName()); } @Test diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/HistoryKeeperTest.java b/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/HistoryKeeperTest.java index 037afba6ad..e651503958 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/HistoryKeeperTest.java +++ b/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/HistoryKeeperTest.java @@ -27,6 +27,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.xwiki.netflux.internal.event.NetfluxMessageUserEvent; +import org.xwiki.observation.ObservationManager; import org.xwiki.test.annotation.ComponentList; import org.xwiki.test.junit5.mockito.ComponentTest; import org.xwiki.test.junit5.mockito.InjectMockComponents; @@ -54,13 +56,16 @@ class HistoryKeeperTest @MockComponent private MessageBuilder messageBuilder; + @MockComponent + private ObservationManager observation; + @Mock Session session; @Mock Basic basicRemote; - private User user; + private LocalUser user; private Channel channel = new Channel("test"); @@ -68,7 +73,7 @@ class HistoryKeeperTest void beforeEach() { when(this.session.getBasicRemote()).thenReturn(this.basicRemote); - this.user = new User(this.session, "alice"); + this.user = new LocalUser(this.session, "alice"); when(this.channels.get("test")).thenReturn(this.channel); } @@ -101,7 +106,7 @@ void onUserMessage() throws Exception when(this.messageBuilder.buildMessage(0, this.historyKeeper.getId(), this.user.getName(), "{\"state\":1, \"channel\":\"missing\"}")).thenReturn("end missing"); this.historyKeeper.onUserMessage(this.user, List.of(0, "MSG", this.historyKeeper.getId(), "valid")); - verify(this.basicRemote).sendText("end missing"); + verify(this.observation).notify(new NetfluxMessageUserEvent(this.user.getName(), "end missing"), null); // Get history message. when(this.messageBuilder.decode("valid")).thenReturn(List.of("GET_HISTORY", "test")); @@ -111,9 +116,9 @@ void onUserMessage() throws Exception this.historyKeeper.onUserMessage(this.user, List.of(0, "MSG", this.historyKeeper.getId(), "valid")); - verify(this.basicRemote).sendText("first"); - verify(this.basicRemote).sendText("second"); - verify(this.basicRemote).sendText("end"); + verify(this.observation).notify(new NetfluxMessageUserEvent(this.user.getName(), "first"), null); + verify(this.observation).notify(new NetfluxMessageUserEvent(this.user.getName(), "second"), null); + verify(this.observation).notify(new NetfluxMessageUserEvent(this.user.getName(), "end"), null); } @Test diff --git a/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/NetfluxEndpointTest.java b/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/NetfluxEndpointTest.java index ed55488194..101f06ecf9 100644 --- a/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/NetfluxEndpointTest.java +++ b/xwiki-commons-core/xwiki-commons-netflux/src/test/java/org/xwiki/netflux/internal/NetfluxEndpointTest.java @@ -28,10 +28,13 @@ import jakarta.websocket.RemoteEndpoint.Basic; import jakarta.websocket.Session; +import org.apache.commons.collections4.IterableUtils; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.xwiki.netflux.internal.event.CommandListener; +import org.xwiki.observation.internal.DefaultObservationManager; import org.xwiki.test.annotation.ComponentList; import org.xwiki.test.junit5.mockito.ComponentTest; import org.xwiki.test.junit5.mockito.InjectMockComponents; @@ -52,7 +55,8 @@ * @since 13.9RC1 */ @ComponentTest -@ComponentList({ChannelStore.class, MessageBuilder.class, IdGenerator.class}) +@ComponentList({Netflux.class, ChannelStore.class, MessageBuilder.class, IdGenerator.class, + DefaultObservationManager.class, CommandListener.class, DefaultLocalUserFactory.class}) class NetfluxEndpointTest { @InjectMockComponents @@ -72,6 +76,7 @@ void joinMessagePingLeave(MockitoComponentManager componentManager) throws Excep // Alice opens a new session. Session aliceSession = mockSession("alice"); + when(aliceSession.getId()).thenReturn("alisesession"); this.endPoint.onOpen(aliceSession, null); verify(aliceSession).addMessageHandler(this.messageHandlerCaptor.capture()); @@ -79,6 +84,7 @@ void joinMessagePingLeave(MockitoComponentManager componentManager) throws Excep // Bob opens a new session. Session bobSession = mockSession("bob"); + when(bobSession.getId()).thenReturn("bobSession"); this.endPoint.onOpen(bobSession, null); verify(bobSession).addMessageHandler(this.messageHandlerCaptor.capture()); @@ -91,15 +97,15 @@ void joinMessagePingLeave(MockitoComponentManager componentManager) throws Excep // Alice joins the first channel. aliceMessageHandler.onMessage(this.jsonConverter.encode(Arrays.asList(1, "JOIN", firstChannel.getKey()))); - assertEquals(1, firstChannel.getConnectedUsers().size()); - String aliceId = firstChannel.getConnectedUsers().get(0).getName(); + assertEquals(1, firstChannel.getUsers().size()); + String aliceId = IterableUtils.get(firstChannel.getUsers().values(), 0).getName(); // Bob joins the first channel. bobMessageHandler.onMessage(this.jsonConverter.encode(Arrays.asList(1, "JOIN", firstChannel.getKey()))); - assertEquals(2, firstChannel.getConnectedUsers().size()); + assertEquals(2, firstChannel.getUsers().size()); assertEquals(1, firstChannel.getBots().size()); - String bobId = firstChannel.getConnectedUsers().get(1).getName(); + String bobId = IterableUtils.get(firstChannel.getUsers().values(), 1).getName(); // Alice sends a message to the channel. aliceMessageHandler @@ -117,7 +123,7 @@ void joinMessagePingLeave(MockitoComponentManager componentManager) throws Excep // Both users join the second channel. bobMessageHandler.onMessage(this.jsonConverter.encode(Arrays.asList(3, "JOIN", secondChannel.getKey()))); aliceMessageHandler.onMessage(this.jsonConverter.encode(Arrays.asList(4, "JOIN", secondChannel.getKey()))); - assertEquals(2, secondChannel.getConnectedUsers().size()); + assertEquals(2, secondChannel.getUsers().size()); assertEquals(1, secondChannel.getBots().size()); // Bob tries to join a channel with an invalid key. @@ -142,15 +148,15 @@ void joinMessagePingLeave(MockitoComponentManager componentManager) throws Excep // Bob leaves the first channel. bobMessageHandler.onMessage(this.jsonConverter.encode(Arrays.asList(6, "LEAVE", firstChannel.getKey()))); - assertEquals(1, firstChannel.getConnectedUsers().size()); - assertEquals(2, secondChannel.getConnectedUsers().size()); + assertEquals(1, firstChannel.getUsers().size()); + assertEquals(2, secondChannel.getUsers().size()); // Close both sessions. this.endPoint.onClose(bobSession, new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Bye!")); this.endPoint.onError(aliceSession, null); - assertEquals(0, firstChannel.getConnectedUsers().size()); - assertEquals(0, secondChannel.getConnectedUsers().size()); + assertEquals(0, firstChannel.getUsers().size()); + assertEquals(0, secondChannel.getUsers().size()); // The history keeper is still connected. assertEquals(1, firstChannel.getBots().size()); @@ -236,6 +242,7 @@ void joinMessagePingLeave(MockitoComponentManager componentManager) throws Excep private Session mockSession(String name) { Session session = mock(Session.class, name); + when(session.getId()).thenReturn("sessionid"); when(session.getUserProperties()).thenReturn(new HashMap<>()); Basic basicRemote = mock(Basic.class); when(session.getBasicRemote()).thenReturn(basicRemote);