Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
feat: better frame
Browse files Browse the repository at this point in the history
  • Loading branch information
gaowanlu committed Mar 14, 2024
1 parent c269bae commit 4e7ea0a
Show file tree
Hide file tree
Showing 24 changed files with 366 additions and 271 deletions.
33 changes: 20 additions & 13 deletions src/app/websocket_app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,8 @@ void websocket_app::process_connection(tubekit::connection::websocket_connection
}
frame.payload_data = std::move(payload_data);

// broadcast
singleton<connection_mgr>::instance()->for_each(
[&frame](connection::connection &conn) -> void
{
websocket_connection *ptr_conn = static_cast<websocket_connection *>(&conn);
websocket_app::send_packet(*ptr_conn, frame.payload_data.c_str(), frame.payload_length, false);
});

// websocket_app::send_packet(m_websocket_connection, frame.payload_data.c_str(), frame.payload_length, false);
// pingpong
websocket_app::send_packet(nullptr, frame.payload_data.c_str(), frame.payload_length, m_websocket_connection.get_gid());
// frame.payload_data.push_back(0);
// LOG_ERROR("%s", frame.payload_data.c_str());
m_websocket_connection.m_recv_buffer.read_ptr_move_n(index - start_index + frame.payload_length);
Expand All @@ -240,7 +233,7 @@ void websocket_app::on_new_connection(tubekit::connection::websocket_connection
LOG_ERROR("on_new_connection");
}

