Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
update: protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
gaowanlu committed Oct 28, 2023
1 parent ae8eaf7 commit ef88e99
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 56 deletions.
4 changes: 2 additions & 2 deletions bin/config/main.ini
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[server]
ip = 0.0.0.0
port = 20023
threads = 3
threads = 1
max_conn = 20000
wait_time = 10
#task_type = HTTP_TASK
task_type = STREAM_TASK
# now support HTTP_TASK or STREAM_TASK
daemon = 1
daemon = 0

[client]
threads = 10
34 changes: 5 additions & 29 deletions client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ int main(int argc, const char **argv)
std::cout << "arg<2" << std::endl;
return 1;
}
ProtoMessageHead message;
ProtoPackage message;
ProtoExampleReq exampleReq;
std::string send_str(argv[1]);
exampleReq.set_testcontext(send_str);
message.set_cmd(ProtoCmd::EXAMPLE_REQ);

const char *server_ip = "61.171.51.135";
const char *server_ip = "172.29.94.203";
int server_port = 20023;

int client_socket = socket(AF_INET, SOCK_STREAM, 0);
Expand All @@ -54,26 +54,13 @@ int main(int argc, const char **argv)
google::protobuf::io::ArrayOutputStream output_stream_1(&body_str[0], body_str.size());
google::protobuf::io::CodedOutputStream coded_output_1(&output_stream_1);
exampleReq.SerializeToCodedStream(&coded_output_1);
message.set_bodylen(body_str.size());
message.set_body(body_str);
std::string data = message.SerializePartialAsString();

uint64_t headLen = data.size();
headLen = htobe64(headLen);
uint64_t packageLen = data.size();

for (int i = 0; i < 5; i++)
{

if (send(client_socket, &headLen, sizeof(headLen), 0) != sizeof(headLen))
{
perror("Send Head failed");
close(client_socket);
return 1;
}
else
{
printf("Send HeadLen succ\n");
}

if (send(client_socket, data.c_str(), data.size(), 0) != data.size())
{
perror("Send Head failed");
Expand All @@ -85,18 +72,7 @@ int main(int argc, const char **argv)
printf("Send Head succ\n");
}

if (send(client_socket, body_str.c_str(), body_str.size(), 0) != body_str.size())
{
perror("Send Body failed");
close(client_socket);
return 1;
}
else
{
printf("Send Body succ\n");
}

printf("Send all bytes head %ld body %ld all %ld\n", data.size(), body_str.size(), data.size() + body_str.size());
printf("Send all bytes package %ld body %ld all %ld\n", data.size(), body_str.size(), data.size() + body_str.size());
}

close(client_socket);
Expand Down
4 changes: 2 additions & 2 deletions protocol/proto_message_head.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
syntax = "proto3";

message ProtoMessageHead{
message ProtoPackage{
int32 cmd = 1;
int32 bodyLen = 2;
bytes body = 2;
}
35 changes: 12 additions & 23 deletions src/app/stream_app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ namespace tubekit::app
tubekit::thread::mutex global_player_mutex;
}

int process_protocol(tubekit::connection::stream_connection &m_stream_connection, int cmd, std::string &&packet)
int process_protocol(tubekit::connection::stream_connection &m_stream_connection, ProtoPackage &package)
{
// EXAMPLE_REQ
if (cmd == ProtoCmd::EXAMPLE_REQ)
if (package.cmd() == ProtoCmd::EXAMPLE_REQ)
{
ProtoExampleReq exampleReq;
if (exampleReq.ParsePartialFromArray(packet.c_str(), packet.size()))
if (exampleReq.ParseFromString(package.body()))
{
std::cout << exampleReq.testcontext() << std::endl;
// std::cout << exampleReq.testcontext() << std::endl;
}
else
{
Expand All @@ -58,39 +58,28 @@ void stream_app::process_connection(tubekit::connection::stream_connection &m_st
{
char *tmp_buffer = all_data_buffer + offset;
uint64_t data_len = all_data_len - offset;
if (data_len <= sizeof(uint64_t))
if (data_len == 0)
{
break;
}

uint64_t headLen = *(uint64_t *)tmp_buffer;
headLen = be64toh(headLen);
if (headLen + sizeof(uint64_t) > data_len)
{
break;
}

ProtoMessageHead protoHead;
if (!protoHead.ParseFromArray(tmp_buffer + sizeof(uint64_t), headLen))
ProtoPackage protoPackage;
if (!protoPackage.ParseFromArray(tmp_buffer, data_len))
{
// std::cout << "protoPackage.ParseFromArray failed" << std::endl;
m_stream_connection.mark_close();
break;
}
uint32_t bodyLen = protoHead.bodylen();
uint32_t cmd = protoHead.cmd();

if (data_len < sizeof(uint64_t) + headLen + bodyLen)
{
break;
}

if (0 != process_protocol(m_stream_connection, cmd, std::move(std::string(tmp_buffer + sizeof(uint64_t) + headLen, bodyLen))))
if (0 != process_protocol(m_stream_connection, protoPackage))
{
// std::cout << "process_protocol failed" << std::endl;
m_stream_connection.mark_close();
m_stream_connection.m_recv_buffer.clear();
break;
}
offset += sizeof(uint64_t) + headLen + bodyLen;
// std::cout << "datalen " << data_len << " package size " << protoPackage.ByteSizeLong() << std::endl;
offset += protoPackage.ByteSizeLong();
}

if (!m_stream_connection.m_recv_buffer.read_ptr_move_n(offset))
Expand Down

0 comments on commit ef88e99

Please sign in to comment.