Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
feat: http loop-accept
Browse files Browse the repository at this point in the history
  • Loading branch information
gaowanlu committed Apr 15, 2024
1 parent f62236b commit 2245f61
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 77 deletions.
3 changes: 2 additions & 1 deletion bin/config/main.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
ip = 0.0.0.0
port = 20023
threads = 6
max_conn = 1000
max_conn = 10000
# using wait_time setting tick time, 10ms
wait_time = 10
accept_per_tick = 50
http_static_dir = /tubekit_static
lua_dir = ./lua
task_type = HTTP_TASK
Expand Down
3 changes: 3 additions & 0 deletions src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ void server::config(const std::string &ip,
size_t threads,
size_t connects,
size_t wait_time,
size_t accept_per_tick,
std::string task_type,
std::string http_static_dir,
std::string lua_dir,
Expand All @@ -297,6 +298,8 @@ void server::config(const std::string &ip,
set_threads(threads);
set_connects(connects);
set_wait_time(wait_time);
set_accept_per_tick(accept_per_tick);

set_task_type(task_type);
set_http_static_dir(http_static_dir);
set_lua_dir(lua_dir);
Expand Down
11 changes: 11 additions & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ namespace tubekit
void set_threads(size_t threads);
void set_connects(size_t connects);
void set_wait_time(size_t wait_time);
inline void set_accept_per_tick(size_t accept_per_tick)
{
m_accept_per_tick = accept_per_tick;
}
inline size_t get_accept_per_tick() const
{
return m_accept_per_tick;
}
void set_task_type(std::string task_type);

void set_http_static_dir(std::string http_static_dir);
Expand All @@ -46,6 +54,7 @@ namespace tubekit
size_t threads,
size_t connects,
size_t wait_time,
size_t accept_per_tick,
std::string task_type,
std::string http_static_dir,
std::string lua_dir,
Expand All @@ -64,6 +73,8 @@ namespace tubekit
size_t m_threads{0};
size_t m_connects{0};
size_t m_wait_time{0};
size_t m_accept_per_tick{0};

std::string m_http_static_dir{};
std::string m_lua_dir{};

Expand Down
157 changes: 81 additions & 76 deletions src/socket/socket_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ void socket_handler::handle()
return;
}

const size_t accept_per_tick = singleton<tubekit::server::server>::instance()->get_accept_per_tick();

time::time socket_handler_time;
socket_handler_time.update();
uint64_t lastest_tick_time = socket_handler_time.get_seconds();
Expand Down Expand Up @@ -261,95 +263,98 @@ void socket_handler::handle()
// There is a new socket connection
if (m_server == now_loop_socket)
{
int socket_fd = m_server->accept(); // Gets the socket_fd for the new connection
if (socket_fd <= 0)
{
continue;
}
socket *socket_object = alloc_socket();
if (socket_object == nullptr)
for (size_t accept_loop_idx = 0; accept_loop_idx < accept_per_tick; accept_loop_idx++)
{
::close(socket_fd);
continue;
}
++gid_seq;
// Almost impossible to process 4294967295 connections in one second
uint64_t loop_gid = (lastest_tick_time << 32) + gid_seq;
socket_object->m_sockfd = socket_fd;
socket_object->set_gid(loop_gid);
socket_object->close_callback = nullptr;
socket_object->set_non_blocking();
socket_object->set_linger(false, 0);
socket_object->set_send_buffer(65536);
socket_object->set_recv_buffer(65536);

if (singleton<server::server>::instance()->get_use_ssl())
{
bool ssl_err = false;
SSL *ssl_instance = SSL_new(singleton<server::server>::instance()->get_ssl_ctx());
if (!ssl_instance)
int socket_fd = m_server->accept(); // Gets the socket_fd for the new connection
if (socket_fd <= 0)
{
ssl_err = true;
LOG_ERROR("SSL_new return NULL");
break; // stop accept
}
if (!ssl_err && 1 != SSL_set_fd(ssl_instance, socket_object->m_sockfd))
socket *socket_object = alloc_socket();
if (socket_object == nullptr)
{
ssl_err = true;
LOG_ERROR("SSL_set_fd error: %s", ERR_error_string(ERR_get_error(), nullptr));
::close(socket_fd);
break; // stop accept
}
// ssl_instance bind to socket_object
socket_object->set_ssl_instance(ssl_instance);
if (ssl_err)
++gid_seq;
// Almost impossible to process 4294967295 connections in one second
uint64_t loop_gid = (lastest_tick_time << 32) + gid_seq;
socket_object->m_sockfd = socket_fd;
socket_object->set_gid(loop_gid);
socket_object->close_callback = nullptr;
socket_object->set_non_blocking();
socket_object->set_linger(false, 0);
socket_object->set_send_buffer(65536);
socket_object->set_recv_buffer(65536);

if (singleton<server::server>::instance()->get_use_ssl())
{
LOG_ERROR("SSL ERR");
push_wait_remove(socket_object);
continue;
bool ssl_err = false;
SSL *ssl_instance = SSL_new(singleton<server::server>::instance()->get_ssl_ctx());
if (!ssl_instance)
{
ssl_err = true;
LOG_ERROR("SSL_new return NULL");
}
if (!ssl_err && 1 != SSL_set_fd(ssl_instance, socket_object->m_sockfd))
{
ssl_err = true;
LOG_ERROR("SSL_set_fd error: %s", ERR_error_string(ERR_get_error(), nullptr));
}
// ssl_instance bind to socket_object
socket_object->set_ssl_instance(ssl_instance);
if (ssl_err)
{
LOG_ERROR("SSL ERR");
push_wait_remove(socket_object);
continue;
}
}
}

// create connection layer instance
connection::connection *p_connection = singleton<connection_mgr>::instance()->create();

if (p_connection == nullptr)
{
LOG_ERROR("p_connection == nullptr");
push_wait_remove(socket_object);
continue;
}
else
{
p_connection->reuse();
p_connection->set_socket_ptr(socket_object);
p_connection->set_gid(loop_gid);
}
// create connection layer instance
connection::connection *p_connection = singleton<connection_mgr>::instance()->create();

bool res = false;
singleton<connection_mgr>::instance()->insert(
loop_gid, {socket_object, p_connection},
[&res](uint64_t key, std::pair<tubekit::socket::socket *, tubekit::connection::connection *> value)
if (p_connection == nullptr)
{
res = true;
},
nullptr);
LOG_ERROR("p_connection == nullptr");
push_wait_remove(socket_object);
break; // stop accept
}
else
{
p_connection->reuse();
p_connection->set_socket_ptr(socket_object);
p_connection->set_gid(loop_gid);
}

if (false == res)
{
LOG_ERROR("singleton<connection_mgr>::instance()->insert error");
singleton<connection_mgr>::instance()->release(p_connection);
push_wait_remove(socket_object);
continue;
}
bool res = false;
singleton<connection_mgr>::instance()->insert(
loop_gid, {socket_object, p_connection},
[&res](uint64_t key, std::pair<tubekit::socket::socket *, tubekit::connection::connection *> value)
{
res = true;
},
nullptr);

// on_new_connection hook will be executed when it's get_ssl_accepted status first
// if not using openssl
if (!singleton<server::server>::instance()->get_use_ssl())
{
// triger new connection hook
singleton<connection_mgr>::instance()->on_new_connection(loop_gid);
}
if (false == res)
{
LOG_ERROR("singleton<connection_mgr>::instance()->insert error");
singleton<connection_mgr>::instance()->release(p_connection);
push_wait_remove(socket_object);
continue;
}

// on_new_connection hook will be executed when it's get_ssl_accepted status first
// if not using openssl
if (!singleton<server::server>::instance()->get_use_ssl())
{
// triger new connection hook
singleton<connection_mgr>::instance()->on_new_connection(loop_gid);
}

// first connected, try listen write and process
do_task(loop_gid, true, true);
// first connected, try listen write and process
do_task(loop_gid, true, true);
} // for (size_t accept_loop_idx; accept_loop_idx < accept_per_tick; accept_loop_idx++)
}
else // already connection socket has event happen
{
Expand Down
3 changes: 3 additions & 0 deletions src/system/system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ void system::init()
const int threads = (*ini)["server"]["threads"];
const int max_conn = (*ini)["server"]["max_conn"];
const int wait_time = (*ini)["server"]["wait_time"];
const int accept_per_tick = (*ini)["server"]["accept_per_tick"];

const string task_type = (*ini)["server"]["task_type"];

const string http_static_dir = (*ini)["server"]["http_static_dir"];
Expand Down Expand Up @@ -76,6 +78,7 @@ void system::init()
port,
threads,
max_conn,
accept_per_tick,
wait_time,
task_type,
http_static_dir,
Expand Down

0 comments on commit 2245f61

Please sign in to comment.