Skip to content

Commit

Permalink
Added startup, shutdown methods to IpSocket and added startup as a re… (
Browse files Browse the repository at this point in the history
#2261)

* Added startup, shutdown methods to IpSocket and added startup as a retry step

* Review fixes I

* Fixing static analysis checks

* Fixing spelling
  • Loading branch information
LeStarch authored Oct 24, 2023
1 parent 18bd52d commit 5e0b388
Show file tree
Hide file tree
Showing 16 changed files with 173 additions and 72 deletions.
44 changes: 33 additions & 11 deletions Drv/Ip/IpSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

namespace Drv {

IpSocket::IpSocket() : m_fd(-1), m_timeoutSeconds(0), m_timeoutMicroseconds(0), m_port(0), m_open(false) {
IpSocket::IpSocket() : m_fd(-1), m_timeoutSeconds(0), m_timeoutMicroseconds(0), m_port(0), m_open(false), m_started(false) {
::memset(m_hostname, 0, sizeof(m_hostname));
}

Expand Down Expand Up @@ -98,23 +98,45 @@ SocketIpStatus IpSocket::addressToIp4(const char* address, void* ip4) {
return SOCK_SUCCESS;
}

bool IpSocket::isStarted() {
bool is_started = false;
this->m_lock.lock();
is_started = this->m_started;
this->m_lock.unLock();
return is_started;
}

bool IpSocket::isOpened() {
bool is_open = false;
m_lock.lock();
is_open = m_open;
m_lock.unLock();
this->m_lock.lock();
is_open = this->m_open;
this->m_lock.unLock();
return is_open;
}

void IpSocket::close() {
m_lock.lock();
this->m_lock.lock();
if (this->m_fd != -1) {
(void)::shutdown(this->m_fd, SHUT_RDWR);
(void)::close(this->m_fd);
this->m_fd = -1;
}
m_open = false;
m_lock.unLock();
this->m_open = false;
this->m_lock.unLock();
}

void IpSocket::shutdown() {
this->close();
this->m_lock.lock();
this->m_started = false;
this->m_lock.unLock();
}

SocketIpStatus IpSocket::startup() {
this->m_lock.lock();
this->m_started = true;
this->m_lock.unLock();
return SOCK_SUCCESS;
}

SocketIpStatus IpSocket::open() {
Expand All @@ -128,10 +150,10 @@ SocketIpStatus IpSocket::open() {
return status;
}
// Lock to update values and "officially open"
m_lock.lock();
m_fd = fd;
m_open = true;
m_lock.unLock();
this->m_lock.lock();
this->m_fd = fd;
this->m_open = true;
this->m_lock.unLock();
return status;
}

Expand Down
30 changes: 29 additions & 1 deletion Drv/Ip/IpSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ enum SocketIpStatus {
SOCK_FAILED_TO_LISTEN = -10, //!< Failed to listen on socket
SOCK_FAILED_TO_ACCEPT = -11, //!< Failed to accept connection
SOCK_SEND_ERROR = -13, //!< Failed to send after configured retries
SOCK_NOT_STARTED = -14, //!< Socket has not been started
};

/**
* \brief Helper base-class for setting up Berkley sockets
* \brief Helper base-class for setting up Berkeley sockets
*
* Certain IP headers have conflicting definitions with the m_data member of various types in fprime. TcpHelper
* separates the ip setup from the incoming Fw::Buffer in the primary component class preventing this collision.
Expand Down Expand Up @@ -66,6 +67,13 @@ class IpSocket {
*/
SocketIpStatus configure(const char* hostname, const U16 port, const U32 send_timeout_seconds,
const U32 send_timeout_microseconds);
/**
* \brief Returns true when the socket is started
*
* Returns true when the socket is started up sufficiently to be actively listening to clients. Returns false
* otherwise. This means `startup()` was called and returned success.
*/
bool isStarted();

/**
* \brief check if IP socket has previously been opened
Expand All @@ -78,6 +86,16 @@ class IpSocket {
*/
bool isOpened();

/**
* \brief startup the socket, a no-op on unless this is server
*
* This will start-up the socket. In the case of most sockets, this is a no-op. On server sockets this binds to the
* server address and progresses through the `listen` step such that on `open` new clients may be accepted.
*
* \return status of startup
*/
virtual SocketIpStatus startup();

/**
* \brief open the IP socket for communications
*
Expand All @@ -92,6 +110,7 @@ class IpSocket {
* In the case of server components (TcpServer) this function will block until a client has connected.
*
* Note: delegates to openProtocol for protocol specific implementation
*
* \return status of open
*/
SocketIpStatus open();
Expand Down Expand Up @@ -135,6 +154,14 @@ class IpSocket {
*/
void close();

/**
* \brief shutdown the socket
*
* Closes the socket opened by the open call. In this case of the TcpServer, this does close server's listening
* port. This will shutdown all clients.
*/
virtual void shutdown();

PROTECTED:

/**
Expand Down Expand Up @@ -179,6 +206,7 @@ class IpSocket {
U32 m_timeoutMicroseconds;
U16 m_port; //!< IP address port used
bool m_open; //!< Have we successfully opened
bool m_started; //!< Have we successfully started the socket
char m_hostname[SOCKET_MAX_HOSTNAME_SIZE]; //!< Hostname to supply
};
} // namespace Drv
Expand Down
25 changes: 21 additions & 4 deletions Drv/Ip/SocketReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ void SocketReadTask::startSocketTask(const Fw::StringBase &name,
FW_ASSERT(Os::Task::TASK_OK == stat, static_cast<NATIVE_INT_TYPE>(stat));
}

SocketIpStatus SocketReadTask::startup() {
return this->getSocketHandler().startup();
}

SocketIpStatus SocketReadTask::open() {
SocketIpStatus status = this->getSocketHandler().open();
// Call connected any time the open is successful
Expand All @@ -45,6 +49,10 @@ SocketIpStatus SocketReadTask::open() {
return status;
}

void SocketReadTask::shutdown() {
this->getSocketHandler().shutdown();
}

void SocketReadTask::close() {
this->getSocketHandler().close();
}
Expand All @@ -55,23 +63,32 @@ Os::Task::TaskStatus SocketReadTask::joinSocketTask(void** value_ptr) {

void SocketReadTask::stopSocketTask() {
this->m_stop = true;
this->getSocketHandler().close(); // Break out of any receives
this->getSocketHandler().shutdown(); // Break out of any receives and fully shutdown
}

void SocketReadTask::readTask(void* pointer) {
FW_ASSERT(pointer);
SocketIpStatus status = SOCK_SUCCESS;
SocketReadTask* self = reinterpret_cast<SocketReadTask*>(pointer);
do {
// Open a network connection if it has not already been open
if ((not self->getSocketHandler().isStarted()) and (not self->m_stop) and
((status = self->startup()) != SOCK_SUCCESS)) {
Fw::Logger::logMsg("[WARNING] Failed to open port with status %d and errno %d\n", status, errno);
(void) Os::Task::delay(SOCKET_RETRY_INTERVAL_MS);
continue;
}

// Open a network connection if it has not already been open
if ((not self->getSocketHandler().isOpened()) and (not self->m_stop) and
((status = self->open()) != SOCK_SUCCESS)) {
Fw::Logger::logMsg("[WARNING] Failed to open port with status %d and errno %d\n", status, errno);
Os::Task::delay(SOCKET_RETRY_INTERVAL_MS);
(void) Os::Task::delay(SOCKET_RETRY_INTERVAL_MS);
continue;
}

// If the network connection is open, read from it
if (self->getSocketHandler().isOpened() and (not self->m_stop)) {
if (self->getSocketHandler().isStarted() and self->getSocketHandler().isOpened() and (not self->m_stop)) {
Fw::Buffer buffer = self->getBuffer();
U8* data = buffer.getData();
FW_ASSERT(data);
Expand All @@ -92,6 +109,6 @@ void SocketReadTask::readTask(void* pointer) {
// As long as not told to stop, and we are successful interrupted or ordered to retry, keep receiving
while (not self->m_stop &&
(status == SOCK_SUCCESS || status == SOCK_INTERRUPTED_TRY_AGAIN || self->m_reconnect));
self->getSocketHandler().close(); // Close the handler again, in case it reconnected
self->getSocketHandler().shutdown(); // Shutdown the port entirely
}
}; // namespace Drv
27 changes: 25 additions & 2 deletions Drv/Ip/SocketReadTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ class SocketReadTask {
const NATIVE_UINT_TYPE stack = Os::Task::TASK_DEFAULT,
const NATIVE_UINT_TYPE cpuAffinity = Os::Task::TASK_DEFAULT);

/**
* \brief startup the socket for communications
*
* Status of the socket handler.
*
* Note: this just delegates to the handler
*
* \return status of open, SOCK_SUCCESS for success, something else on error
*/
SocketIpStatus startup();

/**
* \brief open the socket for communications
*
Expand All @@ -70,13 +81,25 @@ class SocketReadTask {
/**
* \brief close the socket communications
*
* Typically stopping the socket read task will close the connection. However, in cases where the read task
* will not be started, this function may be used to close the socket.
* Typically stopping the socket read task will shutdown the connection. However, in cases where the read task
* will not be started, this function may be used to close the socket. This calls a full `close` on the client
* socket.
*
* Note: this just delegates to the handler
*/
void close();

/**
* \brief shutdown the socket communications
*
* Typically stopping the socket read task will shutdown the connection. However, in cases where the read task
* will not be started, this function may be used to close the socket. This calls a full `shutdown` on the client
* socket.
*
* Note: this just delegates to the handler
*/
void shutdown();

/**
* \brief stop the socket read task and close the associated socket.
*
Expand Down
8 changes: 4 additions & 4 deletions Drv/Ip/TcpClientSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace Drv {
/**
* \brief Helper for setting up Tcp using Berkley sockets as a client
* \brief Helper for setting up Tcp using Berkeley sockets as a client
*
* Certain IP headers have conflicting definitions with the m_data member of various types in fprime. TcpClientSocket
* separates the ip setup from the incoming Fw::Buffer in the primary component class preventing this collision.
Expand All @@ -35,21 +35,21 @@ class TcpClientSocket : public IpSocket {
* \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \return status of open
*/
SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd);
SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) override;
/**
* \brief Protocol specific implementation of send. Called directly with retry from send.
* \param data: data to send
* \param size: size of data to send
* \return: size of data sent, or -1 on error.
*/
I32 sendProtocol(const U8* const data, const U32 size);
I32 sendProtocol(const U8* const data, const U32 size) override;
/**
* \brief Protocol specific implementation of recv. Called directly with error handling from recv.
* \param data: data pointer to fill
* \param size: size of data buffer
* \return: size of data received, or -1 on error.
*/
I32 recvProtocol( U8* const data, const U32 size);
I32 recvProtocol( U8* const data, const U32 size) override;
};
} // namespace Drv

