Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

target refactor #42

Open
8 of 23 tasks
gaowanlu opened this issue Apr 18, 2024 · 1 comment
Open
8 of 23 tasks

target refactor #42

gaowanlu opened this issue Apr 18, 2024 · 1 comment
Assignees

Comments

@gaowanlu
Copy link
Member

gaowanlu commented Apr 18, 2024

代码规则:线程直接用C++11、C++版本允许使用C++17
配置提供AppID进程唯一标识
  • 搭建进程

  • appid

  • 封装socket

  • 搭建线程框架 main线程 worker线程

  • 程序停止处理 封装标志位结构便于线程间传递

  • 封装epoll

  • epoll装入各个线程

  • 封装sockpair

  • 封装conn

  • 停起 tick hook

  • sockpair绑定到各个线程 main-work main-other

  • protobuf线程通信协议制定,能进行线程通信

  • server listen

  • clintfd正确处理

  • client事件循环

  • client最大连接控制

  • HTTP接入

  • STREAMTCP接入

  • WEBSOCKET接入

  • 设计好用的app接口

  • other listen

  • other client

  • rpc逻辑设计

    一个主线程 main 多个worker1 worker2 worker3 worker4 worker5
    为线程编号 5 0 1 2 3 4
    其中线程之间两两都能进行通信采用 socketpair
    如上面的例子直接创建 main-worker1 main-worker2 main-worker3 main-worker4 main-worker5
    main与worker之间通信则直接用二者之间的专属UNIX域套接字
    如果两个worker之间通信则 worker1->main->worker2 worker2->main->worker1

    主线程负责listen socket 由epoll进行IO多路复用 epoll负责 (listensock)、(main-worker...)、(main-other)的收发读写
    对于线程之间的消息收发缓存队列直接用std::vectorstd::string好了可以采用read-write形式,其中std::string和std::vector采用read-write都能得到空间复用,而且不需要加锁了
    但是需要为std::vectorstd::string包装一层 成为Buffer内部具有读指针写指针
    每个worker有自己的事件循环、有自己epoll,epoll需要管理worker-main
    worker也是需要有任务类型一说的 在worker创建时需要提供类型 HTTP TCPSTREAM WEBSOCKET
    worker需要为每个套接字连接维护事件循环、fd、 一个socket对象、一个connection对象
    其中需要为每个connection对应设计sendbuffer与recvbuffer 当然这就不用为buffer加锁了,因为操作connection只会在worker内部,不存在多个线程同时操作同一个连接
    要操作也是通过协议收发,进行线程之间的通信

    对于buffer的设计则是根据不同的业务 HTTP TCPSTREAM WEBSOCKET单独设计
    例如HTTP仍旧采用callback形式
    TCPSTREAM WEBSOCKET 则需要设计来自不同线程的协议包,包括自身线程
    接收到来自线程的数据 写进connection,能写进去则写,空间不够则应该在connection拉链,可以直接采用std::vectorstd::string方式,和上面一样,总之就是线程之间的通信buffer组件了
    同样设计read-write 注意这里的read-write不需要加锁了 直接指针交换即可 当有数据时,需要为相应连接触发下事件

    像其他的业务处理都比较相似了,解包也不是差很多,包括嵌入SSL等等

    关于进程通信还是不要内置提供了,毕竟需要提供一定的自由选择
    (main worker worker worker)<----->center<------>(main worker worker worker)
    由main进行RPC就好了 main<--RPC-->main,需要提供的对外接口就是, main<---->调用接口 RPC的fd需要mainepoll监听处理,对于RPC直接提供一个RPClistenFD,与server listen fd同等地位,只不过RPC listenfd accept到的连接交给mainepoll处理,RPC提供的连接 断开收发触发也是交给mainepoll
    需要对外提供一个写数据到main线程的方式,所以对外提供一个socketpair就好了,一端提供给外部
    一端留给main epoll自己
    main 留一个回调,当有数据从外部写给main时,就触发,在回调内可以线程安全的将其发送给任意的worker
    几乎是比较完美的actor模型了,差不多一切都是异步,即worker-other ,我们根本不知道other是谁,只接受来自other的消息就好了

    关于主线程listen socker accept到的fd也是 通过消息传递给worker的
    对于最大连接数的控制,直接采用std::atomic就好了,在不同线程间进行增减与大小判断,其实足够了,能够在指定大小范围内控制即可
    main accept一个则进行+1 worker断开一个则进行-1
    进而即可控制,当超过一定数量accept直接拒接close即可
    对于为每个连接生成gid,采用原来方式即可,事件戳+轮询uint32基本就满足了,不仅要为连接生成gid,只要是sock都应生成gid sockpair也需要

    对于程序完美停止 采用信号主线程处理 worker应屏蔽信号
    主线程收到信号会通过标志位+忙等sleep worker已关闭标志位 双方通过标志位奔赴 标志位采用std::atomic即可
    worker关闭时需要提供hook,client连接关闭时也需要提供hook
    还应当提供停止接收新连接 + 踢掉所有已经存在的连接的方法
    停止接收新连接通过other-main即可
    踢掉所有已经存在则是other-main 由 main到所有worker,worker接收到直接进行踢出所有client连接就好了

    最后程序收尾时 需要提供hook,同时也应对许多sockpair进行收尾处理
    这样一来性能也比较好 各处也比较灵活
    围绕main worker,每个线程提供消息来了 消息发出去的思想搞,每个线程都是一个Actor
    每个线程只发自己的消息 只处理自己的消息,之间没有数据直接访问操作,除了部分的std::atomic

