Skip to content

Commit c475ee2

Browse files
MindgamesnlDuckelekuukgithub-actions[bot]
authored
Develop (#14)
* Updated version in readme, added mvn central badge, fix typo * Added multiply example * Optimized all imports * Log errors thrown by implementations (#8) * Version bump * Add JaCoCo badge * Bump jedis due to org.json cve * Update develop (#10) * Release (#9) * Updated version in readme, added mvn central badge, fix typo * Added multiply example * Optimized all imports * Log errors thrown by implementations (#8) * Version bump --------- Co-authored-by: Duckelekuuk <[email protected]> Co-authored-by: Duckelekuuk <[email protected]> * Add JaCoCo badge --------- Co-authored-by: Duckelekuuk <[email protected]> Co-authored-by: Duckelekuuk <[email protected]> Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> * Make redis ignore itself, if configured (#13) --------- Co-authored-by: Duckelekuuk <[email protected]> Co-authored-by: Duckelekuuk <[email protected]> Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 406aeba commit c475ee2

File tree

8 files changed

+75
-33
lines changed

8 files changed

+75
-33
lines changed

meteor-common/src/main/java/dev/pixelib/meteor/base/interfaces/SubscriptionHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.pixelib.meteor.base.interfaces;
22

3+
@FunctionalInterface
34
public interface SubscriptionHandler {
45

56
boolean onPacket(byte[] packet) throws Exception;

meteor-core/src/main/java/dev/pixelib/meteor/core/proxy/PendingInvocation.java

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public void run() {
8686
}
8787

8888
isTimedOut.set(true);
89+
8990
this.completable.completeExceptionally(
9091
new InvocationTimedOutException(invocationDescriptor.getMethodName(), invocationDescriptor.getNamespace(), timeoutSeconds)
9192
);

meteor-core/src/main/java/dev/pixelib/meteor/core/trackers/OutgoingInvocationTracker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public void completeInvocation(InvocationResponse invocationResponse) {
5454
// do we have a pending invocation for this invocation id?
5555
PendingInvocation<?> pendingInvocation = pendingInvocations.get(invocationResponse.getInvocationId());
5656
if (pendingInvocation == null) {
57-
throw new IllegalStateException("No pending invocation found for invocation id " + invocationResponse.getInvocationId());
57+
throw new IllegalStateException("No pending invocation found for invocation id " + invocationResponse.getInvocationId() + ". Data: " + invocationResponse.getResult());
58+
//return;
5859
}
5960

6061
pendingInvocation.complete(invocationResponse.getResult());

meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisPacketListener.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.ConcurrentHashMap;
1111
import java.util.concurrent.ExecutorService;
1212
import java.util.concurrent.Executors;
13+
import java.util.function.Consumer;
1314
import java.util.logging.Level;
1415
import java.util.logging.Logger;
1516

@@ -18,10 +19,10 @@ public class RedisPacketListener extends JedisPubSub {
1819
private final Logger logger;
1920

2021
private final ExecutorService jedisThreadPool = Executors.newCachedThreadPool();
21-
private final Map<String, Set<SubscriptionHandler>> messageBrokers = new ConcurrentHashMap<>();
22+
private final Map<String, Set<StringMessageBroker>> messageBrokers = new ConcurrentHashMap<>();
2223
private final Collection<String> customSubscribedChannels = ConcurrentHashMap.newKeySet();
2324

24-
public RedisPacketListener(SubscriptionHandler messageBroker, String startChannel, Logger logger) {
25+
public RedisPacketListener(StringMessageBroker messageBroker, String startChannel, Logger logger) {
2526
this.logger = logger;
2627
registerBroker(startChannel, messageBroker);
2728
customSubscribedChannels.add(startChannel);
@@ -31,14 +32,14 @@ public RedisPacketListener(SubscriptionHandler messageBroker, String startChanne
3132
public void onMessage(String channel, String message) {
3233
messageBrokers.get(channel).forEach(subscriptionHandler -> {
3334
try {
34-
subscriptionHandler.onPacket(Base64.getDecoder().decode(message));
35+
subscriptionHandler.onRedisMessage(message);
3536
} catch (Exception exception) {
3637
logger.log(Level.SEVERE, "Error while handling packet", exception);
3738
}
3839
});
3940
}
4041

41-
public void subscribe(String channel, SubscriptionHandler onReceive) {
42+
public void subscribe(String channel, StringMessageBroker onReceive) {
4243
registerBroker(channel, onReceive);
4344

4445
if (customSubscribedChannels.add(channel)) {
@@ -55,7 +56,7 @@ public Collection<String> getCustomSubscribedChannels() {
5556
return customSubscribedChannels;
5657
}
5758

58-
private void registerBroker(String channel, SubscriptionHandler onReceive) {
59+
private void registerBroker(String channel, StringMessageBroker onReceive) {
5960
messageBrokers.computeIfAbsent(channel, key -> ConcurrentHashMap.newKeySet()).add(onReceive);
6061
}
6162
}

meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisSubscriptionThread.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@
99
import java.util.concurrent.CompletionException;
1010
import java.util.concurrent.ExecutorService;
1111
import java.util.concurrent.Executors;
12+
import java.util.function.Consumer;
1213
import java.util.logging.Level;
1314
import java.util.logging.Logger;
1415

1516
public class RedisSubscriptionThread {
1617

17-
private final SubscriptionHandler messageBroker;
18+
private final StringMessageBroker messageBroker;
1819
private final Logger logger;
1920
private final String defaultChannel;
2021
private final JedisPool jedisPool;
@@ -28,7 +29,7 @@ public class RedisSubscriptionThread {
2829
return thread;
2930
});
3031

31-
public RedisSubscriptionThread(SubscriptionHandler messageBroker, Logger logger, String channel, JedisPool jedisPool) {
32+
public RedisSubscriptionThread(StringMessageBroker messageBroker, Logger logger, String channel, JedisPool jedisPool) {
3233
this.messageBroker = messageBroker;
3334
this.logger = logger;
3435
this.defaultChannel = channel;
@@ -69,18 +70,18 @@ public CompletableFuture<Boolean> start() {
6970

7071
}
7172

72-
public void subscribe(String channel, SubscriptionHandler onReceive) {
73-
jedisPacketListener.subscribe(channel, onReceive);
74-
75-
}
76-
7773
public void stop() {
7874
if (isStopping) return;
7975
isStopping = true;
8076
jedisPacketListener.stop();
8177
listenerThread.shutdownNow();
8278
}
8379

80+
public void subscribe(String channel, StringMessageBroker onReceive) {
81+
jedisPacketListener.subscribe(channel, onReceive);
82+
83+
}
84+
8485
private CompletableFuture<Boolean> isSubscribed() {
8586
return CompletableFuture.supplyAsync(() -> {
8687
final int maxAttempts = 5;

meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisTransport.java

+35-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import java.io.IOException;
1010
import java.util.Base64;
1111
import java.util.Locale;
12+
import java.util.UUID;
13+
import java.util.function.Consumer;
1214
import java.util.logging.Logger;
1315

1416
public class RedisTransport implements RpcTransport {
@@ -18,6 +20,8 @@ public class RedisTransport implements RpcTransport {
1820
private final JedisPool jedisPool;
1921
private final String topic;
2022
private RedisSubscriptionThread redisSubscriptionThread;
23+
private final UUID transportId = UUID.randomUUID();
24+
private boolean ignoreSelf = true;
2125

2226
public RedisTransport(JedisPool jedisPool, String topic) {
2327
this.jedisPool = jedisPool;
@@ -34,14 +38,22 @@ public RedisTransport(String host, int port, String topic) {
3438
this.topic = topic;
3539
}
3640

41+
public RedisTransport withIgnoreSelf(boolean ignoreSelf) {
42+
this.ignoreSelf = ignoreSelf;
43+
return this;
44+
}
45+
3746
@Override
3847
public void send(Direction direction, byte[] bytes) {
3948
if (jedisPool.isClosed()) {
4049
throw new IllegalStateException("Jedis pool is closed");
4150
}
4251

4352
try (Jedis connection = jedisPool.getResource()) {
44-
connection.publish(getTopicName(direction), Base64.getEncoder().encodeToString(bytes));
53+
connection.publish(
54+
getTopicName(direction),
55+
transportId + Base64.getEncoder().encodeToString(bytes)
56+
);
4557
}
4658
}
4759

@@ -51,11 +63,31 @@ public void subscribe(Direction direction, SubscriptionHandler onReceive) {
5163
throw new IllegalStateException("Jedis pool is closed");
5264
}
5365

66+
StringMessageBroker wrappedHandler = (message) -> {
67+
byte[] bytes = message.getBytes();
68+
// only split after UUID, which always has a length of 36
69+
byte[] uuid = new byte[36];
70+
System.arraycopy(bytes, 0, uuid, 0, 36);
71+
72+
if (ignoreSelf && transportId.toString().equals(new String(uuid))) {
73+
return false;
74+
}
75+
76+
byte[] data = new byte[bytes.length - 36];
77+
System.arraycopy(bytes, 36, data, 0, data.length);
78+
79+
try {
80+
return onReceive.onPacket(Base64.getDecoder().decode(data));
81+
} catch (Exception e) {
82+
throw new RuntimeException(e);
83+
}
84+
};
85+
5486
if (redisSubscriptionThread == null) {
55-
redisSubscriptionThread = new RedisSubscriptionThread(onReceive, logger, getTopicName(direction), jedisPool);
87+
redisSubscriptionThread = new RedisSubscriptionThread(wrappedHandler, logger, getTopicName(direction), jedisPool);
5688
redisSubscriptionThread.start().join();
5789
} else {
58-
redisSubscriptionThread.subscribe(getTopicName(direction), onReceive);
90+
redisSubscriptionThread.subscribe(getTopicName(direction), wrappedHandler);
5991
}
6092
}
6193

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package dev.pixelib.meteor.transport.redis;
2+
3+
@FunctionalInterface
4+
public interface StringMessageBroker {
5+
6+
boolean onRedisMessage(String message) throws Exception;
7+
8+
}

meteor-jedis/src/test/java/dev/pixelib/meteor/transport/redis/RedisPacketListenerTest.java

+14-17
Original file line numberDiff line numberDiff line change
@@ -21,45 +21,42 @@
2121
class RedisPacketListenerTest {
2222

2323
@Mock
24-
SubscriptionHandler subscriptionHandler;
24+
StringMessageBroker subscriptionHandler;
2525

2626
@Test
2727
void onMessage_withValidChannel() throws Exception {
2828
String topic = "test";
29-
String message = "message";
30-
byte[] expected = Base64.getDecoder().decode(message);
29+
String expected = "message";
3130

3231
RedisPacketListener redisPacketListener = new RedisPacketListener(subscriptionHandler, topic, Logger.getAnonymousLogger());
3332

34-
redisPacketListener.onMessage(topic, message);
35-
verify(subscriptionHandler, times(1)).onPacket(expected);
36-
verify(subscriptionHandler).onPacket(argThat(argument -> {
37-
assertArrayEquals(expected,argument);
33+
redisPacketListener.onMessage(topic, expected);
34+
verify(subscriptionHandler, times(1)).onRedisMessage(expected);
35+
verify(subscriptionHandler).onRedisMessage(argThat(argument -> {
36+
assertEquals(expected,argument);
3837
return true;
3938
}));
4039
}
4140

4241
@Test
4342
void onMessage_throwException() throws Exception {
4443
String topic = "test";
45-
String message = "message";
46-
byte[] expected = Base64.getDecoder().decode(message);
47-
44+
String expected = "message";
4845

49-
SubscriptionHandler handler = new SubscriptionHandler() {
46+
StringMessageBroker handler = new StringMessageBroker() {
5047
@Override
51-
public boolean onPacket(byte[] packet) throws Exception {
48+
public boolean onRedisMessage(String message) throws Exception {
5249
throw new NullPointerException();
5350
}
5451
};
5552

56-
SubscriptionHandler handlerSub = spy(handler);
53+
StringMessageBroker handlerSub = spy(handler);
5754
RedisPacketListener redisPacketListener = new RedisPacketListener(handlerSub, topic, Logger.getAnonymousLogger());
5855

59-
redisPacketListener.onMessage(topic, message);
60-
verify(handlerSub, times(1)).onPacket(expected);
61-
verify(handlerSub).onPacket(argThat(argument -> {
62-
assertArrayEquals(expected,argument);
56+
redisPacketListener.onMessage(topic, expected);
57+
verify(handlerSub, times(1)).onRedisMessage(expected);
58+
verify(handlerSub).onRedisMessage(argThat(argument -> {
59+
assertEquals(expected,argument);
6360
return true;
6461
}));
6562
}

0 commit comments

Comments
 (0)