Expand Down
36 changes: 27 additions & 9 deletions Drv/Ip/TcpServerSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <Fw/Logger/Logger.hpp>
#include <FpConfig.hpp>


#ifdef TGT_OS_TYPE_VXWORKS
#include <socket.h>
#include <inetLib.h>
Expand Down Expand Up @@ -44,7 +43,6 @@ SocketIpStatus TcpServerSocket::startup() {
NATIVE_INT_TYPE serverFd = -1;
struct sockaddr_in address;
this->close();

// Acquire a socket, or return error
if ((serverFd = ::socket(AF_INET, SOCK_STREAM, 0)) == -1) {
return SOCK_FAILED_TO_GET_SOCKET;
Expand All @@ -68,34 +66,54 @@ SocketIpStatus TcpServerSocket::startup() {
::close(serverFd);
return SOCK_FAILED_TO_BIND;
}
m_base_fd = serverFd;
Fw::Logger::logMsg("Listening for single client at %s:%hu\n", reinterpret_cast<POINTER_CAST>(m_hostname), m_port);
// TCP requires listening on a the socket. Second argument prevents queueing of anything more than a single client.
if (::listen(serverFd, 0) < 0) {
::close(serverFd);
return SOCK_FAILED_TO_LISTEN; // What we have here is a failure to communicate
}
return SOCK_SUCCESS;
this->m_lock.lock();
m_base_fd = serverFd;
this->m_lock.unLock();

return this->IpSocket::startup();
}

void TcpServerSocket::shutdown() {
(void)::shutdown(this->m_base_fd, SHUT_RDWR);
(void)::close(this->m_base_fd);
m_base_fd = -1;
this->close();
this->m_lock.lock();
if (this->m_base_fd != -1) {
(void)::shutdown(this->m_base_fd, SHUT_RDWR);
(void)::close(this->m_base_fd);
this->m_base_fd = -1;
}
this->m_lock.unLock();
this->IpSocket::shutdown();
}

SocketIpStatus TcpServerSocket::openProtocol(NATIVE_INT_TYPE& fd) {
NATIVE_INT_TYPE clientFd = -1;
NATIVE_INT_TYPE serverFd = -1;

// Check started before allowing open
if (not this->isStarted()) {
return SOCK_NOT_STARTED;
}

this->m_lock.lock();
serverFd = this->m_base_fd;
this->m_lock.unLock();

// TCP requires accepting on a the socket to get the client socket file descriptor.
if ((clientFd = ::accept(m_base_fd, nullptr, nullptr)) < 0) {
clientFd = ::accept(serverFd, nullptr, nullptr);
if (clientFd < 0) {
return SOCK_FAILED_TO_ACCEPT; // What we have here is a failure to communicate
}
// Setup client send timeouts
if (IpSocket::setupTimeouts(clientFd) != SOCK_SUCCESS) {
::close(clientFd);
return SOCK_FAILED_TO_SET_SOCKET_OPTIONS;
}

Fw::Logger::logMsg("Accepted client at %s:%hu\n", reinterpret_cast<POINTER_CAST>(m_hostname), m_port);
fd = clientFd;
return SOCK_SUCCESS;
Expand Down
Loading

0 comments on commit 5e0b388

Please sign in to comment.