bool websocket_app::send_packet(tubekit::connection::websocket_connection &m_websocket_connection, const char *data, size_t data_len, bool use_safe)
bool websocket_app::send_packet(tubekit::connection::websocket_connection *m_websocket_connection, const char *data, size_t data_len, uint64_t gid /*= 0*/)
{
if (!data)
{
Expand Down Expand Up @@ -272,9 +265,23 @@ bool websocket_app::send_packet(tubekit::connection::websocket_connection &m_web

frame.insert(frame.end(), data, data + data_len);

if (!use_safe)
if (0 == gid && m_websocket_connection)
{
return m_websocket_connection.send((const char *)frame.data(), frame.size());
return m_websocket_connection->send((const char *)frame.data(), frame.size());
}
return singleton<connection_mgr>::instance()->safe_send(m_websocket_connection.get_socket_ptr(), (const char *)frame.data(), frame.size());
else
{
bool res = false;
singleton<connection_mgr>::instance()->if_exist(
gid,
[&res, &frame](uint64_t key, std::pair<tubekit::socket::socket *, tubekit::connection::connection *> value)
{
tubekit::connection::websocket_connection *p_wsconn = (tubekit::connection::websocket_connection *)(value.second);
res = p_wsconn->send((const char *)frame.data(), frame.size());
},
nullptr);
return res;
}

return false;
}
3 changes: 2 additions & 1 deletion src/app/websocket_app.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <string>
#include <cstdint>
#include "connection/websocket_connection.h"

namespace tubekit
Expand All @@ -12,7 +13,7 @@ namespace tubekit
static void process_connection(tubekit::connection::websocket_connection &m_websocket_connection);
static void on_close_connection(tubekit::connection::websocket_connection &m_websocket_connection);
static void on_new_connection(tubekit::connection::websocket_connection &m_websocket_connection);
static bool send_packet(tubekit::connection::websocket_connection &m_websocket_connection, const char *data, size_t data_len, bool use_safe);
static bool send_packet(tubekit::connection::websocket_connection *m_websocket_connection, const char *data, size_t data_len, uint64_t gid = 0);

static int on_init();
static void on_stop();
Expand Down
234 changes: 117 additions & 117 deletions src/connection/connection_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,175 +13,173 @@ using tubekit::app::websocket_app;
using tubekit::connection::connection;
using tubekit::connection::connection_mgr;
using tubekit::connection::http_connection;
using tubekit::connection::safe_mapping;
using tubekit::connection::stream_connection;
using tubekit::connection::websocket_connection;
using tubekit::task::task_type;
using tubekit::utility::object_pool;
using tubekit::utility::singleton;

connection_mgr::connection_mgr()
safe_mapping::safe_mapping()
{
}

connection_mgr::~connection_mgr()
safe_mapping::~safe_mapping()
{
}

bool connection_mgr::add(void *index_ptr, connection *conn_ptr)
void safe_mapping::if_exist(uint64_t gid,
std::function<void(uint64_t, std::pair<socket::socket *, connection *>)> succ_callback,
std::function<void(uint64_t)> failed_callback)
{
tubekit::thread::auto_lock lock(m_mutex);
auto res = m_map.find(index_ptr);
if (res != m_map.end() && res->second == conn_ptr)
lock.lock();
auto iter = gid2pair.find(gid);
if (iter != gid2pair.end())
{
return true;
if (succ_callback)
{
succ_callback(iter->first, iter->second);
}
}
if (res != m_map.end())
else
{
return false;
if (failed_callback)
{
failed_callback(gid);
}
}
m_map.insert({index_ptr, conn_ptr});
return true;
lock.unlock();
}

void connection_mgr::on_new_connection(void *index_ptr)
void safe_mapping::remove(uint64_t gid,
std::function<void(uint64_t, std::pair<socket::socket *, connection *>)> succ_callback,
std::function<void(uint64_t)> failed_callback)
{
tubekit::thread::auto_lock lock(m_mutex);
auto res = m_map.find(index_ptr);
if (res == m_map.end())
{
return;
}
if (!res->second)
{
return;
}
if (is_stream(res->second))
{
stream_app::on_new_connection(*convert_to_stream(res->second));
lock.lock();
auto iter = gid2pair.find(gid);
if (iter != gid2pair.end())
{
auto first = iter->first;
auto second = iter->second;
gid2pair.erase(iter);
if (succ_callback)
{
succ_callback(first, second);
}
}
if (is_websocket(res->second))
else
{
websocket_app::on_new_connection(*convert_to_websocket(res->second));
if (failed_callback)
{
failed_callback(gid);
}
}
lock.unlock();
}

bool connection_mgr::remove(void *index_ptr)
void safe_mapping::insert(uint64_t gid,
std::pair<socket::socket *, connection *> value,
std::function<void(uint64_t, std::pair<socket::socket *, connection *>)> succ_callback,
std::function<void(uint64_t, std::pair<socket::socket *, connection *>)> failed_callback)
{
tubekit::thread::auto_lock lock(m_mutex);
auto res = m_map.find(index_ptr);
if (res != m_map.end())
lock.lock();
auto iter = gid2pair.find(gid);
if (iter != gid2pair.end())
{
res->second->close_before();
if (is_stream(res->second))
if (failed_callback)
{
stream_app::on_close_connection(*convert_to_stream(res->second));
failed_callback(gid, value);
}
if (is_websocket(res->second))
}
else
{
gid2pair[gid] = value;
if (succ_callback)
{
websocket_app::on_close_connection(*convert_to_websocket(res->second));
succ_callback(gid, value);
}
// connection back to pool
release(res->second);

m_map.erase(res);
return true;
}
return false;
lock.unlock();
}

bool connection_mgr::has(void *index_ptr)
void connection_mgr::if_exist(uint64_t gid,
std::function<void(uint64_t, std::pair<socket::socket *, connection *>)> succ_callback,
std::function<void(uint64_t)> failed_callback)
{
tubekit::thread::auto_lock lock(m_mutex);
auto res = m_map.find(index_ptr);
if (res != m_map.end())
{
return true;
}
return false;
uint64_t hash_idx = gid % m_thread_size;
m_safe_mapping[hash_idx].if_exist(gid, succ_callback, failed_callback);
}

void connection_mgr::for_each(std::function<void(connection &conn)> callback)
void connection_mgr::remove(uint64_t gid,
std::function<void(uint64_t, std::pair<socket::socket *, connection *>)> succ_callback,
std::function<void(uint64_t)> failed_callback)
{
tubekit::thread::auto_lock lock(m_mutex);
if (callback)
{
for (auto p : m_map)
uint64_t hash_idx = gid % m_thread_size;
m_safe_mapping[hash_idx].remove(
gid,
[&succ_callback](uint64_t key, std::pair<socket::socket *, connection *> value)
{
callback(*p.second);
}
}
if (is_stream(value.second))
{
stream_app::on_close_connection(*convert_to_stream(value.second));
}
if (is_websocket(value.second))
{
websocket_app::on_close_connection(*convert_to_websocket(value.second));
}
succ_callback(key, value);
},
failed_callback);
}

connection *connection_mgr::get(void *index_ptr)
void connection_mgr::insert(uint64_t gid,
std::pair<socket::socket *, connection *> value,
std::function<void(uint64_t, std::pair<socket::socket *, connection *>)> succ_callback,
std::function<void(uint64_t, std::pair<socket::socket *, connection *>)> failed_callback)
{
tubekit::thread::auto_lock lock(m_mutex);
auto res = m_map.find(index_ptr);
if (res != m_map.end())
{
return res->second;
}
return nullptr;
uint64_t hash_idx = gid % m_thread_size;
m_safe_mapping[hash_idx].insert(gid, value, succ_callback, failed_callback);
}

bool connection_mgr::safe_send(void *index_ptr, const char *buffer, size_t len)
connection_mgr::connection_mgr()
{
if (!buffer || 0 == len || !index_ptr)
{
return false;
}
tubekit::thread::auto_lock lock(m_mutex);
auto res = m_map.find(index_ptr);
if (res == m_map.end())
{
LOG_ERROR("res == m_map.end()");
return false;
}

if (res->second->get_socket_ptr() != index_ptr)
{
LOG_ERROR("res->second->get_socket_ptr() != index_ptr");
return false;
}

tubekit::socket::socket *psocket = static_cast<tubekit::socket::socket *>(index_ptr);
if (psocket->get_gid() != res->second->get_gid())
{
LOG_ERROR("psocket->get_gid() != res->second->get_gid()");
return false;
}
}

if (is_stream(res->second))
connection_mgr::~connection_mgr()
{
if (m_safe_mapping)
{
stream_connection *stream_conn = convert_to_stream(res->second);
if (!stream_conn)
{
return false;
}
return stream_conn->send(buffer, len);
delete[] m_safe_mapping;
}
}

if (is_websocket(res->second))
{
websocket_connection *websocket_conn = convert_to_websocket(res->second);
if (!websocket_conn)
void connection_mgr::on_new_connection(uint64_t gid)
{
if_exist(
gid,
[](uint64_t key, std::pair<tubekit::socket::socket *, tubekit::connection::connection *> value)
{
return false;
}
return websocket_conn->send(buffer, len);
}
return false;
if (is_stream(value.second))
{
stream_app::on_new_connection(*convert_to_stream(value.second));
}
if (is_websocket(value.second))
{
websocket_app::on_new_connection(*convert_to_websocket(value.second));
}
},
nullptr);
}

bool connection_mgr::mark_close(void *index_ptr)
void connection_mgr::mark_close(uint64_t gid)
{
tubekit::thread::auto_lock lock(m_mutex);
auto res = m_map.find(index_ptr);
if (res != m_map.end())
{
res->second->mark_close();
return true;
}
return false;
if_exist(
gid,
[](uint64_t key, std::pair<tubekit::socket::socket *, tubekit::connection::connection *> value)
{
value.second->mark_close();
},
nullptr);
}

http_connection *connection_mgr::convert_to_http(connection *conn_ptr)
Expand Down Expand Up @@ -262,9 +260,11 @@ bool connection_mgr::is_websocket(connection *conn_ptr)
return false;
}

int connection_mgr::init(tubekit::task::task_type task_type)
int connection_mgr::init(tubekit::task::task_type task_type, uint32_t thread_size)
{
m_task_type = task_type;
m_thread_size = thread_size;
m_safe_mapping = new safe_mapping[thread_size];
return 0;
}

Expand Down
Loading

0 comments on commit 4e7ea0a

Please sign in to comment.