Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.jetlang.remote.core.ErrorHandler;
import org.jetlang.remote.core.JetlangRemotingInputStream;
import org.jetlang.remote.core.JetlangRemotingProtocol;
import org.jetlang.remote.core.RawMsgHandler;
import org.jetlang.remote.core.RawMsgHandlerFactory;
import org.jetlang.remote.core.ReadTimeoutEvent;
import org.jetlang.remote.core.Serializer;
import org.jetlang.remote.core.SerializerFactory;
Expand All @@ -20,6 +22,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.jetlang.remote.core.RawMsgHandlerFactory.NULL_RAW_MSG_HANDLER_FACTORY;

public class JetlangClientHandler implements Acceptor.ClientHandler, ClientPublisher {

private final SerializerAdapter ser;
Expand All @@ -33,6 +37,7 @@ public class JetlangClientHandler implements Acceptor.ClientHandler, ClientPubli
private final BufferedSerializer globalBuffer;

private final Fiber globalSendFiber;
private final RawMsgHandlerFactory rawMsgHandlerFactory;

public interface FiberFactory {

Expand Down Expand Up @@ -66,13 +71,24 @@ public JetlangClientHandler(SerializerAdapter ser,
JetlangSessionConfig config,
FiberFactory fiberFactory,
ErrorHandler errorHandler) {
this(ser, channels, exec, config, fiberFactory, errorHandler, NULL_RAW_MSG_HANDLER_FACTORY);
}

public JetlangClientHandler(SerializerAdapter ser,
NewSessionHandler channels,
Executor exec,
JetlangSessionConfig config,
FiberFactory fiberFactory,
ErrorHandler errorHandler,
RawMsgHandlerFactory rawMsgHandlerFactory) {
this.ser = ser;
this.channels = channels;
this.exec = exec;
this.config = config;
this.fiberFactory = fiberFactory;
this.errorHandler = errorHandler;
this.globalSendFiber = fiberFactory.createGlobalSendFiber();
this.rawMsgHandlerFactory = rawMsgHandlerFactory;
this.globalSendFiber.start();
this.globalBuffer = ser.createBuffered();
}
Expand Down Expand Up @@ -168,7 +184,10 @@ private Runnable createRunnable(final ClientTcpSocket clientTcpSocket) throws IO
final TcpSocket socket = clientTcpSocket.getSocket();
final Fiber sendFiber = fiberFactory.createSendFiber(socket.getSocket());
final Serializer serializer = ser.createForSocket(socket);
final JetlangStreamSession session = new JetlangStreamSession(socket.getRemoteSocketAddress(), new SocketMessageStreamWriter(socket, ser.getCharset(), serializer.getWriter()), sendFiber, errorHandler);
final SocketMessageStreamWriter streamWriter = new SocketMessageStreamWriter(socket, ser.getCharset(), serializer.getWriter());
final RawMsgHandler rawMsgHandler = rawMsgHandlerFactory.rawMsgHandler();
final JetlangStreamSession session = new JetlangStreamSession(socket.getRemoteSocketAddress(), streamWriter,
sendFiber, errorHandler, rawMsgHandler);
return new Runnable() {
public void run() {
try {
Expand All @@ -177,7 +196,8 @@ public void run() {
channels.onNewSession(JetlangClientHandler.this, session);
session.startHeartbeat(config.getHeartbeatIntervalInMs(), TimeUnit.MILLISECONDS);
sendFiber.start();
JetlangRemotingProtocol protocol = new JetlangRemotingProtocol(session, serializer.getReader(), ser.getCharset());

JetlangRemotingProtocol protocol = new JetlangRemotingProtocol(session, serializer.getReader(), ser.getCharset(), rawMsgHandler.enabled());
JetlangRemotingInputStream state = new JetlangRemotingInputStream(socket.getInputStream(), protocol, onReadTimeout);
while (state.readFromStream()) {

Expand Down
13 changes: 12 additions & 1 deletion src/main/java/org/jetlang/remote/acceptor/JetlangNioSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.jetlang.fibers.NioFiber;
import org.jetlang.remote.core.MsgTypes;
import org.jetlang.remote.core.RawMsg;
import org.jetlang.remote.core.RawMsgHandler;

import java.nio.channels.SocketChannel;

Expand All @@ -10,6 +12,7 @@ public class JetlangNioSession extends JetlangBaseSession implements JetlangMess
private final NioJetlangSendFiber.ChannelState channel;
private final NioJetlangSendFiber sendFiber;
private final ErrorHandler errorHandler;
private final RawMsgHandler rawMsgHandler;

public interface ErrorHandler {

Expand All @@ -20,14 +23,22 @@ public interface ErrorHandler {
void onHandlerException(Exception failed);
}

public JetlangNioSession(NioFiber fiber, SocketChannel channel, NioJetlangSendFiber sendFiber, NioJetlangRemotingClientFactory.Id id, ErrorHandler errorHandler) {
public JetlangNioSession(NioFiber fiber, SocketChannel channel, NioJetlangSendFiber sendFiber,
NioJetlangRemotingClientFactory.Id id, ErrorHandler errorHandler,
RawMsgHandler rawMsgHandler) {
super(id);
this.errorHandler = errorHandler;
this.rawMsgHandler = rawMsgHandler;
this.channel = new NioJetlangSendFiber.ChannelState(channel, id, fiber);
this.sendFiber = sendFiber;
this.sendFiber.onNewSession(this.channel);
}

@Override
public void onRawMsg(RawMsg rawMsg) {
rawMsgHandler.onRawMsg(rawMsg);
}

@Override
public void onHandlerException(Exception failed) {
errorHandler.onHandlerException(failed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.jetlang.remote.core.ErrorHandler;
import org.jetlang.remote.core.JetlangRemotingProtocol;
import org.jetlang.remote.core.MsgTypes;
import org.jetlang.remote.core.RawMsg;
import org.jetlang.remote.core.RawMsgHandler;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -18,6 +20,7 @@ public class JetlangStreamSession extends JetlangBaseSession implements JetlangR
private final MessageStreamWriter socket;
private final Fiber sendFiber;
private final ErrorHandler errorHandler;
private final RawMsgHandler rawMsgHandler;
private final Set<String> subscriptions = Collections.synchronizedSet(new HashSet<String>());
private volatile boolean loggedOut;

Expand All @@ -26,11 +29,18 @@ public void run() {
}
};

public JetlangStreamSession(Object id, MessageStreamWriter socket, Fiber sendFiber, ErrorHandler errorHandler) {
public JetlangStreamSession(Object id, MessageStreamWriter socket, Fiber sendFiber,
ErrorHandler errorHandler, RawMsgHandler rawMsgHandler) {
super(id);
this.socket = socket;
this.sendFiber = sendFiber;
this.errorHandler = errorHandler;
this.rawMsgHandler = rawMsgHandler;
}

@Override
public void onRawMsg(RawMsg rawMsg) {
rawMsgHandler.onRawMsg(rawMsg);
}

public void startHeartbeat(int interval, TimeUnit unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.jetlang.fibers.NioFiber;
import org.jetlang.remote.core.JetlangRemotingProtocol;
import org.jetlang.remote.core.ObjectByteReader;
import org.jetlang.remote.core.RawMsgHandler;
import org.jetlang.remote.core.ReadTimeoutEvent;

import java.io.IOException;
Expand All @@ -23,10 +24,11 @@ public class NioJetlangChannelHandler implements NioChannelHandler {
private JetlangRemotingProtocol.State nextCommand;
private long lastReadMs = System.currentTimeMillis();

public NioJetlangChannelHandler(SocketChannel accept, JetlangMessageHandler session, ObjectByteReader reader, Runnable onEnd, Charset charset) {
public NioJetlangChannelHandler(SocketChannel accept, JetlangMessageHandler session, ObjectByteReader reader,
Runnable onEnd, Charset charset, RawMsgHandler rawMsgHandler) {
this.session = session;
this.onEnd = onEnd;
this.protocol = new JetlangRemotingProtocol(session, reader, charset);
this.protocol = new JetlangRemotingProtocol(session, reader, charset, rawMsgHandler.enabled());
this.accept = accept;
this.nextCommand = protocol.root;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.jetlang.core.Disposable;
import org.jetlang.fibers.NioControls;
import org.jetlang.fibers.NioFiber;
import org.jetlang.remote.core.RawMsgHandler;
import org.jetlang.remote.core.RawMsgHandlerFactory;
import org.jetlang.remote.core.Serializer;

import java.net.SocketAddress;
Expand All @@ -12,11 +14,14 @@
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

import static org.jetlang.remote.core.RawMsgHandlerFactory.NULL_RAW_MSG_HANDLER_FACTORY;

public class NioJetlangRemotingClientFactory implements NioAcceptorHandler.ClientFactory {

private final Serializer serializer;
private final JetlangSessionConfig config;
private final Handler handler;
private final RawMsgHandlerFactory rawMsgHandlerFactory;
private final NioJetlangSendFiber sendFiber;
private final Charset charset;

Expand All @@ -36,10 +41,18 @@ default void configureAcceptedClient(SelectionKey key, SocketChannel channel) th
void onHandlerException(Exception failed);
}

public NioJetlangRemotingClientFactory(Serializer serializer, JetlangSessionConfig config, Handler handler, NioJetlangSendFiber sendFiber, Charset charset) {
public NioJetlangRemotingClientFactory(Serializer serializer, JetlangSessionConfig config, Handler handler,
NioJetlangSendFiber sendFiber, Charset charset) {
this(serializer, config, handler, NULL_RAW_MSG_HANDLER_FACTORY, sendFiber, charset);
}

public NioJetlangRemotingClientFactory(Serializer serializer, JetlangSessionConfig config, Handler handler,
RawMsgHandlerFactory rawMsgHandlerFactory, NioJetlangSendFiber sendFiber,
Charset charset) {
this.serializer = serializer;
this.config = config;
this.handler = handler;
this.rawMsgHandlerFactory = rawMsgHandlerFactory;
this.sendFiber = sendFiber;
this.charset = charset;
}
Expand All @@ -52,6 +65,7 @@ public void onAccept(NioFiber fiber, NioControls controls, SelectionKey key, Soc
throw new RuntimeException(e);
}
Hb hb = new Hb();
RawMsgHandler rawMsgHandler = rawMsgHandlerFactory.rawMsgHandler();
final JetlangNioSession session = new JetlangNioSession(fiber, channel, sendFiber, new Id(channel), new JetlangNioSession.ErrorHandler() {
@Override
public void onUnhandledReplyMsg(int reqId, String dataTopicVal, Object readObject) {
Expand All @@ -67,12 +81,13 @@ public void onUnknownMessage(int read) {
public void onHandlerException(Exception failed) {
handler.onHandlerException(failed);
}
});
}, rawMsgHandler);

Runnable onClose = () -> {
hb.onClose();
session.onClose(new SessionCloseEvent());
};
final NioJetlangChannelHandler handler = new NioJetlangChannelHandler(channel, session, serializer.getReader(), onClose, charset);
final NioJetlangChannelHandler handler = new NioJetlangChannelHandler(channel, session, serializer.getReader(), onClose, charset, rawMsgHandler);
this.handler.onNewSession(session);
hb.startHb(fiber, session, handler, config);
controls.addHandler(handler);
Expand Down
31 changes: 25 additions & 6 deletions src/main/java/org/jetlang/remote/client/JetlangTcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.jetlang.remote.core.JetlangRemotingInputStream;
import org.jetlang.remote.core.JetlangRemotingProtocol;
import org.jetlang.remote.core.MsgTypes;
import org.jetlang.remote.core.RawMsg;
import org.jetlang.remote.core.RawMsgHandler;
import org.jetlang.remote.core.ReadTimeoutEvent;
import org.jetlang.remote.core.Serializer;
import org.jetlang.remote.core.SocketMessageStreamWriter;
Expand All @@ -34,7 +36,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.jetlang.remote.core.RawMsgHandler.NULL_RAW_MSG_HANDLER;

/**
*
*/
public class JetlangTcpClient implements JetlangClient {

Expand All @@ -43,6 +48,7 @@ public class JetlangTcpClient implements JetlangClient {
private final JetlangClientConfig config;
private final Serializer ser;
private final ErrorHandler errorHandler;
private final RawMsgHandler rawMsgHandler;
private static final Charset charset = Charset.forName("US-ASCII");
private final SocketConnector socketConnector;
private Disposable pendingConnect;
Expand All @@ -66,7 +72,7 @@ private <T> CloseableChannel<T> channel() {
private final Map<Integer, Req> pendingRequests = Collections.synchronizedMap(new HashMap<Integer, Req>());

private final SocketWriter socketWriter = new SocketWriter() {
public <T> boolean send(final String topic, final T msg){
public <T> boolean send(final String topic, final T msg) {
if (socket != null) {
try {
socket.write(topic, msg);
Expand All @@ -87,11 +93,21 @@ public JetlangTcpClient(SocketConnector socketConnector,
JetlangClientConfig config,
Serializer ser,
ErrorHandler errorHandler) {
this(socketConnector, sendFiber, config, ser, errorHandler, NULL_RAW_MSG_HANDLER);
}

public JetlangTcpClient(SocketConnector socketConnector,
Fiber sendFiber,
JetlangClientConfig config,
Serializer ser,
ErrorHandler errorHandler,
RawMsgHandler rawMsgHandler) {
this.socketConnector = socketConnector;
this.sendFiber = sendFiber;
this.config = config;
this.ser = ser;
this.errorHandler = errorHandler;
this.rawMsgHandler = rawMsgHandler;
}

private class RemoteSubscription<T> {
Expand Down Expand Up @@ -252,7 +268,7 @@ private void handleConnect(Socket newSocket) throws IOException {
final InputStream stream = newSocket.getInputStream();
final Runnable reader = new Runnable() {
public void run() {
final JetlangRemotingProtocol protocol = new JetlangRemotingProtocol(protocolHandler, ser.getReader(), charset);
final JetlangRemotingProtocol protocol = new JetlangRemotingProtocol(protocolHandler, ser.getReader(), charset, rawMsgHandler.enabled());
final JetlangRemotingInputStream inputStream = new JetlangRemotingInputStream(stream, protocol, onReadTimeout);
try {
Connected.publish(new ConnectEvent());
Expand All @@ -275,6 +291,11 @@ public void onMessage(String dataTopicVal, Object readObject) {
publishData(dataTopicVal, readObject);
}

@Override
public void onRawMsg(RawMsg rawMsg) {
rawMsgHandler.onRawMsg(rawMsg);
}

public void onSubscriptionRequest(String val) {
errorHandler.onException(new IOException("SubscriptionNotSupported: " + val));
}
Expand Down Expand Up @@ -309,7 +330,6 @@ public void onRequestReply(int reqId, String dataTopicVal, Object readObject) {
}
};


private void handleReadExceptionOnSendFiber(final IOException e) {
Runnable exec = new Runnable() {
public void run() {
Expand Down Expand Up @@ -446,7 +466,6 @@ public void dispose() {
}
}


public Subscriber<CloseEvent> getCloseChannel() {
return Closed;
}
Expand All @@ -470,7 +489,7 @@ public <T> void publish(String topic, T msg) {
public <T> void publish(final String topic, final T msg, final Runnable onSend) {
Runnable r = new Runnable() {
public void run() {
if(socketWriter.send(topic, msg)){
if (socketWriter.send(topic, msg)) {
if (onSend != null)
onSend.run();
}
Expand All @@ -479,7 +498,7 @@ public void run() {
sendFiber.execute(r);
}

public void execOnSendThread(final Callback<SocketWriter> cb){
public void execOnSendThread(final Callback<SocketWriter> cb) {
Runnable r = new Runnable() {
public void run() {
cb.onMessage(socketWriter);
Expand Down
Loading