diff --git a/docs/javadoc/index-all.html b/docs/javadoc/index-all.html index 5ce2b6d0..b13b3bdd 100644 --- a/docs/javadoc/index-all.html +++ b/docs/javadoc/index-all.html @@ -3894,7 +3894,7 @@

W

 
write(JsonWriter, Object) - Method in class org.arl.fjage.remote.EnumTypeAdapter
 
-
WSHandler(WebSocketConnector) - Constructor for class org.arl.fjage.connectors.WebSocketConnector.WSHandler
+
WSHandler(WebSocketConnector) - Constructor for class org.arl.fjage.connectors.WebSocketConnector.WSHandler
 
wsHandlers - Variable in class org.arl.fjage.connectors.WebSocketConnector
 
diff --git a/docs/javadoc/org/arl/fjage/connectors/WebSocketConnector.WSHandler.html b/docs/javadoc/org/arl/fjage/connectors/WebSocketConnector.WSHandler.html index e27539e5..e1497ff3 100644 --- a/docs/javadoc/org/arl/fjage/connectors/WebSocketConnector.WSHandler.html +++ b/docs/javadoc/org/arl/fjage/connectors/WebSocketConnector.WSHandler.html @@ -98,7 +98,7 @@

Class WebSocketConn
  • java.lang.Object
  • @@ -132,7 +132,7 @@

    Constructor Summary

    Constructor and Description -WSHandler(WebSocketConnector conn)  +WSHandler(WebSocketConnector conn)  @@ -188,7 +188,7 @@

    Methods inherited from class java.lang.Object

    Constructor Detail

    - + @@ -173,7 +173,7 @@

    Field Summary

    name  -protected org.arl.fjage.connectors.WebSocketConnector.OutputThread +protected org.arl.fjage.connectors.WebSocketHubConnector.OutputThread outThread  @@ -383,7 +383,7 @@

    wsHandlers

    diff --git a/src/main/java/org/arl/fjage/connectors/WebSocketConnector.java b/src/main/java/org/arl/fjage/connectors/WebSocketConnector.java index 726a9645..2adcb2e3 100644 --- a/src/main/java/org/arl/fjage/connectors/WebSocketConnector.java +++ b/src/main/java/org/arl/fjage/connectors/WebSocketConnector.java @@ -1,268 +1,4 @@ -/****************************************************************************** - -Copyright (c) 2018, Mandar Chitre - -This file is part of fjage which is released under Simplified BSD License. -See file LICENSE.txt or go to http://www.opensource.org/licenses/BSD-3-Clause -for full license details. - -******************************************************************************/ - package org.arl.fjage.connectors; -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Logger; -import org.eclipse.jetty.websocket.api.*; -import org.eclipse.jetty.websocket.server.WebSocketHandler; -import org.eclipse.jetty.server.handler.ContextHandler; -import org.eclipse.jetty.websocket.servlet.*; -import org.eclipse.jetty.websocket.api.annotations.*; - -/** - * Web socket connector. - */ -public class WebSocketConnector implements Connector, WebSocketCreator { - - protected String name; - protected boolean linemode = false; - protected WebServer server; - protected ContextHandler handler; - protected List wsHandlers = new CopyOnWriteArrayList(); - protected OutputThread outThread = null; - protected PseudoInputStream pin = new PseudoInputStream(); - protected PseudoOutputStream pout = new PseudoOutputStream(); - protected ConnectionListener listener = null; - protected Logger log = Logger.getLogger(getClass().getName()); - - /** - * Create a web socket connector and add it to a web server running on a - * given port. If a web server isn't already created, this will start the - * web server. - */ - public WebSocketConnector(int port, String context) { - init(port, context, -1); - } - - /** - * Create a web socket connector and add it to a web server running on a - * given port. If a web server isn't already created, this will start the - * web server. - */ - public WebSocketConnector(int port, String context, boolean linemode) { - init(port, context, -1); - this.linemode = linemode; - } - - /** - * Create a web socket connector and add it to a web server running on a - * given port. If a web server isn't already created, this will start the - * web server. - */ - public WebSocketConnector(int port, String context, int maxMsgSize) { - init(port, context, maxMsgSize); - } - - /** - * Create a web socket connector and add it to a web server running on a - * given port. If a web server isn't already created, this will start the - * web server. - */ - public WebSocketConnector(int port, String context, boolean linemode, int maxMsgSize) { - init(port, context, maxMsgSize); - this.linemode = linemode; - } - - protected void init(int port, String context, int maxMsgSize) { - try { - name = "ws://"+InetAddress.getLocalHost().getHostAddress()+":"+port+context; - } catch (UnknownHostException ex) { - name = "ws://0.0.0.0:"+port+context; - } - server = WebServer.getInstance(port); - handler = new ContextHandler(context); - handler.setHandler(new WebSocketHandler() { - @Override - public void configure(WebSocketServletFactory factory) { - factory.setCreator(WebSocketConnector.this); - if (maxMsgSize > 0) factory.getPolicy().setMaxTextMessageSize(maxMsgSize); - } - }); - server.add(handler); - server.start(); - outThread = new OutputThread(); - outThread.start(); - } - - @Override - public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) { - return new WSHandler(this); - } - - @Override - public String getName() { - return name; - } - - @Override - public InputStream getInputStream() { - return pin; - } - - @Override - public OutputStream getOutputStream() { - return pout; - } - - @Override - public void setConnectionListener(ConnectionListener listener) { - this.listener = listener; - } - - @Override - public boolean isReliable() { - return true; - } - - @Override - public boolean waitOutputCompletion(long timeout) { - return true; - } - - @Override - public void close() { - outThread.close(); - outThread = null; - server.remove(handler); - server = null; - handler = null; - pin.close(); - pout.close(); - pin = null; - pout = null; - } - - @Override - public String toString() { - return name; - } - - // thread to monitor incoming data on output stream and write to TCP clients - - private class OutputThread extends Thread { - - OutputThread() { - setName(getClass().getSimpleName()+":"+name); - setDaemon(true); - setPriority(MIN_PRIORITY); - } - - @Override - public void run() { - while (true) { - String s; - if (linemode) { - s = pout.readLine(); - if (s == null) break; - } else { - byte[] buf = pout.readAvailable(); - if (buf == null) break; - s = new String(buf); - } - for (WSHandler t: wsHandlers) - t.write(s); - try { - Thread.sleep(10); - } catch (InterruptedException ex) { - break; - } - } - } - - void close() { - try { - if (pout != null) { - pout.close(); - join(); - } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - - } - - // servlets to manage web socket connections - - @WebSocket(maxIdleTime = Integer.MAX_VALUE) - public class WSHandler { - - Session session = null; - WebSocketConnector conn; - - public WSHandler(WebSocketConnector conn) { - this.conn = conn; - } - - @OnWebSocketConnect - public void onConnect(Session session) { - log.fine("New connection from "+session.getRemoteAddress()); - this.session = session; - wsHandlers.add(this); - if (listener != null) listener.connected(conn); - } - - @OnWebSocketClose - public void onClose(int statusCode, String reason) { - log.fine("Connection from "+session.getRemoteAddress()+" closed"); - session = null; - wsHandlers.remove(this); - } - - @OnWebSocketError - public void onError(Throwable t) { - log.warning(t.toString()); - } - - @OnWebSocketMessage - public void onMessage(String message) { - byte[] buf = message.getBytes(); - synchronized (pin) { - for (int c : buf) { - if (c < 0) c += 256; - if (c == 4) continue; // ignore ^D - try { - pin.write(c); - } catch (IOException ex) { - // do nothing - } - } - } - } - - void write(String s) { - try { - if (session != null && session.isOpen()) { - Future f = session.getRemote().sendStringByFuture(s); - try { - f.get(500, TimeUnit.MILLISECONDS); - } catch (TimeoutException e){ - log.fine("Sending timed out. Closing connection to " + session.getRemoteAddress()); - session.disconnect(); - } catch (Exception e){ - log.warning(e.toString()); - } - } - } catch (Exception e) { - log.warning(e.toString()); - } - } - - } - +public class WebSocketConnector { } diff --git a/src/main/java/org/arl/fjage/connectors/WebSocketHubConnector.java b/src/main/java/org/arl/fjage/connectors/WebSocketHubConnector.java new file mode 100644 index 00000000..5f900396 --- /dev/null +++ b/src/main/java/org/arl/fjage/connectors/WebSocketHubConnector.java @@ -0,0 +1,268 @@ +/****************************************************************************** + +Copyright (c) 2018, Mandar Chitre + +This file is part of fjage which is released under Simplified BSD License. +See file LICENSE.txt or go to http://www.opensource.org/licenses/BSD-3-Clause +for full license details. + +******************************************************************************/ + +package org.arl.fjage.connectors; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; +import org.eclipse.jetty.websocket.api.*; +import org.eclipse.jetty.websocket.server.WebSocketHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.websocket.servlet.*; +import org.eclipse.jetty.websocket.api.annotations.*; + +/** + * Web socket connector. + */ +public class WebSocketHubConnector implements Connector, WebSocketCreator { + + protected String name; + protected boolean linemode = false; + protected WebServer server; + protected ContextHandler handler; + protected List wsHandlers = new CopyOnWriteArrayList(); + protected OutputThread outThread = null; + protected PseudoInputStream pin = new PseudoInputStream(); + protected PseudoOutputStream pout = new PseudoOutputStream(); + protected ConnectionListener listener = null; + protected Logger log = Logger.getLogger(getClass().getName()); + + /** + * Create a web socket connector and add it to a web server running on a + * given port. If a web server isn't already created, this will start the + * web server. + */ + public WebSocketHubConnector(int port, String context) { + init(port, context, -1); + } + + /** + * Create a web socket connector and add it to a web server running on a + * given port. If a web server isn't already created, this will start the + * web server. + */ + public WebSocketHubConnector(int port, String context, boolean linemode) { + init(port, context, -1); + this.linemode = linemode; + } + + /** + * Create a web socket connector and add it to a web server running on a + * given port. If a web server isn't already created, this will start the + * web server. + */ + public WebSocketHubConnector(int port, String context, int maxMsgSize) { + init(port, context, maxMsgSize); + } + + /** + * Create a web socket connector and add it to a web server running on a + * given port. If a web server isn't already created, this will start the + * web server. + */ + public WebSocketHubConnector(int port, String context, boolean linemode, int maxMsgSize) { + init(port, context, maxMsgSize); + this.linemode = linemode; + } + + protected void init(int port, String context, int maxMsgSize) { + try { + name = "ws://"+InetAddress.getLocalHost().getHostAddress()+":"+port+context; + } catch (UnknownHostException ex) { + name = "ws://0.0.0.0:"+port+context; + } + server = WebServer.getInstance(port); + handler = new ContextHandler(context); + handler.setHandler(new WebSocketHandler() { + @Override + public void configure(WebSocketServletFactory factory) { + factory.setCreator(WebSocketHubConnector.this); + if (maxMsgSize > 0) factory.getPolicy().setMaxTextMessageSize(maxMsgSize); + } + }); + server.add(handler); + server.start(); + outThread = new OutputThread(); + outThread.start(); + } + + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) { + return new WSHandler(this); + } + + @Override + public String getName() { + return name; + } + + @Override + public InputStream getInputStream() { + return pin; + } + + @Override + public OutputStream getOutputStream() { + return pout; + } + + @Override + public void setConnectionListener(ConnectionListener listener) { + this.listener = listener; + } + + @Override + public boolean isReliable() { + return true; + } + + @Override + public boolean waitOutputCompletion(long timeout) { + return true; + } + + @Override + public void close() { + outThread.close(); + outThread = null; + server.remove(handler); + server = null; + handler = null; + pin.close(); + pout.close(); + pin = null; + pout = null; + } + + @Override + public String toString() { + return name; + } + + // thread to monitor incoming data on output stream and write to TCP clients + + private class OutputThread extends Thread { + + OutputThread() { + setName(getClass().getSimpleName()+":"+name); + setDaemon(true); + setPriority(MIN_PRIORITY); + } + + @Override + public void run() { + while (true) { + String s; + if (linemode) { + s = pout.readLine(); + if (s == null) break; + } else { + byte[] buf = pout.readAvailable(); + if (buf == null) break; + s = new String(buf); + } + for (WSHandler t: wsHandlers) + t.write(s); + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + break; + } + } + } + + void close() { + try { + if (pout != null) { + pout.close(); + join(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + } + + // servlets to manage web socket connections + + @WebSocket(maxIdleTime = Integer.MAX_VALUE) + public class WSHandler { + + Session session = null; + WebSocketHubConnector conn; + + public WSHandler(WebSocketHubConnector conn) { + this.conn = conn; + } + + @OnWebSocketConnect + public void onConnect(Session session) { + log.fine("New connection from "+session.getRemoteAddress()); + this.session = session; + wsHandlers.add(this); + if (listener != null) listener.connected(conn); + } + + @OnWebSocketClose + public void onClose(int statusCode, String reason) { + log.fine("Connection from "+session.getRemoteAddress()+" closed"); + session = null; + wsHandlers.remove(this); + } + + @OnWebSocketError + public void onError(Throwable t) { + log.warning(t.toString()); + } + + @OnWebSocketMessage + public void onMessage(String message) { + byte[] buf = message.getBytes(); + synchronized (pin) { + for (int c : buf) { + if (c < 0) c += 256; + if (c == 4) continue; // ignore ^D + try { + pin.write(c); + } catch (IOException ex) { + // do nothing + } + } + } + } + + void write(String s) { + try { + if (session != null && session.isOpen()) { + Future f = session.getRemote().sendStringByFuture(s); + try { + f.get(500, TimeUnit.MILLISECONDS); + } catch (TimeoutException e){ + log.fine("Sending timed out. Closing connection to " + session.getRemoteAddress()); + session.disconnect(); + } catch (Exception e){ + log.warning(e.toString()); + } + } + } catch (Exception e) { + log.warning(e.toString()); + } + } + + } + +} diff --git a/src/main/java/org/arl/fjage/connectors/WebSocketServer.java b/src/main/java/org/arl/fjage/connectors/WebSocketServer.java new file mode 100644 index 00000000..cd12455b --- /dev/null +++ b/src/main/java/org/arl/fjage/connectors/WebSocketServer.java @@ -0,0 +1,38 @@ +package org.arl.fjage.connectors; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.UnknownHostException; +import java.util.logging.Logger; + +public class WebSocketServer implements Closeable { + + protected int port; + protected WebServer server = null; + protected ConnectionListener listener; + protected String name = null; + protected Logger log = Logger.getLogger(getClass().getName()); + + /** + * Create a TCP server running on a specified port. + * + * @param port TCP port number (0 to autoselect). + */ + public WebSocketServer(int port, String context, WebServer server, ConnectionListener listener) { + this.port = port; + this.server = server; + this.listener = listener; + try { + name = "ws://"+ InetAddress.getLocalHost().getHostAddress()+":"+port+context; + } catch (UnknownHostException ex) { + name = "ws://0.0.0.0:"+port+context; + } + } + + @Override + public void close() throws IOException { +// server.remove(); + } +} diff --git a/src/main/java/org/arl/fjage/remote/ConnectionHandler.java b/src/main/java/org/arl/fjage/remote/ConnectionHandler.java index 74b61734..7bf17077 100644 --- a/src/main/java/org/arl/fjage/remote/ConnectionHandler.java +++ b/src/main/java/org/arl/fjage/remote/ConnectionHandler.java @@ -67,7 +67,7 @@ public ConnectionHandler(Connector conn, RemoteContainer container, Firewall fw) * @return true if the connection is alive, false otherwise. */ public boolean isConnectionAlive() { - if (conn instanceof WebSocketConnector) return true; + if (conn instanceof WebSocketHubConnector) return true; return alive; } diff --git a/src/test/groovy/org/arl/fjage/test/fjagecTest.groovy b/src/test/groovy/org/arl/fjage/test/fjagecTest.groovy index beb51149..813cd6be 100644 --- a/src/test/groovy/org/arl/fjage/test/fjagecTest.groovy +++ b/src/test/groovy/org/arl/fjage/test/fjagecTest.groovy @@ -18,9 +18,9 @@ class fjagecTest { def platform = new RealTimePlatform() def container = new MasterContainer(platform, 5081) WebServer.getInstance(8080).add("/", "/org/arl/fjage/web") - Connector conn = new WebSocketConnector(8080, "/shell/ws") + Connector conn = new WebSocketHubConnector(8080, "/shell/ws") def shell = new ShellAgent(new ConsoleShell(conn), new GroovyScriptEngine()) - container.addConnector(new WebSocketConnector(8080, "/ws", true)) + container.addConnector(new WebSocketHubConnector(8080, "/ws", true)) container.add 'shell', shell container.add('test', new Agent(){ @Override diff --git a/src/test/groovy/org/arl/fjage/test/fjagejsTest.groovy b/src/test/groovy/org/arl/fjage/test/fjagejsTest.groovy index 8b9385ca..d1a28c8b 100644 --- a/src/test/groovy/org/arl/fjage/test/fjagejsTest.groovy +++ b/src/test/groovy/org/arl/fjage/test/fjagejsTest.groovy @@ -11,15 +11,11 @@ for full license details. package org.arl.fjage.test import org.arl.fjage.* -import org.arl.fjage.param.* -import org.arl.fjage.connectors.WebServer -import org.arl.fjage.connectors.WebSocketConnector +import org.arl.fjage.connectors.WebSocketHubConnector import org.arl.fjage.remote.MasterContainer import org.arl.fjage.shell.EchoScriptEngine import org.arl.fjage.shell.ShellAgent import org.junit.Test -import org.arl.fjage.test.ParamServerAgent -import org.arl.fjage.test.EchoServerAgent import static org.junit.Assert.assertTrue import static org.junit.Assert.assertEquals @@ -43,7 +39,7 @@ class fjagejsTest { ] def platform = new RealTimePlatform() def container = new MasterContainer(platform, 5081) - container.addConnector(new WebSocketConnector(8080, "/ws", true)) + container.addConnector(new WebSocketHubConnector(8080, "/ws", true)) container.add("shell", new ShellAgent(new EchoScriptEngine())) platform.start() container.add('echo', new EchoServerAgent())