Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
feat: One stream connection actively sends data to other stream conne…
Browse files Browse the repository at this point in the history
…ctions
  • Loading branch information
gaowanlu committed Sep 26, 2023
1 parent f62d21b commit 3bb856e
Show file tree
Hide file tree
Showing 16 changed files with 155 additions and 54 deletions.
21 changes: 21 additions & 0 deletions external/tubekit-buffer/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,25 @@ void buffer::clear()
std::lock_guard<std::mutex> guard(m_mutex);
m_read_ptr = m_buffer;
m_write_ptr = m_buffer;
}

char *buffer::get_read_ptr()
{
return m_read_ptr;
}

u_int64_t buffer::copy_all(char *out, u_int64_t out_len)
{
if (!out)
{
return 0;
}
std::lock_guard<std::mutex> guard(m_mutex);
uint64_t all_bytes = can_readable_size();
if (out_len < all_bytes)
{
return 0;
}
memcpy(out, get_read_ptr(), all_bytes);
return all_bytes;
}
2 changes: 2 additions & 0 deletions external/tubekit-buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace tubekit
void set_limit_max(u_int64_t limit_max);
u_int64_t get_limit_max();
void clear();
u_int64_t copy_all(char *out, u_int64_t out_len);

private:
u_int64_t m_limit_max;
Expand All @@ -37,6 +38,7 @@ namespace tubekit
bool check_and_write(const char *source, u_int64_t size);
void move_to_before();
u_int64_t after_size();
char *get_read_ptr();
};
}
}
58 changes: 37 additions & 21 deletions src/app/stream_app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,57 @@
#include "proto_res/proto_message_head.pb.h"
#include <tubekit-log/logger.h>
#include <string>
#include <set>
#include "thread/mutex.h"
#include "utility/singleton.h"
#include "connection/connection_mgr.h"

using tubekit::app::stream_app;
using tubekit::connection::connection_mgr;
using tubekit::connection::stream_connection;
using tubekit::utility::singleton;

namespace tubekit::app
{
std::set<void *> global_player;
tubekit::thread::mutex global_player_mutex;
}

void stream_app::process_connection(tubekit::connection::stream_connection &m_stream_connection)
{
using tubekit::app::global_player;
using tubekit::app::global_player_mutex;
uint64_t data_len = m_stream_connection.m_recv_buffer.can_readable_size();
char *tmp_buffer = new char[data_len];
m_stream_connection.m_recv_buffer.copy_all(tmp_buffer, data_len);
m_stream_connection.m_recv_buffer.clear();

// ProtoMessageHead protoMessageHead;
// protoMessageHead.set_cmd(0);

// std::string buffer;

// ProtoExampleReq protoExampleReq;
// protoExampleReq.set_testcontext("hello world");

// protoMessageHead.set_bodylen(buffer.size());

// protoMessageHead.SerializeToString(&buffer);
// m_stream_connection.send(buffer.data(), buffer.size());
// buffer.clear();

// protoExampleReq.SerializeToString(&buffer);

// m_stream_connection.send(buffer.data(), buffer.size());
// buffer.clear();
global_player_mutex.lock();
for (auto socket_ptr : global_player)
{
if (socket_ptr != m_stream_connection.get_socket_ptr())
{
bool b_ret = singleton<connection_mgr>::instance()->safe_send(socket_ptr, tmp_buffer, data_len);
}
}
global_player_mutex.unlock();
delete tmp_buffer;
// m_stream_connection.mark_close();
}

void stream_app::on_close_connection(tubekit::connection::stream_connection &m_stream_connection)
{
// LOG_ERROR("stream connection close addr=[%p]", &m_stream_connection);
using tubekit::app::global_player;
using tubekit::app::global_player_mutex;
global_player_mutex.lock();
global_player.erase(m_stream_connection.get_socket_ptr());
global_player_mutex.unlock();
}

void stream_app::on_new_connection(tubekit::connection::stream_connection &m_stream_connection)
{
// LOG_ERROR("stream connection new conn addr=[%p]", &m_stream_connection);
using tubekit::app::global_player;
using tubekit::app::global_player_mutex;
global_player_mutex.lock();
global_player.insert(m_stream_connection.get_socket_ptr());
global_player_mutex.unlock();
}
36 changes: 18 additions & 18 deletions src/app/tick.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ using namespace tubekit::connection;

void tick::run()
{
static time_t t = 0;
time_t now = time(NULL);
if (0 == t)
{
t = now;
}
if (now - t >= 1)
{
t = now;
singleton<connection_mgr>::instance()->for_each([](connection::connection &conn)
{
if (connection_mgr::is_stream(&conn))
{
connection_mgr::convert_to_stream(&conn)->send("tick\n",5);
//conn.mark_close();
}
return; });
}
// static time_t t = 0;
// time_t now = time(NULL);
// if (0 == t)
// {
// t = now;
// }
// if (now - t >= 1)
// {
// t = now;
// singleton<connection_mgr>::instance()->for_each([](connection::connection &conn)
// {
// if (connection_mgr::is_stream(&conn))
// {
// connection_mgr::convert_to_stream(&conn)->send("tick\n",5);
// //conn.mark_close();
// }
// return; });
// }
}
15 changes: 12 additions & 3 deletions src/connection/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

using tubekit::connection::connection;

connection::connection() : close_flag(false)
connection::connection(tubekit::socket::socket *socket_ptr) : close_flag(false),
socket_ptr(socket_ptr)
{
}

Expand All @@ -16,11 +17,19 @@ void connection::close_before()

void connection::mark_close()
{
close_flag = true;
on_mark_close();
if (close_flag == false)
{
close_flag = true;
on_mark_close();
}
}

