Skip to content

Commit

Permalink
[mod] fix build issue, modify readme
Browse files Browse the repository at this point in the history
Signed-off-by: tsymiar <[email protected]>
  • Loading branch information
tsymiar committed Dec 21, 2024
1 parent 5e72ba3 commit 7411ebb
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 52 deletions.
4 changes: 2 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/bin/bash
if [ "$1" == "clean" ]; then
rm -rvf .vs* build jniLibs lib out
export TERM=xterm
clear
# export TERM=xterm
# clear
else
if [ ! -d build ]; then
mkdir build
Expand Down
Binary file removed doc/client-server.png
Binary file not shown.
Binary file modified doc/message-queue.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 6 additions & 17 deletions doc/readme.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
# Describing

* Run as server

When instantiates the `Scadup` class, you can call `Initialize(PORT)` to initialize;
then call `Start()` to start a server, you can also register callbacks to deal received data by calling
`registerCallback()/appendCallback` with a parameter in `TASK_CALLBACK` format.

* Run as client

Call `Initialize(IP, PORT)` to initialize, then call `Connect()` to start a client.

![client-server](client-server.png)

* Run as broker

Call `Initialize(PORT)` to initialize, then call `Broker()` to start a broker server proxy.
Call `setup(PORT)` to make a socket, then call `broker()` to start a broker server proxy.

* Run as publisher

Call `Initialize(IP, PORT)` to initialize,
then call `Publisher(topic, payload)` to publish `payload` over `topic` to broker.
Call `setup(IP, PORT)` to make a socket,
then call `publish(topic, payload)` to publish `payload` over `topic` to broker.
IP/PORT is the broker ip/port, it is a short connection.

* Run as subscriber

Call `Initialize(IP, PORT)` to initialize, then call `Subscriber(topic)` to run as subscriber;
the `topic` is a mark string to match the connection of publisher, so we can get message from the `topic`.
Call `setup(IP, PORT)` to make a socket, then call `subscribe(topic)` to run as a subscriber;
the `topic` is a mark for broker to identify the publisher, then we can get message from the `topic`.

It works like below:
![message-queue](message-queue.png)

