From 3bc4826396a822e236d3bdab9cf4f1db56257af0 Mon Sep 17 00:00:00 2001 From: daniel Date: Sun, 19 Aug 2018 19:49:00 -0500 Subject: [PATCH 1/7] Starting refactor --- .../edu/uniquindio/utils/command/Command.java | 37 -- .../utils/command/ThreadCommand.java | 42 -- .../communication/transfer/BytesTransfer.java | 38 -- .../CommunicationManagerWaitingResult.java | 492 -------------- .../communication/transfer/Communicator.java | 4 +- .../CommunicationManagerNetworkLAN.java | 170 ----- .../network/CommunicationManagerTCP.java | 611 +++++++++++++++--- .../transfer/network/MessagesReciever.java | 166 ----- ...tworkLAN.java => MulticastManagerUDP.java} | 20 +- .../network/UnicastBigManagerTCP.java | 160 ----- .../transfer/network/UnicastManagerTCP.java | 8 +- .../response/MessageProcessorGateway.java | 33 + .../MessageProcessorGatewayAsynchronous.java | 31 + .../MessagesReceiver.java} | 44 +- .../ReceiverMessageCommand.java | 11 +- .../ResponseReleaser.java} | 8 +- .../{ => response}/ReturnsManager.java | 2 +- .../ReturnsManagerCommunication.java | 6 +- .../{ => response}/WaitingResult.java | 6 +- 19 files changed, 661 insertions(+), 1228 deletions(-) delete mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/command/Command.java delete mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/command/ThreadCommand.java delete mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/BytesTransfer.java delete mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/CommunicationManagerWaitingResult.java delete mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerNetworkLAN.java delete mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MessagesReciever.java rename communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/{MulticastManagerNetworkLAN.java => MulticastManagerUDP.java} (89%) delete mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastBigManagerTCP.java create mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGateway.java create mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGatewayAsynchronous.java rename communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/{Stoppable.java => response/MessagesReceiver.java} (50%) rename communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/{ => response}/ReceiverMessageCommand.java (88%) rename communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/{Responder.java => response/ResponseReleaser.java} (83%) rename communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/{ => response}/ReturnsManager.java (95%) rename communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/{ => response}/ReturnsManagerCommunication.java (93%) rename communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/{ => response}/WaitingResult.java (96%) diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/command/Command.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/command/Command.java deleted file mode 100644 index 04289d8..0000000 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/command/Command.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Communication project implement communication point to point and multicast - * Copyright (C) 2010 Daniel Pelaez, Daniel Lopez, Hector Hurtado - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package co.edu.uniquindio.utils.command; - -/** - * The {@code Command} interface defines the basic methods for implementing the - * Command pattern - * - * @author Daniel Pelaez - * @author Hector Hurtado - * @author Daniel Lopez - * @version 1.0, 17/06/2010 - * @since 1.0 - */ -public interface Command { - /** - * Execute the command action - */ - public void execute(); -} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/command/ThreadCommand.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/command/ThreadCommand.java deleted file mode 100644 index 36c4edd..0000000 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/command/ThreadCommand.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Communication project implement communication point to point and multicast - * Copyright (C) 2010 Daniel Pelaez, Daniel Lopez, Hector Hurtado - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package co.edu.uniquindio.utils.command; - -/** - * The ThreadCommand class is an implementation of the command - * pattern that execute the {@code Thread.start()} method - * - * - * @author Daniel Pelaez - * @author Hector Hurtado - * @author Daniel Lopez - * @version 1.0, 17/06/2010 - * @since 1.0 - * - */ -public abstract class ThreadCommand implements Command, Runnable { - /** - * Execute an thread - */ - public void execute() { - Thread thread = new Thread(this); - thread.start(); - } -} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/BytesTransfer.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/BytesTransfer.java deleted file mode 100644 index 1e064f7..0000000 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/BytesTransfer.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Communication project implement communication point to point and multicast - * Copyright (C) 2010 Daniel Pelaez, Daniel Lopez, Hector Hurtado - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package co.edu.uniquindio.utils.communication.transfer; - -import co.edu.uniquindio.utils.communication.message.Message; - -/** - * The BytesTransfer interfaz have all services to send big - * messages - * - * @author Daniel Pelaez - * @version 1.0, 17/06/2010 - * @since 1.0 - */ -public interface BytesTransfer extends Communicator { - /** - * Send big massage - * - * @param bigMessage Message to send - */ - public void send(Message bigMessage); -} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/CommunicationManagerWaitingResult.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/CommunicationManagerWaitingResult.java deleted file mode 100644 index a471fab..0000000 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/CommunicationManagerWaitingResult.java +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Communication project implement communication point to point and multicast - * Copyright (C) 2010 Daniel Pelaez, Daniel Lopez, Hector Hurtado - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package co.edu.uniquindio.utils.communication.transfer; - -import co.edu.uniquindio.utils.communication.Observable; -import co.edu.uniquindio.utils.communication.Observer; -import co.edu.uniquindio.utils.communication.message.Message; -import co.edu.uniquindio.utils.communication.message.Message.SendType; - -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.util.Map; -import java.util.Set; - -/** - * The {@code CommunicationManagerWaitingResult} class is used to send messages - * regardless of the implementation of the way in which they are sent. Its - * abstract because the implementation of some methods could be different. - * - * @author Daniel Pelaez - * @author Hector Hurtado - * @author Daniel Lopez - * @version 1.0.2, 17/06/2010 - * @see CommunicationManager - * @see WaitingResult - * @see ReturnsManager - * @since 1.0.2 - */ -public abstract class CommunicationManagerWaitingResult implements - CommunicationManager, Responder { - - /** - * Property - */ - private static final String RESPONSE_TIME = "RESPONSE_TIME"; - - /** - * Observable of message - */ - protected Observable observableCommunication; - /** - * Multicast manager - */ - protected Communicator multicastManager; - /** - * Unicast manager - */ - protected Communicator unicastManager; - /** - * Unicast big message - */ - protected Communicator unicastBigManager; - /** - * Communication properties - */ - protected Map communicationProperties; - - /** - * Returns manager - */ - private ReturnsManager returnsManager; - protected MessageProcessor messageProcessor; - - /** - * Builds an CommunicationManagerWaitingResult - */ - protected CommunicationManagerWaitingResult() { - this.returnsManager = new ReturnsManagerCommunication(); - this.observableCommunication = new Observable(); - } - - /** - * Creates and sends a message specifying its type, the type of the response - * and the data. Used sendMessage(message) to send message. The - * typeReturn must to have a method by signed T valueOf(String) - * and class must to have constructor without parameters, or an constructor - * T(String) . If typeReturn is a Message class, the return is - * message object. If message response contains several params, you must set - * typeReturn like Message - * - * @param message The type of the message that will be sent. - * @param typeReturn The type of the response - * @return An object of the specified type. - */ - public T sendMessageUnicast(Message message, Class typeReturn) { - - /* - * Created WaitingResult for message sequence number to send - */ - WaitingResult waitingResult = returnsManager - .createWaitingResult(message.getSequenceNumber(), Long - .parseLong(communicationProperties.get(RESPONSE_TIME))); - - sendMessageUnicast(message); - - /* - * Waiting for response - */ - Message messageResponse = waitingResult.getResult(); - - /* - * Waiting for response - */ - return processResponse(messageResponse, typeReturn, null); - } - - /** - * Creates and sends a message specifying its type, the type of the response - * and the data. Used sendMessage(message) to send message. The - * typeReturn must to have a method by signed T valueOf(String) - * and class must to have constructor without parameters, or an constructor - * T(String) . If typeReturn is a Message class, the return is - * message object. The message response must contains a param called - * paramNameResult, the value of this is used for create result - * - * @param message The type of the message that will be sent. - * @param typeReturn The type of the response - * @param paramNameResult Param name of result - * @return An object of the specified type. - */ - public T sendMessageUnicast(Message message, Class typeReturn, - String paramNameResult) { - - /* - * Created WaitingResult for message sequence number to send - */ - WaitingResult waitingResult = returnsManager - .createWaitingResult(message.getSequenceNumber(), Long - .parseLong(communicationProperties.get(RESPONSE_TIME))); - - sendMessageUnicast(message); - - /* - * Waiting for response - */ - Message messageResponse = waitingResult.getResult(); - - /* - * Waiting for response - */ - return processResponse(messageResponse, typeReturn, paramNameResult); - } - - /** - * Sends a message specifying its type, the type of the response and the - * data. Used Communicator instance called unicastManager to send message - * - * @param message Messages to send - */ - public void sendMessageUnicast(Message message) { - unicastManager.send(message); - } - - /** - * Creates and sends a multicast message specifying its type, the type of - * the response and the data. Used - * sendMessageMultiCast(message) to send message. The - * typeReturn must to have a method by signed T valueOf(String) - * and class must to have constructor without parameters, or an constructor - * T(String) . If typeReturn is a Message class, the return is - * message object. The message response must contains a param called - * paramNameResult, the value of this is used for create result - * - * @param Type return - * @param message Message - * @param typeReturn Type return - * @return Response - */ - public T sendMessageMultiCast(Message message, Class typeReturn) { - - /* - * Created WaitingResult for message sequence number to send - */ - WaitingResult waitingResult = returnsManager - .createWaitingResult(message.getSequenceNumber(), Long - .parseLong(communicationProperties.get(RESPONSE_TIME))); - - sendMessageMultiCast(message); - - /* - * Waiting for response - */ - Message messageResponse = waitingResult.getResult(); - - /* - * Waiting for response - */ - return processResponse(messageResponse, typeReturn, null); - } - - /** - * Creates and sends a multicast message specifying its type, the type of - * the response and the data. Used - * sendMessageMultiCast(message) to send message. The - * typeReturn must to have a method by signed T valueOf(String) - * and class must to have constructor without parameters, or an constructor - * T(String) . If typeReturn is a Message class, the return is - * message object. If message response contains several params, you must set - * typeReturn like Message - * - * @param Type return - * @param message Message - * @param typeReturn Type return - * @param paramNameResult Param name of result - * @return Response - */ - public T sendMessageMultiCast(Message message, Class typeReturn, - String paramNameResult) { - - /* - * Created WaitingResult for message sequence number to send - */ - WaitingResult waitingResult = returnsManager - .createWaitingResult(message.getSequenceNumber(), Long - .parseLong(communicationProperties.get(RESPONSE_TIME))); - - sendMessageMultiCast(message); - - /* - * Waiting for response - */ - Message messageResponse = waitingResult.getResult(); - - /* - * Waiting for response - */ - return processResponse(messageResponse, typeReturn, paramNameResult); - } - - /** - * Sends a multicast message specifying its type, the type of the response - * and the data. Used Communicator instance called multicastManager to send - * message - * - * @param message Messages to send - */ - public void sendMessageMultiCast(Message message) { - multicastManager.send(message); - } - - /** - * Send big message. Used ByteTransfer instance called byteTransferManager - * to send message - * - * @param bigMessage Message to send - */ - public void sendBigMessage(Message bigMessage) { - unicastBigManager.send(bigMessage); - } - - /** - * Send request to reciever big message. Used BytesTransfer instance to send - * request - * - * @param message Request message - * @return Response - */ - public Message recieverBigMessage(Message message) { - - /* - * Created WaitingResult for message sequence number to send - */ - WaitingResult waitingResult = returnsManager - .createWaitingResult(message.getSequenceNumber(), -1); - - unicastBigManager.send(message); - - /* - * Waiting for response - */ - return (Message) waitingResult.getResult(); - } - - /** - * Stop all process - */ - public void stopAll() { - stop(); - multicastManager.stop(); - unicastManager.stop(); - unicastBigManager.stop(); - } - - /** - * Release return in wait. - * - * @param message Message - * @return True if return is release - */ - public boolean releaseResponse(Message message) { - if (message.getSendType().equals(SendType.RESPONSE)) { - returnsManager.releaseWaitingResult(message.getSequenceNumber(), - message); - - return true; - } else { - return false; - } - } - - /** - * Convert message to return type - * - * @param Return type - * @param message Message - * @param type Return type - * @return Return - */ - @SuppressWarnings("unchecked") - private T processResponse(Message message, Class type, - String paramNameResult) { - - T typeInstance = null; - - if (message == null) { - return null; - } - - if (type.equals(Message.class)) { - return (T) message; - } - - if (type.isInterface() || type.isAnnotation() || type.isArray()) { - throw new IllegalArgumentException("The type must a class (" - + type.getName() + ")"); - } - - Set params = message.getParamsKey(); - - String paramValue; - - if (paramNameResult == null) { - - if (params.size() != 1) { - throw new IllegalArgumentException( - "The message contains more than one parameter, you can not convert to " - + type.getName()); - } - - String paramName = (String) params.toArray()[0]; - - if (paramName == null || paramName.isEmpty()) { - throw new IllegalArgumentException( - "The message contains a param name null or empty"); - } - - paramValue = message.getParam(paramName); - } else { - - paramValue = message.getParam(paramNameResult); - } - - if (paramValue == null || paramValue.isEmpty()) { - return null; - } - - try { - Method valueOf = type - .getMethod("valueOf", String.class); - - typeInstance = (T) valueOf.invoke(null, paramValue); - } catch (Exception e) { - try { - - Constructor constructorString = type - .getDeclaredConstructor(String.class); - - typeInstance = constructorString.newInstance(paramValue); - } catch (Exception e1) { - throw new IllegalArgumentException( - "The method valueOf(String) not must to be invoked in class " - + type.getName(), e1); - } - } - - return typeInstance; - } - - /** - * Adds observer to communication - * - * @param observer Observer to add - */ - public void addObserver(Observer observer) { - observableCommunication.addObserver(observer); - } - - /** - * Remove observer to communication - * - * @param observer Observer to remove - */ - public void removeObserver(Observer observer) { - observableCommunication.removeObserver(observer); - } - - /** - * Remove observer by name - * - * @param name Observer name - */ - public void removeObserver(String name) { - observableCommunication.removeObserver(name); - } - - /** - * Initialize communication manager. Invoke to - * createUnicastManager(), - * createMulticastManager() and - * createTransferObjectManager() - */ - public void init() { - this.unicastManager = createUnicastManager(); - this.multicastManager = createMulticastManager(); - this.unicastBigManager = createUnicastBigManager(); - } - - /** - * Creates a Communicator instance for unicast - * - * @return Communicator - */ - protected abstract Communicator createUnicastManager(); - - /** - * Creates a Communicator instance for multicast - * - * @return Communicator - */ - protected abstract Communicator createMulticastManager(); - - /** - * Creates a Communicator instance for unicast big - * - * @return Communicator - */ - protected abstract Communicator createUnicastBigManager(); - - /** - * Stop this instance of communication manager - */ - protected abstract void stop(); - - /** - * Gets properties of communication - * - * @return CommunicationProperties - */ - public Map getCommunicationProperties() { - return communicationProperties; - } - - /** - * Sets properties for communication - * - * @param communicationProperties Properties - */ - public void setCommunicationProperties( - Map communicationProperties) { - this.communicationProperties = communicationProperties; - } - - @Override - public void addMessageProcessor(String name, MessageProcessor messageProcessor) { - this.messageProcessor = messageProcessor; - } - - @Override - public void removeMessageProcessor(String name) { - this.messageProcessor = null; - } - - public MessageProcessor getMessageProcessor() { - return messageProcessor; - } -} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Communicator.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Communicator.java index 6e3744a..289daf0 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Communicator.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Communicator.java @@ -22,7 +22,7 @@ /** * The Communicator interface hava all services for to send and to - * reciever messages + * receiver messages * * @author Daniel Pelaez * @version 1.0, 17/06/2010 @@ -44,5 +44,5 @@ public interface Communicator extends Stoppable { * * @return Message */ - public Message reciever(); + public Message receiver(); } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerNetworkLAN.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerNetworkLAN.java deleted file mode 100644 index 5f24c40..0000000 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerNetworkLAN.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Communication project implement communication point to point and multicast - * Copyright (C) 2010 Daniel Pelaez, Daniel Lopez, Hector Hurtado - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package co.edu.uniquindio.utils.communication.transfer.network; - -import co.edu.uniquindio.utils.communication.transfer.CommunicationManagerWaitingResult; -import co.edu.uniquindio.utils.communication.transfer.Communicator; -import org.apache.log4j.Logger; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * The CommunicationManagerNetworkLAN abstract class management - * send message multicast and recieves all messages from all sources. Required - * params: BUFFER_SIZE_MULTICAST, IP_MULTICAST and PORT_MULTICAST - * - * @author Daniel Pelaez - * @author Hector Hurtado - * @author Daniel Lopez - * @version 1.0, 17/06/2010 - * @since 1.0 - */ -public abstract class CommunicationManagerNetworkLAN extends - CommunicationManagerWaitingResult { - protected final MessageSerialization messageSerialization; - - protected CommunicationManagerNetworkLAN(MessageSerialization messageSerialization) { - this.messageSerialization = messageSerialization; - } - - /** - * Properties for configuration CommunicationManagerNetworkLAN - * - * @author Daniel Pelaez - * @author Hector Hurtado - * @author Daniel Lopez - * @version 1.0, 17/06/2010 - * @since 1.0 - */ - public enum CommunicationManagerNetworkLANProperties { - BUFFER_SIZE_MULTICAST, IP_MULTICAST, PORT_MULTICAST - } - - /** - * Logger - */ - private static final Logger logger = Logger - .getLogger(CommunicationManagerNetworkLAN.class); - - /** - * Reciever all messages - */ - private MessagesReciever messagesReciever; - - /* - * (non-Javadoc) - * - * @seeco.edu.uniquindio.utils.communication.transfer. - * CommunicationManagerWaitingResult#init () - */ - public void init() { - super.init(); - this.messagesReciever = new MessagesReciever(multicastManager, - unicastManager, unicastBigManager, this, this); - this.observableCommunication = messagesReciever; - } - - /** - * Creates a Communicator for multicast manager. Are required of params in - * communication properties: PORT_MULTICAST, IP_MULTICAST and - * BUFFER_SIZE_MULTICAST - */ - protected Communicator createMulticastManager() { - - int portMulticast; - String ipMulticast; - long bufferSize; - - if (communicationProperties - .containsKey(CommunicationManagerNetworkLANProperties.PORT_MULTICAST - .name())) { - portMulticast = Integer - .parseInt(communicationProperties - .get(CommunicationManagerNetworkLANProperties.PORT_MULTICAST - .name())); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property PORT_MULTICAST not found"); - - logger.error("Property PORT_MULTICAST no found", - illegalArgumentException); - - throw illegalArgumentException; - } - - if (communicationProperties - .containsKey(CommunicationManagerNetworkLANProperties.IP_MULTICAST - .name())) { - ipMulticast = communicationProperties - .get(CommunicationManagerNetworkLANProperties.IP_MULTICAST - .name()); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property IP_MULTICAST not found"); - - logger.error("Property IP_MULTICAST no found", - illegalArgumentException); - - throw illegalArgumentException; - } - - if (communicationProperties - .containsKey(CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST - .name())) { - bufferSize = Long - .parseLong(communicationProperties - .get(CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST - .name())); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property BUFFER_SIZE_MULTICAST not found"); - - logger.error("Property BUFFER_SIZE_MULTICAST no found", - illegalArgumentException); - - throw illegalArgumentException; - } - - try { - multicastManager = new MulticastManagerNetworkLAN(portMulticast, - InetAddress.getByName(ipMulticast), bufferSize, messageSerialization); - } catch (UnknownHostException e) { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Error of ipmulticast", e); - - logger.error("Error of ipmulticast", illegalArgumentException); - - throw illegalArgumentException; - } - - return multicastManager; - } - - /** - * Stop messages reciever and multicast manager - */ - protected void stop() { - messagesReciever.stop(); - multicastManager.stop(); - } - - - -} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCP.java index 7f37b60..c03c015 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCP.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCP.java @@ -18,92 +18,555 @@ package co.edu.uniquindio.utils.communication.transfer.network; -import co.edu.uniquindio.utils.communication.transfer.Communicator; +import co.edu.uniquindio.utils.communication.Observable; +import co.edu.uniquindio.utils.communication.Observer; +import co.edu.uniquindio.utils.communication.message.Message; +import co.edu.uniquindio.utils.communication.transfer.*; +import co.edu.uniquindio.utils.communication.transfer.response.MessagesReceiver; +import co.edu.uniquindio.utils.communication.transfer.response.ResponseReleaser; +import co.edu.uniquindio.utils.communication.transfer.response.ReturnsManager; +import co.edu.uniquindio.utils.communication.transfer.response.ReturnsManagerCommunication; import org.apache.log4j.Logger; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Map; +import java.util.Set; + /** * The CommunicationManagerTCP class is an * CommunicationManagerNetworkLAN. Implemented the creation of * transfer object and unicast manager. Required params: PORT_TCP_RESOURCE and * PORT_TCP - * + * * @author Daniel Pelaez * @author Hector Hurtado * @author Daniel Lopez * @version 1.0, 17/06/2010 * @since 1.0 - * */ -public class CommunicationManagerTCP extends CommunicationManagerNetworkLAN { - public CommunicationManagerTCP(MessageSerialization messageSerialization) { - super(messageSerialization); - } - - /** - * The CommunicationManagerTCPProperties enum contains params - * required for communication - * - * @author dpelaez - * - */ - public enum CommunicationManagerTCPProperties { - PORT_TCP_RESOURCE, PORT_TCP - } - - /** - * Logger - */ - private static final Logger logger = Logger - .getLogger(CommunicationManagerTCP.class); - - /** - * Creates a BytesTransferManagerTCP instance. Required param in - * CommunicationProperties called PORT_TCP_RESOURCE - */ - protected Communicator createUnicastBigManager() { - int portTcp; - if (communicationProperties - .containsKey(CommunicationManagerTCPProperties.PORT_TCP_RESOURCE - .name())) { - portTcp = Integer.parseInt(communicationProperties - .get(CommunicationManagerTCPProperties.PORT_TCP_RESOURCE - .name())); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property PORT_TCP_RESOURCE not found"); - - logger.error("Property PORT_TCP_RESOURCE not found", - illegalArgumentException); - - throw illegalArgumentException; - } - unicastBigManager = new UnicastBigManagerTCP(portTcp, messageSerialization); - - return unicastBigManager; - } - - /** - * Creates a UnicastManagerTCP instance. Required param in - * CommunicationProperties called PORT_TCP. - */ - protected Communicator createUnicastManager() { - int portTcp; - if (communicationProperties - .containsKey(CommunicationManagerTCPProperties.PORT_TCP.name())) { - portTcp = Integer.parseInt(communicationProperties - .get(CommunicationManagerTCPProperties.PORT_TCP.name())); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property PORT_TCP not found"); - - logger.error("Property PORT_TCP not found", - illegalArgumentException); - - throw illegalArgumentException; - } - unicastManager = new UnicastManagerTCP(portTcp, messageSerialization); - - return unicastManager; - } +public class CommunicationManagerTCP implements + CommunicationManager, ResponseReleaser { + + /** + * The CommunicationManagerTCPProperties enum contains params + * required for communication + * + * @author dpelaez + */ + public enum CommunicationManagerTCPProperties { + PORT_TCP_RESOURCE, PORT_TCP + } + + /** + * Properties for configuration CommunicationManagerNetworkLAN + * + * @author Daniel Pelaez + * @author Hector Hurtado + * @author Daniel Lopez + * @version 1.0, 17/06/2010 + * @since 1.0 + */ + public enum CommunicationManagerNetworkLANProperties { + BUFFER_SIZE_MULTICAST, IP_MULTICAST, PORT_MULTICAST + } + + private static final Logger logger = Logger + .getLogger(CommunicationManagerTCP.class); + + private static final String RESPONSE_TIME = "RESPONSE_TIME"; + + private final MessageSerialization messageSerialization; + private final Observable observableCommunication; + private Communicator multicastManager; + private Communicator unicastManager; + private Map communicationProperties; + private ReturnsManager returnsManager; + private MessageProcessor messageProcessor; + private MessagesReceiver messagesReciever; + + public CommunicationManagerTCP(MessageSerialization messageSerialization) { + this.returnsManager = new ReturnsManagerCommunication(); + this.observableCommunication = new Observable(); + this.messageSerialization = messageSerialization; + } + + /** + * Creates a BytesTransferManagerTCP instance. Required param in + * CommunicationProperties called PORT_TCP_RESOURCE + */ + protected Communicator createUnicastBigManager() { + int portTcp; + if (communicationProperties + .containsKey(CommunicationManagerTCPProperties.PORT_TCP_RESOURCE + .name())) { + portTcp = Integer.parseInt(communicationProperties + .get(CommunicationManagerTCPProperties.PORT_TCP_RESOURCE + .name())); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property PORT_TCP_RESOURCE not found"); + + logger.error("Property PORT_TCP_RESOURCE not found", + illegalArgumentException); + + throw illegalArgumentException; + } + unicastBigManager = new UnicastBigManagerTCP(portTcp, messageSerialization); + + return unicastBigManager; + } + + /** + * Creates a UnicastManagerTCP instance. Required param in + * CommunicationProperties called PORT_TCP. + */ + protected Communicator createUnicastManager() { + int portTcp; + if (communicationProperties + .containsKey(CommunicationManagerTCPProperties.PORT_TCP.name())) { + portTcp = Integer.parseInt(communicationProperties + .get(CommunicationManagerTCPProperties.PORT_TCP.name())); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property PORT_TCP not found"); + + logger.error("Property PORT_TCP not found", + illegalArgumentException); + + throw illegalArgumentException; + } + unicastManager = new UnicastManagerTCP(portTcp, messageSerialization); + + return unicastManager; + } + + + /** + * Creates and sends a message specifying its type, the type of the response + * and the data. Used sendMessage(message) to send message. The + * typeReturn must to have a method by signed T valueOf(String) + * and class must to have constructor without parameters, or an constructor + * T(String) . If typeReturn is a Message class, the return is + * message object. If message response contains several params, you must set + * typeReturn like Message + * + * @param message The type of the message that will be sent. + * @param typeReturn The type of the response + * @return An object of the specified type. + */ + public T sendMessageUnicast(Message message, Class typeReturn) { + + /* + * Created WaitingResult for message sequence number to send + */ + WaitingResult waitingResult = returnsManager + .createWaitingResult(message.getSequenceNumber(), Long + .parseLong(communicationProperties.get(RESPONSE_TIME))); + + sendMessageUnicast(message); + + /* + * Waiting for response + */ + Message messageResponse = waitingResult.getResult(); + + /* + * Waiting for response + */ + return processResponse(messageResponse, typeReturn, null); + } + + /** + * Creates and sends a message specifying its type, the type of the response + * and the data. Used sendMessage(message) to send message. The + * typeReturn must to have a method by signed T valueOf(String) + * and class must to have constructor without parameters, or an constructor + * T(String) . If typeReturn is a Message class, the return is + * message object. The message response must contains a param called + * paramNameResult, the value of this is used for create result + * + * @param message The type of the message that will be sent. + * @param typeReturn The type of the response + * @param paramNameResult Param name of result + * @return An object of the specified type. + */ + public T sendMessageUnicast(Message message, Class typeReturn, + String paramNameResult) { + + /* + * Created WaitingResult for message sequence number to send + */ + WaitingResult waitingResult = returnsManager + .createWaitingResult(message.getSequenceNumber(), Long + .parseLong(communicationProperties.get(RESPONSE_TIME))); + + sendMessageUnicast(message); + + /* + * Waiting for response + */ + Message messageResponse = waitingResult.getResult(); + + /* + * Waiting for response + */ + return processResponse(messageResponse, typeReturn, paramNameResult); + } + + /** + * Sends a message specifying its type, the type of the response and the + * data. Used Communicator instance called unicastManager to send message + * + * @param message Messages to send + */ + public void sendMessageUnicast(Message message) { + unicastManager.send(message); + } + + /** + * Creates and sends a multicast message specifying its type, the type of + * the response and the data. Used + * sendMessageMultiCast(message) to send message. The + * typeReturn must to have a method by signed T valueOf(String) + * and class must to have constructor without parameters, or an constructor + * T(String) . If typeReturn is a Message class, the return is + * message object. The message response must contains a param called + * paramNameResult, the value of this is used for create result + * + * @param Type return + * @param message Message + * @param typeReturn Type return + * @return Response + */ + public T sendMessageMultiCast(Message message, Class typeReturn) { + + /* + * Created WaitingResult for message sequence number to send + */ + WaitingResult waitingResult = returnsManager + .createWaitingResult(message.getSequenceNumber(), Long + .parseLong(communicationProperties.get(RESPONSE_TIME))); + + sendMessageMultiCast(message); + + /* + * Waiting for response + */ + Message messageResponse = waitingResult.getResult(); + + /* + * Waiting for response + */ + return processResponse(messageResponse, typeReturn, null); + } + + /** + * Creates and sends a multicast message specifying its type, the type of + * the response and the data. Used + * sendMessageMultiCast(message) to send message. The + * typeReturn must to have a method by signed T valueOf(String) + * and class must to have constructor without parameters, or an constructor + * T(String) . If typeReturn is a Message class, the return is + * message object. If message response contains several params, you must set + * typeReturn like Message + * + * @param Type return + * @param message Message + * @param typeReturn Type return + * @param paramNameResult Param name of result + * @return Response + */ + public T sendMessageMultiCast(Message message, Class typeReturn, + String paramNameResult) { + + /* + * Created WaitingResult for message sequence number to send + */ + WaitingResult waitingResult = returnsManager + .createWaitingResult(message.getSequenceNumber(), Long + .parseLong(communicationProperties.get(RESPONSE_TIME))); + + sendMessageMultiCast(message); + + /* + * Waiting for response + */ + Message messageResponse = waitingResult.getResult(); + + /* + * Waiting for response + */ + return processResponse(messageResponse, typeReturn, paramNameResult); + } + + /** + * Sends a multicast message specifying its type, the type of the response + * and the data. Used Communicator instance called multicastManager to send + * message + * + * @param message Messages to send + */ + public void sendMessageMultiCast(Message message) { + multicastManager.send(message); + } + + /** + * Stop all process + */ + public void stopAll() { + messagesReciever.close(); + multicastManager.close(); + multicastManager.close(); + unicastManager.close(); + } + + /** + * Release return in wait. + * + * @param message Message + * @return True if return is release + */ + public boolean release(Message message) { + if (message.getSendType().equals(Message.SendType.RESPONSE)) { + returnsManager.releaseWaitingResult(message.getSequenceNumber(), + message); + + return true; + } else { + return false; + } + } + + /** + * Convert message to return type + * + * @param Return type + * @param message Message + * @param type Return type + * @return Return + */ + @SuppressWarnings("unchecked") + private T processResponse(Message message, Class type, + String paramNameResult) { + + T typeInstance = null; + + if (message == null) { + return null; + } + + if (type.equals(Message.class)) { + return (T) message; + } + + if (type.isInterface() || type.isAnnotation() || type.isArray()) { + throw new IllegalArgumentException("The type must a class (" + + type.getName() + ")"); + } + + Set params = message.getParamsKey(); + + String paramValue; + + if (paramNameResult == null) { + + if (params.size() != 1) { + throw new IllegalArgumentException( + "The message contains more than one parameter, you can not convert to " + + type.getName()); + } + + String paramName = (String) params.toArray()[0]; + + if (paramName == null || paramName.isEmpty()) { + throw new IllegalArgumentException( + "The message contains a param name null or empty"); + } + + paramValue = message.getParam(paramName); + } else { + + paramValue = message.getParam(paramNameResult); + } + + if (paramValue == null || paramValue.isEmpty()) { + return null; + } + + try { + Method valueOf = type + .getMethod("valueOf", String.class); + + typeInstance = (T) valueOf.invoke(null, paramValue); + } catch (Exception e) { + try { + + Constructor constructorString = type + .getDeclaredConstructor(String.class); + + typeInstance = constructorString.newInstance(paramValue); + } catch (Exception e1) { + throw new IllegalArgumentException( + "The method valueOf(String) not must to be invoked in class " + + type.getName(), e1); + } + } + + return typeInstance; + } + + /** + * Adds observer to communication + * + * @param observer Observer to add + */ + public void addObserver(Observer observer) { + observableCommunication.addObserver(observer); + } + + /** + * Remove observer to communication + * + * @param observer Observer to remove + */ + public void removeObserver(Observer observer) { + observableCommunication.removeObserver(observer); + } + + /** + * Remove observer by name + * + * @param name Observer name + */ + public void removeObserver(String name) { + observableCommunication.removeObserver(name); + } + + /** + * Gets properties of communication + * + * @return CommunicationProperties + */ + public Map getCommunicationProperties() { + return communicationProperties; + } + + /** + * Sets properties for communication + * + * @param communicationProperties Properties + */ + public void setCommunicationProperties( + Map communicationProperties) { + this.communicationProperties = communicationProperties; + } + + @Override + public void addMessageProcessor(String name, MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } + + @Override + public void removeMessageProcessor(String name) { + this.messageProcessor = null; + } + + public MessageProcessor getMessageProcessor() { + return messageProcessor; + } + + + /* + * (non-Javadoc) + * + * @seeco.edu.uniquindio.utils.communication.transfer. + * CommunicationManagerWaitingResult#init () + */ + public void init() { + this.unicastManager = createUnicastManager(); + this.multicastManager = createMulticastManager(); + this.messagesReciever = new MessagesReceiver(multicastManager, + unicastManager, unicastBigManager, this, this); + this.observableCommunication = messagesReciever; + } + + /** + * Creates a Communicator for multicast manager. Are required of params in + * communication properties: PORT_MULTICAST, IP_MULTICAST and + * BUFFER_SIZE_MULTICAST + */ + protected Communicator createMulticastManager() { + + int portMulticast; + String ipMulticast; + long bufferSize; + + if (communicationProperties + .containsKey(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.PORT_MULTICAST + .name())) { + portMulticast = Integer + .parseInt(communicationProperties + .get(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.PORT_MULTICAST + .name())); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property PORT_MULTICAST not found"); + + logger.error("Property PORT_MULTICAST no found", + illegalArgumentException); + + throw illegalArgumentException; + } + + if (communicationProperties + .containsKey(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.IP_MULTICAST + .name())) { + ipMulticast = communicationProperties + .get(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.IP_MULTICAST + .name()); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property IP_MULTICAST not found"); + + logger.error("Property IP_MULTICAST no found", + illegalArgumentException); + + throw illegalArgumentException; + } + + if (communicationProperties + .containsKey(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST + .name())) { + bufferSize = Long + .parseLong(communicationProperties + .get(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST + .name())); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property BUFFER_SIZE_MULTICAST not found"); + + logger.error("Property BUFFER_SIZE_MULTICAST no found", + illegalArgumentException); + + throw illegalArgumentException; + } + + try { + multicastManager = new MulticastManagerUDP(portMulticast, + InetAddress.getByName(ipMulticast), bufferSize, messageSerialization); + } catch (UnknownHostException e) { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Error of ipmulticast", e); + + logger.error("Error of ipmulticast", illegalArgumentException); + + throw illegalArgumentException; + } + + return multicastManager; + } } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MessagesReciever.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MessagesReciever.java deleted file mode 100644 index 356c89b..0000000 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MessagesReciever.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Communication project implement communication point to point and multicast - * Copyright (C) 2010 Daniel Pelaez, Daniel Lopez, Hector Hurtado - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package co.edu.uniquindio.utils.communication.transfer.network; - -import co.edu.uniquindio.utils.communication.Observable; -import co.edu.uniquindio.utils.communication.message.Message; -import co.edu.uniquindio.utils.communication.transfer.*; - -/** - * The MessagesReciever class is an - * Observable and Stoppable. Implemented all - * for to reciever messages from three diferent sources - * - * @author Daniel Pelaez - * @version 1.0, 17/06/2010 - * @since 1.0 - * - */ -public class MessagesReciever extends Observable implements Stoppable { - /** - * Running recieving - */ - private boolean run; - /** - * Reciever thread for unicast - */ - private RecieverThreadMessage recieverUnicastMessage; - /** - * Reciever thread for multicast - */ - private RecieverThreadMessage recieverMulticastMessage; - /** - * Reciever thread for transfer bytes - */ - private RecieverThreadMessage recieverTransferBytesMessage; - - /** - * Responder message manager - */ - private Responder responder; - private final CommunicationManagerWaitingResult communicationManager; - - /** - * Builds a MessagesReciever. Creates three - * RecieverThreadMessage and starts - * @param multicastManager - * Communicator for multicast - * @param unicastManager - * Communicator for unicast - * @param transferBytesManager - * @param communicationManager - */ - public MessagesReciever(Communicator multicastManager, - Communicator unicastManager, Communicator transferBytesManager, - Responder responder, CommunicationManagerWaitingResult communicationManager) { - super(); - this.communicationManager = communicationManager; - this.run = true; - - if (unicastManager != null) { - recieverUnicastMessage = new RecieverThreadMessage(unicastManager); - recieverUnicastMessage.receptionStart(); - } - - if (multicastManager != null) { - recieverMulticastMessage = new RecieverThreadMessage( - multicastManager); - recieverMulticastMessage.receptionStart(); - } - - if (transferBytesManager != null) { - recieverTransferBytesMessage = new RecieverThreadMessage( - transferBytesManager); - recieverTransferBytesMessage.receptionStart(); - } - - this.responder = responder; - } - - /** - * Stopped - */ - public void stop() { - run = false; - } - - /** - * Creates a ReceiverMessageCommand. - * - * @param message - * Message to notify - */ - private void createRecieverThread(Message message) { - - ReceiverMessageCommand receiverMessageCommand; - receiverMessageCommand = new ReceiverMessageCommand(message, this, communicationManager); - - receiverMessageCommand.execute(); - } - - /** - * The RecieverThreadMessage class reads message from a - * Communicator and sending to destination. - * - * @author Daniel Pelaez - * @version 1.0, 17/06/2010 - * @since 1.0 - * - */ - class RecieverThreadMessage implements Runnable { - /** - * Communicator - */ - private Communicator communicator; - - /** - * Builds an RecieverThreadMessage - * - * @param communicator - * Communicator - */ - RecieverThreadMessage(Communicator communicator) { - this.communicator = communicator; - } - - /** - * Start reception - */ - public void receptionStart() { - Thread thread = new Thread(this); - thread.start(); - } - - /** - * Read message an invoke to createRecieverThread(message) - */ - public void run() { - Message message; - while (run) { - message = communicator.reciever(); - - if (!responder.releaseResponse(message)) { - - createRecieverThread(message); - } - } - } - } - -} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerNetworkLAN.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java similarity index 89% rename from communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerNetworkLAN.java rename to communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java index f9e08cd..7acf172 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerNetworkLAN.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java @@ -28,20 +28,20 @@ import java.net.MulticastSocket; /** - * The MulticastManagerNetworkLAN class implemented the transfer + * The MulticastManagerUDP class implemented the transfer * messages on multicast UDP * * @author Daniel Pelaez * @version 1.0, 17/06/2010 * @since 1.0 */ -public class MulticastManagerNetworkLAN implements Communicator { +public class MulticastManagerUDP implements Communicator { /** * Logger */ private static final Logger logger = Logger - .getLogger(MulticastManagerNetworkLAN.class); + .getLogger(MulticastManagerUDP.class); /** * Is the size of the buffer used for receiving messages. @@ -72,14 +72,14 @@ public class MulticastManagerNetworkLAN implements Communicator { private final MessageSerialization messageSerialization; /** - * Builds a MulticastManagerNetworkLAN and started multicast socket + * Builds a MulticastManagerUDP and started multicast socket * @param portMulticast Port multicast * @param group Internet address multicast * @param bufferSize Buffer size for to reader * @param messageSerialization */ - public MulticastManagerNetworkLAN(int portMulticast, InetAddress group, - long bufferSize, MessageSerialization messageSerialization) { + public MulticastManagerUDP(int portMulticast, InetAddress group, + long bufferSize, MessageSerialization messageSerialization) { this.messageSerialization = messageSerialization; try { @@ -101,9 +101,9 @@ public MulticastManagerNetworkLAN(int portMulticast, InetAddress group, * (non-Javadoc) * * @see - * co.edu.uniquindio.utils.communication.transfer.Communicator#reciever() + * co.edu.uniquindio.utils.communication.transfer.Communicator#receiver() */ - public Message reciever() { + public Message receiver() { DatagramPacket datagramPacket; String string; Message message; @@ -168,9 +168,9 @@ public void setBufferSize(long bufferSize) { /* * (non-Javadoc) * - * @see co.edu.uniquindio.utils.communication.transfer.Stoppable#stop() + * @see co.edu.uniquindio.utils.communication.transfer.Stoppable#close() */ - public void stop() { + public void close() { multicastSocket.close(); } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastBigManagerTCP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastBigManagerTCP.java deleted file mode 100644 index 49e995a..0000000 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastBigManagerTCP.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Communication project implement communication point to point and multicast - * Copyright (C) 2010 Daniel Pelaez, Daniel Lopez, Hector Hurtado - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package co.edu.uniquindio.utils.communication.transfer.network; - -import co.edu.uniquindio.utils.communication.message.Message; -import co.edu.uniquindio.utils.communication.transfer.Communicator; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.UnknownHostException; - -/** - * The UnicastBigManagerTCP class is an - * Communicator. Implemented the communication for network on TCP - * protocol. - * - * @author Daniel Pelaez - * @version 1.0, 17/06/2010 - * @since 1.0 - */ -public class UnicastBigManagerTCP implements Communicator { - - /** - * Logger - */ - private static final Logger logger = Logger - .getLogger(UnicastBigManagerTCP.class); - /** - * The server socket that will be waiting for connection. - */ - private ServerSocket serverSocket; - - /** - * The value of the port used to create the socket. - */ - private int portTcp; - - private final MessageSerialization messageSerialization; - - public UnicastBigManagerTCP(int portTcp, MessageSerialization messageSerialization) { - this.portTcp = portTcp; - this.messageSerialization = messageSerialization; - - try { - this.serverSocket = new ServerSocket(portTcp); - } catch (IOException e) { - logger.error("The communication socket could not be created", e); - } - } - - /* - * (non-Javadoc) - * - * @see - * co.edu.uniquindio.utils.communication.transfer.Communicator#reciever() - */ - public Message reciever() { - String stringMessage; - Message message = null; - ObjectInputStream objectInputStream; - - try (Socket socket = serverSocket.accept()) { - objectInputStream = new ObjectInputStream(socket.getInputStream()); - stringMessage = (String) objectInputStream.readObject(); - - message = messageSerialization.decode(stringMessage); - - } catch (IOException e) { - logger.error("Error reading socket", e); - } catch (ClassNotFoundException e) { - logger.error("Error reading socket", e); - } - - return message; - } - - /* - * (non-Javadoc) - * - * @see - * co.edu.uniquindio.utils.communication.transfer.Communicator#send(co.edu - * .uniquindio.utils.communication.message.Message) - */ - public void send(Message message) { - - Socket socket = null; - try { - socket = new Socket(message.getAddress().getDestination(), portTcp); - - ObjectOutputStream objectOutputStream = new ObjectOutputStream( - socket.getOutputStream()); - objectOutputStream.writeObject(messageSerialization.encode(message)); - objectOutputStream.flush(); - - } catch (UnknownHostException e) { - logger.error("Error sending message", e); - } catch (IOException e) { - logger.error("Error sending message", e); - } finally { - try { - socket.close(); - } catch (IOException e) { - logger.error("Error closed socket", e); - } - } - - } - - /** - * Gets port TCP - * - * @return Port TCP - */ - public int getPortTcp() { - return portTcp; - } - - /** - * Sets port TCP - * - * @param portTcp Port TCP - */ - public void setPortTcp(int portTcp) { - this.portTcp = portTcp; - } - - /* - * (non-Javadoc) - * - * @see co.edu.uniquindio.utils.communication.transfer.Stoppable#stop() - */ - public void stop() { - try { - serverSocket.close(); - } catch (IOException e) { - logger.error("Error closed socked", e); - } - } - -} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java index 6379bb0..9196dc5 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java @@ -77,9 +77,9 @@ public UnicastManagerTCP(int portTcp, MessageSerialization messageSerialization) * (non-Javadoc) * * @see - * co.edu.uniquindio.utils.communication.transfer.Communicator#reciever() + * co.edu.uniquindio.utils.communication.transfer.Communicator#receiver() */ - public Message reciever() { + public Message receiver() { String stringMessage; Message message = null; ObjectInputStream objectInputStream; @@ -143,9 +143,9 @@ public void setPortTcp(int portTcp) { /* * (non-Javadoc) * - * @see co.edu.uniquindio.utils.communication.transfer.Stoppable#stop() + * @see co.edu.uniquindio.utils.communication.transfer.Stoppable#close() */ - public void stop() { + public void close() { try { serverSocket.close(); } catch (IOException e) { diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGateway.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGateway.java new file mode 100644 index 0000000..3c7d809 --- /dev/null +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGateway.java @@ -0,0 +1,33 @@ +package co.edu.uniquindio.utils.communication.transfer.response; + +import co.edu.uniquindio.utils.communication.Observable; +import co.edu.uniquindio.utils.communication.message.Message; +import co.edu.uniquindio.utils.communication.transfer.CommunicationManager; +import co.edu.uniquindio.utils.communication.transfer.MessageProcessor; + +public class MessageProcessorGateway implements MessageProcessor { + private final ResponseReleaser responseReleaser; + private final CommunicationManager communicationManager; + private final Observable observable; + + public MessageProcessorGateway(ResponseReleaser responseReleaser, CommunicationManager communicationManager, Observable observable) { + this.responseReleaser = responseReleaser; + this.communicationManager = communicationManager; + this.observable = observable; + } + + @Override + public Message process(Message message) { + if (!responseReleaser.release(message)) { + Message response = null; + if (communicationManager.getMessageProcessor() != null) { + response = communicationManager.getMessageProcessor().process(message); + } + if (response != null) { + communicationManager.sendMessageUnicast(response); + observable.notifyMessage(message); + } + } + return null; + } +} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGatewayAsynchronous.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGatewayAsynchronous.java new file mode 100644 index 0000000..75c57b5 --- /dev/null +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGatewayAsynchronous.java @@ -0,0 +1,31 @@ +package co.edu.uniquindio.utils.communication.transfer.response; + +import co.edu.uniquindio.utils.communication.message.Message; +import co.edu.uniquindio.utils.communication.transfer.MessageProcessor; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +public class MessageProcessorGatewayAsynchronous implements MessageProcessor, Closeable { + private final MessageProcessor messageProcessor; + private final ExecutorService executorService; + + public MessageProcessorGatewayAsynchronous(MessageProcessor messageProcessor, ExecutorService executorService) { + this.messageProcessor = messageProcessor; + this.executorService = executorService; + } + + @Override + public Message process(Message message) { + if (!executorService.isShutdown()) { + executorService.submit(() -> messageProcessor.process(message)); + } + return null; + } + + @Override + public void close() throws IOException { + executorService.shutdown(); + } +} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Stoppable.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessagesReceiver.java similarity index 50% rename from communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Stoppable.java rename to communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessagesReceiver.java index 5b975d1..01dfd0f 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Stoppable.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessagesReceiver.java @@ -16,21 +16,33 @@ * along with this program. If not, see . */ -package co.edu.uniquindio.utils.communication.transfer; - -/** - * The Stoppable interface hava all services to objects that is - * must to stop - * - * @author Daniel Pelaez - * @version 1.0, 17/06/2010 - * @since 1.0 - * - */ -public interface Stoppable { +package co.edu.uniquindio.utils.communication.transfer.response; + +import co.edu.uniquindio.utils.communication.transfer.Communicator; +import co.edu.uniquindio.utils.communication.transfer.MessageProcessor; + +import java.io.Closeable; + +public class MessagesReceiver implements Closeable, Runnable { + private boolean run; + private Communicator communicator; + private MessageProcessor messageProcessor; + + public MessagesReceiver(Communicator communicator, + MessageProcessor messageProcessor) { + this.run = true; + this.communicator = communicator; + this.messageProcessor = messageProcessor; + } + + public void close() { + run = false; + } + + public void run() { + while (run) { + messageProcessor.process(communicator.receiver()); + } + } - /** - * Stop all - */ - public void stop(); } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/ReceiverMessageCommand.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReceiverMessageCommand.java similarity index 88% rename from communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/ReceiverMessageCommand.java rename to communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReceiverMessageCommand.java index d753c56..8ac77f6 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/ReceiverMessageCommand.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReceiverMessageCommand.java @@ -16,11 +16,11 @@ * along with this program. If not, see . */ -package co.edu.uniquindio.utils.communication.transfer; +package co.edu.uniquindio.utils.communication.transfer.response; -import co.edu.uniquindio.utils.command.ThreadCommand; import co.edu.uniquindio.utils.communication.Observable; import co.edu.uniquindio.utils.communication.message.Message; +import co.edu.uniquindio.utils.communication.transfer.CommunicationManager; /** * The {@code ReceiverMessageCommand} class, is a Command implementation, that is @@ -31,10 +31,9 @@ * @author Hector Hurtado * @author Daniel Lopez * @version 1.0, 17/06/2010 - * @see ThreadCommand * @since 1.0 */ -public class ReceiverMessageCommand extends ThreadCommand { +public class ReceiverMessageCommand implements Runnable { /** * The message that has arrived */ @@ -45,7 +44,7 @@ public class ReceiverMessageCommand extends ThreadCommand { */ private Observable observable; - private final CommunicationManagerWaitingResult communicationManager; + private final CommunicationManager communicationManager; /** * The constructor of the class. This creates a {@code @@ -56,7 +55,7 @@ public class ReceiverMessageCommand extends ThreadCommand { * @param communicationManager */ public ReceiverMessageCommand(Message message, - Observable observable, CommunicationManagerWaitingResult communicationManager) { + Observable observable, CommunicationManager communicationManager) { this.message = message; this.observable = observable; this.communicationManager = communicationManager; diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Responder.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ResponseReleaser.java similarity index 83% rename from communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Responder.java rename to communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ResponseReleaser.java index bb5abfb..7107cd1 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Responder.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ResponseReleaser.java @@ -16,17 +16,17 @@ * along with this program. If not, see . */ -package co.edu.uniquindio.utils.communication.transfer; +package co.edu.uniquindio.utils.communication.transfer.response; import co.edu.uniquindio.utils.communication.message.Message; /** - * The Responder interface contains the methods for release response + * The ResponseReleaser interface contains the methods for release response * * @author Daniel Pelaez * */ -public interface Responder { +public interface ResponseReleaser { /** * Release response in wait @@ -35,5 +35,5 @@ public interface Responder { * Message to response * @return True if response was release */ - public boolean releaseResponse(Message message); + boolean release(Message message); } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/ReturnsManager.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManager.java similarity index 95% rename from communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/ReturnsManager.java rename to communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManager.java index cc3f241..9bdd88e 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/ReturnsManager.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManager.java @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -package co.edu.uniquindio.utils.communication.transfer; +package co.edu.uniquindio.utils.communication.transfer.response; /** * The ReturnsManager interface have all services for waiting diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/ReturnsManagerCommunication.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManagerCommunication.java similarity index 93% rename from communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/ReturnsManagerCommunication.java rename to communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManagerCommunication.java index 56d0e75..0340aaf 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/ReturnsManagerCommunication.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManagerCommunication.java @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -package co.edu.uniquindio.utils.communication.transfer; +package co.edu.uniquindio.utils.communication.transfer.response; import java.util.Collections; import java.util.HashMap; @@ -33,7 +33,7 @@ * @since 1.0 * @see WaitingResult */ -class ReturnsManagerCommunication implements ReturnsManager { +public class ReturnsManagerCommunication implements ReturnsManager { /** * The map of the WaitingResult references with a specific sequence number */ @@ -43,7 +43,7 @@ class ReturnsManagerCommunication implements ReturnsManager { * Create a synchronized instance of the class HashMap */ - ReturnsManagerCommunication() { + public ReturnsManagerCommunication() { results = Collections .synchronizedMap(new HashMap>()); } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/WaitingResult.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java similarity index 96% rename from communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/WaitingResult.java rename to communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java index 936fd64..74613e8 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/WaitingResult.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -package co.edu.uniquindio.utils.communication.transfer; +package co.edu.uniquindio.utils.communication.transfer.response; import org.apache.log4j.Logger; @@ -97,7 +97,7 @@ class WaitingResult extends Thread { } /** - * Executes the thread while stop==false and + * Executes the thread while close==false and * timeOut!=0. This is, while the response for the message has * not arrived or while the specific amount of time has not finished. If * timeOut is -1, waiting until that the response arribe @@ -149,7 +149,7 @@ public T getResult() { try { semaphore.acquire(); } catch (InterruptedException e) { - logger.error("Error to stop semaphore", e); + logger.error("Error to close semaphore", e); } return result; From 8e45a3e18b1d2c0b212ef4e63d86edd14fd49abb Mon Sep 17 00:00:00 2001 From: daniel Date: Mon, 20 Aug 2018 15:01:26 -0500 Subject: [PATCH 2/7] Using di to handle the dependencies --- .../communication/transfer/Communicator.java | 35 +- .../network/CommunicationManagerTCP.java | 335 +++--------------- .../CommunicationManagerTCPFactory.java | 33 +- .../transfer/network/MulticastManagerUDP.java | 121 +++++-- .../transfer/network/UnicastManagerTCP.java | 65 ++-- .../response/MessageProcessorGateway.java | 25 +- .../response/MessageResponseProcessor.java | 77 ++++ .../response/ReceiverMessageCommand.java | 77 ---- .../transfer/response/ResponseReleaser.java | 39 -- .../transfer/response/WaitingResult.java | 2 +- .../build.gradle | 4 +- .../SocketCommunicationAutoConfiguration.java | 10 +- 12 files changed, 321 insertions(+), 502 deletions(-) create mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageResponseProcessor.java delete mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReceiverMessageCommand.java delete mode 100644 communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ResponseReleaser.java diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Communicator.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Communicator.java index 289daf0..68812b6 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Communicator.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/Communicator.java @@ -20,29 +20,32 @@ import co.edu.uniquindio.utils.communication.message.Message; +import java.io.Closeable; +import java.util.Map; + /** * The Communicator interface hava all services for to send and to * receiver messages - * + * * @author Daniel Pelaez * @version 1.0, 17/06/2010 * @since 1.0 - * */ -public interface Communicator extends Stoppable { +public interface Communicator extends Closeable { + + /** + * Send message + * + * @param message Message to send + */ + void send(Message message); - /** - * Send message - * - * @param message - * Message to send - */ - public void send(Message message); + /** + * Reciever message + * + * @return Message + */ + Message receiver(); - /** - * Reciever message - * - * @return Message - */ - public Message receiver(); + void start(Map properties); } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCP.java index c03c015..7e19ccd 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCP.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCP.java @@ -21,19 +21,18 @@ import co.edu.uniquindio.utils.communication.Observable; import co.edu.uniquindio.utils.communication.Observer; import co.edu.uniquindio.utils.communication.message.Message; -import co.edu.uniquindio.utils.communication.transfer.*; +import co.edu.uniquindio.utils.communication.transfer.CommunicationManager; +import co.edu.uniquindio.utils.communication.transfer.Communicator; +import co.edu.uniquindio.utils.communication.transfer.MessageProcessor; +import co.edu.uniquindio.utils.communication.transfer.response.MessageResponseProcessor; import co.edu.uniquindio.utils.communication.transfer.response.MessagesReceiver; -import co.edu.uniquindio.utils.communication.transfer.response.ResponseReleaser; import co.edu.uniquindio.utils.communication.transfer.response.ReturnsManager; -import co.edu.uniquindio.utils.communication.transfer.response.ReturnsManagerCommunication; +import co.edu.uniquindio.utils.communication.transfer.response.WaitingResult; import org.apache.log4j.Logger; -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.net.UnknownHostException; +import java.io.IOException; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ExecutorService; /** * The CommunicationManagerTCP class is an @@ -48,99 +47,34 @@ * @since 1.0 */ public class CommunicationManagerTCP implements - CommunicationManager, ResponseReleaser { + CommunicationManager { - /** - * The CommunicationManagerTCPProperties enum contains params - * required for communication - * - * @author dpelaez - */ - public enum CommunicationManagerTCPProperties { - PORT_TCP_RESOURCE, PORT_TCP - } - - /** - * Properties for configuration CommunicationManagerNetworkLAN - * - * @author Daniel Pelaez - * @author Hector Hurtado - * @author Daniel Lopez - * @version 1.0, 17/06/2010 - * @since 1.0 - */ - public enum CommunicationManagerNetworkLANProperties { - BUFFER_SIZE_MULTICAST, IP_MULTICAST, PORT_MULTICAST - } private static final Logger logger = Logger .getLogger(CommunicationManagerTCP.class); private static final String RESPONSE_TIME = "RESPONSE_TIME"; - private final MessageSerialization messageSerialization; + private final Communicator unicastManager; + private final MessagesReceiver unicastMessagesReciever; + private final Communicator multicastManager; + private final MessagesReceiver multicastMessagesReciever; + private final MessageResponseProcessor messageResponseProcessor; private final Observable observableCommunication; - private Communicator multicastManager; - private Communicator unicastManager; - private Map communicationProperties; - private ReturnsManager returnsManager; + private final ReturnsManager returnsManager; + private final ExecutorService messagesReceiverExecutor; private MessageProcessor messageProcessor; - private MessagesReceiver messagesReciever; - - public CommunicationManagerTCP(MessageSerialization messageSerialization) { - this.returnsManager = new ReturnsManagerCommunication(); - this.observableCommunication = new Observable(); - this.messageSerialization = messageSerialization; - } - - /** - * Creates a BytesTransferManagerTCP instance. Required param in - * CommunicationProperties called PORT_TCP_RESOURCE - */ - protected Communicator createUnicastBigManager() { - int portTcp; - if (communicationProperties - .containsKey(CommunicationManagerTCPProperties.PORT_TCP_RESOURCE - .name())) { - portTcp = Integer.parseInt(communicationProperties - .get(CommunicationManagerTCPProperties.PORT_TCP_RESOURCE - .name())); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property PORT_TCP_RESOURCE not found"); - - logger.error("Property PORT_TCP_RESOURCE not found", - illegalArgumentException); - - throw illegalArgumentException; - } - unicastBigManager = new UnicastBigManagerTCP(portTcp, messageSerialization); - - return unicastBigManager; - } - - /** - * Creates a UnicastManagerTCP instance. Required param in - * CommunicationProperties called PORT_TCP. - */ - protected Communicator createUnicastManager() { - int portTcp; - if (communicationProperties - .containsKey(CommunicationManagerTCPProperties.PORT_TCP.name())) { - portTcp = Integer.parseInt(communicationProperties - .get(CommunicationManagerTCPProperties.PORT_TCP.name())); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property PORT_TCP not found"); - - logger.error("Property PORT_TCP not found", - illegalArgumentException); - - throw illegalArgumentException; - } - unicastManager = new UnicastManagerTCP(portTcp, messageSerialization); + private Map communicationProperties; - return unicastManager; + public CommunicationManagerTCP(Communicator unicastManager, MessagesReceiver unicastMessagesReciever, Communicator multicastManager, MessagesReceiver multicastMessagesReciever, MessageResponseProcessor messageResponseProcessor, Observable observableCommunication, ReturnsManager returnsManager, ExecutorService messagesReceiverExecutor) { + this.unicastManager = unicastManager; + this.unicastMessagesReciever = unicastMessagesReciever; + this.multicastManager = multicastManager; + this.multicastMessagesReciever = multicastMessagesReciever; + this.messageResponseProcessor = messageResponseProcessor; + this.observableCommunication = observableCommunication; + this.returnsManager = returnsManager; + this.messagesReceiverExecutor = messagesReceiverExecutor; } @@ -176,7 +110,7 @@ public T sendMessageUnicast(Message message, Class typeReturn) { /* * Waiting for response */ - return processResponse(messageResponse, typeReturn, null); + return messageResponseProcessor.process(messageResponse, typeReturn, null); } /** @@ -213,7 +147,7 @@ public T sendMessageUnicast(Message message, Class typeReturn, /* * Waiting for response */ - return processResponse(messageResponse, typeReturn, paramNameResult); + return messageResponseProcessor.process(messageResponse, typeReturn, paramNameResult); } /** @@ -260,7 +194,7 @@ public T sendMessageMultiCast(Message message, Class typeReturn) { /* * Waiting for response */ - return processResponse(messageResponse, typeReturn, null); + return messageResponseProcessor.process(messageResponse, typeReturn, null); } /** @@ -299,7 +233,7 @@ public T sendMessageMultiCast(Message message, Class typeReturn, /* * Waiting for response */ - return processResponse(messageResponse, typeReturn, paramNameResult); + return messageResponseProcessor.process(messageResponse, typeReturn, paramNameResult); } /** @@ -317,105 +251,15 @@ public void sendMessageMultiCast(Message message) { * Stop all process */ public void stopAll() { - messagesReciever.close(); - multicastManager.close(); - multicastManager.close(); - unicastManager.close(); - } - - /** - * Release return in wait. - * - * @param message Message - * @return True if return is release - */ - public boolean release(Message message) { - if (message.getSendType().equals(Message.SendType.RESPONSE)) { - returnsManager.releaseWaitingResult(message.getSequenceNumber(), - message); - - return true; - } else { - return false; - } - } - - /** - * Convert message to return type - * - * @param Return type - * @param message Message - * @param type Return type - * @return Return - */ - @SuppressWarnings("unchecked") - private T processResponse(Message message, Class type, - String paramNameResult) { - - T typeInstance = null; - - if (message == null) { - return null; - } - - if (type.equals(Message.class)) { - return (T) message; - } - - if (type.isInterface() || type.isAnnotation() || type.isArray()) { - throw new IllegalArgumentException("The type must a class (" - + type.getName() + ")"); - } - - Set params = message.getParamsKey(); - - String paramValue; - - if (paramNameResult == null) { - - if (params.size() != 1) { - throw new IllegalArgumentException( - "The message contains more than one parameter, you can not convert to " - + type.getName()); - } - - String paramName = (String) params.toArray()[0]; - - if (paramName == null || paramName.isEmpty()) { - throw new IllegalArgumentException( - "The message contains a param name null or empty"); - } - - paramValue = message.getParam(paramName); - } else { - - paramValue = message.getParam(paramNameResult); - } - - if (paramValue == null || paramValue.isEmpty()) { - return null; - } - try { - Method valueOf = type - .getMethod("valueOf", String.class); - - typeInstance = (T) valueOf.invoke(null, paramValue); - } catch (Exception e) { - try { - - Constructor constructorString = type - .getDeclaredConstructor(String.class); - - typeInstance = constructorString.newInstance(paramValue); - } catch (Exception e1) { - throw new IllegalArgumentException( - "The method valueOf(String) not must to be invoked in class " - + type.getName(), e1); - } + multicastManager.close(); + multicastMessagesReciever.close(); + unicastManager.close(); + unicastMessagesReciever.close(); + messagesReceiverExecutor.shutdown(); + } catch (IOException e) { + throw new IllegalStateException("Problem stopping communication", e); } - - return typeInstance; } /** @@ -445,25 +289,6 @@ public void removeObserver(String name) { observableCommunication.removeObserver(name); } - /** - * Gets properties of communication - * - * @return CommunicationProperties - */ - public Map getCommunicationProperties() { - return communicationProperties; - } - - /** - * Sets properties for communication - * - * @param communicationProperties Properties - */ - public void setCommunicationProperties( - Map communicationProperties) { - this.communicationProperties = communicationProperties; - } - @Override public void addMessageProcessor(String name, MessageProcessor messageProcessor) { this.messageProcessor = messageProcessor; @@ -486,87 +311,15 @@ public MessageProcessor getMessageProcessor() { * CommunicationManagerWaitingResult#init () */ public void init() { - this.unicastManager = createUnicastManager(); - this.multicastManager = createMulticastManager(); - this.messagesReciever = new MessagesReceiver(multicastManager, - unicastManager, unicastBigManager, this, this); - this.observableCommunication = messagesReciever; - } - - /** - * Creates a Communicator for multicast manager. Are required of params in - * communication properties: PORT_MULTICAST, IP_MULTICAST and - * BUFFER_SIZE_MULTICAST - */ - protected Communicator createMulticastManager() { - - int portMulticast; - String ipMulticast; - long bufferSize; - - if (communicationProperties - .containsKey(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.PORT_MULTICAST - .name())) { - portMulticast = Integer - .parseInt(communicationProperties - .get(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.PORT_MULTICAST - .name())); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property PORT_MULTICAST not found"); - - logger.error("Property PORT_MULTICAST no found", - illegalArgumentException); - - throw illegalArgumentException; - } - - if (communicationProperties - .containsKey(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.IP_MULTICAST - .name())) { - ipMulticast = communicationProperties - .get(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.IP_MULTICAST - .name()); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property IP_MULTICAST not found"); - - logger.error("Property IP_MULTICAST no found", - illegalArgumentException); - - throw illegalArgumentException; - } - - if (communicationProperties - .containsKey(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST - .name())) { - bufferSize = Long - .parseLong(communicationProperties - .get(CommunicationManagerNetworkLAN.CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST - .name())); - } else { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Property BUFFER_SIZE_MULTICAST not found"); - - logger.error("Property BUFFER_SIZE_MULTICAST no found", - illegalArgumentException); - - throw illegalArgumentException; - } - - try { - multicastManager = new MulticastManagerUDP(portMulticast, - InetAddress.getByName(ipMulticast), bufferSize, messageSerialization); - } catch (UnknownHostException e) { - IllegalArgumentException illegalArgumentException = new IllegalArgumentException( - "Error of ipmulticast", e); + this.unicastManager.start(communicationProperties); + this.multicastManager.start(communicationProperties); - logger.error("Error of ipmulticast", illegalArgumentException); - - throw illegalArgumentException; - } - - return multicastManager; + messagesReceiverExecutor.execute(unicastMessagesReciever); + messagesReceiverExecutor.execute(multicastMessagesReciever); } + public void init(Map communicationProperties){ + this.communicationProperties = communicationProperties; + init(); + } } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCPFactory.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCPFactory.java index 18013bb..0e707a1 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCPFactory.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCPFactory.java @@ -1,19 +1,28 @@ package co.edu.uniquindio.utils.communication.transfer.network; +import co.edu.uniquindio.utils.communication.Observable; +import co.edu.uniquindio.utils.communication.message.Message; import co.edu.uniquindio.utils.communication.transfer.CommunicationManager; import co.edu.uniquindio.utils.communication.transfer.CommunicationManagerFactory; +import co.edu.uniquindio.utils.communication.transfer.Communicator; +import co.edu.uniquindio.utils.communication.transfer.MessageProcessor; +import co.edu.uniquindio.utils.communication.transfer.response.*; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class CommunicationManagerTCPFactory implements CommunicationManagerFactory { private final MessageSerialization messageSerialization; + private final MessageResponseProcessor messageResponseProcessor; private final Map> instancesProperties; private final Map defaultProperties; - public CommunicationManagerTCPFactory(MessageSerialization messageSerialization, Map> instancesProperties) { + public CommunicationManagerTCPFactory(MessageSerialization messageSerialization, MessageResponseProcessor messageResponseProcessor, Map> instancesProperties) { this.messageSerialization = messageSerialization; + this.messageResponseProcessor = messageResponseProcessor; this.instancesProperties = instancesProperties; this.defaultProperties = new HashMap<>(); @@ -29,11 +38,27 @@ public CommunicationManagerTCPFactory(MessageSerialization messageSerialization, @Override public CommunicationManager newCommunicationManager(String name) { - CommunicationManagerTCP communication = new CommunicationManagerTCP(messageSerialization); + Map communicationProperties = Optional.ofNullable(instancesProperties.get(name)).orElse(defaultProperties); - communication.setCommunicationProperties(Optional.ofNullable(instancesProperties.get(name)).orElse(defaultProperties)); + ExecutorService messageProcessorExecutor = Executors.newCachedThreadPool(); + ExecutorService messagesReceiverExecutor = Executors.newFixedThreadPool(2); - communication.init(); + ReturnsManager returnsManager = new ReturnsManagerCommunication<>(); + Observable observable = new Observable<>(); + + MessageProcessorGateway messageProcessorGateway = new MessageProcessorGateway(returnsManager, observable); + MessageProcessor messageProcessorGatewayAsynchronous = new MessageProcessorGatewayAsynchronous(messageProcessorGateway, messageProcessorExecutor); + + Communicator multicastManager = new MulticastManagerUDP(messageSerialization); + Communicator unicastManager = new UnicastManagerTCP(messageSerialization); + MessagesReceiver unicastMessagesReceiver = new MessagesReceiver(unicastManager, messageProcessorGatewayAsynchronous); + MessagesReceiver multicastMessagesReceiver = new MessagesReceiver(multicastManager, messageProcessorGatewayAsynchronous); + + CommunicationManagerTCP communication = new CommunicationManagerTCP(unicastManager, unicastMessagesReceiver, multicastManager, multicastMessagesReceiver, messageResponseProcessor, observable, returnsManager, messagesReceiverExecutor); + + messageProcessorGateway.setCommunicationManager(communication); + + communication.init(communicationProperties); return communication; } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java index 7acf172..95718f6 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java @@ -26,6 +26,7 @@ import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; +import java.util.Map; /** * The MulticastManagerUDP class implemented the transfer @@ -37,6 +38,19 @@ */ public class MulticastManagerUDP implements Communicator { + /** + * Properties for configuration CommunicationManagerNetworkLAN + * + * @author Daniel Pelaez + * @author Hector Hurtado + * @author Daniel Lopez + * @version 1.0, 17/06/2010 + * @since 1.0 + */ + public enum CommunicationManagerNetworkLANProperties { + BUFFER_SIZE_MULTICAST, IP_MULTICAST, PORT_MULTICAST + } + /** * Logger */ @@ -73,28 +87,11 @@ public class MulticastManagerUDP implements Communicator { /** * Builds a MulticastManagerUDP and started multicast socket - * @param portMulticast Port multicast - * @param group Internet address multicast - * @param bufferSize Buffer size for to reader + * * @param messageSerialization */ - public MulticastManagerUDP(int portMulticast, InetAddress group, - long bufferSize, MessageSerialization messageSerialization) { + public MulticastManagerUDP(MessageSerialization messageSerialization) { this.messageSerialization = messageSerialization; - - try { - this.portMulticast = portMulticast; - - this.multicastSocket = new MulticastSocket(portMulticast); - - this.group = group; - - this.multicastSocket.joinGroup(group); - - this.buffer = new byte[(int) bufferSize]; - } catch (IOException e) { - logger.error("Error creating multicast socket", e); - } } /* @@ -126,6 +123,74 @@ public Message receiver() { return null; } + @Override + public void start(Map properties) { + try { + if (properties + .containsKey(CommunicationManagerNetworkLANProperties.PORT_MULTICAST + .name())) { + portMulticast = Integer + .parseInt(properties + .get(CommunicationManagerNetworkLANProperties.PORT_MULTICAST + .name())); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property PORT_MULTICAST not found"); + + logger.error("Property PORT_MULTICAST no found", + illegalArgumentException); + + throw illegalArgumentException; + } + + if (properties + .containsKey(CommunicationManagerNetworkLANProperties.IP_MULTICAST + .name())) { + group = InetAddress.getByName(properties + .get(CommunicationManagerNetworkLANProperties.IP_MULTICAST + .name())); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property IP_MULTICAST not found"); + + logger.error("Property IP_MULTICAST no found", + illegalArgumentException); + + throw illegalArgumentException; + } + + if (properties + .containsKey(CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST + .name())) { + bufferSize = Long + .parseLong(properties + .get(CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST + .name())); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property BUFFER_SIZE_MULTICAST not found"); + + logger.error("Property BUFFER_SIZE_MULTICAST no found", + illegalArgumentException); + + throw illegalArgumentException; + } + + this.portMulticast = portMulticast; + + this.multicastSocket = new MulticastSocket(portMulticast); + + this.group = group; + + this.multicastSocket.joinGroup(group); + + this.buffer = new byte[(int) bufferSize]; + } catch (IOException e) { + logger.error("Error creating multicast socket", e); + throw new IllegalStateException("Error creating multicast socket", e); + } + } + /* * (non-Javadoc) * @@ -147,24 +212,6 @@ public void send(Message message) { } } - /** - * Gets buffer size to reader - * - * @return Buffer size - */ - public long getBufferSize() { - return bufferSize; - } - - /** - * Sets buffer size to reader - * - * @param bufferSize Buffer size - */ - public void setBufferSize(long bufferSize) { - this.bufferSize = bufferSize; - } - /* * (non-Javadoc) * diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java index 9196dc5..03a5040 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java @@ -28,6 +28,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.UnknownHostException; +import java.util.Map; /** * The UnicastManagerTCP class implemented transfer message based @@ -39,6 +40,15 @@ */ public class UnicastManagerTCP implements Communicator { + /** + * The CommunicationManagerTCPProperties enum contains params + * required for communication + * + * @author dpelaez + */ + public enum CommunicationManagerTCPProperties { + PORT_TCP_RESOURCE, PORT_TCP + } /** * Logger */ @@ -59,18 +69,10 @@ public class UnicastManagerTCP implements Communicator { /** * Builds a UnicastManagerTCP * - * @param portTcp Port TCP number * @param messageSerialization */ - public UnicastManagerTCP(int portTcp, MessageSerialization messageSerialization) { - this.portTcp = portTcp; + public UnicastManagerTCP(MessageSerialization messageSerialization) { this.messageSerialization = messageSerialization; - - try { - this.serverSocket = new ServerSocket(portTcp); - } catch (IOException e) { - logger.error("Error creating server socket", e); - } } /* @@ -99,6 +101,31 @@ public Message receiver() { return message; } + @Override + public void start(Map properties) { + int portTcp; + if (properties + .containsKey(CommunicationManagerTCPProperties.PORT_TCP.name())) { + portTcp = Integer.parseInt(properties + .get(CommunicationManagerTCPProperties.PORT_TCP.name())); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property PORT_TCP not found"); + + logger.error("Property PORT_TCP not found", + illegalArgumentException); + + throw illegalArgumentException; + } + try { + this.serverSocket = new ServerSocket(portTcp); + } catch (IOException e) { + logger.error("Error creating server socket", e); + throw new IllegalStateException("Error creating server socket", e); + } + + } + /* * (non-Javadoc) * @@ -107,7 +134,7 @@ public Message receiver() { * .uniquindio.utils.communication.message.Message) */ public void send(Message message) { - try(Socket socket = new Socket(message.getAddress().getDestination(), portTcp)) { + try (Socket socket = new Socket(message.getAddress().getDestination(), portTcp)) { ObjectOutputStream objectOutputStream = new ObjectOutputStream( socket.getOutputStream()); @@ -122,24 +149,6 @@ public void send(Message message) { } - /** - * Gets port TCP - * - * @return Port TCP - */ - public int getPortTcp() { - return portTcp; - } - - /** - * Sets port TCP - * - * @param portTcp Port TCP - */ - public void setPortTcp(int portTcp) { - this.portTcp = portTcp; - } - /* * (non-Javadoc) * diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGateway.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGateway.java index 3c7d809..8d1f9a6 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGateway.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGateway.java @@ -4,21 +4,21 @@ import co.edu.uniquindio.utils.communication.message.Message; import co.edu.uniquindio.utils.communication.transfer.CommunicationManager; import co.edu.uniquindio.utils.communication.transfer.MessageProcessor; +import co.edu.uniquindio.utils.communication.transfer.network.CommunicationManagerTCP; public class MessageProcessorGateway implements MessageProcessor { - private final ResponseReleaser responseReleaser; - private final CommunicationManager communicationManager; + private final ReturnsManager responseReleaser; + private CommunicationManagerTCP communicationManager; private final Observable observable; - public MessageProcessorGateway(ResponseReleaser responseReleaser, CommunicationManager communicationManager, Observable observable) { + public MessageProcessorGateway(ReturnsManager responseReleaser, Observable observable) { this.responseReleaser = responseReleaser; - this.communicationManager = communicationManager; this.observable = observable; } @Override public Message process(Message message) { - if (!responseReleaser.release(message)) { + if (!release(message)) { Message response = null; if (communicationManager.getMessageProcessor() != null) { response = communicationManager.getMessageProcessor().process(message); @@ -30,4 +30,19 @@ public Message process(Message message) { } return null; } + + public boolean release(Message message) { + if (message.getSendType().equals(Message.SendType.RESPONSE)) { + responseReleaser.releaseWaitingResult(message.getSequenceNumber(), + message); + + return true; + } else { + return false; + } + } + + public void setCommunicationManager(CommunicationManagerTCP communicationManager) { + this.communicationManager = communicationManager; + } } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageResponseProcessor.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageResponseProcessor.java new file mode 100644 index 0000000..1998745 --- /dev/null +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageResponseProcessor.java @@ -0,0 +1,77 @@ +package co.edu.uniquindio.utils.communication.transfer.response; + +import co.edu.uniquindio.utils.communication.message.Message; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.Set; + +public class MessageResponseProcessor { + public T process(Message message, Class type, + String paramNameResult) { + T typeInstance; + + if (message == null/* || message.getMessageType().equals(EMPTY_MESSAGE_TYPE)*/) { + return null; + } + + if (type.equals(Message.class)) { + return type.cast(message); + } + + if (type.isInterface() || type.isAnnotation() || type.isArray()) { + throw new IllegalArgumentException("The type must a class (" + + type.getName() + ")"); + } + + Set params = message.getParamsKey(); + + String paramValue; + + if (paramNameResult == null) { + + if (params.size() != 1) { + throw new IllegalArgumentException( + "The message contains more than one parameter, you can not convert to " + + type.getName()); + } + + String paramName = (String) params.toArray()[0]; + + if (paramName == null || paramName.isEmpty()) { + throw new IllegalArgumentException( + "The message contains a param name null or empty"); + } + + paramValue = message.getParam(paramName); + } else { + + paramValue = message.getParam(paramNameResult); + } + + if (paramValue == null || paramValue.isEmpty()) { + return null; + } + + try { + Method valueOf = type + .getMethod("valueOf", String.class); + + typeInstance = type.cast(valueOf.invoke(null, paramValue)); + } catch (Exception e) { + try { + + Constructor constructorString = type + .getDeclaredConstructor(String.class); + + typeInstance = constructorString.newInstance(paramValue); + } catch (Exception e1) { + throw new IllegalArgumentException( + "The method valueOf(String) not must to be invoked in class " + + type.getName(), e1); + } + } + + return typeInstance; + } +} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReceiverMessageCommand.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReceiverMessageCommand.java deleted file mode 100644 index 8ac77f6..0000000 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReceiverMessageCommand.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Communication project implement communication point to point and multicast - * Copyright (C) 2010 Daniel Pelaez, Daniel Lopez, Hector Hurtado - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package co.edu.uniquindio.utils.communication.transfer.response; - -import co.edu.uniquindio.utils.communication.Observable; -import co.edu.uniquindio.utils.communication.message.Message; -import co.edu.uniquindio.utils.communication.transfer.CommunicationManager; - -/** - * The {@code ReceiverMessageCommand} class, is a Command implementation, that is - * used for receiving a message and notify to the observer the arrive the - * message - * - * @author Daniel Pelaez - * @author Hector Hurtado - * @author Daniel Lopez - * @version 1.0, 17/06/2010 - * @since 1.0 - */ -public class ReceiverMessageCommand implements Runnable { - /** - * The message that has arrived - */ - private Message message; - - /** - * The observer that is notified for the arrival the message - */ - private Observable observable; - - private final CommunicationManager communicationManager; - - /** - * The constructor of the class. This creates a {@code - * ReceiverMessageCommand} instance with the message that arrives and the - * observer that has to be notified - * - * @param message - * @param communicationManager - */ - public ReceiverMessageCommand(Message message, - Observable observable, CommunicationManager communicationManager) { - this.message = message; - this.observable = observable; - this.communicationManager = communicationManager; - } - - /** - * Notifies to the observer the arrived message - */ - public void run() { - Message response = null; - if (communicationManager.getMessageProcessor() != null) { - response = communicationManager.getMessageProcessor().process(message); - } - if (response != null) { - communicationManager.sendMessageUnicast(response); - observable.notifyMessage(message); - } - } -} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ResponseReleaser.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ResponseReleaser.java deleted file mode 100644 index 7107cd1..0000000 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ResponseReleaser.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Communication project implement communication point to point and multicast - * Copyright (C) 2010 Daniel Pelaez, Daniel Lopez, Hector Hurtado - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package co.edu.uniquindio.utils.communication.transfer.response; - -import co.edu.uniquindio.utils.communication.message.Message; - -/** - * The ResponseReleaser interface contains the methods for release response - * - * @author Daniel Pelaez - * - */ -public interface ResponseReleaser { - - /** - * Release response in wait - * - * @param message - * Message to response - * @return True if response was release - */ - boolean release(Message message); -} diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java index 74613e8..56845c3 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java @@ -32,7 +32,7 @@ * @version 1.0, 17/06/2010 * @since 1.0 */ -class WaitingResult extends Thread { +public class WaitingResult extends Thread { /** * Logger diff --git a/main/starters/socket-communication-spring-boot-starter/build.gradle b/main/starters/socket-communication-spring-boot-starter/build.gradle index 414f55a..9833ee4 100644 --- a/main/starters/socket-communication-spring-boot-starter/build.gradle +++ b/main/starters/socket-communication-spring-boot-starter/build.gradle @@ -24,8 +24,8 @@ repositories { dependencies { - compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication', version: '2.0.0' - + //compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication', version: '2.0.0' + compile project(':communication:socket-communication') // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.4' diff --git a/main/starters/socket-communication-spring-boot-starter/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/starter/SocketCommunicationAutoConfiguration.java b/main/starters/socket-communication-spring-boot-starter/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/starter/SocketCommunicationAutoConfiguration.java index 4c08f76..80b9e71 100644 --- a/main/starters/socket-communication-spring-boot-starter/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/starter/SocketCommunicationAutoConfiguration.java +++ b/main/starters/socket-communication-spring-boot-starter/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/starter/SocketCommunicationAutoConfiguration.java @@ -5,6 +5,7 @@ import co.edu.uniquindio.utils.communication.transfer.network.CommunicationManagerTCPFactory; import co.edu.uniquindio.utils.communication.transfer.network.jackson.MessageJsonSerialization; import co.edu.uniquindio.utils.communication.transfer.network.MessageSerialization; +import co.edu.uniquindio.utils.communication.transfer.response.MessageResponseProcessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -22,8 +23,8 @@ public class SocketCommunicationAutoConfiguration { @Bean @ConditionalOnMissingBean - public CommunicationManagerFactory communicationManagerFactory(MessageSerialization messageSerialization) { - return new CommunicationManagerTCPFactory(messageSerialization, socketCommunicationProperties.getInstances()); + public CommunicationManagerFactory communicationManagerFactory(MessageSerialization messageSerialization, MessageResponseProcessor messageResponseProcessor) { + return new CommunicationManagerTCPFactory(messageSerialization, messageResponseProcessor, socketCommunicationProperties.getInstances()); } @Bean @@ -32,4 +33,9 @@ public MessageSerialization messageSerialization() { return new MessageJsonSerialization(new ObjectMapper()); } + @Bean + public MessageResponseProcessor messageResponseProcessor(){ + return new MessageResponseProcessor(); + } + } From b00655be8c565f2a0dfee2fc9a6d641948ad872d Mon Sep 17 00:00:00 2001 From: daniel Date: Mon, 20 Aug 2018 20:38:34 -0500 Subject: [PATCH 3/7] Refactoring --- .../transfer/network/MulticastManagerUDP.java | 32 ++++++------------- .../transfer/network/UnicastManagerTCP.java | 24 +++++++------- .../MessageProcessorGatewayAsynchronous.java | 2 +- .../response/ReturnsManagerCommunication.java | 4 +-- main/desktop-network-gui/build.gradle | 4 ++- .../spring-configuration-metadata.json | 5 --- 6 files changed, 27 insertions(+), 44 deletions(-) diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java index 95718f6..6f4bf5e 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java @@ -38,22 +38,10 @@ */ public class MulticastManagerUDP implements Communicator { - /** - * Properties for configuration CommunicationManagerNetworkLAN - * - * @author Daniel Pelaez - * @author Hector Hurtado - * @author Daniel Lopez - * @version 1.0, 17/06/2010 - * @since 1.0 - */ - public enum CommunicationManagerNetworkLANProperties { + public enum MulticastManagerUDPProperties { BUFFER_SIZE_MULTICAST, IP_MULTICAST, PORT_MULTICAST } - /** - * Logger - */ private static final Logger logger = Logger .getLogger(MulticastManagerUDP.class); @@ -118,20 +106,19 @@ public Message receiver() { return message; } catch (IOException e) { logger.error("Error reading multicast socket", e); + throw new IllegalStateException("Error reading multicast socket", e); } - - return null; } @Override public void start(Map properties) { try { if (properties - .containsKey(CommunicationManagerNetworkLANProperties.PORT_MULTICAST + .containsKey(MulticastManagerUDPProperties.PORT_MULTICAST .name())) { portMulticast = Integer .parseInt(properties - .get(CommunicationManagerNetworkLANProperties.PORT_MULTICAST + .get(MulticastManagerUDPProperties.PORT_MULTICAST .name())); } else { IllegalArgumentException illegalArgumentException = new IllegalArgumentException( @@ -144,10 +131,10 @@ public void start(Map properties) { } if (properties - .containsKey(CommunicationManagerNetworkLANProperties.IP_MULTICAST + .containsKey(MulticastManagerUDPProperties.IP_MULTICAST .name())) { group = InetAddress.getByName(properties - .get(CommunicationManagerNetworkLANProperties.IP_MULTICAST + .get(MulticastManagerUDPProperties.IP_MULTICAST .name())); } else { IllegalArgumentException illegalArgumentException = new IllegalArgumentException( @@ -160,11 +147,11 @@ public void start(Map properties) { } if (properties - .containsKey(CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST + .containsKey(MulticastManagerUDPProperties.BUFFER_SIZE_MULTICAST .name())) { bufferSize = Long .parseLong(properties - .get(CommunicationManagerNetworkLANProperties.BUFFER_SIZE_MULTICAST + .get(MulticastManagerUDPProperties.BUFFER_SIZE_MULTICAST .name())); } else { IllegalArgumentException illegalArgumentException = new IllegalArgumentException( @@ -208,7 +195,8 @@ public void send(Message message) { try { multicastSocket.send(datagramPacket); } catch (IOException e) { - logger.error("Error writting multicast socket", e); + logger.error("Error writing multicast socket", e); + throw new IllegalStateException("Error writing multicast socket", e); } } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java index 03a5040..c7c924d 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java @@ -27,7 +27,6 @@ import java.io.ObjectOutputStream; import java.net.ServerSocket; import java.net.Socket; -import java.net.UnknownHostException; import java.util.Map; /** @@ -41,14 +40,15 @@ public class UnicastManagerTCP implements Communicator { /** - * The CommunicationManagerTCPProperties enum contains params + * The UnicastManagerTCPProperties enum contains params * required for communication * * @author dpelaez */ - public enum CommunicationManagerTCPProperties { + public enum UnicastManagerTCPProperties { PORT_TCP_RESOURCE, PORT_TCP } + /** * Logger */ @@ -92,10 +92,9 @@ public Message receiver() { message = messageSerialization.decode(stringMessage); - } catch (IOException e) { - logger.error("Error reading socket", e); - } catch (ClassNotFoundException e) { + } catch (IOException | ClassNotFoundException e) { logger.error("Error reading socket", e); + throw new IllegalStateException("Error reading socket", e); } return message; @@ -103,11 +102,10 @@ public Message receiver() { @Override public void start(Map properties) { - int portTcp; if (properties - .containsKey(CommunicationManagerTCPProperties.PORT_TCP.name())) { + .containsKey(UnicastManagerTCPProperties.PORT_TCP.name())) { portTcp = Integer.parseInt(properties - .get(CommunicationManagerTCPProperties.PORT_TCP.name())); + .get(UnicastManagerTCPProperties.PORT_TCP.name())); } else { IllegalArgumentException illegalArgumentException = new IllegalArgumentException( "Property PORT_TCP not found"); @@ -141,10 +139,9 @@ public void send(Message message) { objectOutputStream.writeObject(messageSerialization.encode(message)); objectOutputStream.flush(); - } catch (UnknownHostException e) { - logger.error("Error writting socket", e); } catch (IOException e) { - logger.error("Error writting socket", e); + logger.error("Error writing socket " + message.getAddress(), e); + throw new IllegalStateException("Error writing socket " + message.getAddress(), e); } } @@ -158,7 +155,8 @@ public void close() { try { serverSocket.close(); } catch (IOException e) { - logger.error("Error closed server socket", e); + logger.error("Error closing server socket", e); + throw new IllegalStateException("Error closing server socket", e); } } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGatewayAsynchronous.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGatewayAsynchronous.java index 75c57b5..ab7ea85 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGatewayAsynchronous.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/MessageProcessorGatewayAsynchronous.java @@ -19,7 +19,7 @@ public MessageProcessorGatewayAsynchronous(MessageProcessor messageProcessor, Ex @Override public Message process(Message message) { if (!executorService.isShutdown()) { - executorService.submit(() -> messageProcessor.process(message)); + executorService.execute(() -> messageProcessor.process(message)); } return null; } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManagerCommunication.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManagerCommunication.java index 0340aaf..63fdbac 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManagerCommunication.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/ReturnsManagerCommunication.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * The ReturnsManagerCommunication class is responsible for @@ -44,8 +45,7 @@ public class ReturnsManagerCommunication implements ReturnsManager { */ public ReturnsManagerCommunication() { - results = Collections - .synchronizedMap(new HashMap>()); + results = new ConcurrentHashMap<>(); } /** diff --git a/main/desktop-network-gui/build.gradle b/main/desktop-network-gui/build.gradle index 88f96d2..8f7875c 100644 --- a/main/desktop-network-gui/build.gradle +++ b/main/desktop-network-gui/build.gradle @@ -15,5 +15,7 @@ dependencies { compile group: 'com.github.estigma88', name: 'jdhtuq-chord-spring-boot-starter', version: '2.0.0' compile group: 'com.github.estigma88', name: 'jdhtuq-dhash-spring-boot-starter', version: '2.0.0' - compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication-spring-boot-starter', version: '2.0.0' + //compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication-spring-boot-starter', version: '2.0.0' + + compile project(':main:starters:socket-communication-spring-boot-starter') } \ No newline at end of file diff --git a/main/starters/socket-communication-spring-boot-starter/out/production/classes/META-INF/spring-configuration-metadata.json b/main/starters/socket-communication-spring-boot-starter/out/production/classes/META-INF/spring-configuration-metadata.json index baee918..03d27d9 100644 --- a/main/starters/socket-communication-spring-boot-starter/out/production/classes/META-INF/spring-configuration-metadata.json +++ b/main/starters/socket-communication-spring-boot-starter/out/production/classes/META-INF/spring-configuration-metadata.json @@ -5,11 +5,6 @@ "sourceType": "co.edu.uniquindio.utils.communication.transfer.network.starter.SocketCommunicationProperties", "name": "communication.socket", "type": "co.edu.uniquindio.utils.communication.transfer.network.starter.SocketCommunicationProperties" - }, - { - "sourceType": "co.edu.uniquindio.dhash.starter.DHashProperties", - "name": "p2p.dhash", - "type": "co.edu.uniquindio.dhash.starter.DHashProperties" } ], "properties": [ From 62b86fd800cf668d36c330c4402863658ba0d5ad Mon Sep 17 00:00:00 2001 From: daniel Date: Mon, 27 Aug 2018 19:50:56 -0500 Subject: [PATCH 4/7] Adding time to replicate data in ITs and modifying the socket connection to use connect method --- .../CommunicationManagerTCPFactory.java | 2 +- .../transfer/network/MulticastManagerUDP.java | 8 ++--- .../transfer/network/UnicastManagerTCP.java | 30 +++++++++++++------ .../ittest/resources/features/gets.feature | 1 + .../features/leave-all-nodes.feature | 1 + .../resources/features/leave-node.feature | 1 + .../resources/features/new-node.feature | 1 + .../resources/features/offline-node.feature | 1 + .../ittest/resources/features/puts.feature | 3 +- .../src/main/resources/application.yml | 2 +- .../src/main/resources/application.yml | 2 +- 11 files changed, 34 insertions(+), 18 deletions(-) diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCPFactory.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCPFactory.java index 0e707a1..32e99e8 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCPFactory.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/CommunicationManagerTCPFactory.java @@ -30,7 +30,7 @@ public CommunicationManagerTCPFactory(MessageSerialization messageSerialization, this.defaultProperties.put("BUFFER_SIZE_MULTICAST", "1024"); this.defaultProperties.put("IP_MULTICAST", "224.0.0.2"); this.defaultProperties.put("PORT_MULTICAST", "2000"); - this.defaultProperties.put("PORT_TCP_RESOURCE", "2004"); + this.defaultProperties.put("TIMEOUT_TCP_CONNECTION", "2000"); this.defaultProperties.put("PORT_TCP", "2005"); this.defaultProperties.put("PORT_UDP", "2006"); this.defaultProperties.put("BUFFER_SIZE_UDP", "1024"); diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java index 6f4bf5e..82c2963 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/MulticastManagerUDP.java @@ -91,7 +91,7 @@ public MulticastManagerUDP(MessageSerialization messageSerialization) { public Message receiver() { DatagramPacket datagramPacket; String string; - Message message; + Message message = null; datagramPacket = new DatagramPacket(buffer, buffer.length); @@ -102,12 +102,11 @@ public Message receiver() { .getLength()); message = messageSerialization.decode(string); - - return message; } catch (IOException e) { logger.error("Error reading multicast socket", e); - throw new IllegalStateException("Error reading multicast socket", e); } + + return message; } @Override @@ -196,7 +195,6 @@ public void send(Message message) { multicastSocket.send(datagramPacket); } catch (IOException e) { logger.error("Error writing multicast socket", e); - throw new IllegalStateException("Error writing multicast socket", e); } } diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java index c7c924d..740367c 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/network/UnicastManagerTCP.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Map; @@ -46,7 +47,7 @@ public class UnicastManagerTCP implements Communicator { * @author dpelaez */ public enum UnicastManagerTCPProperties { - PORT_TCP_RESOURCE, PORT_TCP + TIMEOUT_TCP_CONNECTION, PORT_TCP } /** @@ -64,6 +65,7 @@ public enum UnicastManagerTCPProperties { * The value of the port used to create the socket. */ private int portTcp; + private int timeoutTcpConnection; private final MessageSerialization messageSerialization; /** @@ -84,17 +86,15 @@ public UnicastManagerTCP(MessageSerialization messageSerialization) { public Message receiver() { String stringMessage; Message message = null; - ObjectInputStream objectInputStream; - try (Socket socket = serverSocket.accept()) { - objectInputStream = new ObjectInputStream(socket.getInputStream()); + try (Socket socket = serverSocket.accept(); + ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) { + stringMessage = (String) objectInputStream.readObject(); message = messageSerialization.decode(stringMessage); - } catch (IOException | ClassNotFoundException e) { logger.error("Error reading socket", e); - throw new IllegalStateException("Error reading socket", e); } return message; @@ -115,6 +115,19 @@ public void start(Map properties) { throw illegalArgumentException; } + if (properties + .containsKey(UnicastManagerTCPProperties.TIMEOUT_TCP_CONNECTION.name())) { + timeoutTcpConnection = Integer.parseInt(properties + .get(UnicastManagerTCPProperties.TIMEOUT_TCP_CONNECTION.name())); + } else { + IllegalArgumentException illegalArgumentException = new IllegalArgumentException( + "Property TIMEOUT_TCP_CONNECTION not found"); + + logger.error("Property TIMEOUT_TCP_CONNECTION not found", + illegalArgumentException); + + throw illegalArgumentException; + } try { this.serverSocket = new ServerSocket(portTcp); } catch (IOException e) { @@ -132,16 +145,15 @@ public void start(Map properties) { * .uniquindio.utils.communication.message.Message) */ public void send(Message message) { - try (Socket socket = new Socket(message.getAddress().getDestination(), portTcp)) { + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(message.getAddress().getDestination(), portTcp), timeoutTcpConnection); ObjectOutputStream objectOutputStream = new ObjectOutputStream( socket.getOutputStream()); objectOutputStream.writeObject(messageSerialization.encode(message)); - objectOutputStream.flush(); } catch (IOException e) { logger.error("Error writing socket " + message.getAddress(), e); - throw new IllegalStateException("Error writing socket " + message.getAddress(), e); } } diff --git a/it/socket-it/src/ittest/resources/features/gets.feature b/it/socket-it/src/ittest/resources/features/gets.feature index e4b48bf..cd7f6c9 100644 --- a/it/socket-it/src/ittest/resources/features/gets.feature +++ b/it/socket-it/src/ittest/resources/features/gets.feature @@ -61,6 +61,7 @@ Feature: A node requested all the resources | resource10.txt | Unpleasant astonished an diminution up partiality. Noisy an their of meant. Death means up civil do an offer wound of. Called square an in afraid direct. Resolution diminution conviction so mr at unpleasing simplicity no. No it as breakfast up conveying earnestly immediate principle. Him son disposed produced humoured overcame she bachelor improved. Studied however out wishing but inhabit fortune windows. | And I use the "172.16.0.43" as a gateway And I put resources into the network + And I wait for stabilizing after 20 seconds Scenario: The node "172.16.0.43" requested all the resources Given I use the "172.16.0.43" as a gateway diff --git a/it/socket-it/src/ittest/resources/features/leave-all-nodes.feature b/it/socket-it/src/ittest/resources/features/leave-all-nodes.feature index 75c78c3..fe32117 100644 --- a/it/socket-it/src/ittest/resources/features/leave-all-nodes.feature +++ b/it/socket-it/src/ittest/resources/features/leave-all-nodes.feature @@ -61,6 +61,7 @@ Feature: Every node left the network one by one | resource10.txt | Unpleasant astonished an diminution up partiality. Noisy an their of meant. Death means up civil do an offer wound of. Called square an in afraid direct. Resolution diminution conviction so mr at unpleasing simplicity no. No it as breakfast up conveying earnestly immediate principle. Him son disposed produced humoured overcame she bachelor improved. Studied however out wishing but inhabit fortune windows. | And I use the "172.16.0.43" as a gateway And I put resources into the network + And I wait for stabilizing after 20 seconds Scenario: Every node left the network one by one Given The "172.16.0.4" left the network diff --git a/it/socket-it/src/ittest/resources/features/leave-node.feature b/it/socket-it/src/ittest/resources/features/leave-node.feature index aa11816..44639ff 100644 --- a/it/socket-it/src/ittest/resources/features/leave-node.feature +++ b/it/socket-it/src/ittest/resources/features/leave-node.feature @@ -61,6 +61,7 @@ Feature: A node left the network | resource10.txt | Unpleasant astonished an diminution up partiality. Noisy an their of meant. Death means up civil do an offer wound of. Called square an in afraid direct. Resolution diminution conviction so mr at unpleasing simplicity no. No it as breakfast up conveying earnestly immediate principle. Him son disposed produced humoured overcame she bachelor improved. Studied however out wishing but inhabit fortune windows. | And I use the "172.16.0.43" as a gateway And I put resources into the network + And I wait for stabilizing after 20 seconds Scenario: The node "172.16.0.10" left the network Given The "172.16.0.10" left the network diff --git a/it/socket-it/src/ittest/resources/features/new-node.feature b/it/socket-it/src/ittest/resources/features/new-node.feature index aeb5110..3f4eaa5 100644 --- a/it/socket-it/src/ittest/resources/features/new-node.feature +++ b/it/socket-it/src/ittest/resources/features/new-node.feature @@ -41,6 +41,7 @@ Feature: A node is add to the network | resource10.txt | Unpleasant astonished an diminution up partiality. Noisy an their of meant. Death means up civil do an offer wound of. Called square an in afraid direct. Resolution diminution conviction so mr at unpleasing simplicity no. No it as breakfast up conveying earnestly immediate principle. Him son disposed produced humoured overcame she bachelor improved. Studied however out wishing but inhabit fortune windows. | And I use the "172.16.0.43" as a gateway And I put resources into the network + And I wait for stabilizing after 20 seconds Scenario: The node "172.16.0.10" is add to the network Given The "172.16.0.10" is added to the network diff --git a/it/socket-it/src/ittest/resources/features/offline-node.feature b/it/socket-it/src/ittest/resources/features/offline-node.feature index f0c3325..133cf93 100644 --- a/it/socket-it/src/ittest/resources/features/offline-node.feature +++ b/it/socket-it/src/ittest/resources/features/offline-node.feature @@ -61,6 +61,7 @@ Feature: A node is offline | resource10.txt | Unpleasant astonished an diminution up partiality. Noisy an their of meant. Death means up civil do an offer wound of. Called square an in afraid direct. Resolution diminution conviction so mr at unpleasing simplicity no. No it as breakfast up conveying earnestly immediate principle. Him son disposed produced humoured overcame she bachelor improved. Studied however out wishing but inhabit fortune windows. | And I use the "172.16.0.43" as a gateway And I put resources into the network + And I wait for stabilizing after 20 seconds Scenario: The node "172.16.0.10" is offline Given The "172.16.0.10" is offline diff --git a/it/socket-it/src/ittest/resources/features/puts.feature b/it/socket-it/src/ittest/resources/features/puts.feature index 0482276..06317e3 100644 --- a/it/socket-it/src/ittest/resources/features/puts.feature +++ b/it/socket-it/src/ittest/resources/features/puts.feature @@ -63,7 +63,8 @@ Feature: I put resources into the network | resource10.txt | Unpleasant astonished an diminution up partiality. Noisy an their of meant. Death means up civil do an offer wound of. Called square an in afraid direct. Resolution diminution conviction so mr at unpleasing simplicity no. No it as breakfast up conveying earnestly immediate principle. Him son disposed produced humoured overcame she bachelor improved. Studied however out wishing but inhabit fortune windows. | Given I use the "172.16.0.43" as a gateway When I put resources into the network - Then The resources are put in the following nodes: + Then I wait for stabilizing after 20 seconds + And The resources are put in the following nodes: | resource1.txt | 172.16.0.7,172.16.0.21 | | resource2.txt | 172.16.0.10,172.16.0.7 | | resource3.txt | 172.16.0.12,172.16.0.43 | diff --git a/it/socket-it/src/main/resources/application.yml b/it/socket-it/src/main/resources/application.yml index 1ba566c..a49e942 100644 --- a/it/socket-it/src/main/resources/application.yml +++ b/it/socket-it/src/main/resources/application.yml @@ -14,7 +14,7 @@ communication: BUFFER_SIZE_MULTICAST: 1024 IP_MULTICAST: 224.0.0.1 PORT_MULTICAST: 22011 - PORT_TCP_RESOURCE: 22012 + TIMEOUT_TCP_CONNECTION: 2000 PORT_TCP: 22013 PORT_UDP: 22014 BUFFER_SIZE_UDP: 1024 diff --git a/main/desktop-network-gui/src/main/resources/application.yml b/main/desktop-network-gui/src/main/resources/application.yml index 6dd92ff..c7d9361 100644 --- a/main/desktop-network-gui/src/main/resources/application.yml +++ b/main/desktop-network-gui/src/main/resources/application.yml @@ -11,7 +11,7 @@ communication: BUFFER_SIZE_MULTICAST: 1024 IP_MULTICAST: 224.0.0.1 PORT_MULTICAST: 2000 - PORT_TCP_RESOURCE: 2001 + TIMEOUT_TCP_CONNECTION: 2000 PORT_TCP: 2002 PORT_UDP: 2003 BUFFER_SIZE_UDP: 1024 \ No newline at end of file From 7fdc72893231d5b064889f2bcf2b178e5db6812c Mon Sep 17 00:00:00 2001 From: daniel Date: Mon, 27 Aug 2018 21:07:28 -0500 Subject: [PATCH 5/7] Removing additional thread and using CountDownLatch for blocking --- .../transfer/response/WaitingResult.java | 81 +++++-------------- 1 file changed, 20 insertions(+), 61 deletions(-) diff --git a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java index 56845c3..4f9ab65 100644 --- a/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java +++ b/communication/socket-communication/src/main/java/co/edu/uniquindio/utils/communication/transfer/response/WaitingResult.java @@ -20,7 +20,8 @@ import org.apache.log4j.Logger; -import java.util.concurrent.Semaphore; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * The {@code WaitingResult} class is responsible for waiting for a response to @@ -32,7 +33,7 @@ * @version 1.0, 17/06/2010 * @since 1.0 */ -public class WaitingResult extends Thread { +public class WaitingResult{ /** * Logger @@ -51,15 +52,10 @@ public class WaitingResult extends Thread { private T result; /** - * This semaphore is blocked when asked for the response to a certain + * This countDownLatch is blocked when asked for the response to a certain * message, and is released when the response to the message arrives */ - private Semaphore semaphore; - - /** - * This variable is used for Knowing if a response for a message has arrived - */ - private boolean stop; + private CountDownLatch countDownLatch; /** * This variable is the specific amount of time that the class waits for a @@ -78,11 +74,10 @@ public class WaitingResult extends Thread { * * @param sequence . The sequence number of the message */ - WaitingResult(long sequence, ReturnsManager returnsManager) { + private WaitingResult(long sequence, ReturnsManager returnsManager) { this.sequence = sequence; this.returnsManager = returnsManager; - this.stop = false; - this.semaphore = new Semaphore(0); + this.countDownLatch = new CountDownLatch(1); } /** @@ -96,60 +91,26 @@ public class WaitingResult extends Thread { this.timeOut = timeOut; } - /** - * Executes the thread while close==false and - * timeOut!=0. This is, while the response for the message has - * not arrived or while the specific amount of time has not finished. If - * timeOut is -1, waiting until that the response arribe - */ - public void run() { - - if (timeOut != -1) { - - while (!stop && timeOut != 0) { - try { - sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - timeOut--; - } - - } else { - - while (!stop) { - try { - sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - } - - if (!stop) { - - logger - .debug("Timeout waiting for a response for number sequence= '" - + sequence + "'"); - - logger.debug("Timeout waiting for a response"); - - returnsManager.releaseWaitingResult(sequence, null); - } - } - /** * This method is used for getting the response of the message * * @return Returns the response for the message */ public T getResult() { - start(); try { - semaphore.acquire(); + if(countDownLatch.await(timeOut, TimeUnit.MILLISECONDS)){ + logger + .debug("Response arrives for number sequence= '" + + sequence + "'"); + }else{ + logger + .debug("Timeout waiting for a response for number sequence= '" + + sequence + "'"); + + returnsManager.releaseWaitingResult(sequence, null); + } } catch (InterruptedException e) { - logger.error("Error to close semaphore", e); + logger.error("Error to close countDownLatch", e); } return result; @@ -163,9 +124,7 @@ public T getResult() { public void setResult(T result) { this.result = result; - semaphore.release(); - - stop = true; + countDownLatch.countDown(); } } From 5463a4d928a7bae1ef000d4ce3b146a5534c900e Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 28 Aug 2018 17:23:01 -0500 Subject: [PATCH 6/7] Linking module versions --- communication/socket-communication/build.gradle | 2 +- main/desktop-network-gui/build.gradle | 10 ++++------ main/desktop-structure-gui/build.gradle | 4 ++-- .../build.gradle | 6 +++--- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/communication/socket-communication/build.gradle b/communication/socket-communication/build.gradle index 6bd5fe4..47f6e1e 100644 --- a/communication/socket-communication/build.gradle +++ b/communication/socket-communication/build.gradle @@ -9,7 +9,7 @@ repositories { } group = 'com.github.estigma88' -version = '2.0.0' +version = '2.1.0' dependencies { compile group: 'com.github.estigma88', name: 'jdhtuq-communication-api', version: '2.0.0' diff --git a/main/desktop-network-gui/build.gradle b/main/desktop-network-gui/build.gradle index 8067ada..36a5e5f 100644 --- a/main/desktop-network-gui/build.gradle +++ b/main/desktop-network-gui/build.gradle @@ -8,14 +8,12 @@ repositories { mavenCentral() } -version = '2.0.2' +version = '2.0.4' dependencies { compile("org.springframework.boot:spring-boot-starter:1.5.10.RELEASE") - //compile group: 'com.github.estigma88', name: 'jdhtuq-chord-spring-boot-starter', version: '2.0.0' - //compile group: 'com.github.estigma88', name: 'jdhtuq-dhash-spring-boot-starter', version: '2.0.0' - compile project(':main:starters:chord-spring-boot-starter') - compile project(':main:starters:dhash-spring-boot-starter') - compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication-spring-boot-starter', version: '2.0.0' + compile group: 'com.github.estigma88', name: 'jdhtuq-chord-spring-boot-starter', version: '2.0.1' + compile group: 'com.github.estigma88', name: 'jdhtuq-dhash-spring-boot-starter', version: '2.0.1' + compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication-spring-boot-starter', version: '2.1.0' } \ No newline at end of file diff --git a/main/desktop-structure-gui/build.gradle b/main/desktop-structure-gui/build.gradle index abd4859..5a70a3a 100644 --- a/main/desktop-structure-gui/build.gradle +++ b/main/desktop-structure-gui/build.gradle @@ -13,7 +13,7 @@ version = '2.0.2' dependencies { compile("org.springframework.boot:spring-boot-starter:1.5.10.RELEASE") - compile project(':main:starters:chord-spring-boot-starter') - compile project(':main:starters:dhash-spring-boot-starter') + compile group: 'com.github.estigma88', name: 'jdhtuq-chord-spring-boot-starter', version: '2.0.1' + compile group: 'com.github.estigma88', name: 'jdhtuq-dhash-spring-boot-starter', version: '2.0.1' compile group: 'com.github.estigma88', name: 'jdhtuq-data-structure-communication-spring-boot-starter', version: '2.0.0' } \ No newline at end of file diff --git a/main/starters/socket-communication-spring-boot-starter/build.gradle b/main/starters/socket-communication-spring-boot-starter/build.gradle index 9833ee4..fb6dd3e 100644 --- a/main/starters/socket-communication-spring-boot-starter/build.gradle +++ b/main/starters/socket-communication-spring-boot-starter/build.gradle @@ -14,7 +14,7 @@ plugins { } group = 'com.github.estigma88' -version = '2.0.0' +version = '2.1.0' sourceCompatibility = 1.8 @@ -24,8 +24,8 @@ repositories { dependencies { - //compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication', version: '2.0.0' - compile project(':communication:socket-communication') + compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication', version: '2.1.0' + // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.4' From cd271dfa2217b1903b17e1063fab890e8342e059 Mon Sep 17 00:00:00 2001 From: daniel Date: Wed, 29 Aug 2018 20:06:32 -0500 Subject: [PATCH 7/7] Updating readme and improvement exception management in socket it --- README.md | 6 +++--- .../co/edu/uniquindio/dht/it/socket/test/MessageClient.java | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index d38d887..ad72ade 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,11 @@ Add the following dependencies to your project to use Chord, DHash and Communica #### Gradle example -- Chord: `compile group: 'com.github.estigma88', name: 'jdhtuq-chord-spring-boot-starter', version: '2.0.0'` -- DHash: `compile group: 'com.github.estigma88', name: 'jdhtuq-dhash-spring-boot-starter', version: '2.0.0'` +- Chord: `compile group: 'com.github.estigma88', name: 'jdhtuq-chord-spring-boot-starter', version: '2.0.1'` +- DHash: `compile group: 'com.github.estigma88', name: 'jdhtuq-dhash-spring-boot-starter', version: '2.0.1'` - Communication: Use one of the following implementations - Data Structure: `compile group: 'com.github.estigma88', name: 'jdhtuq-data-structure-communication-spring-boot-starter', version: '2.0.0'` - - Sockets: `compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication-spring-boot-starter', version: '2.0.0'` + - Sockets: `compile group: 'com.github.estigma88', name: 'jdhtuq-socket-communication-spring-boot-starter', version: '2.1.0'` ## More info [More Info](https://github.com/estigma88/jdhtuq/wiki) diff --git a/it/socket-it/src/main/java/co/edu/uniquindio/dht/it/socket/test/MessageClient.java b/it/socket-it/src/main/java/co/edu/uniquindio/dht/it/socket/test/MessageClient.java index 1dde179..480e733 100644 --- a/it/socket-it/src/main/java/co/edu/uniquindio/dht/it/socket/test/MessageClient.java +++ b/it/socket-it/src/main/java/co/edu/uniquindio/dht/it/socket/test/MessageClient.java @@ -33,8 +33,7 @@ public Message send(Message request) { return messageSerialization.decode(stringMessage); } catch (IOException | ClassNotFoundException e) { logger.error("Error writting socket", e); + throw new IllegalStateException("Error writting socket to: " + request.getAddress() + " in port: " + portTcp, e); } - - return null; } }