This repository has been archived by the owner on Apr 19, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
target refactor #42
Comments
#include <iostream>
#include <vector>
#include <string>
#include <thread>
#include <chrono>
#include <unordered_set>
#include <unordered_map>
using namespace std;
class Buffer
{
public:
Buffer() : data1(10),
data2(10),
reader_used_len(10),
writer_used_len(10),
reader_idx(0),
writer_idx(0),
data_reader(&data1),
data_writer(&data2)
{
}
void CacheSwap()
{
// 判断是否读完了
if (reader_idx != reader_used_len)
{
return;
}
// 没有其他数据
if (writer_used_len == 0)
{
return;
}
reader_idx = 0;
reader_used_len = writer_used_len;
writer_used_len = 0;
writer_idx = 0;
// swap cache
std::vector<std::string> *data_ptr = data_reader;
data_reader = data_writer;
data_writer = data_ptr;
read_out_idx = 0;
}
std::string &Push()
{
if (writer_idx == writer_used_len)
{
if (writer_used_len == data_writer->size()) // full
{
data_writer->push_back("");
}
writer_used_len++;
}
std::string &res = data_writer->at(writer_idx);
writer_idx++;
res.clear();
return res;
}
const std::string &First()
{
if (reader_idx == reader_used_len) // empty
{
if (data_reader->empty())
{
data_reader->push_back("");
return data_reader->at(0);
}
else
{
data_reader->at(0).clear();
}
return data_reader->at(0);
}
return data_reader->at(reader_idx);
}
void Pop()
{
if (reader_idx <= reader_used_len)
{
data_reader->at(reader_idx).clear();
data_reader->at(reader_idx).resize(0);
reader_idx++;
read_out_idx++;
}
}
bool Empty()
{
return (reader_idx >= reader_used_len);
}
public:
std::vector<std::string> data1;
std::vector<std::string> data2;
size_t reader_used_len;
size_t writer_used_len;
size_t reader_idx;
size_t writer_idx;
std::vector<std::string> *data_reader;
std::vector<std::string> *data_writer;
size_t read_out_idx;
};
class Conn
{
public:
Buffer recvBuffer;
Buffer sendBuffer;
// sock
// recvFirstByteIdx
// sendFirstByteIdx
};
int main(int argc, const char **argv)
{
std::vector<Conn> connArray(100);
std::unordered_set<int> connUsing;
std::unordered_set<int> connNotUse;
for (std::size_t index = 0; index < connArray.size(); index++)
{
connNotUse.insert(index);
}
std::unordered_map<int, int> sockfd2Conn;
std::unordered_map<int, int> gid2sockfd;
// constexpr int bufferCount = 20000;
// Buffer bufferArray[bufferCount];
// int bufferArrayIdx = 0;
// size_t allBytes = 0;
// do
// {
// if (bufferArrayIdx + 1 >= bufferCount)
// {
// bufferArrayIdx = 0;
// }
// else
// {
// bufferArrayIdx++;
// }
// Buffer &buffer = bufferArray[bufferArrayIdx];
// for (int loop = 0; loop < 100; loop++)
// {
// // 写内容
// buffer.CacheSwap();
// for (int i = 0; i < 11; i++)
// {
// std::string &Item = buffer.Push();
// Item = std::string(10240, 't');
// }
// // 读内容
// buffer.CacheSwap();
// // 读内容
// while (!buffer.Empty())
// {
// const std::string &Item = buffer.First();
// allBytes += Item.size();
// buffer.Pop();
// }
// }
// std::cout << buffer.data_reader->size() << " " << buffer.data_writer->size() << std::endl;
// std::cout << allBytes / (1024 * 1024 * 1024) << "GB" << std::endl;
// // std::this_thread::sleep_for(std::chrono::milliseconds(4));
// } while (true);
return 0;
} |
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
搭建进程
appid
封装socket
搭建线程框架 main线程 worker线程
程序停止处理 封装标志位结构便于线程间传递
封装epoll
epoll装入各个线程
封装sockpair
封装conn
停起 tick hook
sockpair绑定到各个线程 main-work main-other
protobuf线程通信协议制定,能进行线程通信
server listen
clintfd正确处理
client事件循环
client最大连接控制
HTTP接入
STREAMTCP接入
WEBSOCKET接入
设计好用的app接口
other listen
other client
rpc逻辑设计
一个主线程 main 多个worker1 worker2 worker3 worker4 worker5
为线程编号 5 0 1 2 3 4
其中线程之间两两都能进行通信采用 socketpair
如上面的例子直接创建 main-worker1 main-worker2 main-worker3 main-worker4 main-worker5
main与worker之间通信则直接用二者之间的专属UNIX域套接字
如果两个worker之间通信则 worker1->main->worker2 worker2->main->worker1
主线程负责listen socket 由epoll进行IO多路复用 epoll负责 (listensock)、(main-worker...)、(main-other)的收发读写
对于线程之间的消息收发缓存队列直接用std::vectorstd::string好了可以采用read-write形式,其中std::string和std::vector采用read-write都能得到空间复用,而且不需要加锁了
但是需要为std::vectorstd::string包装一层 成为Buffer内部具有读指针写指针
每个worker有自己的事件循环、有自己epoll,epoll需要管理worker-main
worker也是需要有任务类型一说的 在worker创建时需要提供类型 HTTP TCPSTREAM WEBSOCKET
worker需要为每个套接字连接维护事件循环、fd、 一个socket对象、一个connection对象
其中需要为每个connection对应设计sendbuffer与recvbuffer 当然这就不用为buffer加锁了,因为操作connection只会在worker内部,不存在多个线程同时操作同一个连接
要操作也是通过协议收发,进行线程之间的通信
对于buffer的设计则是根据不同的业务 HTTP TCPSTREAM WEBSOCKET单独设计
例如HTTP仍旧采用callback形式
TCPSTREAM WEBSOCKET 则需要设计来自不同线程的协议包,包括自身线程
接收到来自线程的数据 写进connection,能写进去则写,空间不够则应该在connection拉链,可以直接采用std::vectorstd::string方式,和上面一样,总之就是线程之间的通信buffer组件了
同样设计read-write 注意这里的read-write不需要加锁了 直接指针交换即可 当有数据时,需要为相应连接触发下事件
像其他的业务处理都比较相似了,解包也不是差很多,包括嵌入SSL等等
关于进程通信还是不要内置提供了,毕竟需要提供一定的自由选择
(main worker worker worker)<----->center<------>(main worker worker worker)
由main进行RPC就好了 main<--RPC-->main,需要提供的对外接口就是, main<---->调用接口 RPC的fd需要mainepoll监听处理,对于RPC直接提供一个RPClistenFD,与server listen fd同等地位,只不过RPC listenfd accept到的连接交给mainepoll处理,RPC提供的连接 断开收发触发也是交给mainepoll
需要对外提供一个写数据到main线程的方式,所以对外提供一个socketpair就好了,一端提供给外部
一端留给main epoll自己
main 留一个回调,当有数据从外部写给main时,就触发,在回调内可以线程安全的将其发送给任意的worker
几乎是比较完美的actor模型了,差不多一切都是异步,即worker-other ,我们根本不知道other是谁,只接受来自other的消息就好了
关于主线程listen socker accept到的fd也是 通过消息传递给worker的
对于最大连接数的控制,直接采用std::atomic就好了,在不同线程间进行增减与大小判断,其实足够了,能够在指定大小范围内控制即可
main accept一个则进行+1 worker断开一个则进行-1
进而即可控制,当超过一定数量accept直接拒接close即可
对于为每个连接生成gid,采用原来方式即可,事件戳+轮询uint32基本就满足了,不仅要为连接生成gid,只要是sock都应生成gid sockpair也需要
对于程序完美停止 采用信号主线程处理 worker应屏蔽信号
主线程收到信号会通过标志位+忙等sleep worker已关闭标志位 双方通过标志位奔赴 标志位采用std::atomic即可
worker关闭时需要提供hook,client连接关闭时也需要提供hook
还应当提供停止接收新连接 + 踢掉所有已经存在的连接的方法
停止接收新连接通过other-main即可
踢掉所有已经存在则是other-main 由 main到所有worker,worker接收到直接进行踢出所有client连接就好了
最后程序收尾时 需要提供hook,同时也应对许多sockpair进行收尾处理
这样一来性能也比较好 各处也比较灵活
围绕main worker,每个线程提供消息来了 消息发出去的思想搞,每个线程都是一个Actor
每个线程只发自己的消息 只处理自己的消息,之间没有数据直接访问操作,除了部分的std::atomic
The text was updated successfully, but these errors were encountered: