Skip to content

Commit

Permalink
1. default maxCache: 20K
Browse files Browse the repository at this point in the history
2. Server: thread control
3. Callout: thread control
  • Loading branch information
gazer2kanlin committed Dec 17, 2018
1 parent b6fefed commit f80a3f9
Show file tree
Hide file tree
Showing 13 changed files with 397 additions and 280 deletions.
2 changes: 1 addition & 1 deletion uia.comm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.uia.solution</groupId>
<artifactId>uia-comm</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>
<packaging>jar</packaging>
<name>uia-comm</name>
<url>https://github.com/uia4j/uia-comm</url>
Expand Down
57 changes: 20 additions & 37 deletions uia.comm/src/main/java/uia/comm/SocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class SocketClient implements ProtocolEventHandler<SocketDataController>,

private final HashMap<String, MessageCallIn<SocketDataController>> callIns;

private final HashMap<String, MessageCallOut> callOuts;
private final ConcurrentHashMap<String, MessageCallOut> callOuts;

private final String aliasName;

Expand Down Expand Up @@ -103,17 +104,17 @@ public SocketClient(final Protocol<SocketDataController> protocol, final Message
this.protocol.addMessageHandler(this);
this.manager = manager;
this.callIns = new HashMap<String, MessageCallIn<SocketDataController>>();
this.callOuts = new HashMap<String, MessageCallOut>();
this.callOuts = new ConcurrentHashMap<String, MessageCallOut>();
this.started = false;
this.maxCache = 10 * 1000; // 10K
this.maxCache = 20 * 1024; // 20K
}

public int getMaxCache() {
return this.maxCache;
}

public void setMaxCache(int maxCache) {
this.maxCache = Math.min(2000000, Math.max(16, maxCache)); // 2M
this.maxCache = Math.max(16, maxCache);
}

@Override
Expand Down Expand Up @@ -244,15 +245,15 @@ public synchronized boolean tryConnect() {
}
catch (Exception ex) {
if (this.clientPort > 0) {
logger.error(String.format("%s> connect to %s:%s(%d) failure. %s",
logger.error(String.format("%s> connect to %s:%s(%d) failed. %s",
this.aliasName,
this.addr,
this.port,
this.clientPort,
ex.getMessage()));
}
else {
logger.error(String.format("%s> connect to %s:%s failure. %s",
logger.error(String.format("%s> connect to %s:%s failed. %s",
this.aliasName,
this.addr,
this.port,
Expand Down Expand Up @@ -317,7 +318,7 @@ public boolean send(final byte[] data, int times) throws SocketException {
return this.controller.send(data, times);
}
catch (Exception ex) {
logger.error(String.format("%s> send %s failure. ex:%s",
logger.error(String.format("%s> send %s failed. ex:%s",
this.aliasName,
ByteUtils.toHexString(data, 100),
ex.getMessage()));
Expand All @@ -340,10 +341,7 @@ public byte[] send(final byte[] data, String txId, long timeout, int retry) thro
ExecutorService threadPool = Executors.newSingleThreadExecutor();

try {
synchronized (this.callOuts) {
this.callOuts.put(txId, callout);
}

this.callOuts.put(txId, callout);
if (this.controller.send(data, retry)) {
try {
Future<byte[]> future = threadPool.submit(callout);
Expand All @@ -355,15 +353,13 @@ public byte[] send(final byte[] data, String txId, long timeout, int retry) thro
}
}
else {
logger.debug(String.format("%s> send %s failure", this.aliasName, ByteUtils.toHexString(data, 100)));
throw new SocketException(this.aliasName + "> send failure");
logger.debug(String.format("%s> send %s failed", this.aliasName, ByteUtils.toHexString(data, 100)));
throw new SocketException(this.aliasName + "> send failed");
}
}
finally {
threadPool.shutdown();
synchronized (this.callOuts) {
this.callOuts.remove(txId);
}
this.callOuts.remove(txId);
}
}

Expand All @@ -373,31 +369,24 @@ public boolean send(final byte[] data, final MessageCallOut callOut, long timeou
}

@Override
public boolean send(final byte[] data, final MessageCallOut callOut, long timeout, int retry) throws SocketException {
public boolean send(final byte[] data, MessageCallOut callOut, long timeout, int retry) throws SocketException {
if (!this.started) {
throw new SocketException(this.aliasName + "> is not started.");
}

final String tx = callOut.getTxId();
synchronized (this.callOuts) {
this.callOuts.put(tx, callOut);
}
this.callOuts.put(tx, callOut);

if (this.controller.send(data, retry)) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {

@Override
public void run() {
MessageCallOut out = null;
synchronized (SocketClient.this.callOuts) {
if (SocketClient.this.callOuts.containsKey(tx)) {
logger.debug(String.format("%s> tx:%s callOut timeout", SocketClient.this.aliasName, callOut.getTxId()));
out = SocketClient.this.callOuts.remove(tx);
}
}
if (out != null) {
MessageCallOut out = SocketClient.this.callOuts.remove(tx);
if(out != null) {
try {
logger.info(String.format("%s> tx:%s callOut timeout", SocketClient.this.aliasName, out.getTxId()));
out.timeout();
}
catch (Exception ex) {
Expand All @@ -410,10 +399,8 @@ public void run() {
return true;
}
else {
synchronized (this.callOuts) {
this.callOuts.remove(tx);
}
logger.debug(String.format("%s> send %s failure", this.aliasName, ByteUtils.toHexString(data, 100)));
this.callOuts.remove(tx);
logger.debug(String.format("%s> send %s failed", this.aliasName, ByteUtils.toHexString(data, 100)));
return false;
}
}
Expand Down Expand Up @@ -460,16 +447,12 @@ public void run() {
}
else {
String tx = this.manager.findTx(received);
final MessageCallOut callOut = this.callOuts.get(tx);
final MessageCallOut callOut = this.callOuts.remove(tx);
if (callOut == null) {
logger.debug(String.format("%s> cmd:%s tx:%s callout reply missing", this.aliasName, cmd, tx));
return;
}

synchronized (this.callOuts) {
this.callOuts.remove(tx);
}

logger.debug(String.format("%s> cmd:%s tx:%s callout reply", this.aliasName, cmd, tx));
new Thread(new Runnable() {

Expand Down
1 change: 1 addition & 0 deletions uia.comm/src/main/java/uia/comm/SocketDataController.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public synchronized boolean send(byte[] data, int times) {
int cnt = this.ch.write(ByteBuffer.wrap(encoded));
if (cnt == encoded.length) {
logger.debug(String.format("%s> send %s", this.name, ByteUtils.toHexString(encoded, 100)));
Thread.sleep(100);
return true;
}
else {
Expand Down
4 changes: 2 additions & 2 deletions uia.comm/src/main/java/uia/comm/SocketDataSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class SocketDataSelector {

/**
*
* @throws IOException Raise when open failure.
* @throws IOException Raise when open failed.
*/
public SocketDataSelector() throws IOException {
this.selector = Selector.open();
Expand All @@ -48,7 +48,7 @@ public SocketDataSelector() throws IOException {
* Register controller to specific channel.
* @param ch Socket channel.
* @param controller Socket data controller.
* @throws ClosedChannelException Raise when register channel failure.
* @throws ClosedChannelException Raise when register channel failed.
*/
public void register(SocketChannel ch, SocketDataController controller) throws ClosedChannelException {
ch.register(this.selector, SelectionKey.OP_READ, controller);
Expand Down
Loading

0 comments on commit f80a3f9

Please sign in to comment.