Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
fantasy-peak committed May 21, 2024
1 parent a7a53f5 commit aefbf8d
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 55 deletions.
32 changes: 16 additions & 16 deletions out/bi_web/include/fantasy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class HelloWorldClient final {
frpc::DateTime date_time,
std::function<
void(std::string, Info, uint64_t, std::optional<std::string>)> cb) {
auto req_id = m_req_id.fetch_add(1);
auto req_id = frpc::createUuid();
auto snd_bufs =
makeRequestPacket<HelloWorldClientHelloWorldServer::hello_world>(
req_id,
Expand All @@ -140,7 +140,7 @@ class HelloWorldClient final {
void(std::string, Info, uint64_t, std::optional<std::string>)> cb,
const std::chrono::milliseconds& timeout,
std::function<void()> timeout_cb) {
auto req_id = m_req_id.fetch_add(1);
auto req_id = frpc::createUuid();
auto snd_bufs =
makeRequestPacket<HelloWorldClientHelloWorldServer::hello_world>(
req_id,
Expand Down Expand Up @@ -393,7 +393,8 @@ class HelloWorldClient final {

private:
template <HelloWorldClientHelloWorldServer type, typename T>
std::vector<zmq::message_t> makeRequestPacket(uint64_t req_id, T&& t) {
std::vector<zmq::message_t> makeRequestPacket(const std::string& req_id,
T&& t) {
auto header = std::make_tuple(req_id, type);
auto buffer = frpc::pack<decltype(header)>(header);
auto packet = frpc::pack<T>(std::forward<T>(t));
Expand All @@ -404,7 +405,7 @@ class HelloWorldClient final {
return snd_bufs;
}

void callTimeoutCallback(uint64_t req_id) {
void callTimeoutCallback(const std::string& req_id) {
std::unique_lock lk(m_mtx);
if (m_timeout_cb.find(req_id) == m_timeout_cb.end())
return;
Expand All @@ -422,7 +423,7 @@ class HelloWorldClient final {
}
try {
using FrpcHeader =
std::tuple<uint64_t, HelloWorldClientHelloWorldServer>;
std::tuple<std::string, HelloWorldClientHelloWorldServer>;
auto [req_id, req_type] =
frpc::unpack<FrpcHeader>(recv_bufs[0].data(),
recv_bufs[0].size());
Expand Down Expand Up @@ -467,9 +468,8 @@ class HelloWorldClient final {
std::unique_ptr<frpc::BiChannel> m_channel;
std::function<void(std::string)> m_error;
std::mutex m_mtx;
std::unordered_map<uint64_t, std::any> m_cb;
std::unordered_map<uint64_t, std::function<void()>> m_timeout_cb;
std::atomic_uint64_t m_req_id{0};
std::unordered_map<std::string, std::any> m_cb;
std::unordered_map<std::string, std::function<void()>> m_timeout_cb;
};

struct HelloWorldServerHandler {
Expand Down Expand Up @@ -663,7 +663,7 @@ class HelloWorldServer final {
}
try {
using FrpcHeader =
std::tuple<uint64_t, HelloWorldClientHelloWorldServer>;
std::tuple<std::string, HelloWorldClientHelloWorldServer>;
[[maybe_unused]] auto [req_id, req_type] =
frpc::unpack<FrpcHeader>(recv_bufs[1].data(),
recv_bufs[1].size());
Expand Down Expand Up @@ -1304,7 +1304,7 @@ class StreamClient final {
}

auto hello_world() {
auto req_id = m_req_id.fetch_add(1);
auto req_id = frpc::createUuid();
auto header =
std::make_tuple(req_id, StreamClientStreamServer::hello_world);

Expand Down Expand Up @@ -1383,7 +1383,7 @@ class StreamClient final {
}
try {
using FrpcHeader =
std::tuple<uint64_t, StreamClientStreamServer, bool>;
std::tuple<std::string, StreamClientStreamServer, bool>;
auto [req_id, req_type, is_close] =
frpc::unpack<FrpcHeader>(recv_bufs[0].data(),
recv_bufs[0].size());
Expand Down Expand Up @@ -1424,9 +1424,8 @@ class StreamClient final {
std::unique_ptr<frpc::BiChannel> m_channel;
std::function<void(std::string)> m_error;
std::mutex m_mtx;
std::unordered_map<uint64_t, std::any> m_cb;
std::unordered_map<uint64_t, std::function<void()>> m_close_cb;
std::atomic_uint64_t m_req_id{0};
std::unordered_map<std::string, std::any> m_cb;
std::unordered_map<std::string, std::function<void()>> m_close_cb;
std::unique_ptr<frpc::ContextPool> m_pool_ptr;
};

Expand Down Expand Up @@ -1570,7 +1569,8 @@ class StreamServer final {
return;
}
try {
using FrpcHeader = std::tuple<uint64_t, StreamClientStreamServer>;
using FrpcHeader =
std::tuple<std::string, StreamClientStreamServer>;
auto [req_id, req_type] =
frpc::unpack<FrpcHeader>(recv_bufs[1].data(),
recv_bufs[1].size());
Expand Down Expand Up @@ -1696,7 +1696,7 @@ class StreamServer final {
std::unique_ptr<frpc::BiChannel> m_channel;
std::mutex m_mtx;
std::unique_ptr<frpc::ContextPool> m_pool_ptr;
std::unordered_map<uint64_t, std::any> m_channel_mapping;
std::unordered_map<std::string, std::any> m_channel_mapping;
};

#endif
Expand Down
8 changes: 6 additions & 2 deletions out/bi_web/include/impl/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ struct ChannelConfig {
std::size_t channel_size{50000};
};

inline std::string uniqueAddr() {
inline std::string createUuid() {
uuid_t uuid;
char s[37];
uuid_generate_random(uuid);
uuid_unparse(uuid, s);
return "inproc://" + std::string(s);
return std::string(s);
}

inline std::string uniqueAddr() {
return "inproc://" + createUuid();
}

template <typename>
Expand Down
32 changes: 16 additions & 16 deletions out/include/fantasy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class HelloWorldClient final {
frpc::DateTime date_time,
std::function<
void(std::string, Info, uint64_t, std::optional<std::string>)> cb) {
auto req_id = m_req_id.fetch_add(1);
auto req_id = frpc::createUuid();
auto snd_bufs =
makeRequestPacket<HelloWorldClientHelloWorldServer::hello_world>(
req_id,
Expand All @@ -140,7 +140,7 @@ class HelloWorldClient final {
void(std::string, Info, uint64_t, std::optional<std::string>)> cb,
const std::chrono::milliseconds& timeout,
std::function<void()> timeout_cb) {
auto req_id = m_req_id.fetch_add(1);
auto req_id = frpc::createUuid();
auto snd_bufs =
makeRequestPacket<HelloWorldClientHelloWorldServer::hello_world>(
req_id,
Expand Down Expand Up @@ -393,7 +393,8 @@ class HelloWorldClient final {

private:
template <HelloWorldClientHelloWorldServer type, typename T>
std::vector<zmq::message_t> makeRequestPacket(uint64_t req_id, T&& t) {
std::vector<zmq::message_t> makeRequestPacket(const std::string& req_id,
T&& t) {
auto header = std::make_tuple(req_id, type);
auto buffer = frpc::pack<decltype(header)>(header);
auto packet = frpc::pack<T>(std::forward<T>(t));
Expand All @@ -404,7 +405,7 @@ class HelloWorldClient final {
return snd_bufs;
}

void callTimeoutCallback(uint64_t req_id) {
void callTimeoutCallback(const std::string& req_id) {
std::unique_lock lk(m_mtx);
if (m_timeout_cb.find(req_id) == m_timeout_cb.end())
return;
Expand All @@ -422,7 +423,7 @@ class HelloWorldClient final {
}
try {
using FrpcHeader =
std::tuple<uint64_t, HelloWorldClientHelloWorldServer>;
std::tuple<std::string, HelloWorldClientHelloWorldServer>;
auto [req_id, req_type] =
frpc::unpack<FrpcHeader>(recv_bufs[0].data(),
recv_bufs[0].size());
Expand Down Expand Up @@ -467,9 +468,8 @@ class HelloWorldClient final {
std::unique_ptr<frpc::BiChannel> m_channel;
std::function<void(std::string)> m_error;
std::mutex m_mtx;
std::unordered_map<uint64_t, std::any> m_cb;
std::unordered_map<uint64_t, std::function<void()>> m_timeout_cb;
std::atomic_uint64_t m_req_id{0};
std::unordered_map<std::string, std::any> m_cb;
std::unordered_map<std::string, std::function<void()>> m_timeout_cb;
};

struct HelloWorldServerHandler {
Expand Down Expand Up @@ -663,7 +663,7 @@ class HelloWorldServer final {
}
try {
using FrpcHeader =
std::tuple<uint64_t, HelloWorldClientHelloWorldServer>;
std::tuple<std::string, HelloWorldClientHelloWorldServer>;
[[maybe_unused]] auto [req_id, req_type] =
frpc::unpack<FrpcHeader>(recv_bufs[1].data(),
recv_bufs[1].size());
Expand Down Expand Up @@ -1304,7 +1304,7 @@ class StreamClient final {
}

auto hello_world() {
auto req_id = m_req_id.fetch_add(1);
auto req_id = frpc::createUuid();
auto header =
std::make_tuple(req_id, StreamClientStreamServer::hello_world);

Expand Down Expand Up @@ -1383,7 +1383,7 @@ class StreamClient final {
}
try {
using FrpcHeader =
std::tuple<uint64_t, StreamClientStreamServer, bool>;
std::tuple<std::string, StreamClientStreamServer, bool>;
auto [req_id, req_type, is_close] =
frpc::unpack<FrpcHeader>(recv_bufs[0].data(),
recv_bufs[0].size());
Expand Down Expand Up @@ -1424,9 +1424,8 @@ class StreamClient final {
std::unique_ptr<frpc::BiChannel> m_channel;
std::function<void(std::string)> m_error;
std::mutex m_mtx;
std::unordered_map<uint64_t, std::any> m_cb;
std::unordered_map<uint64_t, std::function<void()>> m_close_cb;
std::atomic_uint64_t m_req_id{0};
std::unordered_map<std::string, std::any> m_cb;
std::unordered_map<std::string, std::function<void()>> m_close_cb;
std::unique_ptr<frpc::ContextPool> m_pool_ptr;
};

Expand Down Expand Up @@ -1570,7 +1569,8 @@ class StreamServer final {
return;
}
try {
using FrpcHeader = std::tuple<uint64_t, StreamClientStreamServer>;
using FrpcHeader =
std::tuple<std::string, StreamClientStreamServer>;
auto [req_id, req_type] =
frpc::unpack<FrpcHeader>(recv_bufs[1].data(),
recv_bufs[1].size());
Expand Down Expand Up @@ -1696,7 +1696,7 @@ class StreamServer final {
std::unique_ptr<frpc::BiChannel> m_channel;
std::mutex m_mtx;
std::unique_ptr<frpc::ContextPool> m_pool_ptr;
std::unordered_map<uint64_t, std::any> m_channel_mapping;
std::unordered_map<std::string, std::any> m_channel_mapping;
};

#endif
Expand Down
8 changes: 6 additions & 2 deletions out/include/impl/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ struct ChannelConfig {
std::size_t channel_size{50000};
};

inline std::string uniqueAddr() {
inline std::string createUuid() {
uuid_t uuid;
char s[37];
uuid_generate_random(uuid);
uuid_unparse(uuid, s);
return "inproc://" + std::string(s);
return std::string(s);
}

inline std::string uniqueAddr() {
return "inproc://" + createUuid();
}

template <typename>
Expand Down
19 changes: 9 additions & 10 deletions template/cpp/bi.inja
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public:
{% for func in value.definitions %}

void {{func.func_name}}({{_format_args(func.inputs)}}, std::function<void({{_format_args_type(func.outputs)}})> cb) {
auto req_id = m_req_id.fetch_add(1);
auto req_id = frpc::createUuid();
auto snd_bufs = makeRequestPacket<{{value.caller}}{{value.callee}}::{{func.func_name}}>(
req_id,
std::make_tuple({{_format_args_name_and_move(func.inputs)}}));
Expand All @@ -69,7 +69,7 @@ public:
std::function<void({{_format_args_type(func.outputs)}})> cb,
const std::chrono::milliseconds& timeout,
std::function<void()> timeout_cb) {
auto req_id = m_req_id.fetch_add(1);
auto req_id = frpc::createUuid();
auto snd_bufs = makeRequestPacket<{{value.caller}}{{value.callee}}::{{func.func_name}}>(
req_id,
std::make_tuple({{_format_args_name_and_move(func.inputs)}}));
Expand Down Expand Up @@ -176,7 +176,7 @@ public:

private:
template <{{value.caller}}{{value.callee}} type, typename T>
std::vector<zmq::message_t> makeRequestPacket(uint64_t req_id, T&& t) {
std::vector<zmq::message_t> makeRequestPacket(const std::string& req_id, T&& t) {
auto header = std::make_tuple(req_id, type);
auto buffer = frpc::pack<decltype(header)>(header);
auto packet = frpc::pack<T>(std::forward<T>(t));
Expand All @@ -187,7 +187,7 @@ private:
return snd_bufs;
}

void callTimeoutCallback(uint64_t req_id) {
void callTimeoutCallback(const std::string& req_id) {
std::unique_lock lk(m_mtx);
if (m_timeout_cb.find(req_id) == m_timeout_cb.end())
return;
Expand All @@ -204,7 +204,7 @@ private:
return;
}
try {
using FrpcHeader = std::tuple<uint64_t, {{value.caller}}{{value.callee}}>;
using FrpcHeader = std::tuple<std::string, {{value.caller}}{{value.callee}}>;
auto [req_id, req_type] = frpc::unpack<FrpcHeader>(recv_bufs[0].data(), recv_bufs[0].size());
std::unique_lock lk(m_mtx);
if (m_cb.find(req_id) == m_cb.end())
Expand Down Expand Up @@ -237,10 +237,9 @@ private:
std::unique_ptr<frpc::BiChannel> m_channel;
std::function<void(std::string)> m_error;
std::mutex m_mtx;
std::unordered_map<uint64_t, std::any> m_cb;
std::unordered_map<uint64_t, std::function<void()>> m_timeout_cb;
std::atomic_uint64_t m_req_id{0};
};
std::unordered_map<std::string, std::any> m_cb;
std::unordered_map<std::string, std::function<void()>> m_timeout_cb;
};

struct {{value.callee}}Handler {
{% for func in value.definitions %}
Expand Down Expand Up @@ -372,7 +371,7 @@ private:
return;
}
try {
using FrpcHeader = std::tuple<uint64_t, {{value.caller}}{{value.callee}}>;
using FrpcHeader = std::tuple<std::string, {{value.caller}}{{value.callee}}>;
[[maybe_unused]] auto [req_id, req_type] = frpc::unpack<FrpcHeader>(recv_bufs[1].data(), recv_bufs[1].size());
switch(req_type) {
{% for func in value.definitions %}
Expand Down
13 changes: 6 additions & 7 deletions template/cpp/bi_stream.inja
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public:
{% for func in value.definitions %}

auto {{func.func_name}}() {
auto req_id = m_req_id.fetch_add(1);
auto req_id = frpc::createUuid();
auto header = std::make_tuple(req_id, {{value.caller}}{{value.callee}}::{{func.func_name}});

auto buffer = frpc::pack<decltype(header)>(header);
Expand Down Expand Up @@ -132,7 +132,7 @@ private:
return;
}
try {
using FrpcHeader = std::tuple<uint64_t, {{value.caller}}{{value.callee}}, bool>;
using FrpcHeader = std::tuple<std::string, {{value.caller}}{{value.callee}}, bool>;
auto [req_id, req_type, is_close] = frpc::unpack<FrpcHeader>(recv_bufs[0].data(), recv_bufs[0].size());
if (is_close) {
std::unique_lock lk(m_mtx);
Expand Down Expand Up @@ -171,9 +171,8 @@ private:
std::unique_ptr<frpc::BiChannel> m_channel;
std::function<void(std::string)> m_error;
std::mutex m_mtx;
std::unordered_map<uint64_t, std::any> m_cb;
std::unordered_map<uint64_t, std::function<void()>> m_close_cb;
std::atomic_uint64_t m_req_id{0};
std::unordered_map<std::string, std::any> m_cb;
std::unordered_map<std::string, std::function<void()>> m_close_cb;
std::unique_ptr<frpc::ContextPool> m_pool_ptr;
};

Expand Down Expand Up @@ -284,7 +283,7 @@ private:
return;
}
try {
using FrpcHeader = std::tuple<uint64_t, {{value.caller}}{{value.callee}}>;
using FrpcHeader = std::tuple<std::string, {{value.caller}}{{value.callee}}>;
auto [req_id, req_type] = frpc::unpack<FrpcHeader>(recv_bufs[1].data(), recv_bufs[1].size());
switch(req_type) {
{% for func in value.definitions %}
Expand Down Expand Up @@ -375,7 +374,7 @@ private:
std::unique_ptr<frpc::BiChannel> m_channel;
std::mutex m_mtx;
std::unique_ptr<frpc::ContextPool> m_pool_ptr;
std::unordered_map<uint64_t, std::any> m_channel_mapping;
std::unordered_map<std::string, std::any> m_channel_mapping;
};

#endif
Expand Down
Loading

0 comments on commit aefbf8d

Please sign in to comment.