Skip to content

Commit

Permalink
[mod] fix build warning
Browse files Browse the repository at this point in the history
Signed-off-by: tsymiar <[email protected]>
  • Loading branch information
tsymiar committed Dec 24, 2024
1 parent ebd95f2 commit e145744
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 35 deletions.
31 changes: 15 additions & 16 deletions src/scadup/Broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
using namespace Scadup;

static struct MsgQue g_msgQue;
static Message g_message{};

char G_FlagValue[][0xc] = { "NONE", "BROKER", "PUBLISHER", "SUBSCRIBER", };
const char* GET_VAL(G_ScaFlag x) { return (x >= NONE && x < MAX_VAL) ? G_FlagValue[x] : G_FlagValue[0]; };
const char* GET_FLAG(G_ScaFlag x) { return (x >= NONE && x < MAX_VAL) ? G_FlagValue[x] : G_FlagValue[0]; };

void signalCatch(int value)
{
Expand Down Expand Up @@ -99,7 +97,7 @@ SOCKET Scadup::socket2Broker(const char* ip, unsigned short port, uint64_t& ssid
if (head.size == sizeof(head) && head.flag == BROKER)
ssid = head.ssid;
else
LOGW("Error flag %s, size=%d", GET_VAL(head.flag), head.size);
LOGW("Mismatch flag %s, size %u.", GET_FLAG(head.flag), head.size);
} else {
if (size == 0) {
LOGE("Connection closed by peer, close %d", socket);
Expand Down Expand Up @@ -236,12 +234,13 @@ void Broker::taskAllot(Networks& works, const Network& work)

int Broker::ProxyTask(Networks& works, const Network& work)
{
LOGI("start message proxy task, works(%d), for %s:%u.",
LOGI("start proxy task, works(%d), address %s:%u, size %u.",
works[work.head.flag >= MAX_VAL && work.head.flag < MAX_VAL ? work.head.flag : NONE].size(),
work.IP, work.PORT);
work.IP, work.PORT, work.head.size);
const size_t sz1 = sizeof(Message::Payload::status);
size_t left = work.head.size - HEAD_SIZE;
Message* msg = new(g_message) Message();
static Message sval{};
Message* msg = new(sval) Message;
msg->payload.content = new char[left - sz1];
memset(msg->payload.content, 0, left - sz1);
size_t len = 0;
Expand All @@ -256,7 +255,7 @@ int Broker::ProxyTask(Networks& works, const Network& work)
return -1;
}
} else if (got == 0) {
LOGW("Connection closed by peer");
LOGW("Connection closed by peer.");
delete[] msg->payload.content;
return -1;
} else {
Expand All @@ -280,7 +279,7 @@ int Broker::ProxyTask(Networks& works, const Network& work)
if (message != nullptr) {
Message msg = *reinterpret_cast<Message*>(message);
if (msg.head.flag != PUBLISHER) {
LOGW("Message invalid(%d), len=%d!", msg.head.flag, msg.head.size);
LOGW("Message invalid(%d), len=%u!", msg.head.flag, msg.head.size);
return -1;
}
if (msg.head.size > 0) {
Expand All @@ -294,7 +293,7 @@ int Broker::ProxyTask(Networks& works, const Network& work)
size_t len = ::send(sub.socket, buff, size, MSG_NOSIGNAL);
if (len == 0 || (len < 0 && errno == EPIPE)) {
setOffline(works, sub.socket);
LOGE("Write to sock[%d], size %zu failed!", sub.socket, msg.head.size);
LOGE("Write to sock[%d], size %u failed!", sub.socket, msg.head.size);
break;
} else {
if (len == HEAD_SIZE + sz1) {
Expand All @@ -304,7 +303,7 @@ int Broker::ProxyTask(Networks& works, const Network& work)
left -= len;
}
} while (left > 0);
LOGI("writes message to subscriber[%s:%u], size %zu!", sub.IP, sub.PORT, msg.head.size);
LOGI("writes message to subscriber[%s:%u], size %u!", sub.IP, sub.PORT, msg.head.size);
} else {
LOGW("No valid subscriber of topic %04x!", sub.head.topic);
}
Expand All @@ -313,7 +312,7 @@ int Broker::ProxyTask(Networks& works, const Network& work)
Delete(msg.payload.content)
queue_pop(&g_msgQue);
} else {
LOGW("Message size(%d) invalid!", msg.head.size);
LOGW("Message size(%u) invalid!", msg.head.size);
}
} else {
LOGW("MsgQue is null!");
Expand Down Expand Up @@ -342,7 +341,7 @@ void Broker::setOffline(Networks& works, SOCKET socket)

void Broker::checkAlive(Networks& works, bool* active)
{
LOGI("start Network checking task at %p.", active);
LOGI("start alive checking task at %p.", active);
while (active != nullptr && (*active)) {
wait(Time100ms * 3);
std::lock_guard<std::mutex> lock(m_lock);
Expand All @@ -362,7 +361,7 @@ void Broker::checkAlive(Networks& works, bool* active)
auto next = std::next(it);
works.erase(it);
it = next;
LOGI("works key(%s) is null deleted! now size=%d", GET_VAL(it->first), works.size());
LOGI("works key(%s) is null deleted! now size=%d", GET_FLAG(it->first), works.size());
} else {
++it;
}
Expand Down Expand Up @@ -425,8 +424,8 @@ int Broker::broker()
m_networks[head.flag].emplace_back(work);
}
taskAllot(m_networks, work);
LOGI("a new %s (%s:%d) %d set to Networks, topic=0x%04x, ssid=0x%04x, size=%d.",
GET_VAL(head.flag), work.IP, work.PORT, work.socket, head.topic, ssid, head.size);
LOGI("a new %s (%s:%d) %d set to Networks, topic=0x%04x, ssid=0x%04x, size=%u.",
GET_FLAG(head.flag), work.IP, work.PORT, work.socket, head.topic, ssid, head.size);
} else {
if (0 == size || errno == EINVAL || (size < 0 && errno != EAGAIN)) {
LOGE("Recv fail(%ld), ssid=%llu, close %d: %s", size, head.ssid, sockNew, strerror(errno));
Expand Down
21 changes: 11 additions & 10 deletions src/scadup/Subscriber.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "Scadup.h"

using namespace Scadup;
extern const char* GET_VAL(G_ScaFlag x);
extern const char* GET_FLAG(G_ScaFlag x);
bool Subscriber::m_exit = false;

void Subscriber::setup(const char* ip, unsigned short port)
Expand Down Expand Up @@ -64,7 +64,7 @@ ssize_t Subscriber::subscribe(uint32_t topic, RECV_CALLBACK callback)
::close(m_socket);
return -3;
}
LOGI("MQ writes %ld [%lld] %s.", len, msg.head.ssid, GET_VAL(msg.head.flag));
LOGI("MQ writes %ld [%lld] %s.", len, msg.head.ssid, GET_FLAG(msg.head.flag));
continue;
}
if (msg.head.size > size) {
Expand All @@ -84,18 +84,19 @@ ssize_t Subscriber::subscribe(uint32_t topic, RECV_CALLBACK callback)
msg.payload.status[0] = 'O';
msg.payload.status[1] = 'K';
msg.payload.status[2] = '\0';
auto* pMsg = reinterpret_cast<Message*>(new char[size + len]);
if (pMsg != nullptr) {
memcpy(pMsg, &msg, sizeof(Message));
auto* message = reinterpret_cast<Message*>(new char[size + len]);
if (message != nullptr) {
memcpy(message, &msg.head, HEAD_SIZE);
memcpy(reinterpret_cast<char*>(message) + HEAD_SIZE, msg.payload.status, sizeof(Message::Payload::status));
if (len > 0) {
pMsg->payload.content = body;
pMsg->payload.content[len - 1] = '\0';
message->payload.content = body;
message->payload.content[len - 1] = '\0';
}
if (callback != nullptr) {
callback(*pMsg);
callback(*message);
}
LOGI("message payload = [%s]-[%s]", pMsg->payload.status, pMsg->payload.content);
Delete(pMsg);
LOGI("message payload = [%s]-[%s]", message->payload.status, message->payload.content);
Delete(message);
}
}
Delete(body);
Expand Down
26 changes: 24 additions & 2 deletions src/utils/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ threadpool::threadpool() : m_stopPool(false) { }

threadpool::~threadpool() { }

void threadpool::start(size_t threadsnum)
void threadpool::start(size_t threads)
{
for (size_t i = 0; i < threadsnum; ++i) {
for (size_t i = 0; i < threads; ++i) {
m_workers.emplace_back([this] {
for (;;) {
std::function<void()> task;
Expand Down Expand Up @@ -35,6 +35,28 @@ void threadpool::enqueue(F&& f)
m_condition.notify_one();
}

template<class F, class... Args>
auto threadpool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> fres = task->get_future();
{
std::unique_lock<std::mutex> lock(m_queueMutex);

// don't allow enqueueing after stopping the pool
if (m_stopPool)
throw std::runtime_error("enqueue on stopped ThreadPool");

m_tasks.emplace([task]() { (*task)(); });
}
m_condition.notify_one();
return fres;
}

void threadpool::stop()
{
{
Expand Down
15 changes: 10 additions & 5 deletions src/utils/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <future>
#include <stdexcept>

class threadpool {
public:
Expand All @@ -17,14 +19,17 @@ class threadpool {
template<class F>
void enqueue(F&& f);

void start(size_t threadsnum);
template<class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> enqueue(F&& f, Args&&... args);

void start(size_t threads);
void stop();

private:
std::vector<std::thread> m_workers;
std::queue<std::function<void()>> m_tasks;
std::mutex m_queueMutex;
std::condition_variable m_condition;
std::vector<std::thread> m_workers = {};
std::queue<std::function<void()>> m_tasks = {};
std::mutex m_queueMutex{};
std::condition_variable m_condition{};
std::atomic<bool> m_stopPool;
};

Expand Down
4 changes: 2 additions & 2 deletions test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using namespace std;
using namespace Scadup;

extern const char* GET_VAL(Scadup::G_ScaFlag x);
extern const char* GET_FLAG(Scadup::G_ScaFlag x);

static void usage()
{
Expand Down Expand Up @@ -39,7 +39,7 @@ int main(int argc, char* argv[])
IP = "127.0.0.1";
cout << "IP is null when parse 'scadup.cfg', default: " << IP << endl;
}
cout << argv[0] << ": start [" << flag << "](" << GET_VAL(flag) << ")" << endl;
cout << argv[0] << ": " << GET_FLAG(flag) << " test start." << endl;
uint32_t topic = 0x1234;
if (argc > 2) {
topic = strtol(argv[2], NULL, 16);
Expand Down

0 comments on commit e145744

Please sign in to comment.