@gaowanlu gaowanlu self-assigned this Apr 18, 2024
@gaowanlu
Copy link
Member Author

#include <iostream>
#include <vector>
#include <string>
#include <thread>
#include <chrono>
#include <unordered_set>
#include <unordered_map>
using namespace std;

class Buffer
{
public:
    Buffer() : data1(10),
               data2(10),
               reader_used_len(10),
               writer_used_len(10),
               reader_idx(0),
               writer_idx(0),
               data_reader(&data1),
               data_writer(&data2)
    {
    }
    void CacheSwap()
    {
        // 判断是否读完了
        if (reader_idx != reader_used_len)
        {
            return;
        }
        // 没有其他数据
        if (writer_used_len == 0)
        {
            return;
        }

        reader_idx = 0;
        reader_used_len = writer_used_len;
        writer_used_len = 0;
        writer_idx = 0;
        // swap cache
        std::vector<std::string> *data_ptr = data_reader;
        data_reader = data_writer;
        data_writer = data_ptr;
        read_out_idx = 0;
    }

    std::string &Push()
    {
        if (writer_idx == writer_used_len)
        {
            if (writer_used_len == data_writer->size()) // full
            {
                data_writer->push_back("");
            }
            writer_used_len++;
        }

        std::string &res = data_writer->at(writer_idx);
        writer_idx++;
        res.clear();
        return res;
    }

    const std::string &First()
    {
        if (reader_idx == reader_used_len) // empty
        {
            if (data_reader->empty())
            {
                data_reader->push_back("");
                return data_reader->at(0);
            }
            else
            {
                data_reader->at(0).clear();
            }
            return data_reader->at(0);
        }
        return data_reader->at(reader_idx);
    }

    void Pop()
    {
        if (reader_idx <= reader_used_len)
        {
            data_reader->at(reader_idx).clear();
            data_reader->at(reader_idx).resize(0);
            reader_idx++;
            read_out_idx++;
        }
    }

    bool Empty()
    {
        return (reader_idx >= reader_used_len);
    }

public:
    std::vector<std::string> data1;
    std::vector<std::string> data2;
    size_t reader_used_len;
    size_t writer_used_len;
    size_t reader_idx;
    size_t writer_idx;
    std::vector<std::string> *data_reader;
    std::vector<std::string> *data_writer;

    size_t read_out_idx;
};

class Conn
{
public:
    Buffer recvBuffer;
    Buffer sendBuffer;
    // sock
    // recvFirstByteIdx
    // sendFirstByteIdx
};

int main(int argc, const char **argv)
{
    std::vector<Conn> connArray(100);
    std::unordered_set<int> connUsing;
    std::unordered_set<int> connNotUse;
    for (std::size_t index = 0; index < connArray.size(); index++)
    {
        connNotUse.insert(index);
    }

    std::unordered_map<int, int> sockfd2Conn;
    std::unordered_map<int, int> gid2sockfd;

    // constexpr int bufferCount = 20000;
    // Buffer bufferArray[bufferCount];
    // int bufferArrayIdx = 0;

    // size_t allBytes = 0;
    // do
    // {
    //     if (bufferArrayIdx + 1 >= bufferCount)
    //     {
    //         bufferArrayIdx = 0;
    //     }
    //     else
    //     {
    //         bufferArrayIdx++;
    //     }
    //     Buffer &buffer = bufferArray[bufferArrayIdx];
    //     for (int loop = 0; loop < 100; loop++)
    //     {
    //         // 写内容
    //         buffer.CacheSwap();
    //         for (int i = 0; i < 11; i++)
    //         {
    //             std::string &Item = buffer.Push();
    //             Item = std::string(10240, 't');
    //         }
    //         // 读内容
    //         buffer.CacheSwap();
    //         // 读内容
    //         while (!buffer.Empty())
    //         {
    //             const std::string &Item = buffer.First();
    //             allBytes += Item.size();
    //             buffer.Pop();
    //         }
    //     }
    //     std::cout << buffer.data_reader->size() << " " << buffer.data_writer->size() << std::endl;
    //     std::cout << allBytes / (1024 * 1024 * 1024) << "GB" << std::endl;
    //     // std::this_thread::sleep_for(std::chrono::milliseconds(4));
    // } while (true);
    return 0;
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
Status: In Progress
Development

No branches or pull requests

1 participant