From f80a3f945de7ffc9f25c205688371f0c0cb60ac2 Mon Sep 17 00:00:00 2001 From: "Kyle K. Lin" Date: Mon, 17 Dec 2018 09:42:15 +0800 Subject: [PATCH] 1. default maxCache: 20K 2. Server: thread control 3. Callout: thread control --- uia.comm/pom.xml | 2 +- .../src/main/java/uia/comm/SocketClient.java | 57 +++------ .../java/uia/comm/SocketDataController.java | 1 + .../java/uia/comm/SocketDataSelector.java | 4 +- .../src/main/java/uia/comm/SocketServer.java | 121 +++++++++--------- .../src/test/java/uia/comm/DatagramTest.java | 6 +- .../src/test/java/uia/comm/HTSocketTest.java | 120 ----------------- .../test/java/uia/comm/SocketClientTest.java | 88 +++++++++++++ .../test/java/uia/comm/SocketServerTest.java | 85 +++++++++--- .../my/{MyManager.java => ClientManager.java} | 8 +- ...oClientRequest.java => ClientRequest.java} | 55 +++++--- ...oServerRequest.java => ServerManager.java} | 39 +++--- .../test/java/uia/comm/my/ServerRequest.java | 91 +++++++++++++ 13 files changed, 397 insertions(+), 280 deletions(-) delete mode 100644 uia.comm/src/test/java/uia/comm/HTSocketTest.java create mode 100644 uia.comm/src/test/java/uia/comm/SocketClientTest.java rename uia.comm/src/test/java/uia/comm/my/{MyManager.java => ClientManager.java} (87%) rename uia.comm/src/test/java/uia/comm/my/{ToClientRequest.java => ClientRequest.java} (57%) rename uia.comm/src/test/java/uia/comm/my/{ToServerRequest.java => ServerManager.java} (58%) create mode 100644 uia.comm/src/test/java/uia/comm/my/ServerRequest.java diff --git a/uia.comm/pom.xml b/uia.comm/pom.xml index 6781e1c..ca0465c 100644 --- a/uia.comm/pom.xml +++ b/uia.comm/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.uia.solution uia-comm - 0.3.0 + 0.3.1 jar uia-comm https://github.com/uia4j/uia-comm diff --git a/uia.comm/src/main/java/uia/comm/SocketClient.java b/uia.comm/src/main/java/uia/comm/SocketClient.java index fe67086..9a303ab 100644 --- a/uia.comm/src/main/java/uia/comm/SocketClient.java +++ b/uia.comm/src/main/java/uia/comm/SocketClient.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -57,7 +58,7 @@ public class SocketClient implements ProtocolEventHandler, private final HashMap> callIns; - private final HashMap callOuts; + private final ConcurrentHashMap callOuts; private final String aliasName; @@ -103,9 +104,9 @@ public SocketClient(final Protocol protocol, final Message this.protocol.addMessageHandler(this); this.manager = manager; this.callIns = new HashMap>(); - this.callOuts = new HashMap(); + this.callOuts = new ConcurrentHashMap(); this.started = false; - this.maxCache = 10 * 1000; // 10K + this.maxCache = 20 * 1024; // 20K } public int getMaxCache() { @@ -113,7 +114,7 @@ public int getMaxCache() { } public void setMaxCache(int maxCache) { - this.maxCache = Math.min(2000000, Math.max(16, maxCache)); // 2M + this.maxCache = Math.max(16, maxCache); } @Override @@ -244,7 +245,7 @@ public synchronized boolean tryConnect() { } catch (Exception ex) { if (this.clientPort > 0) { - logger.error(String.format("%s> connect to %s:%s(%d) failure. %s", + logger.error(String.format("%s> connect to %s:%s(%d) failed. %s", this.aliasName, this.addr, this.port, @@ -252,7 +253,7 @@ public synchronized boolean tryConnect() { ex.getMessage())); } else { - logger.error(String.format("%s> connect to %s:%s failure. %s", + logger.error(String.format("%s> connect to %s:%s failed. %s", this.aliasName, this.addr, this.port, @@ -317,7 +318,7 @@ public boolean send(final byte[] data, int times) throws SocketException { return this.controller.send(data, times); } catch (Exception ex) { - logger.error(String.format("%s> send %s failure. ex:%s", + logger.error(String.format("%s> send %s failed. ex:%s", this.aliasName, ByteUtils.toHexString(data, 100), ex.getMessage())); @@ -340,10 +341,7 @@ public byte[] send(final byte[] data, String txId, long timeout, int retry) thro ExecutorService threadPool = Executors.newSingleThreadExecutor(); try { - synchronized (this.callOuts) { - this.callOuts.put(txId, callout); - } - + this.callOuts.put(txId, callout); if (this.controller.send(data, retry)) { try { Future future = threadPool.submit(callout); @@ -355,15 +353,13 @@ public byte[] send(final byte[] data, String txId, long timeout, int retry) thro } } else { - logger.debug(String.format("%s> send %s failure", this.aliasName, ByteUtils.toHexString(data, 100))); - throw new SocketException(this.aliasName + "> send failure"); + logger.debug(String.format("%s> send %s failed", this.aliasName, ByteUtils.toHexString(data, 100))); + throw new SocketException(this.aliasName + "> send failed"); } } finally { threadPool.shutdown(); - synchronized (this.callOuts) { - this.callOuts.remove(txId); - } + this.callOuts.remove(txId); } } @@ -373,15 +369,13 @@ public boolean send(final byte[] data, final MessageCallOut callOut, long timeou } @Override - public boolean send(final byte[] data, final MessageCallOut callOut, long timeout, int retry) throws SocketException { + public boolean send(final byte[] data, MessageCallOut callOut, long timeout, int retry) throws SocketException { if (!this.started) { throw new SocketException(this.aliasName + "> is not started."); } final String tx = callOut.getTxId(); - synchronized (this.callOuts) { - this.callOuts.put(tx, callOut); - } + this.callOuts.put(tx, callOut); if (this.controller.send(data, retry)) { Timer timer = new Timer(); @@ -389,15 +383,10 @@ public boolean send(final byte[] data, final MessageCallOut callOut, long timeou @Override public void run() { - MessageCallOut out = null; - synchronized (SocketClient.this.callOuts) { - if (SocketClient.this.callOuts.containsKey(tx)) { - logger.debug(String.format("%s> tx:%s callOut timeout", SocketClient.this.aliasName, callOut.getTxId())); - out = SocketClient.this.callOuts.remove(tx); - } - } - if (out != null) { + MessageCallOut out = SocketClient.this.callOuts.remove(tx); + if(out != null) { try { + logger.info(String.format("%s> tx:%s callOut timeout", SocketClient.this.aliasName, out.getTxId())); out.timeout(); } catch (Exception ex) { @@ -410,10 +399,8 @@ public void run() { return true; } else { - synchronized (this.callOuts) { - this.callOuts.remove(tx); - } - logger.debug(String.format("%s> send %s failure", this.aliasName, ByteUtils.toHexString(data, 100))); + this.callOuts.remove(tx); + logger.debug(String.format("%s> send %s failed", this.aliasName, ByteUtils.toHexString(data, 100))); return false; } } @@ -460,16 +447,12 @@ public void run() { } else { String tx = this.manager.findTx(received); - final MessageCallOut callOut = this.callOuts.get(tx); + final MessageCallOut callOut = this.callOuts.remove(tx); if (callOut == null) { logger.debug(String.format("%s> cmd:%s tx:%s callout reply missing", this.aliasName, cmd, tx)); return; } - synchronized (this.callOuts) { - this.callOuts.remove(tx); - } - logger.debug(String.format("%s> cmd:%s tx:%s callout reply", this.aliasName, cmd, tx)); new Thread(new Runnable() { diff --git a/uia.comm/src/main/java/uia/comm/SocketDataController.java b/uia.comm/src/main/java/uia/comm/SocketDataController.java index 112cc0b..9798a57 100644 --- a/uia.comm/src/main/java/uia/comm/SocketDataController.java +++ b/uia.comm/src/main/java/uia/comm/SocketDataController.java @@ -100,6 +100,7 @@ public synchronized boolean send(byte[] data, int times) { int cnt = this.ch.write(ByteBuffer.wrap(encoded)); if (cnt == encoded.length) { logger.debug(String.format("%s> send %s", this.name, ByteUtils.toHexString(encoded, 100))); + Thread.sleep(100); return true; } else { diff --git a/uia.comm/src/main/java/uia/comm/SocketDataSelector.java b/uia.comm/src/main/java/uia/comm/SocketDataSelector.java index 35bab95..f75bef1 100644 --- a/uia.comm/src/main/java/uia/comm/SocketDataSelector.java +++ b/uia.comm/src/main/java/uia/comm/SocketDataSelector.java @@ -38,7 +38,7 @@ public class SocketDataSelector { /** * - * @throws IOException Raise when open failure. + * @throws IOException Raise when open failed. */ public SocketDataSelector() throws IOException { this.selector = Selector.open(); @@ -48,7 +48,7 @@ public SocketDataSelector() throws IOException { * Register controller to specific channel. * @param ch Socket channel. * @param controller Socket data controller. - * @throws ClosedChannelException Raise when register channel failure. + * @throws ClosedChannelException Raise when register channel failed. */ public void register(SocketChannel ch, SocketDataController controller) throws ClosedChannelException { ch.register(this.selector, SelectionKey.OP_READ, controller); diff --git a/uia.comm/src/main/java/uia/comm/SocketServer.java b/uia.comm/src/main/java/uia/comm/SocketServer.java index b1a06d7..0c2b591 100644 --- a/uia.comm/src/main/java/uia/comm/SocketServer.java +++ b/uia.comm/src/main/java/uia/comm/SocketServer.java @@ -30,6 +30,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -64,11 +65,11 @@ public enum ConnectionStyle { private final TreeMap> callIns; - private final TreeMap> clientCallouts; + private final ConcurrentHashMap> clientCallouts; private final ArrayList listeners; - private final TreeMap controllers; + private final ConcurrentHashMap controllers; private final String aliasName; @@ -97,7 +98,7 @@ public SocketServer(Protocol protocol, int port, MessageMa * @param port Socket port. * @param manager Protocol manager. * @param aliasName Alias name. - * @throws Exception Raise construct failure. + * @throws Exception Raise construction failed. */ public SocketServer(Protocol protocol, int port, MessageManager manager, String aliasName, ConnectionStyle connectionStyle) throws Exception { this.aliasName = aliasName; @@ -105,15 +106,15 @@ public SocketServer(Protocol protocol, int port, MessageMa this.protocol.addMessageHandler(this); this.manager = manager; this.callIns = new TreeMap>(); - this.clientCallouts = new TreeMap>(); + this.clientCallouts = new ConcurrentHashMap>(); this.started = false; this.connectionStyle = connectionStyle; - this.controllers = new TreeMap(); + this.controllers = new ConcurrentHashMap(); this.listeners = new ArrayList(); this.idleTime = 60000; this.port = port; - this.maxCache = 10 * 1000; // 10K + this.maxCache = 20 * 1024; // 20K } public int getMaxCache() { @@ -121,7 +122,7 @@ public int getMaxCache() { } public void setMaxCache(int maxCache) { - this.maxCache = Math.min(2000000, Math.max(16, maxCache)); // 2M + this.maxCache = Math.max(16, maxCache); } public int getClientCount() { @@ -242,17 +243,17 @@ public byte[] send(final String clientName, final byte[] data, String txId, long MessageCallOutConcurrent callout = new MessageCallOutConcurrent(txId, timeout); ExecutorService threadPool = Executors.newSingleThreadExecutor(); - TreeMap callOuts = this.clientCallouts.get(clientName); - if (callOuts == null) { - callOuts = new TreeMap(); - this.clientCallouts.put(clientName, callOuts); + ConcurrentHashMap callOuts = null; + synchronized(this.clientCallouts) { + callOuts = this.clientCallouts.get(clientName); + if (callOuts == null) { + callOuts = new ConcurrentHashMap(); + this.clientCallouts.put(clientName, callOuts); + } } + callOuts.put(txId, callout); try { - synchronized (callOuts) { - callOuts.put(txId, callout); - } - if (controller.send(data, 1)) { logger.debug(String.format("%s> %s> send %s", this.aliasName, clientName, ByteUtils.toHexString(data, 100))); try { @@ -260,19 +261,17 @@ public byte[] send(final String clientName, final byte[] data, String txId, long return future.get(); } catch (Exception e) { - logger.error(String.format("%s> %s> reply failure", this.aliasName, clientName)); - throw new SocketException(String.format("%s> %s> reply failure", this.aliasName, clientName)); + logger.error(String.format("%s> %s> reply failed", this.aliasName, clientName)); + throw new SocketException(String.format("%s> %s> reply failed", this.aliasName, clientName)); } } else { - logger.debug(String.format("%s> %s> send %s failure", this.aliasName, clientName, ByteUtils.toHexString(data, 100))); - throw new SocketException(String.format("%s> %s> send failure", this.aliasName, clientName)); + logger.debug(String.format("%s> %s> send %s failed", this.aliasName, clientName, ByteUtils.toHexString(data, 100))); + throw new SocketException(String.format("%s> %s> send failed", this.aliasName, clientName)); } } finally { - synchronized (callOuts) { - callOuts.remove(txId); - } + callOuts.remove(txId); } } @@ -286,7 +285,7 @@ public byte[] send(final String clientName, final byte[] data, String txId, long * @return Send success or not. * @throws SocketException Raise if not started or client missing. */ - public boolean send(final String clientName, final byte[] data, final MessageCallOut callOut, final long timeout) throws SocketException { + public boolean send(final String clientName, final byte[] data, MessageCallOut callOut, long timeout) throws SocketException { if (!this.started) { throw new SocketException(this.aliasName + "> is not started."); } @@ -295,28 +294,33 @@ public boolean send(final String clientName, final byte[] data, final MessageCal if (controller == null) { throw new SocketException(clientName + "> missing"); } - - TreeMap callOuts = this.clientCallouts.get(clientName); - if (callOuts == null) { - callOuts = new TreeMap(); - this.clientCallouts.put(clientName, callOuts); + + ConcurrentHashMap callOuts = null; + synchronized(this.clientCallouts) { + callOuts = this.clientCallouts.get(clientName); + if (callOuts == null) { + callOuts = new ConcurrentHashMap(); + this.clientCallouts.put(clientName, callOuts); + } } final String tx = callOut.getTxId(); - synchronized (callOuts) { - callOuts.put(tx, callOut); - } - - final TreeMap callOutsRef = callOuts; + callOuts.put(tx, callOut); + + final ConcurrentHashMap callOutsRef = callOuts; if (controller.send(data, 1)) { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { - synchronized (callOutsRef) { - if (callOutsRef.containsKey(tx)) { - callOutsRef.remove(tx); - callOut.timeout(); + MessageCallOut out = callOutsRef.remove(tx); + if (out != null) { + try { + logger.info(String.format("%s> tx:%s callOut timeout", clientName, out.getTxId())); + out.timeout(); + } + catch(Exception ex) { + } } } @@ -325,9 +329,7 @@ public void run() { return true; } else { - synchronized (callOuts) { - callOuts.remove(tx); - } + callOuts.remove(tx); return false; } } @@ -399,23 +401,25 @@ public void run() { * @param clientName Client name. */ public void disconnect(String clientName) { - SocketDataController controller = null; - synchronized (this.controllers) { - controller = this.controllers.remove(clientName); - } + final SocketDataController controller = this.controllers.remove(clientName); - this.callIns.remove(clientName); this.clientCallouts.remove(clientName); if (controller != null) { - // logger.info(String.format("%s> %s disconnected", this.aliasName, clientName)); + logger.info(String.format("%s> %s disconnected, count:%s", this.aliasName, clientName, this.controllers.size())); SelectionKey key = controller.getChannel().keyFor(this.serverSelector); if (key != null) { key.cancel(); } controller.stop(); - raiseDisconnected(controller); + new Thread(new Runnable() { + + @Override + public void run() { + raiseDisconnected(controller); + } + }).start(); } } @@ -525,7 +529,7 @@ public void run() { }).start(); } else { - TreeMap callOuts = this.clientCallouts.get(monitor.getController().getName()); + ConcurrentHashMap callOuts = this.clientCallouts.get(monitor.getController().getName()); if (callOuts == null) { logger.debug(String.format("%s> %s> not found", this.aliasName, @@ -534,7 +538,7 @@ public void run() { } String tx = this.manager.findTx(received); - final MessageCallOut callOut = callOuts.get(tx); + final MessageCallOut callOut = callOuts.remove(tx); if (callOut == null) { logger.debug(String.format("%s> %s> %s cmd:%s tx:%s callOut reply missing", this.aliasName, @@ -545,10 +549,6 @@ public void run() { return; } - synchronized (callOuts) { - callOuts.remove(tx); - } - logger.debug(String.format("%s> %s> %s cmd:%s tx:%s callOut reply", this.aliasName, monitor.getController().getName(), @@ -662,7 +662,7 @@ private void clientConnected(SocketChannel client) { } } - SocketDataController controller = new SocketDataController( + final SocketDataController controller = new SocketDataController( clientId, client, this.manager, @@ -678,14 +678,19 @@ private void clientConnected(SocketChannel client) { // use server selector client.register(this.serverSelector, SelectionKey.OP_READ, controller); - logger.info(String.format("%s> %s controller added", this.aliasName, clientId)); + logger.info(String.format("%s> %s connected, count:%s", this.aliasName, clientId, this.controllers.size())); - raiseConnected(controller); + new Thread(new Runnable() { + @Override + public void run() { + raiseConnected(controller); + } + }).start(); + } catch (Exception ex) { - ex.printStackTrace(); - logger.error(String.format("%s> client connected failure.", this.aliasName), ex); + logger.error(String.format("%s> client connection failed.", this.aliasName), ex); } } diff --git a/uia.comm/src/test/java/uia/comm/DatagramTest.java b/uia.comm/src/test/java/uia/comm/DatagramTest.java index e440316..72a5069 100644 --- a/uia.comm/src/test/java/uia/comm/DatagramTest.java +++ b/uia.comm/src/test/java/uia/comm/DatagramTest.java @@ -2,7 +2,7 @@ import org.junit.Test; -import uia.comm.my.MyManager; +import uia.comm.my.ClientManager; import uia.comm.protocol.ng.NGProtocol; public class DatagramTest { @@ -12,14 +12,14 @@ public void test() throws Exception { DatagramServer server = new DatagramServer( new NGProtocol(), 5678, - new MyManager(), + new ClientManager(), "server"); server.connect(); DatagramClient client = new DatagramClient( new NGProtocol(), - new MyManager(), + new ClientManager(), "client"); client.connect("localhost", 5678); diff --git a/uia.comm/src/test/java/uia/comm/HTSocketTest.java b/uia.comm/src/test/java/uia/comm/HTSocketTest.java deleted file mode 100644 index 60a8eb3..0000000 --- a/uia.comm/src/test/java/uia/comm/HTSocketTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/******************************************************************************* - * Copyright 2017 UIA - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package uia.comm; - -import org.apache.log4j.PropertyConfigurator; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import uia.comm.my.MyManager; -import uia.comm.my.ToClientRequest; -import uia.comm.my.ToServerRequest; -import uia.comm.protocol.ht.HTProtocol; - -/** - * - * @author Kyle K. Lin - * - */ -public class HTSocketTest { - - private static final int PORT = 3234; - - private final HTProtocol clientProtocol; - - private final SocketServer server; - - private final MyManager manager; - - private final ToClientRequest toClientReq; - - private final ToServerRequest toServerReq; - - private String clientId; - - public HTSocketTest() throws Exception { - PropertyConfigurator.configure("log4j.properties"); - - this.manager = new MyManager(); - this.toClientReq = new ToClientRequest(); - this.toServerReq = new ToServerRequest(); - this.clientProtocol = new HTProtocol( - new byte[] { (byte) 0x8a }, - new byte[] { (byte) 0xa8 }); - - HTProtocol serverProtocol = new HTProtocol( - new byte[] { (byte) 0x8a }, - new byte[] { (byte) 0xa8 }); - this.server = new SocketServer(serverProtocol, PORT, this.manager, "svr"); - this.server.registerCallin(this.toServerReq); - this.server.addServerListener(new SocketServerListener() { - - @Override - public void connected(SocketDataController controller) { - HTSocketTest.this.clientId = controller.getName(); - } - - @Override - public void disconnected(SocketDataController controller) { - } - - }); - } - - @Before - public void before() throws Exception { - this.server.start(); - Thread.sleep(500); - } - - @After - public void after() throws Exception { - this.server.stop(); - } - - @Test - public void testInOut() throws Exception { - SocketClient client = new SocketClient(this.clientProtocol, this.manager, "c1", PORT + 1); - client.registerCallin(this.toClientReq); - client.connect("localhost", PORT); - - // client 2 server - // ABC2 - boolean s1 = client.send( - new byte[] { (byte) 0x8a, 0x41, 0x42, 0x43, 0x32, (byte) 0xa8 }, - this.toServerReq, - 1000); - Assert.assertTrue(s1); - - // server 2 client - // ABC1 - boolean s2 = this.server.send( - this.clientId, - new byte[] { (byte) 0x8a, 0x41, 0x42, 0x43, 0x31, (byte) 0xa8 }, - this.toClientReq, - 1000); - Assert.assertTrue(s2); - - // close - Thread.sleep(500); - client.disconnect(); - } -} diff --git a/uia.comm/src/test/java/uia/comm/SocketClientTest.java b/uia.comm/src/test/java/uia/comm/SocketClientTest.java new file mode 100644 index 0000000..0b7240f --- /dev/null +++ b/uia.comm/src/test/java/uia/comm/SocketClientTest.java @@ -0,0 +1,88 @@ +/******************************************************************************* + * Copyright 2017 UIA + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package uia.comm; + +import org.apache.log4j.PropertyConfigurator; +import org.junit.Assert; +import org.junit.Test; + +import uia.comm.SocketServer.ConnectionStyle; +import uia.comm.my.ClientManager; +import uia.comm.my.ClientRequest; +import uia.comm.my.ServerManager; +import uia.comm.my.ServerRequest; +import uia.comm.protocol.ht.HTProtocol; + +/** + * + * @author Kyle K. Lin + * + */ +public class SocketClientTest { + + private int size = 100000; + + public SocketClientTest() { + PropertyConfigurator.configure("log4j.properties"); + } + + @Test + public void testOnlyOne() throws Exception { + SocketServer server = create("OnlyOne", 2236, ConnectionStyle.ONLYONE); + server.start(); + + SocketClient client = new SocketClient( + new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()), + new ClientManager(), + "clnt"); + client.setMaxCache(2000000); + client.registerCallin(new ServerRequest(client.getName(), this.size)); + client.connect("localhost", 2236); + Thread.sleep(1000); + Assert.assertEquals(1, server.getClientCount()); + + for(int i =0; i<5; i++) { + ClientRequest req = new ClientRequest("clnt", this.size); + String tx = "" + (i % 10); + try { + byte[] data = req.sampling(tx); + System.out.println("clnt, " + tx + "> request: " + data.length); + client.send(data, req, 500); + } catch (Exception e) { + System.out.println("clnt, " + tx + "> broken"); + } + } + + Thread.sleep(4000); + + server.stop(); + } + + private SocketServer create(String name, int port, ConnectionStyle cs) throws Exception { + final SocketServer server = new SocketServer( + new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()), + port, + new ServerManager(), + name, + cs); + server.registerCallin(new ClientRequest(name, this.size)); + server.setMaxCache(2000000); + return server; + } +} diff --git a/uia.comm/src/test/java/uia/comm/SocketServerTest.java b/uia.comm/src/test/java/uia/comm/SocketServerTest.java index c4b8947..e108bb6 100644 --- a/uia.comm/src/test/java/uia/comm/SocketServerTest.java +++ b/uia.comm/src/test/java/uia/comm/SocketServerTest.java @@ -23,8 +23,10 @@ import org.junit.Test; import uia.comm.SocketServer.ConnectionStyle; -import uia.comm.my.MyManager; -import uia.comm.protocol.ng.NGProtocol; +import uia.comm.my.ClientManager; +import uia.comm.my.ServerManager; +import uia.comm.my.ServerRequest; +import uia.comm.protocol.ht.HTProtocol; /** * @@ -32,6 +34,8 @@ * */ public class SocketServerTest { + + private int size = 100000; public SocketServerTest() { PropertyConfigurator.configure("log4j.properties"); @@ -39,31 +43,45 @@ public SocketServerTest() { @Test public void testOnlyOne() throws Exception { - SocketServer server = create("OnlyOne", 2234, ConnectionStyle.ONLYONE); + SocketServer server = create("OnlyOne", 2236, ConnectionStyle.ONLYONE); server.start(); for (int i = 0; i < 4; i++) { - SocketClient client = new SocketClient(new NGProtocol(), new MyManager(), "oo-" + i); - Assert.assertTrue(client.connect("localhost", 2234)); - Thread.sleep(4000); + SocketClient client = new SocketClient( + new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()), + new ClientManager(), + "clnt" + i); + client.setMaxCache(2000000); + client.registerCallin(new ServerRequest(client.getName(), this.size)); + client.connect("localhost", 2236); + Thread.sleep(1000); Assert.assertEquals(1, server.getClientCount()); + Thread.sleep(5000); } + Thread.sleep(120000); server.stop(); } @Test public void testOneEachClient() throws Exception { - SocketServer server = create("OneEachClient", 2235, ConnectionStyle.ONE_EACH_CLIENT); + SocketServer server = create("OneEachClient", 2236, ConnectionStyle.ONE_EACH_CLIENT); server.start(); for (int i = 0; i < 4; i++) { - SocketClient client = new SocketClient(new NGProtocol(), new MyManager(), "oec-" + i); - Assert.assertTrue(client.connect("localhost", 2235)); - Thread.sleep(4000); + SocketClient client = new SocketClient( + new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()), + new ClientManager(), + "clnt" + i); + client.setMaxCache(2000000); + client.registerCallin(new ServerRequest(client.getName(), this.size)); + client.connect("localhost", 2236); + Thread.sleep(1000); Assert.assertEquals(1, server.getClientCount()); + Thread.sleep(5000); } + Thread.sleep(120000); server.stop(); } @@ -72,24 +90,40 @@ public void testNormalType() throws Exception { SocketServer server = create("Normal", 2236, ConnectionStyle.NORMAL); server.start(); - for (int i = 0; i < 4; i++) { - SocketClient client = new SocketClient(new NGProtocol(), new MyManager(), "n-" + i); - Assert.assertTrue(client.connect("localhost", 2236)); - Thread.sleep(4000); + for (int i = 0; i < 10; i++) { + SocketClient client = new SocketClient( + new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()), + new ClientManager(), + "clnt" + i); + client.setMaxCache(2000000); + client.registerCallin(new ServerRequest(client.getName(), this.size)); + client.connect("localhost", 2236); Assert.assertEquals(i + 1, server.getClientCount()); + Thread.sleep(256); } + Thread.sleep(120000); server.stop(); } @Test public void testIdle() throws Exception { - SocketServer server = create("Idle", 2238, ConnectionStyle.NORMAL); + final SocketServer server = new SocketServer( + new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()), + 2236, + new ServerManager(), + "Idle1500", + ConnectionStyle.NORMAL); + server.setMaxCache(2000000); server.setIdleTime(1500); server.start(); - SocketClient client = new SocketClient(new NGProtocol(), new MyManager(), "clnt"); - Assert.assertTrue(client.connect("localhost", 2238)); + SocketClient client = new SocketClient( + new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()), + new ClientManager(), + "clnt"); + Assert.assertTrue(client.connect("localhost", 2236)); + Thread.sleep(500); Assert.assertEquals(1, server.getClientCount()); Thread.sleep(3500); @@ -100,15 +134,28 @@ public void testIdle() throws Exception { private SocketServer create(String name, int port, ConnectionStyle cs) throws Exception { final SocketServer server = new SocketServer( - new NGProtocol(), + new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()), port, - new MyManager(), + new ServerManager(), name, cs); + server.setMaxCache(2000000); server.addServerListener(new SocketServerListener() { @Override public void connected(SocketDataController controller) { + ServerRequest req = new ServerRequest(controller.getName(), SocketServerTest.this.size); + for(int i =0; i<100; i++) { + String tx = "" + (i % 10); + try { + byte[] data = req.sampling(tx); + System.out.println(controller.getName() + ", " + tx + "> request: " + data.length); + server.send(controller.getName(), data, req, 500); + Thread.sleep(1000); + } catch (Exception e) { + System.out.println(controller.getName() + ", " + tx + "> broken"); + } + } } @Override diff --git a/uia.comm/src/test/java/uia/comm/my/MyManager.java b/uia.comm/src/test/java/uia/comm/my/ClientManager.java similarity index 87% rename from uia.comm/src/test/java/uia/comm/my/MyManager.java rename to uia.comm/src/test/java/uia/comm/my/ClientManager.java index d9437b1..fa1e393 100644 --- a/uia.comm/src/test/java/uia/comm/my/MyManager.java +++ b/uia.comm/src/test/java/uia/comm/my/ClientManager.java @@ -27,22 +27,22 @@ * @author Kyle K. Lin * */ -public class MyManager implements MessageManager { +public class ClientManager implements MessageManager { @Override public boolean isCallIn(String cmd) { - return "ABC".equals(cmd); + return "SVRREQ".equals(cmd); } @Override public String findCmd(byte[] data) { - String cmd = new String(Arrays.copyOfRange(data, 1, 4)); + String cmd = new String(Arrays.copyOfRange(data, 6, 12)); return cmd; } @Override public String findTx(byte[] data) { - String tx = new String(Arrays.copyOfRange(data, 4, 5)); + String tx = new String(Arrays.copyOfRange(data, 12, 13)); return tx; } diff --git a/uia.comm/src/test/java/uia/comm/my/ToClientRequest.java b/uia.comm/src/test/java/uia/comm/my/ClientRequest.java similarity index 57% rename from uia.comm/src/test/java/uia/comm/my/ToClientRequest.java rename to uia.comm/src/test/java/uia/comm/my/ClientRequest.java index 0febaa3..2facb79 100644 --- a/uia.comm/src/test/java/uia/comm/my/ToClientRequest.java +++ b/uia.comm/src/test/java/uia/comm/my/ClientRequest.java @@ -29,41 +29,62 @@ * @author Kyle K. Lin * */ -public class ToClientRequest implements MessageCallIn, MessageCallOut { +public class ClientRequest implements MessageCallIn, MessageCallOut { + + private String name; + + private String message; + + private String tx; + + public ClientRequest(String name, int size) { + this.name = name; + StringBuilder body = new StringBuilder(); + for(int i=0; i< size; i++) { + body.append("0"); + } + this.message = body.append("_END").toString(); + } + + public byte[] sampling(String tx) { + this.tx = tx; + return ("BEGIN_CNTREQ" + this.tx + this.message).getBytes(); + } - @Override + @Override public String getCmdName() { - return "ABC"; + return "CNTREQ"; } @Override public String getTxId() { - return "1"; + return this.tx; } @Override public void execute(byte[] request, SocketDataController controller) { - // DEF12 - try { - Thread.sleep(150); - boolean r = controller.send(new byte[] { (byte) 0x8a, 0x44, 0x45, 0x46, 0x31, 0x32, (byte) 0xa8 }, 1); - Assert.assertTrue(r); - } - catch (InterruptedException e) { - - } + try { + String tx = new String(new byte[] { request[12] }); + long t = System.currentTimeMillis() % 1500; + if(t > 495) { + System.out.println(this.name + ", " + tx + "> sleep: " + t); + } + Thread.sleep(t); + + controller.send(("BEGIN_CNTRSP" + tx + this.message).getBytes(), 1); + } catch (Exception e) { + e.printStackTrace(); + } } @Override public void execute(byte[] reply) { - Assert.assertArrayEquals( - new byte[] { (byte) 0x8a, 0x44, 0x45, 0x46, 0x31, 0x32, (byte) 0xa8 }, - reply); + System.out.println(this.name + ", " + this.tx + "> reply: " + reply.length); } @Override public void timeout() { - Assert.assertTrue(false); + System.out.println(this.name + ", " + this.tx + "> timeout"); } } diff --git a/uia.comm/src/test/java/uia/comm/my/ToServerRequest.java b/uia.comm/src/test/java/uia/comm/my/ServerManager.java similarity index 58% rename from uia.comm/src/test/java/uia/comm/my/ToServerRequest.java rename to uia.comm/src/test/java/uia/comm/my/ServerManager.java index ad2c81d..b2fc283 100644 --- a/uia.comm/src/test/java/uia/comm/my/ToServerRequest.java +++ b/uia.comm/src/test/java/uia/comm/my/ServerManager.java @@ -18,45 +18,46 @@ *******************************************************************************/ package uia.comm.my; -import org.junit.Assert; +import java.util.Arrays; -import uia.comm.MessageCallIn; -import uia.comm.MessageCallOut; -import uia.comm.SocketDataController; +import uia.comm.MessageManager; /** * * @author Kyle K. Lin * */ -public class ToServerRequest implements MessageCallOut, MessageCallIn { +public class ServerManager implements MessageManager { @Override - public String getCmdName() { - return "ABC"; + public boolean isCallIn(String cmd) { + return "CNTREQ".equals(cmd); } @Override - public String getTxId() { - return "2"; + public String findCmd(byte[] data) { + String cmd = new String(Arrays.copyOfRange(data, 6, 12)); + return cmd; } @Override - public void execute(byte[] reply) { - Assert.assertArrayEquals( - new byte[] { (byte) 0x8a, 0x44, 0x45, 0x46, 0x32, 0x31, (byte) 0xa8 }, - reply); + public String findTx(byte[] data) { + String tx = new String(Arrays.copyOfRange(data, 12, 13)); + return tx; } @Override - public void timeout() { - Assert.assertTrue(false); + public byte[] decode(byte[] data) { + return data; } @Override - public synchronized void execute(byte[] request, SocketDataController controller) { - // DEF21 - boolean r = controller.send(new byte[] { (byte) 0x8a, 0x44, 0x45, 0x46, 0x32, 0x31, (byte) 0xa8 }, 1); - Assert.assertTrue(r); + public byte[] encode(byte[] data) { + return data; + } + + @Override + public boolean validate(byte[] data) { + return true; } } diff --git a/uia.comm/src/test/java/uia/comm/my/ServerRequest.java b/uia.comm/src/test/java/uia/comm/my/ServerRequest.java new file mode 100644 index 0000000..123e6e2 --- /dev/null +++ b/uia.comm/src/test/java/uia/comm/my/ServerRequest.java @@ -0,0 +1,91 @@ +/******************************************************************************* + * Copyright 2017 UIA + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package uia.comm.my; + +import org.junit.Assert; + +import uia.comm.MessageCallIn; +import uia.comm.MessageCallOut; +import uia.comm.SocketDataController; + +/** + * + * @author Kyle K. Lin + * + */ +public class ServerRequest implements MessageCallIn, MessageCallOut { + + private String name; + + private String message; + + private String tx; + + public ServerRequest(String name, int size) { + this.name = name; + StringBuilder body = new StringBuilder(); + for(int i=0; i< size; i++) { + body.append("0"); + } + this.message = body.append("_END").toString(); + } + + public byte[] sampling(String tx) { + this.tx = tx; + return ("BEGIN_SVRREQ" + this.tx + this.message).getBytes(); + } + + @Override + public String getCmdName() { + return "SVRREQ"; + } + + @Override + public String getTxId() { + return this.tx; + } + + @Override + public void execute(byte[] request, SocketDataController controller) { + try { + String tx = new String(new byte[] { request[12] }); + long t = System.currentTimeMillis() % 1500; + if(t > 495) { + System.out.println(this.name + ", " + tx + "> sleep: " + t); + } + Thread.sleep(t); + + boolean r = controller.send(("BEGIN_SVRRSP" + tx + this.message).getBytes(), 1); + Assert.assertTrue(r); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void execute(byte[] reply) { + System.out.println(this.name + ", " + this.tx + "> reply: " + reply.length); + } + + @Override + public void timeout() { + System.out.println(this.name + ", " + this.tx + "> timeout"); + } + +}