Skip to content

Commit

Permalink
Merge branch 'release/v0.4.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
gazer2kanlin committed Jul 11, 2019
2 parents eb65c6e + 144fd58 commit 4cf9de1
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 484 deletions.
12 changes: 7 additions & 5 deletions uia.comm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.uia.solution</groupId>
<artifactId>uia-comm</artifactId>
<version>0.3.3</version>
<version>0.4.0</version>
<packaging>jar</packaging>
<name>uia-comm</name>
<url>https://github.com/uia4j/uia-comm</url>
Expand Down Expand Up @@ -103,14 +103,16 @@
</plugin>
</plugins>
</build>
<scm>
<developerConnection>Kyle K. Lin</developerConnection>
<url>https://github.com/uia4j/uia-comm</url>
</scm>
<distributionManagement>
<repository>
<id>bintray-my-maven</id>
<url>https://api.bintray.com/maven/uia4j/maven/uia-comm/;publish=1</url>
</repository>
</distributionManagement>
<issueManagement>
<url>https://github.com/uia4j/uia-comm/issues</url>
</issueManagement>
<organization>
<url>https://github.com/uia4j</url>
</organization>
</project>
260 changes: 93 additions & 167 deletions uia.comm/src/main/java/uia/comm/DatagramClient.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
/*******************************************************************************
* Copyright 2017 UIA
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package uia.comm;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.DatagramChannel;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.log4j.Logger;

Expand All @@ -19,6 +31,12 @@
import uia.comm.protocol.ProtocolMonitor;
import uia.utils.ByteUtils;

/**
* UDP client.
*
* @author Kyle K. Lin
*
*/
public class DatagramClient implements ProtocolEventHandler<DatagramDataController>, CommClient<DatagramDataController> {

private final static Logger logger = Logger.getLogger(DatagramClient.class);
Expand All @@ -29,19 +47,21 @@ public class DatagramClient implements ProtocolEventHandler<DatagramDataControll

private final HashMap<String, MessageCallIn<DatagramDataController>> callIns;

private final HashMap<String, MessageCallOut> callOuts;

private final String aliasName;

private int broadcastPort;

private boolean started;

private DatagramDataController controller;

private DatagramChannel ch;

private String addr;

private int port;
private String listenAddress;

private int listenPort;

private int maxCache;

/**
* The constructor.
Expand All @@ -56,59 +76,53 @@ public DatagramClient(final Protocol<DatagramDataController> protocol, final Mes
this.protocol.addMessageHandler(this);
this.manager = manager;
this.callIns = new HashMap<String, MessageCallIn<DatagramDataController>>();
this.callOuts = new HashMap<String, MessageCallOut>();
this.started = false;
this.maxCache = 20 * 1024; // 20K
}

@Override
public String getName() {
return this.aliasName;
public int getMaxCache() {
return this.maxCache;
}

/**
* Get address.
* @return Address.
*/
public String getAddr() {
return this.addr;
public void setMaxCache(int maxCache) {
this.maxCache = Math.max(16, maxCache);
}

/**
* Set address.
* @param addr Address.
*/
public void setAddr(String addr) {
this.addr = addr;
}
public int getBroadcastPort() {
return broadcastPort;
}

/**
* Get port no.
* @return Port no.
*/
public int getPort() {
return this.port;
}
public void setBroadcastPort(int broadcastPort) {
this.broadcastPort = broadcastPort;
}

/**
* Set port no.
* @param port Port no.
*/
public void setPort(int port) {
this.port = port;
public String getListenAddress() {
return listenAddress;
}

public void setListenAddress(String listenAddress) {
this.listenAddress = listenAddress;
}

public int getListenPort() {
return listenPort;
}

public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}

@Override
public String getName() {
return this.aliasName;
}

/**
* Connect to specific socket server.
*
* @param address Address.
* @param port Port no.
* @return True if connect success or connected already.
*/
public synchronized boolean connect(String address, int port) {
public synchronized boolean connect(String listenAddress, int listenPort, int broadcastPort) {
disconnect();

this.addr = address;
this.port = port;
this.listenAddress = listenAddress;
this.listenPort = listenPort;
this.broadcastPort = broadcastPort;
this.started = false;

return tryConnect();
Expand All @@ -127,41 +141,44 @@ public boolean isIdle(int timeout) {
* @return Connected or not.
*/
public synchronized boolean tryConnect() {
if (this.addr == null) {
this.started = false;
return false;
}

if (this.started) {
return true;
}

try {
this.ch = DatagramChannel.open();
this.ch.connect(new InetSocketAddress(InetAddress.getByName(this.addr), this.port));

/** Android API 24 above
this.ch = DatagramChannel.open(StandardProtocolFamily.INET);
this.ch.setOption(StandardSocketOptions.SO_RCVBUF, 2 * this.maxCache);
this.ch.setOption(StandardSocketOptions.SO_SNDBUF, 2 * this.maxCache);
*/
this.ch.socket().setBroadcast(true);
this.ch.configureBlocking(false);
this.ch.socket().bind(new InetSocketAddress(this.listenAddress, this.listenPort));
this.controller = new DatagramDataController(
this.aliasName,
this.broadcastPort,
this.ch,
this.manager,
this.protocol.createMonitor(this.aliasName));

logger.info(String.format("%s> connect to %s:%s",
this.aliasName,
this.addr,
this.port));

this.controller.setMaxCache(this.maxCache);
this.controller.start();
this.started = true;

logger.info(String.format("%s> listen on %s:%s, broadcastPort:%s",
this.aliasName,
this.listenAddress,
this.listenPort,
this.broadcastPort));

return true;
}
catch (Exception ex) {
logger.error(String.format("%s> connect to %s:%s failure. %s",
logger.error(String.format("%s> listen failed on %s:%s, broadcastPort:%s",
this.aliasName,
this.addr,
this.port,
ex.getMessage()));

logger.error(ex);
this.listenAddress,
this.listenPort,
this.broadcastPort), ex);
disconnect();
return false;
}
Expand Down Expand Up @@ -189,6 +206,7 @@ public synchronized void disconnect() {
}

try {
this.controller.stop();
logger.info(String.format("%s> disconnect", this.aliasName));
}
catch (Exception ex) {
Expand Down Expand Up @@ -228,95 +246,22 @@ public boolean send(final byte[] data, int times) throws SocketException {

@Override
public byte[] send(final byte[] data, String txId, long timeout) throws SocketException {
return send(data, txId, timeout, 1);
throw new SocketException("Not support on UDP. Use send(byte[], int).");
}

@Override
public byte[] send(final byte[] data, String txId, long timeout, int retry) throws SocketException {
if (!this.started) {
throw new SocketException(this.aliasName + "> is not started.");
}

MessageCallOutConcurrent callout = new MessageCallOutConcurrent(getName(), txId, timeout);
ExecutorService threadPool = Executors.newSingleThreadExecutor();

try {
synchronized (this.callOuts) {
this.callOuts.put(txId, callout);
}

if (this.controller.send(data, retry)) {
try {
Future<byte[]> future = threadPool.submit(callout);
return future.get();
}
catch (Exception e) {
logger.error(String.format("%s> callout failed", this.aliasName), e);
return null;
}
}
else {
logger.debug(String.format("%s> send %s failure", this.aliasName, ByteUtils.toHexString(data, 100)));
throw new SocketException(this.aliasName + "> send failure");
}
}
finally {
threadPool.shutdown();
synchronized (this.callOuts) {
this.callOuts.remove(txId);
}
}
throw new SocketException("Not support on UDP. Use send(byte[], int).");
}

@Override
public boolean send(final byte[] data, final MessageCallOut callOut, long timeout) throws SocketException {
return send(data, callOut, timeout, 1);
throw new SocketException("Not support on UDP. Use send(byte[], int).");
}

@Override
public boolean send(final byte[] data, final MessageCallOut callOut, long timeout, int retry) throws SocketException {
if (!this.started) {
throw new SocketException(this.aliasName + "> is not started.");
}

final String tx = callOut.getTxId();
synchronized (this.callOuts) {
this.callOuts.put(tx, callOut);
}

if (this.controller.send(data, retry)) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {

@Override
public void run() {
MessageCallOut out = null;
synchronized (DatagramClient.this.callOuts) {
if (DatagramClient.this.callOuts.containsKey(tx)) {
logger.debug(String.format("%s> tx:%s callOut timeout", DatagramClient.this.aliasName, callOut.getTxId()));
out = DatagramClient.this.callOuts.remove(tx);
}
}
if (out != null) {
try {
out.timeout();
}
catch (Exception ex) {

}
}
}

}, timeout);
return true;
}
else {
synchronized (this.callOuts) {
this.callOuts.remove(tx);
}
logger.debug(String.format("%s> send %s failure", this.aliasName, ByteUtils.toHexString(data, 100)));
return false;
}
throw new SocketException("Not support on UDP. Use send(byte[], int).");
}

@Override
Expand Down Expand Up @@ -360,26 +305,7 @@ public void run() {
}).start();
}
else {
String tx = this.manager.findTx(received);
final MessageCallOut callOut = this.callOuts.get(tx);
if (callOut == null) {
logger.debug(String.format("%s> cmd:%s tx:%s callout reply missing", this.aliasName, cmd, tx));
return;
}

synchronized (this.callOuts) {
this.callOuts.remove(tx);
}

logger.debug(String.format("%s> cmd:%s tx:%s callout reply", this.aliasName, cmd, tx));
new Thread(new Runnable() {

@Override
public void run() {
callOut.execute(received);
}

}).start();
logger.error(String.format("%s> cmd:%s callIn NOT FOUND", this.aliasName, cmd));
}
}

Expand Down
Loading

0 comments on commit 4cf9de1

Please sign in to comment.