Skip to content

Commit

Permalink
Add support push,pull
Browse files Browse the repository at this point in the history
  • Loading branch information
fantasy-peak committed Apr 11, 2024
1 parent ef4c102 commit b9e2e3a
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 25 deletions.
39 changes: 28 additions & 11 deletions out/frpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,12 @@ struct UniChannel final {
m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt);
m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl);
}
m_socket.connect(config.addr);
if (config.socktype == zmq::socket_type::sub)
m_socket.set(zmq::sockopt::subscribe, "");
if (config.bind)
m_socket.bind(config.addr);
else
m_socket.connect(config.addr);
}

UniChannel(std::shared_ptr<zmq::context_t> context,
Expand All @@ -515,7 +520,12 @@ struct UniChannel final {
m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt);
m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl);
}
m_socket.connect(config.addr);
if (config.socktype == zmq::socket_type::sub)
m_socket.set(zmq::sockopt::subscribe, "");
if (config.bind)
m_socket.bind(config.addr);
else
m_socket.connect(config.addr);
}

~UniChannel() {
Expand All @@ -524,10 +534,6 @@ struct UniChannel final {
m_thread.join();
}

void subscribe(const std::string& topic = "") {
m_socket.set(zmq::sockopt::subscribe, topic);
}

void start() {
m_running = true;
m_thread = std::thread([this] {
Expand Down Expand Up @@ -1035,7 +1041,6 @@ class HelloWorldReceiver final {
}

void start() {
m_channel->subscribe("");
m_channel->start();
}

Expand All @@ -1050,7 +1055,10 @@ class HelloWorldReceiver final {
static auto create(ChannelConfig& config,
std::variant<std::shared_ptr<CoroHelloWorldReceiverHandler>, std::shared_ptr<HelloWorldReceiverHandler>> handler,
std::function<void(std::string)> error) {
config.socktype = zmq::socket_type::sub;
if ((config.socktype != zmq::socket_type::sub) && config.socktype != zmq::socket_type::pull)
config.socktype = zmq::socket_type::sub;
if (config.socktype == zmq::socket_type::sub)
config.bind = false;
return std::make_unique<HelloWorldReceiver>(config, std::move(handler), std::move(error));
}

Expand Down Expand Up @@ -1140,7 +1148,10 @@ class HelloWorldSender final {
m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt);
m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl);
}
m_socket.bind(config.addr);
if (config.bind)
m_socket.bind(config.addr);
else
m_socket.connect(config.addr);
}

HelloWorldSender(const ChannelConfig& config,
Expand All @@ -1153,14 +1164,20 @@ class HelloWorldSender final {
m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt);
m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl);
}
m_socket.bind(config.addr);
if (config.bind)
m_socket.bind(config.addr);
else
m_socket.connect(config.addr);
}

~HelloWorldSender() {
}

static auto create(ChannelConfig& config) {
config.socktype = zmq::socket_type::pub;
if ((config.socktype != zmq::socket_type::pub) && config.socktype != zmq::socket_type::push)
config.socktype = zmq::socket_type::pub;
if (config.socktype == zmq::socket_type::pub)
config.bind = true;
return std::make_unique<HelloWorldSender>(config);
}

Expand Down
21 changes: 16 additions & 5 deletions template/cpp/uni.inja
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public:
}

void start() {
m_channel->subscribe("");
m_channel->start();
}

Expand All @@ -97,7 +96,10 @@ public:
static auto create(ChannelConfig& config,
std::variant<std::shared_ptr<CoroHelloWorldReceiverHandler>, std::shared_ptr<HelloWorldReceiverHandler>> handler,
std::function<void(std::string)> error) {
config.socktype = zmq::socket_type::sub;
if ((config.socktype != zmq::socket_type::sub) && config.socktype != zmq::socket_type::pull)
config.socktype = zmq::socket_type::sub;
if (config.socktype == zmq::socket_type::sub)
config.bind = false;
return std::make_unique<{{value.callee}}>(config, std::move(handler), std::move(error));
}

Expand Down Expand Up @@ -167,7 +169,10 @@ public:
m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt);
m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl);
}
m_socket.bind(config.addr);
if (config.bind)
m_socket.bind(config.addr);
else
m_socket.connect(config.addr);
}

{{value.caller}}(const ChannelConfig& config,
Expand All @@ -180,13 +185,19 @@ public:
m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt);
m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl);
}
m_socket.bind(config.addr);
if (config.bind)
m_socket.bind(config.addr);
else
m_socket.connect(config.addr);
}

~{{value.caller}}() {}

static auto create(ChannelConfig& config) {
config.socktype = zmq::socket_type::pub;
if ((config.socktype != zmq::socket_type::pub) && config.socktype != zmq::socket_type::push)
config.socktype = zmq::socket_type::pub;
if (config.socktype == zmq::socket_type::pub)
config.bind = true;
return std::make_unique<{{value.caller}}>(config);
}

