Skip to content

Commit 782676b

Browse files
committed
XCOMMONS-3425: The Netflux WebSocket endpoint delays replies for no good reason
1 parent 34eb701 commit 782676b

File tree

9 files changed

+160
-227
lines changed

9 files changed

+160
-227
lines changed

xwiki-commons-core/xwiki-commons-netflux/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
<properties>
3535
<!-- Name to display by the Extension Manager -->
3636
<xwiki.extension.name>Netflux Server</xwiki.extension.name>
37-
<xwiki.jacoco.instructionRatio>0.88</xwiki.jacoco.instructionRatio>
37+
<xwiki.jacoco.instructionRatio>0.92</xwiki.jacoco.instructionRatio>
3838
</properties>
3939
<dependencies>
4040
<dependency>

xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/HistoryKeeper.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Iterator;
2323
import java.util.LinkedList;
2424
import java.util.List;
25+
import java.util.stream.Stream;
2526

2627
import jakarta.inject.Inject;
2728
import jakarta.inject.Named;
@@ -48,7 +49,7 @@ public class HistoryKeeper extends AbstractBot
4849
private ChannelStore channels;
4950

5051
@Inject
51-
private MessageDispatcher dispatcher;
52+
private MessageBuilder messageBuilder;
5253

5354
@Override
5455
public String getId()
@@ -61,32 +62,48 @@ public String getId()
6162
@Override
6263
public void onUserMessage(User sender, List<Object> message)
6364
{
65+
// The history keeper responds only to GET_HISTORY messages.
66+
6467
if (message.size() <= 3) {
65-
// Unsupported message format.
68+
// Doesn't match the GET_HISTORY message format. Ignore.
6669
return;
6770
}
6871

6972
List<Object> messageBody;
7073
try {
71-
messageBody = this.dispatcher.decode(message.get(3).toString());
74+
messageBody = this.messageBuilder.decode(message.get(3).toString());
7275
} catch (Exception e) {
7376
this.logger.debug("Failed to parse message body.", e);
7477
return;
7578
}
7679

7780
if (messageBody.size() < 2 || !"GET_HISTORY".equals(messageBody.get(0))) {
78-
// Unsupported message format.
81+
// It is either not a GET_HISTORY message or a GET_HISTORY message that doesn't specify the channel for
82+
// which to return the history. Ignore.
7983
return;
8084
}
8185

82-
String channelKey = (String) messageBody.get(1);
86+
sendChannelHistory(sender, (String) messageBody.get(1));
87+
}
88+
89+
private void sendChannelHistory(User user, String channelKey)
90+
{
91+
Stream<String> messages = Stream.of();
8392
Channel channel = this.channels.get(channelKey);
8493
if (channel != null) {
85-
channel.getMessages().forEach(msgStr -> this.dispatcher.addMessage(sender, msgStr));
94+
messages = Stream.concat(messages, channel.getMessages().stream());
8695
}
8796
String endHistoryBody = "{\"state\":1, \"channel\":\"" + channelKey + "\"}";
88-
String endHistoryMessage = this.dispatcher.buildMessage(0, getId(), sender.getName(), endHistoryBody);
89-
this.dispatcher.addMessage(sender, endHistoryMessage);
97+
String endHistoryMessage = this.messageBuilder.buildMessage(0, getId(), user.getName(), endHistoryBody);
98+
messages = Stream.concat(messages, Stream.of(endHistoryMessage));
99+
100+
try {
101+
for (String msg : (Iterable<String>) messages::iterator) {
102+
user.getSession().getBasicRemote().sendText(msg);
103+
}
104+
} catch (Exception e) {
105+
this.logger.debug("Failed to send channel history.", e);
106+
}
90107
}
91108

92109
@Override
@@ -95,7 +112,7 @@ public void onChannelMessage(Channel channel, User sender, String messageType, S
95112
// We keep only command messages in the channel history. Note that the channel history is replayed when a user
96113
// joins a channel, so we don't want to replay messages like JOIN or LEAVE (an user can join and leave a channel
97114
// multiple times).
98-
if (MessageDispatcher.COMMAND_MSG.equals(messageType)) {
115+
if (MessageBuilder.COMMAND_MSG.equals(messageType)) {
99116
this.logger.debug("Added in history: [{}]", message);
100117
if (isCheckpoint(message)) {
101118
// Prune old messages from memory.
@@ -118,7 +135,7 @@ public void onChannelMessage(Channel channel, User sender, String messageType, S
118135

119136
private boolean isCheckpoint(String message)
120137
{
121-
List<Object> msg = this.dispatcher.decode(message);
138+
List<Object> msg = this.messageBuilder.decode(message);
122139
if (!msg.isEmpty()) {
123140
Object lastItem = msg.get(msg.size() - 1);
124141
if (lastItem instanceof String s) {
Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@
3131
import org.xwiki.component.annotation.Component;
3232

3333
/**
34-
* The component responsible for dispatching the Netflux messages.
34+
* The component responsible for building the Netflux messages.
3535
*
3636
* @version $Id$
3737
* @since 15.10.11
3838
* @since 16.4.1
3939
* @since 16.5.0RC1
4040
*/
41-
@Component(roles = MessageDispatcher.class)
41+
@Component(roles = MessageBuilder.class)
4242
@Singleton
43-
public class MessageDispatcher
43+
public class MessageBuilder
4444
{
4545
/**
4646
* The standard message type.
@@ -52,18 +52,6 @@ public class MessageDispatcher
5252

5353
private final JsonConverter converter = new JsonConverter();
5454

55-
/**
56-
* Add a message to the sending queue of a given user.
57-
*
58-
* @param toUser the target user
59-
* @param message the message to add
60-
*/
61-
public void addMessage(User toUser, String message)
62-
{
63-
this.logger.debug("Adding message to [{}]: [{}]", toUser.getName(), message);
64-
toUser.getMessagesToBeSent().add(message);
65-
}
66-
6755
/**
6856
* Build a message to be sent to a user.
6957
*

0 commit comments

Comments
 (0)