diff --git a/uia.comm/pom.xml b/uia.comm/pom.xml
index 6781e1c..ca0465c 100644
--- a/uia.comm/pom.xml
+++ b/uia.comm/pom.xml
@@ -3,7 +3,7 @@
4.0.0
org.uia.solution
uia-comm
- 0.3.0
+ 0.3.1
jar
uia-comm
https://github.com/uia4j/uia-comm
diff --git a/uia.comm/src/main/java/uia/comm/SocketClient.java b/uia.comm/src/main/java/uia/comm/SocketClient.java
index fe67086..9a303ab 100644
--- a/uia.comm/src/main/java/uia/comm/SocketClient.java
+++ b/uia.comm/src/main/java/uia/comm/SocketClient.java
@@ -29,6 +29,7 @@
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -57,7 +58,7 @@ public class SocketClient implements ProtocolEventHandler,
private final HashMap> callIns;
- private final HashMap callOuts;
+ private final ConcurrentHashMap callOuts;
private final String aliasName;
@@ -103,9 +104,9 @@ public SocketClient(final Protocol protocol, final Message
this.protocol.addMessageHandler(this);
this.manager = manager;
this.callIns = new HashMap>();
- this.callOuts = new HashMap();
+ this.callOuts = new ConcurrentHashMap();
this.started = false;
- this.maxCache = 10 * 1000; // 10K
+ this.maxCache = 20 * 1024; // 20K
}
public int getMaxCache() {
@@ -113,7 +114,7 @@ public int getMaxCache() {
}
public void setMaxCache(int maxCache) {
- this.maxCache = Math.min(2000000, Math.max(16, maxCache)); // 2M
+ this.maxCache = Math.max(16, maxCache);
}
@Override
@@ -244,7 +245,7 @@ public synchronized boolean tryConnect() {
}
catch (Exception ex) {
if (this.clientPort > 0) {
- logger.error(String.format("%s> connect to %s:%s(%d) failure. %s",
+ logger.error(String.format("%s> connect to %s:%s(%d) failed. %s",
this.aliasName,
this.addr,
this.port,
@@ -252,7 +253,7 @@ public synchronized boolean tryConnect() {
ex.getMessage()));
}
else {
- logger.error(String.format("%s> connect to %s:%s failure. %s",
+ logger.error(String.format("%s> connect to %s:%s failed. %s",
this.aliasName,
this.addr,
this.port,
@@ -317,7 +318,7 @@ public boolean send(final byte[] data, int times) throws SocketException {
return this.controller.send(data, times);
}
catch (Exception ex) {
- logger.error(String.format("%s> send %s failure. ex:%s",
+ logger.error(String.format("%s> send %s failed. ex:%s",
this.aliasName,
ByteUtils.toHexString(data, 100),
ex.getMessage()));
@@ -340,10 +341,7 @@ public byte[] send(final byte[] data, String txId, long timeout, int retry) thro
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try {
- synchronized (this.callOuts) {
- this.callOuts.put(txId, callout);
- }
-
+ this.callOuts.put(txId, callout);
if (this.controller.send(data, retry)) {
try {
Future future = threadPool.submit(callout);
@@ -355,15 +353,13 @@ public byte[] send(final byte[] data, String txId, long timeout, int retry) thro
}
}
else {
- logger.debug(String.format("%s> send %s failure", this.aliasName, ByteUtils.toHexString(data, 100)));
- throw new SocketException(this.aliasName + "> send failure");
+ logger.debug(String.format("%s> send %s failed", this.aliasName, ByteUtils.toHexString(data, 100)));
+ throw new SocketException(this.aliasName + "> send failed");
}
}
finally {
threadPool.shutdown();
- synchronized (this.callOuts) {
- this.callOuts.remove(txId);
- }
+ this.callOuts.remove(txId);
}
}
@@ -373,15 +369,13 @@ public boolean send(final byte[] data, final MessageCallOut callOut, long timeou
}
@Override
- public boolean send(final byte[] data, final MessageCallOut callOut, long timeout, int retry) throws SocketException {
+ public boolean send(final byte[] data, MessageCallOut callOut, long timeout, int retry) throws SocketException {
if (!this.started) {
throw new SocketException(this.aliasName + "> is not started.");
}
final String tx = callOut.getTxId();
- synchronized (this.callOuts) {
- this.callOuts.put(tx, callOut);
- }
+ this.callOuts.put(tx, callOut);
if (this.controller.send(data, retry)) {
Timer timer = new Timer();
@@ -389,15 +383,10 @@ public boolean send(final byte[] data, final MessageCallOut callOut, long timeou
@Override
public void run() {
- MessageCallOut out = null;
- synchronized (SocketClient.this.callOuts) {
- if (SocketClient.this.callOuts.containsKey(tx)) {
- logger.debug(String.format("%s> tx:%s callOut timeout", SocketClient.this.aliasName, callOut.getTxId()));
- out = SocketClient.this.callOuts.remove(tx);
- }
- }
- if (out != null) {
+ MessageCallOut out = SocketClient.this.callOuts.remove(tx);
+ if(out != null) {
try {
+ logger.info(String.format("%s> tx:%s callOut timeout", SocketClient.this.aliasName, out.getTxId()));
out.timeout();
}
catch (Exception ex) {
@@ -410,10 +399,8 @@ public void run() {
return true;
}
else {
- synchronized (this.callOuts) {
- this.callOuts.remove(tx);
- }
- logger.debug(String.format("%s> send %s failure", this.aliasName, ByteUtils.toHexString(data, 100)));
+ this.callOuts.remove(tx);
+ logger.debug(String.format("%s> send %s failed", this.aliasName, ByteUtils.toHexString(data, 100)));
return false;
}
}
@@ -460,16 +447,12 @@ public void run() {
}
else {
String tx = this.manager.findTx(received);
- final MessageCallOut callOut = this.callOuts.get(tx);
+ final MessageCallOut callOut = this.callOuts.remove(tx);
if (callOut == null) {
logger.debug(String.format("%s> cmd:%s tx:%s callout reply missing", this.aliasName, cmd, tx));
return;
}
- synchronized (this.callOuts) {
- this.callOuts.remove(tx);
- }
-
logger.debug(String.format("%s> cmd:%s tx:%s callout reply", this.aliasName, cmd, tx));
new Thread(new Runnable() {
diff --git a/uia.comm/src/main/java/uia/comm/SocketDataController.java b/uia.comm/src/main/java/uia/comm/SocketDataController.java
index 112cc0b..9798a57 100644
--- a/uia.comm/src/main/java/uia/comm/SocketDataController.java
+++ b/uia.comm/src/main/java/uia/comm/SocketDataController.java
@@ -100,6 +100,7 @@ public synchronized boolean send(byte[] data, int times) {
int cnt = this.ch.write(ByteBuffer.wrap(encoded));
if (cnt == encoded.length) {
logger.debug(String.format("%s> send %s", this.name, ByteUtils.toHexString(encoded, 100)));
+ Thread.sleep(100);
return true;
}
else {
diff --git a/uia.comm/src/main/java/uia/comm/SocketDataSelector.java b/uia.comm/src/main/java/uia/comm/SocketDataSelector.java
index 35bab95..f75bef1 100644
--- a/uia.comm/src/main/java/uia/comm/SocketDataSelector.java
+++ b/uia.comm/src/main/java/uia/comm/SocketDataSelector.java
@@ -38,7 +38,7 @@ public class SocketDataSelector {
/**
*
- * @throws IOException Raise when open failure.
+ * @throws IOException Raise when open failed.
*/
public SocketDataSelector() throws IOException {
this.selector = Selector.open();
@@ -48,7 +48,7 @@ public SocketDataSelector() throws IOException {
* Register controller to specific channel.
* @param ch Socket channel.
* @param controller Socket data controller.
- * @throws ClosedChannelException Raise when register channel failure.
+ * @throws ClosedChannelException Raise when register channel failed.
*/
public void register(SocketChannel ch, SocketDataController controller) throws ClosedChannelException {
ch.register(this.selector, SelectionKey.OP_READ, controller);
diff --git a/uia.comm/src/main/java/uia/comm/SocketServer.java b/uia.comm/src/main/java/uia/comm/SocketServer.java
index b1a06d7..0c2b591 100644
--- a/uia.comm/src/main/java/uia/comm/SocketServer.java
+++ b/uia.comm/src/main/java/uia/comm/SocketServer.java
@@ -30,6 +30,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -64,11 +65,11 @@ public enum ConnectionStyle {
private final TreeMap> callIns;
- private final TreeMap> clientCallouts;
+ private final ConcurrentHashMap> clientCallouts;
private final ArrayList listeners;
- private final TreeMap controllers;
+ private final ConcurrentHashMap controllers;
private final String aliasName;
@@ -97,7 +98,7 @@ public SocketServer(Protocol protocol, int port, MessageMa
* @param port Socket port.
* @param manager Protocol manager.
* @param aliasName Alias name.
- * @throws Exception Raise construct failure.
+ * @throws Exception Raise construction failed.
*/
public SocketServer(Protocol protocol, int port, MessageManager manager, String aliasName, ConnectionStyle connectionStyle) throws Exception {
this.aliasName = aliasName;
@@ -105,15 +106,15 @@ public SocketServer(Protocol protocol, int port, MessageMa
this.protocol.addMessageHandler(this);
this.manager = manager;
this.callIns = new TreeMap>();
- this.clientCallouts = new TreeMap>();
+ this.clientCallouts = new ConcurrentHashMap>();
this.started = false;
this.connectionStyle = connectionStyle;
- this.controllers = new TreeMap();
+ this.controllers = new ConcurrentHashMap();
this.listeners = new ArrayList();
this.idleTime = 60000;
this.port = port;
- this.maxCache = 10 * 1000; // 10K
+ this.maxCache = 20 * 1024; // 20K
}
public int getMaxCache() {
@@ -121,7 +122,7 @@ public int getMaxCache() {
}
public void setMaxCache(int maxCache) {
- this.maxCache = Math.min(2000000, Math.max(16, maxCache)); // 2M
+ this.maxCache = Math.max(16, maxCache);
}
public int getClientCount() {
@@ -242,17 +243,17 @@ public byte[] send(final String clientName, final byte[] data, String txId, long
MessageCallOutConcurrent callout = new MessageCallOutConcurrent(txId, timeout);
ExecutorService threadPool = Executors.newSingleThreadExecutor();
- TreeMap callOuts = this.clientCallouts.get(clientName);
- if (callOuts == null) {
- callOuts = new TreeMap();
- this.clientCallouts.put(clientName, callOuts);
+ ConcurrentHashMap callOuts = null;
+ synchronized(this.clientCallouts) {
+ callOuts = this.clientCallouts.get(clientName);
+ if (callOuts == null) {
+ callOuts = new ConcurrentHashMap();
+ this.clientCallouts.put(clientName, callOuts);
+ }
}
+ callOuts.put(txId, callout);
try {
- synchronized (callOuts) {
- callOuts.put(txId, callout);
- }
-
if (controller.send(data, 1)) {
logger.debug(String.format("%s> %s> send %s", this.aliasName, clientName, ByteUtils.toHexString(data, 100)));
try {
@@ -260,19 +261,17 @@ public byte[] send(final String clientName, final byte[] data, String txId, long
return future.get();
}
catch (Exception e) {
- logger.error(String.format("%s> %s> reply failure", this.aliasName, clientName));
- throw new SocketException(String.format("%s> %s> reply failure", this.aliasName, clientName));
+ logger.error(String.format("%s> %s> reply failed", this.aliasName, clientName));
+ throw new SocketException(String.format("%s> %s> reply failed", this.aliasName, clientName));
}
}
else {
- logger.debug(String.format("%s> %s> send %s failure", this.aliasName, clientName, ByteUtils.toHexString(data, 100)));
- throw new SocketException(String.format("%s> %s> send failure", this.aliasName, clientName));
+ logger.debug(String.format("%s> %s> send %s failed", this.aliasName, clientName, ByteUtils.toHexString(data, 100)));
+ throw new SocketException(String.format("%s> %s> send failed", this.aliasName, clientName));
}
}
finally {
- synchronized (callOuts) {
- callOuts.remove(txId);
- }
+ callOuts.remove(txId);
}
}
@@ -286,7 +285,7 @@ public byte[] send(final String clientName, final byte[] data, String txId, long
* @return Send success or not.
* @throws SocketException Raise if not started or client missing.
*/
- public boolean send(final String clientName, final byte[] data, final MessageCallOut callOut, final long timeout) throws SocketException {
+ public boolean send(final String clientName, final byte[] data, MessageCallOut callOut, long timeout) throws SocketException {
if (!this.started) {
throw new SocketException(this.aliasName + "> is not started.");
}
@@ -295,28 +294,33 @@ public boolean send(final String clientName, final byte[] data, final MessageCal
if (controller == null) {
throw new SocketException(clientName + "> missing");
}
-
- TreeMap callOuts = this.clientCallouts.get(clientName);
- if (callOuts == null) {
- callOuts = new TreeMap();
- this.clientCallouts.put(clientName, callOuts);
+
+ ConcurrentHashMap callOuts = null;
+ synchronized(this.clientCallouts) {
+ callOuts = this.clientCallouts.get(clientName);
+ if (callOuts == null) {
+ callOuts = new ConcurrentHashMap();
+ this.clientCallouts.put(clientName, callOuts);
+ }
}
final String tx = callOut.getTxId();
- synchronized (callOuts) {
- callOuts.put(tx, callOut);
- }
-
- final TreeMap callOutsRef = callOuts;
+ callOuts.put(tx, callOut);
+
+ final ConcurrentHashMap callOutsRef = callOuts;
if (controller.send(data, 1)) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
- synchronized (callOutsRef) {
- if (callOutsRef.containsKey(tx)) {
- callOutsRef.remove(tx);
- callOut.timeout();
+ MessageCallOut out = callOutsRef.remove(tx);
+ if (out != null) {
+ try {
+ logger.info(String.format("%s> tx:%s callOut timeout", clientName, out.getTxId()));
+ out.timeout();
+ }
+ catch(Exception ex) {
+
}
}
}
@@ -325,9 +329,7 @@ public void run() {
return true;
}
else {
- synchronized (callOuts) {
- callOuts.remove(tx);
- }
+ callOuts.remove(tx);
return false;
}
}
@@ -399,23 +401,25 @@ public void run() {
* @param clientName Client name.
*/
public void disconnect(String clientName) {
- SocketDataController controller = null;
- synchronized (this.controllers) {
- controller = this.controllers.remove(clientName);
- }
+ final SocketDataController controller = this.controllers.remove(clientName);
- this.callIns.remove(clientName);
this.clientCallouts.remove(clientName);
if (controller != null) {
- // logger.info(String.format("%s> %s disconnected", this.aliasName, clientName));
+ logger.info(String.format("%s> %s disconnected, count:%s", this.aliasName, clientName, this.controllers.size()));
SelectionKey key = controller.getChannel().keyFor(this.serverSelector);
if (key != null) {
key.cancel();
}
controller.stop();
- raiseDisconnected(controller);
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ raiseDisconnected(controller);
+ }
+ }).start();
}
}
@@ -525,7 +529,7 @@ public void run() {
}).start();
}
else {
- TreeMap callOuts = this.clientCallouts.get(monitor.getController().getName());
+ ConcurrentHashMap callOuts = this.clientCallouts.get(monitor.getController().getName());
if (callOuts == null) {
logger.debug(String.format("%s> %s> not found",
this.aliasName,
@@ -534,7 +538,7 @@ public void run() {
}
String tx = this.manager.findTx(received);
- final MessageCallOut callOut = callOuts.get(tx);
+ final MessageCallOut callOut = callOuts.remove(tx);
if (callOut == null) {
logger.debug(String.format("%s> %s> %s cmd:%s tx:%s callOut reply missing",
this.aliasName,
@@ -545,10 +549,6 @@ public void run() {
return;
}
- synchronized (callOuts) {
- callOuts.remove(tx);
- }
-
logger.debug(String.format("%s> %s> %s cmd:%s tx:%s callOut reply",
this.aliasName,
monitor.getController().getName(),
@@ -662,7 +662,7 @@ private void clientConnected(SocketChannel client) {
}
}
- SocketDataController controller = new SocketDataController(
+ final SocketDataController controller = new SocketDataController(
clientId,
client,
this.manager,
@@ -678,14 +678,19 @@ private void clientConnected(SocketChannel client) {
// use server selector
client.register(this.serverSelector, SelectionKey.OP_READ, controller);
- logger.info(String.format("%s> %s controller added", this.aliasName, clientId));
+ logger.info(String.format("%s> %s connected, count:%s", this.aliasName, clientId, this.controllers.size()));
- raiseConnected(controller);
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ raiseConnected(controller);
+ }
+ }).start();
+
}
catch (Exception ex) {
- ex.printStackTrace();
- logger.error(String.format("%s> client connected failure.", this.aliasName), ex);
+ logger.error(String.format("%s> client connection failed.", this.aliasName), ex);
}
}
diff --git a/uia.comm/src/test/java/uia/comm/DatagramTest.java b/uia.comm/src/test/java/uia/comm/DatagramTest.java
index e440316..72a5069 100644
--- a/uia.comm/src/test/java/uia/comm/DatagramTest.java
+++ b/uia.comm/src/test/java/uia/comm/DatagramTest.java
@@ -2,7 +2,7 @@
import org.junit.Test;
-import uia.comm.my.MyManager;
+import uia.comm.my.ClientManager;
import uia.comm.protocol.ng.NGProtocol;
public class DatagramTest {
@@ -12,14 +12,14 @@ public void test() throws Exception {
DatagramServer server = new DatagramServer(
new NGProtocol(),
5678,
- new MyManager(),
+ new ClientManager(),
"server");
server.connect();
DatagramClient client = new DatagramClient(
new NGProtocol(),
- new MyManager(),
+ new ClientManager(),
"client");
client.connect("localhost", 5678);
diff --git a/uia.comm/src/test/java/uia/comm/HTSocketTest.java b/uia.comm/src/test/java/uia/comm/HTSocketTest.java
deleted file mode 100644
index 60a8eb3..0000000
--- a/uia.comm/src/test/java/uia/comm/HTSocketTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*******************************************************************************
- * Copyright 2017 UIA
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package uia.comm;
-
-import org.apache.log4j.PropertyConfigurator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import uia.comm.my.MyManager;
-import uia.comm.my.ToClientRequest;
-import uia.comm.my.ToServerRequest;
-import uia.comm.protocol.ht.HTProtocol;
-
-/**
- *
- * @author Kyle K. Lin
- *
- */
-public class HTSocketTest {
-
- private static final int PORT = 3234;
-
- private final HTProtocol clientProtocol;
-
- private final SocketServer server;
-
- private final MyManager manager;
-
- private final ToClientRequest toClientReq;
-
- private final ToServerRequest toServerReq;
-
- private String clientId;
-
- public HTSocketTest() throws Exception {
- PropertyConfigurator.configure("log4j.properties");
-
- this.manager = new MyManager();
- this.toClientReq = new ToClientRequest();
- this.toServerReq = new ToServerRequest();
- this.clientProtocol = new HTProtocol(
- new byte[] { (byte) 0x8a },
- new byte[] { (byte) 0xa8 });
-
- HTProtocol serverProtocol = new HTProtocol(
- new byte[] { (byte) 0x8a },
- new byte[] { (byte) 0xa8 });
- this.server = new SocketServer(serverProtocol, PORT, this.manager, "svr");
- this.server.registerCallin(this.toServerReq);
- this.server.addServerListener(new SocketServerListener() {
-
- @Override
- public void connected(SocketDataController controller) {
- HTSocketTest.this.clientId = controller.getName();
- }
-
- @Override
- public void disconnected(SocketDataController controller) {
- }
-
- });
- }
-
- @Before
- public void before() throws Exception {
- this.server.start();
- Thread.sleep(500);
- }
-
- @After
- public void after() throws Exception {
- this.server.stop();
- }
-
- @Test
- public void testInOut() throws Exception {
- SocketClient client = new SocketClient(this.clientProtocol, this.manager, "c1", PORT + 1);
- client.registerCallin(this.toClientReq);
- client.connect("localhost", PORT);
-
- // client 2 server
- // ABC2
- boolean s1 = client.send(
- new byte[] { (byte) 0x8a, 0x41, 0x42, 0x43, 0x32, (byte) 0xa8 },
- this.toServerReq,
- 1000);
- Assert.assertTrue(s1);
-
- // server 2 client
- // ABC1
- boolean s2 = this.server.send(
- this.clientId,
- new byte[] { (byte) 0x8a, 0x41, 0x42, 0x43, 0x31, (byte) 0xa8 },
- this.toClientReq,
- 1000);
- Assert.assertTrue(s2);
-
- // close
- Thread.sleep(500);
- client.disconnect();
- }
-}
diff --git a/uia.comm/src/test/java/uia/comm/SocketClientTest.java b/uia.comm/src/test/java/uia/comm/SocketClientTest.java
new file mode 100644
index 0000000..0b7240f
--- /dev/null
+++ b/uia.comm/src/test/java/uia/comm/SocketClientTest.java
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * Copyright 2017 UIA
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package uia.comm;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import uia.comm.SocketServer.ConnectionStyle;
+import uia.comm.my.ClientManager;
+import uia.comm.my.ClientRequest;
+import uia.comm.my.ServerManager;
+import uia.comm.my.ServerRequest;
+import uia.comm.protocol.ht.HTProtocol;
+
+/**
+ *
+ * @author Kyle K. Lin
+ *
+ */
+public class SocketClientTest {
+
+ private int size = 100000;
+
+ public SocketClientTest() {
+ PropertyConfigurator.configure("log4j.properties");
+ }
+
+ @Test
+ public void testOnlyOne() throws Exception {
+ SocketServer server = create("OnlyOne", 2236, ConnectionStyle.ONLYONE);
+ server.start();
+
+ SocketClient client = new SocketClient(
+ new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()),
+ new ClientManager(),
+ "clnt");
+ client.setMaxCache(2000000);
+ client.registerCallin(new ServerRequest(client.getName(), this.size));
+ client.connect("localhost", 2236);
+ Thread.sleep(1000);
+ Assert.assertEquals(1, server.getClientCount());
+
+ for(int i =0; i<5; i++) {
+ ClientRequest req = new ClientRequest("clnt", this.size);
+ String tx = "" + (i % 10);
+ try {
+ byte[] data = req.sampling(tx);
+ System.out.println("clnt, " + tx + "> request: " + data.length);
+ client.send(data, req, 500);
+ } catch (Exception e) {
+ System.out.println("clnt, " + tx + "> broken");
+ }
+ }
+
+ Thread.sleep(4000);
+
+ server.stop();
+ }
+
+ private SocketServer create(String name, int port, ConnectionStyle cs) throws Exception {
+ final SocketServer server = new SocketServer(
+ new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()),
+ port,
+ new ServerManager(),
+ name,
+ cs);
+ server.registerCallin(new ClientRequest(name, this.size));
+ server.setMaxCache(2000000);
+ return server;
+ }
+}
diff --git a/uia.comm/src/test/java/uia/comm/SocketServerTest.java b/uia.comm/src/test/java/uia/comm/SocketServerTest.java
index c4b8947..e108bb6 100644
--- a/uia.comm/src/test/java/uia/comm/SocketServerTest.java
+++ b/uia.comm/src/test/java/uia/comm/SocketServerTest.java
@@ -23,8 +23,10 @@
import org.junit.Test;
import uia.comm.SocketServer.ConnectionStyle;
-import uia.comm.my.MyManager;
-import uia.comm.protocol.ng.NGProtocol;
+import uia.comm.my.ClientManager;
+import uia.comm.my.ServerManager;
+import uia.comm.my.ServerRequest;
+import uia.comm.protocol.ht.HTProtocol;
/**
*
@@ -32,6 +34,8 @@
*
*/
public class SocketServerTest {
+
+ private int size = 100000;
public SocketServerTest() {
PropertyConfigurator.configure("log4j.properties");
@@ -39,31 +43,45 @@ public SocketServerTest() {
@Test
public void testOnlyOne() throws Exception {
- SocketServer server = create("OnlyOne", 2234, ConnectionStyle.ONLYONE);
+ SocketServer server = create("OnlyOne", 2236, ConnectionStyle.ONLYONE);
server.start();
for (int i = 0; i < 4; i++) {
- SocketClient client = new SocketClient(new NGProtocol(), new MyManager(), "oo-" + i);
- Assert.assertTrue(client.connect("localhost", 2234));
- Thread.sleep(4000);
+ SocketClient client = new SocketClient(
+ new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()),
+ new ClientManager(),
+ "clnt" + i);
+ client.setMaxCache(2000000);
+ client.registerCallin(new ServerRequest(client.getName(), this.size));
+ client.connect("localhost", 2236);
+ Thread.sleep(1000);
Assert.assertEquals(1, server.getClientCount());
+ Thread.sleep(5000);
}
+ Thread.sleep(120000);
server.stop();
}
@Test
public void testOneEachClient() throws Exception {
- SocketServer server = create("OneEachClient", 2235, ConnectionStyle.ONE_EACH_CLIENT);
+ SocketServer server = create("OneEachClient", 2236, ConnectionStyle.ONE_EACH_CLIENT);
server.start();
for (int i = 0; i < 4; i++) {
- SocketClient client = new SocketClient(new NGProtocol(), new MyManager(), "oec-" + i);
- Assert.assertTrue(client.connect("localhost", 2235));
- Thread.sleep(4000);
+ SocketClient client = new SocketClient(
+ new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()),
+ new ClientManager(),
+ "clnt" + i);
+ client.setMaxCache(2000000);
+ client.registerCallin(new ServerRequest(client.getName(), this.size));
+ client.connect("localhost", 2236);
+ Thread.sleep(1000);
Assert.assertEquals(1, server.getClientCount());
+ Thread.sleep(5000);
}
+ Thread.sleep(120000);
server.stop();
}
@@ -72,24 +90,40 @@ public void testNormalType() throws Exception {
SocketServer server = create("Normal", 2236, ConnectionStyle.NORMAL);
server.start();
- for (int i = 0; i < 4; i++) {
- SocketClient client = new SocketClient(new NGProtocol(), new MyManager(), "n-" + i);
- Assert.assertTrue(client.connect("localhost", 2236));
- Thread.sleep(4000);
+ for (int i = 0; i < 10; i++) {
+ SocketClient client = new SocketClient(
+ new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()),
+ new ClientManager(),
+ "clnt" + i);
+ client.setMaxCache(2000000);
+ client.registerCallin(new ServerRequest(client.getName(), this.size));
+ client.connect("localhost", 2236);
Assert.assertEquals(i + 1, server.getClientCount());
+ Thread.sleep(256);
}
+ Thread.sleep(120000);
server.stop();
}
@Test
public void testIdle() throws Exception {
- SocketServer server = create("Idle", 2238, ConnectionStyle.NORMAL);
+ final SocketServer server = new SocketServer(
+ new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()),
+ 2236,
+ new ServerManager(),
+ "Idle1500",
+ ConnectionStyle.NORMAL);
+ server.setMaxCache(2000000);
server.setIdleTime(1500);
server.start();
- SocketClient client = new SocketClient(new NGProtocol(), new MyManager(), "clnt");
- Assert.assertTrue(client.connect("localhost", 2238));
+ SocketClient client = new SocketClient(
+ new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()),
+ new ClientManager(),
+ "clnt");
+ Assert.assertTrue(client.connect("localhost", 2236));
+
Thread.sleep(500);
Assert.assertEquals(1, server.getClientCount());
Thread.sleep(3500);
@@ -100,15 +134,28 @@ public void testIdle() throws Exception {
private SocketServer create(String name, int port, ConnectionStyle cs) throws Exception {
final SocketServer server = new SocketServer(
- new NGProtocol(),
+ new HTProtocol("BEGIN_".getBytes(), "_END".getBytes()),
port,
- new MyManager(),
+ new ServerManager(),
name,
cs);
+ server.setMaxCache(2000000);
server.addServerListener(new SocketServerListener() {
@Override
public void connected(SocketDataController controller) {
+ ServerRequest req = new ServerRequest(controller.getName(), SocketServerTest.this.size);
+ for(int i =0; i<100; i++) {
+ String tx = "" + (i % 10);
+ try {
+ byte[] data = req.sampling(tx);
+ System.out.println(controller.getName() + ", " + tx + "> request: " + data.length);
+ server.send(controller.getName(), data, req, 500);
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ System.out.println(controller.getName() + ", " + tx + "> broken");
+ }
+ }
}
@Override
diff --git a/uia.comm/src/test/java/uia/comm/my/MyManager.java b/uia.comm/src/test/java/uia/comm/my/ClientManager.java
similarity index 87%
rename from uia.comm/src/test/java/uia/comm/my/MyManager.java
rename to uia.comm/src/test/java/uia/comm/my/ClientManager.java
index d9437b1..fa1e393 100644
--- a/uia.comm/src/test/java/uia/comm/my/MyManager.java
+++ b/uia.comm/src/test/java/uia/comm/my/ClientManager.java
@@ -27,22 +27,22 @@
* @author Kyle K. Lin
*
*/
-public class MyManager implements MessageManager {
+public class ClientManager implements MessageManager {
@Override
public boolean isCallIn(String cmd) {
- return "ABC".equals(cmd);
+ return "SVRREQ".equals(cmd);
}
@Override
public String findCmd(byte[] data) {
- String cmd = new String(Arrays.copyOfRange(data, 1, 4));
+ String cmd = new String(Arrays.copyOfRange(data, 6, 12));
return cmd;
}
@Override
public String findTx(byte[] data) {
- String tx = new String(Arrays.copyOfRange(data, 4, 5));
+ String tx = new String(Arrays.copyOfRange(data, 12, 13));
return tx;
}
diff --git a/uia.comm/src/test/java/uia/comm/my/ToClientRequest.java b/uia.comm/src/test/java/uia/comm/my/ClientRequest.java
similarity index 57%
rename from uia.comm/src/test/java/uia/comm/my/ToClientRequest.java
rename to uia.comm/src/test/java/uia/comm/my/ClientRequest.java
index 0febaa3..2facb79 100644
--- a/uia.comm/src/test/java/uia/comm/my/ToClientRequest.java
+++ b/uia.comm/src/test/java/uia/comm/my/ClientRequest.java
@@ -29,41 +29,62 @@
* @author Kyle K. Lin
*
*/
-public class ToClientRequest implements MessageCallIn, MessageCallOut {
+public class ClientRequest implements MessageCallIn, MessageCallOut {
+
+ private String name;
+
+ private String message;
+
+ private String tx;
+
+ public ClientRequest(String name, int size) {
+ this.name = name;
+ StringBuilder body = new StringBuilder();
+ for(int i=0; i< size; i++) {
+ body.append("0");
+ }
+ this.message = body.append("_END").toString();
+ }
+
+ public byte[] sampling(String tx) {
+ this.tx = tx;
+ return ("BEGIN_CNTREQ" + this.tx + this.message).getBytes();
+ }
- @Override
+ @Override
public String getCmdName() {
- return "ABC";
+ return "CNTREQ";
}
@Override
public String getTxId() {
- return "1";
+ return this.tx;
}
@Override
public void execute(byte[] request, SocketDataController controller) {
- // DEF12
- try {
- Thread.sleep(150);
- boolean r = controller.send(new byte[] { (byte) 0x8a, 0x44, 0x45, 0x46, 0x31, 0x32, (byte) 0xa8 }, 1);
- Assert.assertTrue(r);
- }
- catch (InterruptedException e) {
-
- }
+ try {
+ String tx = new String(new byte[] { request[12] });
+ long t = System.currentTimeMillis() % 1500;
+ if(t > 495) {
+ System.out.println(this.name + ", " + tx + "> sleep: " + t);
+ }
+ Thread.sleep(t);
+
+ controller.send(("BEGIN_CNTRSP" + tx + this.message).getBytes(), 1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
@Override
public void execute(byte[] reply) {
- Assert.assertArrayEquals(
- new byte[] { (byte) 0x8a, 0x44, 0x45, 0x46, 0x31, 0x32, (byte) 0xa8 },
- reply);
+ System.out.println(this.name + ", " + this.tx + "> reply: " + reply.length);
}
@Override
public void timeout() {
- Assert.assertTrue(false);
+ System.out.println(this.name + ", " + this.tx + "> timeout");
}
}
diff --git a/uia.comm/src/test/java/uia/comm/my/ToServerRequest.java b/uia.comm/src/test/java/uia/comm/my/ServerManager.java
similarity index 58%
rename from uia.comm/src/test/java/uia/comm/my/ToServerRequest.java
rename to uia.comm/src/test/java/uia/comm/my/ServerManager.java
index ad2c81d..b2fc283 100644
--- a/uia.comm/src/test/java/uia/comm/my/ToServerRequest.java
+++ b/uia.comm/src/test/java/uia/comm/my/ServerManager.java
@@ -18,45 +18,46 @@
*******************************************************************************/
package uia.comm.my;
-import org.junit.Assert;
+import java.util.Arrays;
-import uia.comm.MessageCallIn;
-import uia.comm.MessageCallOut;
-import uia.comm.SocketDataController;
+import uia.comm.MessageManager;
/**
*
* @author Kyle K. Lin
*
*/
-public class ToServerRequest implements MessageCallOut, MessageCallIn {
+public class ServerManager implements MessageManager {
@Override
- public String getCmdName() {
- return "ABC";
+ public boolean isCallIn(String cmd) {
+ return "CNTREQ".equals(cmd);
}
@Override
- public String getTxId() {
- return "2";
+ public String findCmd(byte[] data) {
+ String cmd = new String(Arrays.copyOfRange(data, 6, 12));
+ return cmd;
}
@Override
- public void execute(byte[] reply) {
- Assert.assertArrayEquals(
- new byte[] { (byte) 0x8a, 0x44, 0x45, 0x46, 0x32, 0x31, (byte) 0xa8 },
- reply);
+ public String findTx(byte[] data) {
+ String tx = new String(Arrays.copyOfRange(data, 12, 13));
+ return tx;
}
@Override
- public void timeout() {
- Assert.assertTrue(false);
+ public byte[] decode(byte[] data) {
+ return data;
}
@Override
- public synchronized void execute(byte[] request, SocketDataController controller) {
- // DEF21
- boolean r = controller.send(new byte[] { (byte) 0x8a, 0x44, 0x45, 0x46, 0x32, 0x31, (byte) 0xa8 }, 1);
- Assert.assertTrue(r);
+ public byte[] encode(byte[] data) {
+ return data;
+ }
+
+ @Override
+ public boolean validate(byte[] data) {
+ return true;
}
}
diff --git a/uia.comm/src/test/java/uia/comm/my/ServerRequest.java b/uia.comm/src/test/java/uia/comm/my/ServerRequest.java
new file mode 100644
index 0000000..123e6e2
--- /dev/null
+++ b/uia.comm/src/test/java/uia/comm/my/ServerRequest.java
@@ -0,0 +1,91 @@
+/*******************************************************************************
+ * Copyright 2017 UIA
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package uia.comm.my;
+
+import org.junit.Assert;
+
+import uia.comm.MessageCallIn;
+import uia.comm.MessageCallOut;
+import uia.comm.SocketDataController;
+
+/**
+ *
+ * @author Kyle K. Lin
+ *
+ */
+public class ServerRequest implements MessageCallIn, MessageCallOut {
+
+ private String name;
+
+ private String message;
+
+ private String tx;
+
+ public ServerRequest(String name, int size) {
+ this.name = name;
+ StringBuilder body = new StringBuilder();
+ for(int i=0; i< size; i++) {
+ body.append("0");
+ }
+ this.message = body.append("_END").toString();
+ }
+
+ public byte[] sampling(String tx) {
+ this.tx = tx;
+ return ("BEGIN_SVRREQ" + this.tx + this.message).getBytes();
+ }
+
+ @Override
+ public String getCmdName() {
+ return "SVRREQ";
+ }
+
+ @Override
+ public String getTxId() {
+ return this.tx;
+ }
+
+ @Override
+ public void execute(byte[] request, SocketDataController controller) {
+ try {
+ String tx = new String(new byte[] { request[12] });
+ long t = System.currentTimeMillis() % 1500;
+ if(t > 495) {
+ System.out.println(this.name + ", " + tx + "> sleep: " + t);
+ }
+ Thread.sleep(t);
+
+ boolean r = controller.send(("BEGIN_SVRRSP" + tx + this.message).getBytes(), 1);
+ Assert.assertTrue(r);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void execute(byte[] reply) {
+ System.out.println(this.name + ", " + this.tx + "> reply: " + reply.length);
+ }
+
+ @Override
+ public void timeout() {
+ System.out.println(this.name + ", " + this.tx + "> timeout");
+ }
+
+}