Expand Down
18 changes: 12 additions & 6 deletions template/cpp/uni_channel.inja
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ public:
m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt);
m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl);
}
m_socket.connect(config.addr);
if (config.socktype == zmq::socket_type::sub)
m_socket.set(zmq::sockopt::subscribe, "");
if (config.bind)
m_socket.bind(config.addr);
else
m_socket.connect(config.addr);
}

UniChannel(std::shared_ptr<zmq::context_t> context,
Expand All @@ -42,7 +47,12 @@ public:
m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt);
m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl);
}
m_socket.connect(config.addr);
if (config.socktype == zmq::socket_type::sub)
m_socket.set(zmq::sockopt::subscribe, "");
if (config.bind)
m_socket.bind(config.addr);
else
m_socket.connect(config.addr);
}

~UniChannel() {
Expand All @@ -51,10 +61,6 @@ public:
m_thread.join();
}

void subscribe(const std::string& topic = "") {
m_socket.set(zmq::sockopt::subscribe, topic);
}

void start() {
m_running = true;
m_thread = std::thread([this] {
Expand Down
55 changes: 52 additions & 3 deletions test/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>
#include <string>
#include <tuple>
#include <zmq.hpp>

#include "frpc.hpp"

Expand Down Expand Up @@ -63,20 +64,63 @@ struct CoroHelloWorldReceiver : public frpc::CoroHelloWorldReceiverHandler {
#endif

struct HelloWorldReceiverHandler : public frpc::HelloWorldReceiverHandler {
HelloWorldReceiverHandler() = default;
HelloWorldReceiverHandler(const std::string& label)
: label(label) {
}

virtual void hello_world(std::string in) override {
spdlog::info("HelloWorldReceiverHandler::hello_world: {}", in);
spdlog::info("HelloWorldReceiverHandler::hello_world: {}, {}", label, in);
return;
}
virtual void notice(int32_t in, std::string info) override {
spdlog::info("HelloWorldReceiverHandler::notice: {}: {}", in, info);
spdlog::info("HelloWorldReceiverHandler::notice: {}, {}: {}", label, in, info);
return;
}

std::string label{"test"};
};

auto test_push_pull() {
start([] {
frpc::ChannelConfig pub_config{};
pub_config.socktype = zmq::socket_type::push;
pub_config.bind = true;
pub_config.addr = "tcp://127.0.0.1:5877";
auto sender = frpc::HelloWorldSender::create(pub_config);
int i = 10;
while (i--) {
sender->hello_world(std::to_string(i) + "_frpc_push_01");
sender->notice(i, "hello world");
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
frpc::ChannelConfig sub_config{};
sub_config.addr = "tcp://127.0.0.1:5877";
sub_config.socktype = zmq::socket_type::pull;
sub_config.bind = false;
auto receiver = frpc::HelloWorldReceiver::create(
sub_config,
std::make_shared<HelloWorldReceiverHandler>("pull-01"),
[](auto error) {
spdlog::error("{}", error);
});
receiver->start();
auto receiver1 = frpc::HelloWorldReceiver::create(
sub_config,
std::make_shared<HelloWorldReceiverHandler>("pull-02"),
[](auto error) {
spdlog::error("{}", error);
});
receiver1->start();
std::this_thread::sleep_for(std::chrono::seconds(15));
}

auto test_pub_sub() {
start([] {
frpc::ChannelConfig pub_config{};
pub_config.addr = "tcp://127.0.0.1:5877";
pub_config.socktype = zmq::socket_type::pub;
auto sender = frpc::HelloWorldSender::create(pub_config);
int i = 10;
while (i--) {
Expand All @@ -87,6 +131,7 @@ auto test_pub_sub() {
});

frpc::ChannelConfig sub_config{};
sub_config.socktype = zmq::socket_type::sub;
sub_config.addr = "tcp://127.0.0.1:5877";
auto receiver = frpc::HelloWorldReceiver::create(
sub_config,
Expand Down Expand Up @@ -214,7 +259,7 @@ struct CoroStreamServerHandler : public frpc::CoroStreamServerHandler {
std::shared_ptr<frpc::Stream<void(std::string)>> outs) {
start([outs = std::move(outs)]() mutable {
for (int i = 0; i < 5; i++) {
outs->operator()(std::string("stream_server_") + std::to_string(i));
outs->operator()(std::string("stream_server_") + std::to_string(i));
std::this_thread::sleep_for(std::chrono::seconds(1));
}
outs->close();
Expand Down Expand Up @@ -277,7 +322,11 @@ void test_coro_stream(auto& pool) {
#endif

int main() {
spdlog::info("----------------test_push_pull--------------------------");
test_push_pull();
spdlog::info("----------------test_pub_sub----------------------------");
auto [receiver, monitor] = test_pub_sub();
std::this_thread::sleep_for(std::chrono::seconds(800));
test_bi();
std::this_thread::sleep_for(std::chrono::seconds(10));
spdlog::info("----------------test coro--------------------------");
Expand Down

0 comments on commit b9e2e3a

Please sign in to comment.