diff --git a/.gitignore b/.gitignore index cd69c526f19c..97e44879bed8 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ python/setup.py # Xmake .xmake/ +CMakePresets.json diff --git a/CMakeLists.txt b/CMakeLists.txt index 800bf47ca9f7..a4ed26be1565 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,7 +81,7 @@ option(NCNN_SIMPLEVK "minimal in-house vulkan loader" ON) option(NCNN_SYSTEM_GLSLANG "use system glslang library" OFF) option(NCNN_RUNTIME_CPU "runtime dispatch cpu routines" ON) option(NCNN_DISABLE_PIC "disable position-independent code" OFF) -option(NCNN_BUILD_TESTS "build tests" OFF) +option(NCNN_BUILD_TESTS "build tests" ON) option(NCNN_COVERAGE "build for coverage" OFF) option(NCNN_ASAN "build for address sanitizer" OFF) option(NCNN_BUILD_BENCHMARK "build benchmark" ON) @@ -89,6 +89,7 @@ option(NCNN_PYTHON "build python api" OFF) option(NCNN_INT8 "int8 inference" ON) option(NCNN_BF16 "bf16 inference" ON) option(NCNN_FORCE_INLINE "force inline some function" ON) +option(NCNN_MUTITHREAD "enable multi thread bata" ON) if(ANDROID OR IOS OR NCNN_SIMPLESTL) option(NCNN_DISABLE_RTTI "disable rtti" ON) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 261221104a34..a704c0b554c4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -48,6 +48,13 @@ if(ANDROID) list(APPEND ncnn_SRCS mat_pixel_android.cpp) endif() +if(NCNN_MUTITHREAD) + list(APPEND ncnn_SRCS thread.cpp) + if(WIN32) + list(APPEND ncnn_SRCS TheadInfo.cpp) + endif() +endif() + ncnn_src_group(ncnn_SRCS "sources") include_directories("${CMAKE_CURRENT_SOURCE_DIR}/layer/${NCNN_TARGET_ARCH}") @@ -266,6 +273,11 @@ if(NCNN_THREADS) target_link_libraries(ncnn PUBLIC pthread) endif() endif() +if(NCNN_MUTITHREAD) + if(NOT WIN32 AND (NOT NCNN_SIMPLEOMP) AND (NOT NCNN_SIMPLESTL)) + target_link_libraries(ncnn PUBLIC -pthread) + endif() +endif() if(NCNN_VULKAN) if(NCNN_SIMPLEVK) diff --git a/src/TheadInfo.cpp b/src/TheadInfo.cpp new file mode 100644 index 000000000000..f49f6eb8ad96 --- /dev/null +++ b/src/TheadInfo.cpp @@ -0,0 +1,69 @@ +#ifdef NCNN_MUTITHREAD +#ifdef _WIN32 + +#include "TheadInfo.h" +namespace ncnn { + +// 初始化静态成员 +ThreadInfo* ThreadInfo::thread_info = nullptr; + +ThreadInfo::ThreadInfo(/* args */) +{ + int groupCount = GetActiveProcessorGroupCount(); + for (WORD group = 0; group < groupCount; group++) + { + DWORD processorsInGroup = GetActiveProcessorCount(group); + for (int i = 0; i < static_cast(processorsInGroup); i++) + { + CoreInfo info; + info.group = group; + info.id = i + core_infos.size(); + info.affinity = (static_cast(1) << i); + core_infos.push_back(info); + } + } +} + +ThreadInfo* ThreadInfo::get() +{ + static Mutex lock; + AutoLock guard(lock); + + if (!thread_info) + { + thread_info = new ThreadInfo(); + } + return thread_info; +} + +CoreInfo ThreadInfo::getCurrentCore() +{ + // 获取当前线程运行的CPU核心(支持多处理器组) + DWORD_PTR process_affinity, system_affinity; + GetProcessAffinityMask(GetCurrentProcess(), &process_affinity, &system_affinity); + + // 使用扩展API获取处理器组信息 + PROCESSOR_NUMBER proc_num; + GetCurrentProcessorNumberEx(&proc_num); + + for (const auto& core : core_infos) + { + // 匹配组号和组内核心编号 + if (core.group == proc_num.Group && (core.affinity & (1ULL << proc_num.Number))) + { + return core; + } + } + + // 未找到时返回默认值 + return {-1, -1, 0}; +} + +void ThreadInfo::getAllCore(std::vector& out) +{ + out = core_infos; +} +} // namespace ncnn + +#endif +#endif diff --git a/src/TheadInfo.h b/src/TheadInfo.h new file mode 100644 index 000000000000..7ab03b697e09 --- /dev/null +++ b/src/TheadInfo.h @@ -0,0 +1,30 @@ +#ifndef THREAD_INFO_H +#define THREAD_INFO_H +#ifdef NCNN_MUTITHREAD +#if defined _WIN32 +#include "cpu.h" +namespace ncnn { +struct CoreInfo +{ +public: + int id; + int group; + DWORD_PTR affinity; +}; +class ThreadInfo +{ +private: + static ThreadInfo* thread_info; + std::vector core_infos; + ThreadInfo(/* args */); + +public: + static ThreadInfo* get(); + CoreInfo getCurrentCore(); + void getAllCore(std::vector& out); +}; +} // namespace ncnn + +#endif +#endif +#endif \ No newline at end of file diff --git a/src/cpu.cpp b/src/cpu.cpp index 9f91812b90cf..f623f3b1aff3 100644 --- a/src/cpu.cpp +++ b/src/cpu.cpp @@ -1424,12 +1424,21 @@ static std::vector get_max_freq_mhz() static int set_sched_affinity(const ncnn::CpuSet& thread_affinity_mask) { +#ifdef _WIN32 + GROUP_AFFINITY groupAffinity; + ZeroMemory(&groupAffinity, sizeof(groupAffinity)); + groupAffinity.Group = static_cast(thread_affinity_mask.cpu_group); + groupAffinity.Mask = thread_affinity_mask.mask; + + SetThreadGroupAffinity(GetCurrentThread(), &groupAffinity, NULL); +#else DWORD_PTR prev_mask = SetThreadAffinityMask(GetCurrentThread(), thread_affinity_mask.mask); if (prev_mask == 0) { NCNN_LOGE("SetThreadAffinityMask failed %d", GetLastError()); return -1; } +#endif return 0; } @@ -2266,22 +2275,27 @@ CpuSet::CpuSet() void CpuSet::enable(int cpu) { - mask |= ((ULONG_PTR)1 << cpu); + cpu_group = cpu / 64; + mask |= ((ULONG_PTR)1 << (cpu - cpu_group * 64)); } void CpuSet::disable(int cpu) { - mask &= ~((ULONG_PTR)1 << cpu); + cpu_group = cpu / 64; + mask &= ~((ULONG_PTR)1 << (cpu - cpu_group * 64)); } void CpuSet::disable_all() { + cpu_group = 0; mask = 0; } bool CpuSet::is_enabled(int cpu) const { - return mask & ((ULONG_PTR)1 << cpu); + if (cpu_group != cpu / 64) + return false; + return mask & ((ULONG_PTR)1 << (cpu - cpu_group * 64)); } int CpuSet::num_enabled() const @@ -3266,4 +3280,38 @@ int set_flush_denormals(int flush_denormals) #endif } +int get_multi_thread_batch() +{ +#if defined(_NCNN_MUTITHREAD) +#if defined _WIN32 + DWORD length = 0; + GetLogicalProcessorInformation(NULL, &length); + if (GetLastError() != ERROR_INSUFFICIENT_BUFFER) + return 0; + + PSYSTEM_LOGICAL_PROCESSOR_INFORMATION buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION)malloc(length); + + int count = 0; + if (GetLogicalProcessorInformation(buffer, &length)) + { + DWORD offset = 0; + while (offset < length) + { + if (buffer->Relationship == RelationProcessorCore) + count++; + + offset += sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION); + buffer++; + } + } + free(buffer); + return count; +#else + return get_cpu_count(); +#endif +#else + return get_cpu_count(); +#endif +} + } // namespace ncnn diff --git a/src/cpu.h b/src/cpu.h index cbf417111f6d..0c8761a53a37 100644 --- a/src/cpu.h +++ b/src/cpu.h @@ -8,6 +8,7 @@ #if defined _WIN32 #define WIN32_LEAN_AND_MEAN +#define _WIN32_WINNT 0x0601 // Windows 7+ #include #endif #if defined __ANDROID__ || defined __linux__ @@ -30,6 +31,7 @@ class NCNN_EXPORT CpuSet public: #if defined _WIN32 + int cpu_group; ULONG_PTR mask; #endif #if defined __ANDROID__ || defined __linux__ @@ -172,6 +174,9 @@ NCNN_EXPORT void set_kmp_blocktime(int time_ms); NCNN_EXPORT int get_flush_denormals(); NCNN_EXPORT int set_flush_denormals(int flush_denormals); +// multi thread batch inference +NCNN_EXPORT int get_multi_thread_batch(); + } // namespace ncnn #endif // NCNN_CPU_H diff --git a/src/layer.cpp b/src/layer.cpp index f1b849dad255..4792c7231138 100644 --- a/src/layer.cpp +++ b/src/layer.cpp @@ -98,6 +98,11 @@ int Layer::forward_inplace(Mat& /*bottom_top_blob*/, const Option& /*opt*/) cons return -1; } +int Layer::forward_thread(void* /*info*/) const +{ + return -1; +} + #if NCNN_VULKAN int Layer::upload_model(VkTransfer& /*cmd*/, const Option& /*opt*/) { diff --git a/src/layer.h b/src/layer.h index 5351de1c0dd1..5bfd58742e5e 100644 --- a/src/layer.h +++ b/src/layer.h @@ -94,6 +94,10 @@ class NCNN_EXPORT Layer // return 0 if success virtual int forward_inplace(std::vector& bottom_top_blobs, const Option& opt) const; virtual int forward_inplace(Mat& bottom_top_blob, const Option& opt) const; + /// @brief mutithread work function + /// @param workspace thread infomation + /// @return 0 if success + virtual int forward_thread(void* workspace); #if NCNN_VULKAN public: @@ -139,6 +143,7 @@ class NCNN_EXPORT Layer // layer factory function typedef Layer* (*layer_creator_func)(void*); typedef void (*layer_destroyer_func)(Layer*, void*); +typedef int (*layer_work_func)(Layer*, void*); struct layer_registry_entry { diff --git a/src/layer/absval.cpp b/src/layer/absval.cpp index 2f38d3520f2c..61e355dda915 100644 --- a/src/layer/absval.cpp +++ b/src/layer/absval.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause #include "absval.h" +#include "thread.h" namespace ncnn { @@ -17,6 +18,16 @@ int AbsVal::forward_inplace(Mat& bottom_top_blob, const Option& opt) const int h = bottom_top_blob.h; int channels = bottom_top_blob.c; int size = w * h; + if (opt.num_threads > 64) + { + ThreadWorkspace workspace; + workspace.layer = (Layer*)this; + MutilThread thread(workspace, opt); + std::vector workspace_blobs; + workspace_blobs.push_back(bottom_top_blob); + thread.join(workspace_blobs); + return 0; + } #pragma omp parallel for num_threads(opt.num_threads) for (int q = 0; q < channels; q++) @@ -33,4 +44,47 @@ int AbsVal::forward_inplace(Mat& bottom_top_blob, const Option& opt) const return 0; } +int AbsVal::forward_thread(void* workspace) +{ + ThreadInfoExc* info = (ThreadInfoExc*)workspace; + Mat& bottom_top_blob = info->mats->at(0); + if (bottom_top_blob.elemsize == 1) + { + int8_t* ptr = (int8_t*)bottom_top_blob.data; + const int8_t flag = 1 << 7; + for (size_t i = info->start_index; i < info->end_index; i++) + { + if (ptr[i] & flag) + { + ptr[i] = -ptr[i]; + } + } + } + else if (bottom_top_blob.elemsize == 2) + { + int16_t* ptr = (int16_t*)bottom_top_blob.data; + const int16_t flag = 1 << 15; + for (size_t i = info->start_index; i < info->end_index; i++) + { + if (ptr[i] & flag) + { + ptr[i] = -ptr[i]; + } + } + } + else + { + float* ptr = (float*)bottom_top_blob.data; + for (size_t i = info->start_index; i < info->end_index; i++) + { + if (ptr[i] < 0) + { + ptr[i] = -ptr[i]; + } + } + } + + return 0; +} + } // namespace ncnn diff --git a/src/layer/absval.h b/src/layer/absval.h index deb9540d0e16..619cfeb6480a 100644 --- a/src/layer/absval.h +++ b/src/layer/absval.h @@ -14,6 +14,7 @@ class AbsVal : public Layer AbsVal(); virtual int forward_inplace(Mat& bottom_top_blob, const Option& opt) const; + virtual int forward_thread(void* workspace); }; } // namespace ncnn diff --git a/src/layer/batchnorm.h b/src/layer/batchnorm.h index 6043d0e413a1..0deedba46d1f 100644 --- a/src/layer/batchnorm.h +++ b/src/layer/batchnorm.h @@ -19,6 +19,8 @@ class BatchNorm : public Layer virtual int forward_inplace(Mat& bottom_top_blob, const Option& opt) const; + virtual int forward_thread(void* workspace); + public: // param int channels; diff --git a/src/platform.h.in b/src/platform.h.in index 8b7357eec5b9..79a79db042ee 100644 --- a/src/platform.h.in +++ b/src/platform.h.in @@ -57,6 +57,7 @@ #cmakedefine01 NCNN_INT8 #cmakedefine01 NCNN_BF16 #cmakedefine01 NCNN_FORCE_INLINE +#cmakedefine01 NCNN_MUTITHREAD #cmakedefine NCNN_VERSION_STRING "@NCNN_VERSION_STRING@" diff --git a/src/thread.cpp b/src/thread.cpp new file mode 100644 index 000000000000..779f46c87257 --- /dev/null +++ b/src/thread.cpp @@ -0,0 +1,164 @@ +#include "thread.h" +#include "cpu.h" +#if defined __ANDROID__ || defined __linux__ +#include +#endif + +#if defined _WIN32 +DWORD WINAPI winWorker(LPVOID lpParam) +{ + ncnn::ThreadInfoExc* info = (ncnn::ThreadInfoExc*)lpParam; + if (info->coreinfo->group >= 0 && info->coreinfo->affinity != 0) + { + GROUP_AFFINITY groupAffinity; + ZeroMemory(&groupAffinity, sizeof(groupAffinity)); + groupAffinity.Group = static_cast(info->coreinfo->group); + groupAffinity.Mask = info->coreinfo->affinity; + + SetThreadGroupAffinity(GetCurrentThread(), &groupAffinity, NULL); + } + info->workspace->layer->forward_thread(info); + info->manager->threadsComplete[info->threadid] = true; + delete info; + return 0; +} +#else +void* pthreadWorker(void* lpParam) +{ + ncnn::ThreadInfoExc* info = (ncnn::ThreadInfoExc*)lpParam; +#if defined __ANDROID__ || defined __linux__ + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(info->threadid, &cpuset); + // 绑定到指定核心 + pthread_t current_thread = pthread_self(); + pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset); +#endif + info->workspace->layer->forward_thread(info); + info->manager->threadsComplete[info->threadid] = true; + delete info; + return nullptr; +} +#endif +namespace ncnn { +MutilThread::MutilThread(ThreadWorkspace _workspace, const Option& opt) +{ + workspace = _workspace; + m_opt = opt; + threadsComplete.resize(opt.num_threads); + for (int i = 0; i < opt.num_threads; i++) + { + threadsComplete[i] = false; + } + threadsComplete[helpid] = true; +} + +MutilThread::~MutilThread() +{ + threadsComplete.clear(); +} + +void MutilThread::join(std::vector& mats) +{ +#if defined _WIN32 + Mat mat = mats[0]; + CoreInfo cur = ThreadInfo::get()->getCurrentCore(); + std::vector cores; + ThreadInfo::get()->getAllCore(cores); + std::vector handles; + ThreadInfoExc* curinfo = nullptr; + size_t workersize = ((mat.w * mat.h * mat.d) / m_opt.num_threads + 1) * mat.c * mat.elemsize; + size_t matlen = mats.size(); + for (int i = 0; i < m_opt.num_threads; i++) + { + ThreadInfoExc* info = new ThreadInfoExc(); + info->threadid = i; + info->start_index = i * workersize; + info->end_index = (i + 1) * workersize; + if (info->end_index > matlen) + { + info->end_index = matlen; + } + info->workspace = &workspace; + info->mats = &mats; + info->opt = &m_opt; + info->coreinfo = &cores[i]; + threadsComplete[i] = false; + info->manager = this; + if (cur.id == cores[i].id) + { + helpid = i; + threadsComplete[i] = true; + handles.push_back(nullptr); + curinfo = info; + continue; + } + handles.push_back(CreateThread(nullptr, 0, winWorker, info, 0, nullptr)); + } + workspace.layer->forward_thread(curinfo); + delete curinfo; + bool check = true; + do + { + check = false; + for (int i = 0; i < m_opt.num_threads; i++) + { + if (threadsComplete[i] == false) + { + check = true; + break; + } + } + } while (check); + for (size_t i = 0; i < handles.size(); i++) + { + if (handles[i] != nullptr) + { + CloseHandle(handles[i]); + } + } + handles.clear(); +#else + Mat mat = mats[0]; + int curid = -1; +#if defined __ANDROID__ || defined __linux__ + curid = sched_getcpu(); +#endif + + std::vector pthread_handles; + ThreadInfoExc* curinfo = nullptr; + size_t workersize = ((mat.w * mat.h * mat.d) / m_opt.num_threads + 1) * mat.c * mat.elemsize; + size_t matlen = mats.size(); + for (int i = 0; i < m_opt.num_threads; i++) + { + ThreadInfoExc* info = new ThreadInfoExc(); + info->threadid = i; + info->start_index = i * workersize; + info->end_index = (i + 1) * workersize; + if (info->end_index > matlen) + { + info->end_index = matlen; + } + info->workspace = &workspace; + info->mats = &mats; + info->opt = &m_opt; + threadsComplete[i] = false; + info->manager = this; + if (curid == cores[i].id && curid > -1) + { + helpid = i; + threadsComplete[i] = true; + curinfo = info; + continue; + } + pthread_handles.push_back(pthread_create(&pthread_handles[i], nullptr, pthreadWorker, info)); + } + workspace.layer->forward_thread(curinfo); + delete curinfo; + for (size_t i = 0; i < pthread_handles.size(); i++) + { + pthread_join(pthread_handles[i], nullptr); + } +#endif +} +} // namespace ncnn \ No newline at end of file diff --git a/src/thread.h b/src/thread.h new file mode 100644 index 000000000000..9f12c71d4c27 --- /dev/null +++ b/src/thread.h @@ -0,0 +1,42 @@ +#ifndef THREAD_H +#define THREAD_H +#include "layer.h" +#include "TheadInfo.h" +#if defined __ANDROID__ || defined __linux__ || defined __APPLE__ +#include +#endif +namespace ncnn { +class MutilThread; +struct ThreadWorkspace +{ + Layer* layer; +}; +struct ThreadInfoExc +{ + int threadid; + size_t start_index; + size_t end_index; + ThreadWorkspace* workspace; + std::vector* mats; + Option* opt; + MutilThread* manager; +#if defined _WIN32 + CoreInfo* coreinfo; +#endif +}; +class MutilThread +{ +private: + Option m_opt; + volatile int helpid; + ThreadWorkspace workspace; + +public: + MutilThread(ThreadWorkspace _workspace, const Option& opt); + void join(std::vector& mats); + std::vector threadsComplete; + ~MutilThread(); +}; + +} // namespace ncnn +#endif diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9d5b6517e643..25c92367c445 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -62,6 +62,7 @@ ncnn_add_test(c_api) ncnn_add_test(cpu) ncnn_add_test(expression) ncnn_add_test(paramdict) +ncnn_add_test(thread) if(NCNN_VULKAN) ncnn_add_test(command) diff --git a/tests/test_thread.cpp b/tests/test_thread.cpp new file mode 100644 index 000000000000..883d1e56b36a --- /dev/null +++ b/tests/test_thread.cpp @@ -0,0 +1,115 @@ +#include "testutil.h" +#include "thread.h" + +class TestLayer : public ncnn::Layer +{ +public: + virtual int forward_inplace(Mat& bottom_top_blob, const Option& opt) + { + ThreadWorkspace workspace; + workspace.layer = (Layer*)this; + MutilThread thread(workspace, opt); + std::vector workspace_blobs; + workspace_blobs.push_back(bottom_top_blob); + thread.join(workspace_blobs); + return 0; + } + virtual int forward_thread(void* workspace) + { + ThreadInfoExc* info = (ThreadInfoExc*)workspace; + Mat& bottom_top_blob = info->mats->at(0); + if (bottom_top_blob.elemsize == 1) + { + int8_t* ptr = (int8_t*)bottom_top_blob.data; + const int8_t flag = 1 << 7; + for (size_t i = info->start_index; i < info->end_index; i++) + { + if (ptr[i] & flag) + { + ptr[i] = -ptr[i]; + } + } + } + else if (bottom_top_blob.elemsize == 2) + { + int16_t* ptr = (int16_t*)bottom_top_blob.data; + const int16_t flag = 1 << 15; + for (size_t i = info->start_index; i < info->end_index; i++) + { + if (ptr[i] & flag) + { + ptr[i] = -ptr[i]; + } + } + } + else + { + float* ptr = (float*)bottom_top_blob.data; + for (size_t i = info->start_index; i < info->end_index; i++) + { + if (ptr[i] < 0) + { + ptr[i] = -ptr[i]; + } + } + } + + return 0; + } +}; + +static int test_thread(const ncnn::Mat& a) +{ + ncnn::ParamDict pd; + + std::vector weights(0); + + int ret = test_layer("TestLayer", pd, weights, a); + if (ret != 0) + { + fprintf(stderr, "test_thread failed a.dims=%d a=(%d %d %d %d)\n", a.dims, a.w, a.h, a.d, a.c); + } + + return ret; +} + +static int test_thread_0() +{ + return 0 + || test_thread(RandomMat(5, 6, 7, 24)) + || test_thread(RandomMat(5, 6, 7, 12)) + || test_thread(RandomMat(5, 6, 7, 13)); +} + +static int test_thread_1() +{ + return 0 + || test_thread(RandomMat(5, 7, 24)) + || test_thread(RandomMat(5, 6, 24)) + || test_thread(RandomMat(7, 9, 24)); +} + +static int test_thread_2() +{ + return 0 + || test_thread(RandomMat(7, 12)) + || test_thread(RandomMat(5, 12)) + || test_thread(RandomMat(9, 12)); +} + +static int test_thread_3() +{ + return 0 + || test_thread(RandomMat(7)) + || test_thread(RandomMat(128)) + || test_thread(RandomMat(256)); +} + +int main() +{ + return 0 + || test_thread_0() + || test_thread_1() + || test_thread_2() + || test_thread_3(); +} \ No newline at end of file