## Usage
Expand Down
19 changes: 10 additions & 9 deletions src/scadup/Broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ int Scadup::connect(const char* ip, unsigned short port, unsigned int total)
{
SOCKET socket = ::socket(AF_INET, SOCK_STREAM, 0);
if (socket < 0) {
LOGE("Generating socket (%s).",
LOGE("Generating socket to connect(%s).",
(errno != 0 ? strerror(errno) : std::to_string(socket).c_str()));
return -1;
}
Expand All @@ -72,7 +72,7 @@ int Scadup::connect(const char* ip, unsigned short port, unsigned int total)
unsigned int tries = 0;
while (::connect(socket, reinterpret_cast<struct sockaddr*>(&local), sizeof(local)) == (-1)) {
if (tries < total) {
wait(Wait100ms * (long)pow(2, tries));
wait(Time100ms * (long)pow(2, tries));
tries++;
} else {
LOGE("Retrying to connect (times=%d, %s).", tries, (errno != 0 ? strerror(errno) : "No error"));
Expand Down Expand Up @@ -117,7 +117,7 @@ int Broker::setup(unsigned short port)

SOCKET socket = ::socket(AF_INET, SOCK_STREAM, 0);
if (socket < 0) {
LOGE("Generating socket (%s).",
LOGE("Generating socket to setup(%s).",
(errno != 0 ? strerror(errno) : std::to_string(socket).c_str()));
return -1;
}
Expand Down Expand Up @@ -214,7 +214,7 @@ void Broker::taskAllot(Networks& works, const Network& work)
break;
} else {
}
wait(Wait100ms);
wait(Time100ms);
}
}, work.socket);
if (task.joinable())
Expand Down Expand Up @@ -322,7 +322,7 @@ void Broker::checkAlive(Networks& works, bool* active)
{
LOGI("start Network checking task at %p.", active);
while (active != nullptr && (*active)) {
wait(Wait100ms * 3);
wait(Time100ms * 3);
std::lock_guard<std::mutex> lock(m_lock);
for (auto& work : works) {
std::vector<Network>& vec = work.second;
Expand All @@ -340,7 +340,7 @@ void Broker::checkAlive(Networks& works, bool* active)
auto next = std::next(it);
works.erase(it);
it = next;
LOGI("works key(%s) is null deleted! remain=%d", GET_VAL(it->first), works.size());
LOGI("works key(%s) is null deleted! now size=%d", GET_VAL(it->first), works.size());
} else {
++it;
}
Expand Down Expand Up @@ -369,7 +369,8 @@ int Broker::broker()
Network work = {};
getpeername(sockNew, reinterpret_cast<struct sockaddr*>(&peer), &socklen);
char addr[INET_ADDRSTRLEN];
strncpy(work.IP, inet_ntop(AF_INET, &peer.sin_addr, addr, sizeof(addr)), INET_ADDRSTRLEN);
const char* ip = inet_ntop(AF_INET, &peer.sin_addr, addr, INET_ADDRSTRLEN);
strncpy(work.IP, ip, INET_ADDRSTRLEN);
work.PORT = ntohs(peer.sin_port);
time_t t{};
time(&t);
Expand Down Expand Up @@ -402,8 +403,8 @@ int Broker::broker()
m_networks[head.flag].emplace_back(work);
}
taskAllot(m_networks, work);
LOGI("a new %s (%s:%d) %d set to Networks, topic=0x%04x, size=%d.",
GET_VAL(head.flag), work.IP, work.PORT, work.socket, head.topic, head.size);
LOGI("a new %s (%s:%d) %d set to Networks, topic=0x%04x, ssid=0x%04x, size=%d.",
GET_VAL(head.flag), work.IP, work.PORT, work.socket, head.topic, ssid, head.size);
} else {
if (0 == size || errno == EINVAL || (size < 0 && errno != EAGAIN)) {
LOGE("Recv fail(%ld), ssid=%llu, close %d: %s", size, head.ssid, sockNew, strerror(errno));
Expand Down
6 changes: 3 additions & 3 deletions src/scadup/Publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ssize_t Publisher::broadcast(const uint8_t* data, size_t len)

int Publisher::publish(uint32_t topic, const std::string& payload, ...)
{
LOGI("begin publish to BROKER, ssid=%llu, msg=\"%s\"", m_ssid, payload.c_str());
LOGI("begin publish to BROKER, ssid=0x%04x, msg=\"%s\"", m_ssid, payload.c_str());
size_t size = payload.size();
if (size == 0) {
LOGW("Payload was empty!");
Expand All @@ -54,8 +54,8 @@ int Publisher::publish(uint32_t topic, const std::string& payload, ...)

ssize_t bytes = broadcast(message, msgLen);
LOGI("broadcast message size expect=%d, bytes=%d.", msgLen, bytes);
wait(1000);
wait(Time100ms);
Delete(message);

return 0;
return bytes;
}
16 changes: 8 additions & 8 deletions src/scadup/Scadup.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ extern "C" {
}

using SOCKET = int;
const unsigned int Wait100ms = 100;
const unsigned int Time100ms = 100;
#define write(x,y,z) ::send(x,(char*)(y),z,MSG_NOSIGNAL)
#define Delete(ptr) { if (ptr != nullptr) { delete[] ptr; ptr = nullptr; } }

Expand All @@ -51,7 +51,7 @@ namespace Scadup {
G_ScaFlag flag;
uint32_t size;
uint32_t topic;
volatile uint64_t ssid; //ssid = (port | key | ip)
volatile uint64_t ssid; // ssid = (port | key | ip)
} __attribute__((aligned(4)));
struct Message {
Header head{};
Expand Down Expand Up @@ -104,20 +104,20 @@ namespace Scadup {
private:
ssize_t broadcast(const uint8_t*, size_t);
private:
SOCKET m_socket;
uint64_t m_ssid;
SOCKET m_socket = 0;
uint64_t m_ssid = 0;
};
class Subscriber {
public:
void setup(const char*, unsigned short = 9999);
ssize_t subscribe(uint32_t, RECV_CALLBACK = nullptr);
void exit();
void close();
static void exit();
private:
void keepalive(SOCKET, bool&);
private:
uint64_t m_ssid;
uint32_t m_topic;
static bool m_exit;
uint64_t m_ssid = 0;
SOCKET m_socket = 0;
bool m_exit = false;
};
}
29 changes: 17 additions & 12 deletions src/scadup/Subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using namespace Scadup;
extern const char* GET_VAL(G_ScaFlag x);
bool Subscriber::m_exit = false;

void Subscriber::setup(const char* ip, unsigned short port)
{
Expand All @@ -16,14 +17,14 @@ void Subscriber::setup(const char* ip, unsigned short port)

ssize_t Subscriber::subscribe(uint32_t topic, RECV_CALLBACK callback)
{
LOGI("subscribe topic=0x%04x, ssid=%llu", topic, m_ssid);
LOGI("subscribe topic=0x%04x, ssid=0x%04x", topic, m_ssid);
Header head{};
head.flag = SUBSCRIBER;
head.ssid = m_ssid;
head.topic = topic;
size_t len = ::send(m_socket, reinterpret_cast<char*>(&head), HEAD_SIZE, 0);
if (len == 0 || (len < 0 && errno == EPIPE)) {
close(m_socket);
::close(m_socket);
LOGE("Write to sock %d, ssid %llu failed!", m_socket, m_ssid);
return -1;
}
Expand All @@ -33,14 +34,14 @@ ssize_t Subscriber::subscribe(uint32_t topic, RECV_CALLBACK callback)
LOGW("Subscribe will exit");
break;
}
wait(Wait100ms);
wait(Time100ms);
Message msg = {};
const size_t size = HEAD_SIZE + sizeof(Message::Payload::status);
memset(static_cast<void*>(&msg), 0, size);
ssize_t len = ::recv(m_socket, reinterpret_cast<char*>(&msg), size, MSG_WAITALL);
if (len == 0 || (len < 0 && errno != EAGAIN)) {
LOGE("Receive msg fail[%ld], %s", len, strerror(errno));
close(m_socket);
::close(m_socket);
return -2;
}
if (memcmp(reinterpret_cast<char*>(&msg), "Scadup", 7) == 0)
Expand All @@ -54,7 +55,7 @@ ssize_t Subscriber::subscribe(uint32_t topic, RECV_CALLBACK callback)
len = writes(m_socket, reinterpret_cast<uint8_t*>(&msg), size);
if (len < 0) {
LOGE("Writes %s", strerror(errno));
close(m_socket);
::close(m_socket);
return -3;
}
LOGI("MQ writes %ld [%lld] %s.", len, msg.head.ssid, GET_VAL(msg.head.flag));
Expand All @@ -70,7 +71,7 @@ ssize_t Subscriber::subscribe(uint32_t topic, RECV_CALLBACK callback)
len = ::recv(m_socket, body, length, 0);
if (len < 0 || (len == 0 && errno != EINTR)) {
LOGE("Receive body fail, %s", strerror(errno));
close(m_socket);
::close(m_socket);
Delete(body);
return -5;
} else {
Expand Down Expand Up @@ -106,23 +107,27 @@ void Subscriber::keepalive(SOCKET socket, bool& exit)
head.flag = SUBSCRIBER;
size_t len = ::send(socket, reinterpret_cast<char*>(&head), HEAD_SIZE, 0);
if (len == 0 || (len < 0 && errno == EPIPE)) {
close(socket);
::close(socket);
LOGE("Write to sock[%d], cmd %zu failed!", socket, head.cmd);
break;
}
wait(Wait100ms * 3);
wait(Time100ms * 3);
}
}

void Subscriber::exit()
void Subscriber::close()
{
Header head{};
head.cmd = 0xff;
::send(m_socket, &head, HEAD_SIZE, 0);
m_exit = true;
wait(Wait100ms);
wait(Time100ms);
if (m_socket > 0) {
close(m_socket);
::close(m_socket);
m_socket = 0;
}
}

void Subscriber::exit()
{
m_exit = true;
}
6 changes: 5 additions & 1 deletion src/utils/FileUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ std::string FileUtils::GetFileStringContent(const std::string& filename)
file.seekg(0, std::ios::end);
content.resize(file.tellg());
file.seekg(0, std::ios::beg);
file.read(&content[0], content.size());
size_t size = content.size();
if (size > 0x1000000) {
size = 0x1000000;
}
file.read(&content[0], size);
file.close();
}
return content;
Expand Down
4 changes: 4 additions & 0 deletions src/utils/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ inline const char* basename(const char* name)
#endif
#ifdef __ANDROID__
#include <android/log.h>
#ifdef __cplusplus
#define _LOG_(level, fmt, ...) __android_log_print(level, LOG_TAG,"(%s:%d)[%s]: " fmt, basename(const_cast<char*>(__FILE__)), __LINE__, __FUNCTION__, ##__VA_ARGS__)
#else
#define _LOG_(level, fmt, ...) __android_log_print(level, LOG_TAG,"(%s:%d)[%s]: " fmt, basename(__FILE__), __LINE__, __FUNCTION__, ##__VA_ARGS__)
#endif
#define LOGD(fmt, ...) _LOG_(ANDROID_LOG_DEBUG, fmt, ##__VA_ARGS__)
#define LOGI(fmt, ...) _LOG_(ANDROID_LOG_INFO, fmt, ##__VA_ARGS__)
#define LOGW(fmt, ...) _LOG_(ANDROID_LOG_WARN, fmt, ##__VA_ARGS__)
Expand Down

0 comments on commit 7411ebb

Please sign in to comment.