diff --git a/src/main/java/org/jetlang/remote/acceptor/JetlangClientHandler.java b/src/main/java/org/jetlang/remote/acceptor/JetlangClientHandler.java index 6d0ed0d..3c1e87b 100644 --- a/src/main/java/org/jetlang/remote/acceptor/JetlangClientHandler.java +++ b/src/main/java/org/jetlang/remote/acceptor/JetlangClientHandler.java @@ -5,6 +5,8 @@ import org.jetlang.remote.core.ErrorHandler; import org.jetlang.remote.core.JetlangRemotingInputStream; import org.jetlang.remote.core.JetlangRemotingProtocol; +import org.jetlang.remote.core.RawMsgHandler; +import org.jetlang.remote.core.RawMsgHandlerFactory; import org.jetlang.remote.core.ReadTimeoutEvent; import org.jetlang.remote.core.Serializer; import org.jetlang.remote.core.SerializerFactory; @@ -20,6 +22,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.jetlang.remote.core.RawMsgHandlerFactory.NULL_RAW_MSG_HANDLER_FACTORY; + public class JetlangClientHandler implements Acceptor.ClientHandler, ClientPublisher { private final SerializerAdapter ser; @@ -33,6 +37,7 @@ public class JetlangClientHandler implements Acceptor.ClientHandler, ClientPubli private final BufferedSerializer globalBuffer; private final Fiber globalSendFiber; + private final RawMsgHandlerFactory rawMsgHandlerFactory; public interface FiberFactory { @@ -66,6 +71,16 @@ public JetlangClientHandler(SerializerAdapter ser, JetlangSessionConfig config, FiberFactory fiberFactory, ErrorHandler errorHandler) { + this(ser, channels, exec, config, fiberFactory, errorHandler, NULL_RAW_MSG_HANDLER_FACTORY); + } + + public JetlangClientHandler(SerializerAdapter ser, + NewSessionHandler channels, + Executor exec, + JetlangSessionConfig config, + FiberFactory fiberFactory, + ErrorHandler errorHandler, + RawMsgHandlerFactory rawMsgHandlerFactory) { this.ser = ser; this.channels = channels; this.exec = exec; @@ -73,6 +88,7 @@ public JetlangClientHandler(SerializerAdapter ser, this.fiberFactory = fiberFactory; this.errorHandler = errorHandler; this.globalSendFiber = fiberFactory.createGlobalSendFiber(); + this.rawMsgHandlerFactory = rawMsgHandlerFactory; this.globalSendFiber.start(); this.globalBuffer = ser.createBuffered(); } @@ -168,7 +184,10 @@ private Runnable createRunnable(final ClientTcpSocket clientTcpSocket) throws IO final TcpSocket socket = clientTcpSocket.getSocket(); final Fiber sendFiber = fiberFactory.createSendFiber(socket.getSocket()); final Serializer serializer = ser.createForSocket(socket); - final JetlangStreamSession session = new JetlangStreamSession(socket.getRemoteSocketAddress(), new SocketMessageStreamWriter(socket, ser.getCharset(), serializer.getWriter()), sendFiber, errorHandler); + final SocketMessageStreamWriter streamWriter = new SocketMessageStreamWriter(socket, ser.getCharset(), serializer.getWriter()); + final RawMsgHandler rawMsgHandler = rawMsgHandlerFactory.rawMsgHandler(); + final JetlangStreamSession session = new JetlangStreamSession(socket.getRemoteSocketAddress(), streamWriter, + sendFiber, errorHandler, rawMsgHandler); return new Runnable() { public void run() { try { @@ -177,7 +196,8 @@ public void run() { channels.onNewSession(JetlangClientHandler.this, session); session.startHeartbeat(config.getHeartbeatIntervalInMs(), TimeUnit.MILLISECONDS); sendFiber.start(); - JetlangRemotingProtocol protocol = new JetlangRemotingProtocol(session, serializer.getReader(), ser.getCharset()); + + JetlangRemotingProtocol protocol = new JetlangRemotingProtocol(session, serializer.getReader(), ser.getCharset(), rawMsgHandler.enabled()); JetlangRemotingInputStream state = new JetlangRemotingInputStream(socket.getInputStream(), protocol, onReadTimeout); while (state.readFromStream()) { diff --git a/src/main/java/org/jetlang/remote/acceptor/JetlangNioSession.java b/src/main/java/org/jetlang/remote/acceptor/JetlangNioSession.java index a99ac35..b3f2867 100644 --- a/src/main/java/org/jetlang/remote/acceptor/JetlangNioSession.java +++ b/src/main/java/org/jetlang/remote/acceptor/JetlangNioSession.java @@ -2,6 +2,8 @@ import org.jetlang.fibers.NioFiber; import org.jetlang.remote.core.MsgTypes; +import org.jetlang.remote.core.RawMsg; +import org.jetlang.remote.core.RawMsgHandler; import java.nio.channels.SocketChannel; @@ -10,6 +12,7 @@ public class JetlangNioSession extends JetlangBaseSession implements JetlangMess private final NioJetlangSendFiber.ChannelState channel; private final NioJetlangSendFiber sendFiber; private final ErrorHandler errorHandler; + private final RawMsgHandler rawMsgHandler; public interface ErrorHandler { @@ -20,14 +23,22 @@ public interface ErrorHandler { void onHandlerException(Exception failed); } - public JetlangNioSession(NioFiber fiber, SocketChannel channel, NioJetlangSendFiber sendFiber, NioJetlangRemotingClientFactory.Id id, ErrorHandler errorHandler) { + public JetlangNioSession(NioFiber fiber, SocketChannel channel, NioJetlangSendFiber sendFiber, + NioJetlangRemotingClientFactory.Id id, ErrorHandler errorHandler, + RawMsgHandler rawMsgHandler) { super(id); this.errorHandler = errorHandler; + this.rawMsgHandler = rawMsgHandler; this.channel = new NioJetlangSendFiber.ChannelState(channel, id, fiber); this.sendFiber = sendFiber; this.sendFiber.onNewSession(this.channel); } + @Override + public void onRawMsg(RawMsg rawMsg) { + rawMsgHandler.onRawMsg(rawMsg); + } + @Override public void onHandlerException(Exception failed) { errorHandler.onHandlerException(failed); diff --git a/src/main/java/org/jetlang/remote/acceptor/JetlangStreamSession.java b/src/main/java/org/jetlang/remote/acceptor/JetlangStreamSession.java index 5582e93..2978f31 100644 --- a/src/main/java/org/jetlang/remote/acceptor/JetlangStreamSession.java +++ b/src/main/java/org/jetlang/remote/acceptor/JetlangStreamSession.java @@ -5,6 +5,8 @@ import org.jetlang.remote.core.ErrorHandler; import org.jetlang.remote.core.JetlangRemotingProtocol; import org.jetlang.remote.core.MsgTypes; +import org.jetlang.remote.core.RawMsg; +import org.jetlang.remote.core.RawMsgHandler; import java.io.IOException; import java.util.Collections; @@ -18,6 +20,7 @@ public class JetlangStreamSession extends JetlangBaseSession implements JetlangR private final MessageStreamWriter socket; private final Fiber sendFiber; private final ErrorHandler errorHandler; + private final RawMsgHandler rawMsgHandler; private final Set subscriptions = Collections.synchronizedSet(new HashSet()); private volatile boolean loggedOut; @@ -26,11 +29,18 @@ public void run() { } }; - public JetlangStreamSession(Object id, MessageStreamWriter socket, Fiber sendFiber, ErrorHandler errorHandler) { + public JetlangStreamSession(Object id, MessageStreamWriter socket, Fiber sendFiber, + ErrorHandler errorHandler, RawMsgHandler rawMsgHandler) { super(id); this.socket = socket; this.sendFiber = sendFiber; this.errorHandler = errorHandler; + this.rawMsgHandler = rawMsgHandler; + } + + @Override + public void onRawMsg(RawMsg rawMsg) { + rawMsgHandler.onRawMsg(rawMsg); } public void startHeartbeat(int interval, TimeUnit unit) { diff --git a/src/main/java/org/jetlang/remote/acceptor/NioJetlangChannelHandler.java b/src/main/java/org/jetlang/remote/acceptor/NioJetlangChannelHandler.java index 6e9bdff..b47d811 100644 --- a/src/main/java/org/jetlang/remote/acceptor/NioJetlangChannelHandler.java +++ b/src/main/java/org/jetlang/remote/acceptor/NioJetlangChannelHandler.java @@ -5,6 +5,7 @@ import org.jetlang.fibers.NioFiber; import org.jetlang.remote.core.JetlangRemotingProtocol; import org.jetlang.remote.core.ObjectByteReader; +import org.jetlang.remote.core.RawMsgHandler; import org.jetlang.remote.core.ReadTimeoutEvent; import java.io.IOException; @@ -23,10 +24,11 @@ public class NioJetlangChannelHandler implements NioChannelHandler { private JetlangRemotingProtocol.State nextCommand; private long lastReadMs = System.currentTimeMillis(); - public NioJetlangChannelHandler(SocketChannel accept, JetlangMessageHandler session, ObjectByteReader reader, Runnable onEnd, Charset charset) { + public NioJetlangChannelHandler(SocketChannel accept, JetlangMessageHandler session, ObjectByteReader reader, + Runnable onEnd, Charset charset, RawMsgHandler rawMsgHandler) { this.session = session; this.onEnd = onEnd; - this.protocol = new JetlangRemotingProtocol(session, reader, charset); + this.protocol = new JetlangRemotingProtocol(session, reader, charset, rawMsgHandler.enabled()); this.accept = accept; this.nextCommand = protocol.root; } diff --git a/src/main/java/org/jetlang/remote/acceptor/NioJetlangRemotingClientFactory.java b/src/main/java/org/jetlang/remote/acceptor/NioJetlangRemotingClientFactory.java index d295df3..c79ce84 100644 --- a/src/main/java/org/jetlang/remote/acceptor/NioJetlangRemotingClientFactory.java +++ b/src/main/java/org/jetlang/remote/acceptor/NioJetlangRemotingClientFactory.java @@ -3,6 +3,8 @@ import org.jetlang.core.Disposable; import org.jetlang.fibers.NioControls; import org.jetlang.fibers.NioFiber; +import org.jetlang.remote.core.RawMsgHandler; +import org.jetlang.remote.core.RawMsgHandlerFactory; import org.jetlang.remote.core.Serializer; import java.net.SocketAddress; @@ -12,11 +14,14 @@ import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; +import static org.jetlang.remote.core.RawMsgHandlerFactory.NULL_RAW_MSG_HANDLER_FACTORY; + public class NioJetlangRemotingClientFactory implements NioAcceptorHandler.ClientFactory { private final Serializer serializer; private final JetlangSessionConfig config; private final Handler handler; + private final RawMsgHandlerFactory rawMsgHandlerFactory; private final NioJetlangSendFiber sendFiber; private final Charset charset; @@ -36,10 +41,18 @@ default void configureAcceptedClient(SelectionKey key, SocketChannel channel) th void onHandlerException(Exception failed); } - public NioJetlangRemotingClientFactory(Serializer serializer, JetlangSessionConfig config, Handler handler, NioJetlangSendFiber sendFiber, Charset charset) { + public NioJetlangRemotingClientFactory(Serializer serializer, JetlangSessionConfig config, Handler handler, + NioJetlangSendFiber sendFiber, Charset charset) { + this(serializer, config, handler, NULL_RAW_MSG_HANDLER_FACTORY, sendFiber, charset); + } + + public NioJetlangRemotingClientFactory(Serializer serializer, JetlangSessionConfig config, Handler handler, + RawMsgHandlerFactory rawMsgHandlerFactory, NioJetlangSendFiber sendFiber, + Charset charset) { this.serializer = serializer; this.config = config; this.handler = handler; + this.rawMsgHandlerFactory = rawMsgHandlerFactory; this.sendFiber = sendFiber; this.charset = charset; } @@ -52,6 +65,7 @@ public void onAccept(NioFiber fiber, NioControls controls, SelectionKey key, Soc throw new RuntimeException(e); } Hb hb = new Hb(); + RawMsgHandler rawMsgHandler = rawMsgHandlerFactory.rawMsgHandler(); final JetlangNioSession session = new JetlangNioSession(fiber, channel, sendFiber, new Id(channel), new JetlangNioSession.ErrorHandler() { @Override public void onUnhandledReplyMsg(int reqId, String dataTopicVal, Object readObject) { @@ -67,12 +81,13 @@ public void onUnknownMessage(int read) { public void onHandlerException(Exception failed) { handler.onHandlerException(failed); } - }); + }, rawMsgHandler); + Runnable onClose = () -> { hb.onClose(); session.onClose(new SessionCloseEvent()); }; - final NioJetlangChannelHandler handler = new NioJetlangChannelHandler(channel, session, serializer.getReader(), onClose, charset); + final NioJetlangChannelHandler handler = new NioJetlangChannelHandler(channel, session, serializer.getReader(), onClose, charset, rawMsgHandler); this.handler.onNewSession(session); hb.startHb(fiber, session, handler, config); controls.addHandler(handler); diff --git a/src/main/java/org/jetlang/remote/client/JetlangTcpClient.java b/src/main/java/org/jetlang/remote/client/JetlangTcpClient.java index d3eecb4..abf3d67 100644 --- a/src/main/java/org/jetlang/remote/client/JetlangTcpClient.java +++ b/src/main/java/org/jetlang/remote/client/JetlangTcpClient.java @@ -16,6 +16,8 @@ import org.jetlang.remote.core.JetlangRemotingInputStream; import org.jetlang.remote.core.JetlangRemotingProtocol; import org.jetlang.remote.core.MsgTypes; +import org.jetlang.remote.core.RawMsg; +import org.jetlang.remote.core.RawMsgHandler; import org.jetlang.remote.core.ReadTimeoutEvent; import org.jetlang.remote.core.Serializer; import org.jetlang.remote.core.SocketMessageStreamWriter; @@ -34,7 +36,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.jetlang.remote.core.RawMsgHandler.NULL_RAW_MSG_HANDLER; + /** + * */ public class JetlangTcpClient implements JetlangClient { @@ -43,6 +48,7 @@ public class JetlangTcpClient implements JetlangClient { private final JetlangClientConfig config; private final Serializer ser; private final ErrorHandler errorHandler; + private final RawMsgHandler rawMsgHandler; private static final Charset charset = Charset.forName("US-ASCII"); private final SocketConnector socketConnector; private Disposable pendingConnect; @@ -66,7 +72,7 @@ private CloseableChannel channel() { private final Map pendingRequests = Collections.synchronizedMap(new HashMap()); private final SocketWriter socketWriter = new SocketWriter() { - public boolean send(final String topic, final T msg){ + public boolean send(final String topic, final T msg) { if (socket != null) { try { socket.write(topic, msg); @@ -87,11 +93,21 @@ public JetlangTcpClient(SocketConnector socketConnector, JetlangClientConfig config, Serializer ser, ErrorHandler errorHandler) { + this(socketConnector, sendFiber, config, ser, errorHandler, NULL_RAW_MSG_HANDLER); + } + + public JetlangTcpClient(SocketConnector socketConnector, + Fiber sendFiber, + JetlangClientConfig config, + Serializer ser, + ErrorHandler errorHandler, + RawMsgHandler rawMsgHandler) { this.socketConnector = socketConnector; this.sendFiber = sendFiber; this.config = config; this.ser = ser; this.errorHandler = errorHandler; + this.rawMsgHandler = rawMsgHandler; } private class RemoteSubscription { @@ -252,7 +268,7 @@ private void handleConnect(Socket newSocket) throws IOException { final InputStream stream = newSocket.getInputStream(); final Runnable reader = new Runnable() { public void run() { - final JetlangRemotingProtocol protocol = new JetlangRemotingProtocol(protocolHandler, ser.getReader(), charset); + final JetlangRemotingProtocol protocol = new JetlangRemotingProtocol(protocolHandler, ser.getReader(), charset, rawMsgHandler.enabled()); final JetlangRemotingInputStream inputStream = new JetlangRemotingInputStream(stream, protocol, onReadTimeout); try { Connected.publish(new ConnectEvent()); @@ -275,6 +291,11 @@ public void onMessage(String dataTopicVal, Object readObject) { publishData(dataTopicVal, readObject); } + @Override + public void onRawMsg(RawMsg rawMsg) { + rawMsgHandler.onRawMsg(rawMsg); + } + public void onSubscriptionRequest(String val) { errorHandler.onException(new IOException("SubscriptionNotSupported: " + val)); } @@ -309,7 +330,6 @@ public void onRequestReply(int reqId, String dataTopicVal, Object readObject) { } }; - private void handleReadExceptionOnSendFiber(final IOException e) { Runnable exec = new Runnable() { public void run() { @@ -446,7 +466,6 @@ public void dispose() { } } - public Subscriber getCloseChannel() { return Closed; } @@ -470,7 +489,7 @@ public void publish(String topic, T msg) { public void publish(final String topic, final T msg, final Runnable onSend) { Runnable r = new Runnable() { public void run() { - if(socketWriter.send(topic, msg)){ + if (socketWriter.send(topic, msg)) { if (onSend != null) onSend.run(); } @@ -479,7 +498,7 @@ public void run() { sendFiber.execute(r); } - public void execOnSendThread(final Callback cb){ + public void execOnSendThread(final Callback cb) { Runnable r = new Runnable() { public void run() { cb.onMessage(socketWriter); diff --git a/src/main/java/org/jetlang/remote/core/JetlangRemotingProtocol.java b/src/main/java/org/jetlang/remote/core/JetlangRemotingProtocol.java index 0291add..c4319b9 100644 --- a/src/main/java/org/jetlang/remote/core/JetlangRemotingProtocol.java +++ b/src/main/java/org/jetlang/remote/core/JetlangRemotingProtocol.java @@ -12,17 +12,8 @@ public class JetlangRemotingProtocol { private final Charset charset; private final DataRequest dataRequest = new DataRequest(); private final DataRequestReply dataRequestReply = new DataRequestReply(); - private final DataReader d = new DataReader() { - @Override - protected State onObject(String dataTopicVal, Object readObject) { - try { - session.onMessage(dataTopicVal, readObject); - } catch (Exception failed) { - session.onHandlerException(failed); - } - return root; - } - }; + private final DataReader dataReader; + public final State root = new State() { public int getRequiredBytes() { return 1; @@ -42,7 +33,7 @@ public State run() { execEvent(session::onLogout); return this; case MsgTypes.Data: - return d.first.first; + return dataReader.run(); case MsgTypes.DataRequest: return dataRequest.reqIdSt; case MsgTypes.DataReply: @@ -98,6 +89,8 @@ public interface Handler { void onMessage(String dataTopicVal, Object readObject); + void onRawMsg(RawMsg rawMsg); + void onSubscriptionRequest(String val); void onRequest(int reqId, String dataTopicVal, Object readObject); @@ -115,11 +108,12 @@ public interface Handler { void onHandlerException(Exception failed); } - public JetlangRemotingProtocol(Handler session, ObjectByteReader reader, Charset charset) { + public JetlangRemotingProtocol(Handler session, ObjectByteReader reader, Charset charset, boolean enableRawMsgs) { this.session = session; this.charset = charset; this.buffer = ByteBuffer.wrap(this.bufferArray); this.reader = reader; + this.dataReader = enableRawMsgs ? new RawDataReader() : new MessageReader(); } public interface State { @@ -155,7 +149,61 @@ public State run() { protected abstract State onString(String val) throws IOException; } - private abstract class DataReader { + public interface DataReader { + State run(); + } + + private class RawDataReader implements DataReader { + private int topicSize; + private int dataSize; + + private int start; + private int length; + + final RawMsg rawMsg = new RawMsg() { + @Override + public void read(ByteBuffer destination) { + for (int i = 0; i < length; i++ ) { + byte b = buffer.get(); + destination.put(b); + } + } + }; + + @Override + public State run() { + this.start = buffer.position(); + + int pos = buffer.position(); + topicSize = buffer.get(pos); + pos += 1; + pos += topicSize; + dataSize = buffer.getInt(pos); + pos += 4; + pos += dataSize; + + this.length = (pos) - start; + + session.onRawMsg(rawMsg); + buffer.position(start + length); // must always set position + + return root; + } + } + + private class MessageReader extends ObjectReader { + @Override + protected State onObject(String dataTopicVal, Object readObject) { + try { + session.onMessage(dataTopicVal, readObject); + } catch (Exception failed) { + session.onHandlerException(failed); + } + return root; + } + }; + + private abstract class ObjectReader implements DataReader { private int dataSizeVal; private String dataTopicVal; State dataSizeRead = new State() { @@ -187,6 +235,11 @@ protected State onString(String val) { } }; + @Override + public State run() { + return first.first; + } + protected abstract State onObject(String dataTopicVal, Object readObject) throws IOException; } @@ -208,7 +261,7 @@ protected void handleRequest(int reqId, String dataTopicVal, Object readObject) private abstract class DataRequestBase { int reqId; - DataReader data = new DataReader() { + final ObjectReader data = new ObjectReader() { @Override protected State onObject(String dataTopicVal, Object readObject) { try { @@ -222,14 +275,14 @@ protected State onObject(String dataTopicVal, Object readObject) { protected abstract void handleRequest(int reqId, String dataTopicVal, Object readObject); - State reqIdSt = new State() { + final State reqIdSt = new State() { public int getRequiredBytes() { return 4; } public State run() { reqId = buffer.getInt(); - return data.first.first; + return data.run(); } }; } diff --git a/src/main/java/org/jetlang/remote/core/RawMsg.java b/src/main/java/org/jetlang/remote/core/RawMsg.java new file mode 100644 index 0000000..35771f4 --- /dev/null +++ b/src/main/java/org/jetlang/remote/core/RawMsg.java @@ -0,0 +1,7 @@ +package org.jetlang.remote.core; + +import java.nio.ByteBuffer; + +public interface RawMsg { + void read(ByteBuffer destination); +} diff --git a/src/main/java/org/jetlang/remote/core/RawMsgHandler.java b/src/main/java/org/jetlang/remote/core/RawMsgHandler.java new file mode 100644 index 0000000..dc5804f --- /dev/null +++ b/src/main/java/org/jetlang/remote/core/RawMsgHandler.java @@ -0,0 +1,20 @@ +package org.jetlang.remote.core; + +public interface RawMsgHandler { + + boolean enabled(); + + void onRawMsg(RawMsg rawMsg); + + RawMsgHandler NULL_RAW_MSG_HANDLER = new RawMsgHandler() { + @Override + public boolean enabled() { + return false; + } + + @Override + public void onRawMsg(RawMsg rawMsg) { + throw new UnsupportedOperationException(); + } + }; +} diff --git a/src/main/java/org/jetlang/remote/core/RawMsgHandlerFactory.java b/src/main/java/org/jetlang/remote/core/RawMsgHandlerFactory.java new file mode 100644 index 0000000..75e3edd --- /dev/null +++ b/src/main/java/org/jetlang/remote/core/RawMsgHandlerFactory.java @@ -0,0 +1,15 @@ +package org.jetlang.remote.core; + +import static org.jetlang.remote.core.RawMsgHandler.NULL_RAW_MSG_HANDLER; + +public interface RawMsgHandlerFactory { + RawMsgHandler rawMsgHandler(); + + RawMsgHandlerFactory NULL_RAW_MSG_HANDLER_FACTORY = new RawMsgHandlerFactory() { + @Override + public RawMsgHandler rawMsgHandler() { + return NULL_RAW_MSG_HANDLER; + } + }; +} + diff --git a/src/test/java/org/jetlang/remote/IntegrationBase.java b/src/test/java/org/jetlang/remote/IntegrationBase.java index ebc7c69..d17ad29 100644 --- a/src/test/java/org/jetlang/remote/IntegrationBase.java +++ b/src/test/java/org/jetlang/remote/IntegrationBase.java @@ -4,17 +4,43 @@ import org.jetlang.core.Disposable; import org.jetlang.core.SynchronousDisposingExecutor; import org.jetlang.fibers.ThreadFiber; -import org.jetlang.remote.acceptor.*; -import org.jetlang.remote.client.*; +import org.jetlang.remote.acceptor.Acceptor; +import org.jetlang.remote.acceptor.ClientPublisher; +import org.jetlang.remote.acceptor.JetlangClientHandler; +import org.jetlang.remote.acceptor.JetlangFiberSession; +import org.jetlang.remote.acceptor.JetlangSession; +import org.jetlang.remote.acceptor.JetlangSessionConfig; +import org.jetlang.remote.acceptor.LogoutEvent; +import org.jetlang.remote.acceptor.NewFiberSessionHandler; +import org.jetlang.remote.acceptor.NewSessionHandler; +import org.jetlang.remote.acceptor.SerializerAdapter; +import org.jetlang.remote.acceptor.SessionCloseEvent; +import org.jetlang.remote.acceptor.SessionMessage; +import org.jetlang.remote.acceptor.SessionRequest; +import org.jetlang.remote.acceptor.SessionTopic; +import org.jetlang.remote.client.CloseEvent; +import org.jetlang.remote.client.ConnectEvent; +import org.jetlang.remote.client.JetlangClient; +import org.jetlang.remote.client.JetlangClientConfig; +import org.jetlang.remote.client.JetlangTcpClient; +import org.jetlang.remote.client.LogoutResult; +import org.jetlang.remote.client.SocketConnector; +import org.jetlang.remote.client.TimeoutControls; import org.jetlang.remote.core.ErrorHandler; import org.jetlang.remote.core.HeartbeatEvent; import org.jetlang.remote.core.JavaSerializer; +import org.jetlang.remote.core.RawMsg; +import org.jetlang.remote.core.RawMsgHandler; import org.jetlang.remote.core.ReadTimeoutEvent; +import org.jetlang.web.NioReader; import org.junit.After; import org.junit.Test; import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; import java.net.ServerSocket; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -22,7 +48,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public abstract class IntegrationBase { @@ -369,7 +397,6 @@ public void onNewSession(ClientPublisher pub, JetlangFiberSession session) { @Test public void requestReply() throws IOException { - NewSessionHandler sessionCallback = new NewSessionHandler() { public void onNewSession(ClientPublisher pub, JetlangSession jetlangSession) { Callback onRequest = new Callback() { @@ -411,7 +438,7 @@ public void onMessage(SessionRequest sessionRequest) { @Test - public void regression() throws IOException, InterruptedException { + public void regression() throws Exception { final EventAssert subscriptionReceived = new EventAssert(1); Callback onTopic = new Callback() { public void onMessage(SessionTopic message) { @@ -478,6 +505,102 @@ private JetlangClient createClient() { return new JetlangTcpClient(conn, new ThreadFiber(), clientConfig, new JavaSerializer(), new ErrorHandler.SysOut()); } + @Test + public void rawMsgRegression() throws Exception { + final EventAssert subscriptionReceived = new EventAssert(1); + Callback onTopic = new Callback() { + public void onMessage(SessionTopic message) { + message.publish("mymsg"); + } + }; + subscriptionReceived.onMessage(onTopic); + final EventAssert logoutEvent = new EventAssert(1); + final EventAssert> serverMessageReceive = new EventAssert>(1); + final EventAssert unsubscribeReceive = new EventAssert(1); + final EventAssert serverSessionClose = new EventAssert(1); + + NewFiberSessionHandler handlerFactory = new NewFiberSessionHandler() { + public void onNewSession(ClientPublisher pub, JetlangFiberSession session) { + subscriptionReceived.subscribe(session.getSubscriptionRequestChannel(), session.getFiber()); + logoutEvent.subscribe(session.getLogoutChannel(), session.getFiber()); + serverMessageReceive.subscribe(session.getSessionMessageChannel(), session.getFiber()); + unsubscribeReceive.subscribe(session.getUnsubscribeChannel(), session.getFiber()); + serverSessionClose.subscribe(session.getSessionCloseChannel(), session.getFiber()); + assertEquals(session.getSessionId(), session.getSessionId()); + } + }; + + Acceptor acceptor = createAcceptor(wrap(handlerFactory)); + Thread runner = new Thread(acceptor); + runner.start(); + + EventAssert clientMsgReceive = new EventAssert(1); + JetlangClient client = createRawMsgClient(clientMsgReceive); + EventAssert clientConnect = EventAssert.expect(1, client.getConnectChannel()); + EventAssert clientClose = EventAssert.expect(1, client.getCloseChannel()); + + Disposable unsubscribe = client.subscribe("newtopic", clientMsgReceive.asSubscribable()); + client.start(); + + subscriptionReceived.assertEvent(); + assertEquals("newtopic", subscriptionReceived.takeFromReceived().getTopic()); + clientConnect.assertEvent(); + clientMsgReceive.assertEvent(); + client.publish("toServer", "myclientmessage"); + serverMessageReceive.assertEvent(); + SessionMessage sessionMessage = serverMessageReceive.takeFromReceived(); + assertEquals("toServer", sessionMessage.getTopic()); + assertEquals("myclientmessage", sessionMessage.getMessage()); + unsubscribe.dispose(); + unsubscribeReceive.assertEvent(); + assertEquals("newtopic", unsubscribeReceive.takeFromReceived()); + + LogoutResult closeLatch = client.close(true); + + assertTrue(closeLatch.await(10, TimeUnit.SECONDS)); + logoutEvent.assertEvent(); + serverSessionClose.assertEvent(); + clientClose.assertEvent(); + assertEquals(CloseEvent.GracefulDisconnect.class, clientClose.received.take().getClass()); + assertEquals(0, handler.clientCount()); + acceptor.stop(); + service.shutdownNow(); + } + + private JetlangClient createRawMsgClient(EventAssert clientMsgReceive) { + ByteBuffer byteBuffer = NioReader.bufferAllocate(4096); + return new JetlangTcpClient(conn, new ThreadFiber(), clientConfig, new JavaSerializer(), new ErrorHandler.SysOut(), new RawMsgHandler() { + @Override + public boolean enabled() { + return true; + } + + @Override + public void onRawMsg(RawMsg rawMsg) { + rawMsg.read(byteBuffer); + + byteBuffer.flip(); + int topicSize = byteBuffer.get(); + + final char[] chars = new char[topicSize]; + for (int i = 0; i < topicSize; i++) { + chars[i] = (char) byteBuffer.get(); + } + String topic = new String(chars); + int size = byteBuffer.getInt(); + + try { + String msg = (String) new ObjectInputStream(new ByteBufferInputStream(byteBuffer)).readObject(); + clientMsgReceive.receiveMessage(msg); + } catch (Throwable t) { + throw new RuntimeException(t); + } + + byteBuffer.clear(); + } + }); + } + private Acceptor createAcceptor(NewSessionHandler newSession) throws IOException { handler = new JetlangClientHandler(serializerFactory, newSession, service, sessionConfig, new JetlangClientHandler.FiberFactory.ThreadFiberFactory(), @@ -487,4 +610,24 @@ private Acceptor createAcceptor(NewSessionHandler newSession) throws IOException new Acceptor.ErrorHandler.SysOut(), handler); } + + public class ByteBufferInputStream extends InputStream { + private final ByteBuffer buffer; + + public ByteBufferInputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public int read() { + return buffer.get() & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) { + final int pos = buffer.position(); + buffer.get(b, off, len); + return buffer.position() - pos; + } + } }