bool connection::is_close()
{
return close_flag;
}

tubekit::socket::socket *connection::get_socket_ptr()
{
return socket_ptr;
}
7 changes: 6 additions & 1 deletion src/connection/connection.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include "socket/socket.h"

namespace tubekit
{
Expand All @@ -7,7 +8,7 @@ namespace tubekit
class connection
{
public:
connection();
connection(tubekit::socket::socket *socket_ptr);
virtual ~connection();
virtual void close_before();

Expand All @@ -21,9 +22,13 @@ namespace tubekit
*/
void mark_close();
bool is_close();
tubekit::socket::socket *get_socket_ptr();

private:
bool close_flag;

protected:
tubekit::socket::socket *socket_ptr;
};
}
}
24 changes: 24 additions & 0 deletions src/connection/connection_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,30 @@ connection *connection_mgr::get(void *index_ptr)
return nullptr;
}

bool connection_mgr::safe_send(void *index_ptr, const char *buffer, size_t len)
{
if (!buffer || 0 == len)
{
return false;
}
tubekit::thread::auto_lock lock(m_mutex);
auto res = m_map.find(index_ptr);
if (res == m_map.end())
{
return false;
}
if (!is_stream(res->second))
{
return false;
}
stream_connection *stream_conn = convert_to_stream(res->second);
if (!stream_conn)
{
return false;
}
return stream_conn->send(buffer, len);
}

bool connection_mgr::mark_close(void *index_ptr)
{
tubekit::thread::auto_lock lock(m_mutex);
Expand Down
14 changes: 14 additions & 0 deletions src/connection/connection_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ namespace tubekit::connection
bool mark_close(void *index_ptr);
void for_each(std::function<void(connection &conn)> callback);

/**
* @brief Sending data to a stream connection can prevent the problem
* of using a null pointer in the connection.
* In addition to processing the allocated connection
* within the worker thread, safe_send should be used
*
* @param index_ptr
* @param buffer
* @param len
* @return true
* @return false
*/
bool safe_send(void *index_ptr, const char *buffer, size_t len);

public:
static http_connection *convert_to_http(connection *conn_ptr);
static stream_connection *convert_to_stream(connection *conn_ptr);
Expand Down
4 changes: 2 additions & 2 deletions src/connection/http_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
using namespace std;
using namespace tubekit::connection;

http_connection::http_connection(tubekit::socket::socket *socket_ptr) : m_buffer(2048),
socket_ptr(socket_ptr),
http_connection::http_connection(tubekit::socket::socket *socket_ptr) : connection(socket_ptr),
m_buffer(2048),
recv_end(false),
process_end(false),
buffer_used_len(0),
Expand Down
1 change: 0 additions & 1 deletion src/connection/http_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ namespace tubekit
bool process_end;
bool response_end;
bool everything_end;
tubekit::socket::socket *socket_ptr;
};
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/connection/stream_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ using tubekit::connection::stream_connection;
using tubekit::socket::socket_handler;
using tubekit::utility::singleton;

stream_connection::stream_connection(tubekit::socket::socket *socket_ptr) : socket_ptr(socket_ptr),
stream_connection::stream_connection(tubekit::socket::socket *socket_ptr) : connection(socket_ptr),
m_send_buffer(2048),
m_recv_buffer(2048),
m_wating_send_pack(2048)
Expand Down Expand Up @@ -151,7 +151,7 @@ bool stream_connection::buf2sock()
return false;
}

bool stream_connection::send(char *buffer, size_t buffer_size)
bool stream_connection::send(const char *buffer, size_t buffer_size)
{
if (buffer == nullptr)
{
Expand Down
14 changes: 10 additions & 4 deletions src/connection/stream_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,16 @@ namespace tubekit
bool buf2sock();

public:
bool send(char *buffer, size_t buffer_size);
/**
* @brief Only when processing stream_connection, the worker thread of the connection uses its own send,
* and if sending to other connections, the connection mgr's safe_send needs to be used.
*
* @param buffer
* @param buffer_size
* @return true
* @return false
*/
bool send(const char *buffer, size_t buffer_size);

public:
virtual void on_mark_close() override;
Expand All @@ -33,9 +42,6 @@ namespace tubekit
buffer::buffer m_send_buffer;
buffer::buffer m_recv_buffer;
buffer::buffer m_wating_send_pack;

private:
tubekit::socket::socket *socket_ptr;
};
}
}
1 change: 1 addition & 0 deletions src/socket/even_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void event_poller::create(int max_connections)
* EPOLLLT表示默认epoll工作模式
* @param op 操作选项 EPOLL_CTL_ADD,EPOLL_CTL_MOD,EPOLL_CTL_DEL
*/

int event_poller::ctrl(int fd, void *ptr, __uint32_t events, int op)
{
struct ::epoll_event ev;
Expand Down
1 change: 1 addition & 0 deletions src/socket/socket_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ int socket_handler::detach(socket *m_socket)

int socket_handler::remove(socket *m_socket)
{
auto_lock lock(m_mutex);
int iret = detach(m_socket);
if (0 != iret)
{
Expand Down
5 changes: 4 additions & 1 deletion src/task/stream_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ void stream_task::run()
// recv data
{
// read data from socket to connection layer buffer
t_stream_connection->sock2buf();
if (false == t_stream_connection->sock2buf())
{
singleton<connection_mgr>::instance()->mark_close(socket_ptr);
}
}

// process data
Expand Down
2 changes: 1 addition & 1 deletion src/thread/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace tubekit::thread
{
if (ptr && ptr->compare(task))
{
if(task)
if (task)
{
delete task;
}
Expand Down

0 comments on commit 3bb856e

Please sign in to comment.