From c641305c032a57ff9d1888d270c9888b5a2dd19b Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Mon, 15 Jun 2020 23:04:30 +0800 Subject: [PATCH 01/24] build(copp): add copp bug: can not find headers of copp --- CMakeLists.txt | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1f51c2f..38a7e1a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -52,6 +52,21 @@ packageProject( DEPENDENCIES ) +CPMAddPackage( + NAME copp + GITHUB_REPOSITORY owt5008137/libcopp + GIT_TAG 1.2.1 +) + +packageProject( + NAME copp + VERSION ${PROJECT_VERSION} + BINARY_DIR ${copp_BINARY_DIR} + INCLUDE_DIR ${copp_SOURCE_DIR}/include + INCLUDE_DESTINATION include/${PROJECT_NAME}-${PROJECT_VERSION} + DEPENDENCIES +) + if (NOT TARGET lua) CPMFindPackage( NAME lua @@ -117,7 +132,7 @@ target_compile_options(Smark PUBLIC "$<$:/permissive->") # Link dependencies (if required) # target_link_libraries(Smark PUBLIC cxxopts) -target_link_libraries(Smark PUBLIC uv_a HttpParser LuaForGlue) +target_link_libraries(Smark PUBLIC uv_a HttpParser LuaForGlue copp) target_include_directories(Smark PUBLIC @@ -134,5 +149,5 @@ packageProject( BINARY_DIR ${PROJECT_BINARY_DIR} INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include INCLUDE_DESTINATION include/${PROJECT_NAME}-${PROJECT_VERSION} - DEPENDENCIES "uv_a" "HttpParser" "LuaForGlue" + DEPENDENCIES "uv_a" "HttpParser" "LuaForGlue" "copp" ) From ad8637d37fab064feaa1d304ee0c54e816a924f6 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Tue, 16 Jun 2020 12:04:55 +0800 Subject: [PATCH 02/24] fix(copp): fix bug: can not include headers --- CMakeLists.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 38a7e1a..0508aee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -67,6 +67,8 @@ packageProject( DEPENDENCIES ) +target_include_directories(copp PUBLIC ${copp_SOURCE_DIR}/include) + if (NOT TARGET lua) CPMFindPackage( NAME lua @@ -149,5 +151,5 @@ packageProject( BINARY_DIR ${PROJECT_BINARY_DIR} INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include INCLUDE_DESTINATION include/${PROJECT_NAME}-${PROJECT_VERSION} - DEPENDENCIES "uv_a" "HttpParser" "LuaForGlue" "copp" + DEPENDENCIES ) From 92f13921936bf215601884d30b8e27c4af7de247 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Tue, 16 Jun 2020 22:59:52 +0800 Subject: [PATCH 03/24] feat(task): impl Task, TaskManager --- include/tasks.h | 46 +++++++++++++++++++ source/tasks.cpp | 102 ++++++++++++++++++++++++++++++++++++++++++ test/source/smark.cpp | 11 +---- test/source/tasks.cpp | 25 +++++++++++ test/source/util.h | 9 ++++ 5 files changed, 183 insertions(+), 10 deletions(-) create mode 100644 include/tasks.h create mode 100644 source/tasks.cpp create mode 100644 test/source/tasks.cpp create mode 100644 test/source/util.h diff --git a/include/tasks.h b/include/tasks.h new file mode 100644 index 0000000..4208d89 --- /dev/null +++ b/include/tasks.h @@ -0,0 +1,46 @@ +#pragma once +#include + +#include +#include +#include +#include +#include + +namespace smark::tasks { + typedef std::function)> TaskProc; + class TaskManager; + class Task : std::enable_shared_from_this { + public: + enum State { New, Runable, Dead }; + State state = State::New; + Task(TaskProc proc); + void Start(); + void Yield(); + void Resume(); + void Wait(std::shared_ptr task); + void WaitAll(const std::vector>* task_list); + template void Complete(ResultType* result); + template ResultType* GetResult(); + + private: + cotask::task<>::ptr_t task_ptr_; + void* result_; + }; + class TaskManager { + public: + std::shared_ptr NewTask(TaskProc proc); + void Wait(std::shared_ptr waiter, std::shared_ptr waitting); + void StopTask(std::shared_ptr task); + void RunOnce(); + + private: + std::vector> task_list_; + std::map, std::shared_ptr> waitting_tasks_; + std::queue> starting_tasks_; + }; + std::shared_ptr GetCurrentTask(); + std::shared_ptr async(TaskProc proc); + std::shared_ptr await(std::shared_ptr task); + extern thread_local TaskManager task_mgr; +} // namespace smark::tasks \ No newline at end of file diff --git a/source/tasks.cpp b/source/tasks.cpp new file mode 100644 index 0000000..d168785 --- /dev/null +++ b/source/tasks.cpp @@ -0,0 +1,102 @@ +#include "tasks.h" + +#include + +namespace smark::tasks { + thread_local std::map*, std::shared_ptr> map2task; + + Task::Task(TaskProc proc) { + auto this_ptr = shared_from_this(); + task_ptr_ = cotask::task<>::create([=]() { + map2task[cotask::this_task::get>()] = this_ptr; + + proc(this_ptr); + + // TODO: add to finally + this_ptr->state = State::Dead; + map2task.erase(cotask::this_task::get>); + }); + } + + void Task::Start() { + state = State::Runable; + task_ptr_->start(); + } + + void Task::Yield() { task_ptr_->yield(); } + + void Task::Resume() { task_ptr_->resume(); } + + void Task::Wait(std::shared_ptr task) { task_mgr.Wait(shared_from_this(), task); } + + void Task::WaitAll(const std::vector>* task_list) { + for (auto iter = task_list->begin(); iter != task_list->end(); iter++) { + task_mgr.Wait(shared_from_this(), *iter); + } + } + + template void Task::Complete(ResultType* result) { + result_ = reinterpret_cast(result); + task_mgr.StopTask(shared_from_this()); + } + + template ResultType* Task::GetResult() { + return reinterpret_cast(result_); + } + + std::shared_ptr TaskManager::NewTask(TaskProc proc) { + auto task = std::make_shared(proc); + task_list_.push_back(task); + return task; + } + + void TaskManager::Wait(std::shared_ptr waiter, std::shared_ptr waitting) { + waitting_tasks_[waitting] = waiter; + } + + void TaskManager::StopTask(std::shared_ptr task) { + auto iter = waitting_tasks_.find(task); + if (iter != waitting_tasks_.end()) { + starting_tasks_.push(iter->second); + waitting_tasks_.erase(iter); + } + auto t = std::find(task_list_.begin(), task_list_.end(), task); + if (t != task_list_.end()) { + task_list_.erase(t); + } + } + + void TaskManager::RunOnce() { + auto task = starting_tasks_.front(); + starting_tasks_.pop(); + switch (task->state) { + case Task::State::New: + task->Start() break; + + case Task::State::Runable: + task->Resume(); + break; + + default: + break; + } + } + + std::shared_ptr GetCurrentTask() { + return map2task[cotask::this_task::get>()]; + } + + std::shared_ptr async(TaskProc proc) { + auto task = task_mgr.NewTask(proc); + return task; + } + + std::shared_ptr await(std::shared_ptr task) { + auto current_task = GetCurrentTask(); + task_mgr.Wait(current_task, task); + + current_task->Yield(); + } + + thread_local TaskManager task_mgr; +} // namespace smark::tasks \ No newline at end of file diff --git a/test/source/smark.cpp b/test/source/smark.cpp index 0910098..a1421a2 100644 --- a/test/source/smark.cpp +++ b/test/source/smark.cpp @@ -18,16 +18,7 @@ DISABLE_SOME_WARNINGS #endif #include "testsvr.h" - -#define INIT_TASK int __task_count = __COUNTER__ -#define SUB_TASK(task) \ - (void)__COUNTER__; \ - task++ -#define END_TASK __task_count = __COUNTER__ - __task_count - 1 - -// do not use '==' to compare string -// do not use string.compare: fail on "This is a response" -#define STR_COMPARE(str, value) strcmp(str.c_str(), value) == 0 +#include "util.h" using namespace smark_tests; diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp new file mode 100644 index 0000000..6b89208 --- /dev/null +++ b/test/source/tasks.cpp @@ -0,0 +1,25 @@ +#include "platform.h" +DISABLE_SOME_WARNINGS +#include + +#include "tasks.h" +#include "util.h" + +using namespace smark::tasks; + +TEST_CASE("SimpleTask") { + int task = 0; + INIT_TASK; + Task co_task([](std::shared_ptr this_task) { + SUB_TASK(task); + this_task->Yield(); + SUB_TASK(task); + }); + CHECK(co_task.state == Task::State::New); + co_task.Start(); + CHECK(co_task.state == Task::State::Runable); + co_task.Resume(); + CHECK(co_task.state == Task::State::Dead); + END_TASK; + CHECK(task == __task_count); +} diff --git a/test/source/util.h b/test/source/util.h new file mode 100644 index 0000000..a571bb7 --- /dev/null +++ b/test/source/util.h @@ -0,0 +1,9 @@ +#define INIT_TASK int __task_count = __COUNTER__ +#define SUB_TASK(task) \ + (void)__COUNTER__; \ + task++ +#define END_TASK __task_count = __COUNTER__ - __task_count - 1 + +// do not use '==' to compare string +// do not use string.compare: fail on "This is a response" +#define STR_COMPARE(str, value) strcmp(str.c_str(), value) == 0 \ No newline at end of file From eae8b2a0c75c2b2645c6b5ecbcf4e3e7eb744172 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Tue, 16 Jun 2020 23:05:46 +0800 Subject: [PATCH 04/24] fix(task): fix compile errors --- CMakeLists.txt | 10 ++++++---- include/tasks.h | 5 ++++- source/tasks.cpp | 7 +++++-- test/source/tasks.cpp | 2 +- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0508aee..122af99 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,10 +87,12 @@ if (NOT TARGET lua) add_library(LuaForGlue ${lua_sources} ${lua_headers}) set_target_properties(LuaForGlue PROPERTIES LINKER_LANGUAGE C) - target_include_directories(LuaForGlue - PUBLIC - $ - ) + # target_include_directories(LuaForGlue + # PUBLIC + # $ + # ) + + target_include_directories(LuaForGlue PUBLIC ${lua_SOURCE_DIR}) if(ANDROID) target_compile_definitions(LuaForGlue PRIVATE LUA_USE_POSIX LUA_USE_DLOPEN) diff --git a/include/tasks.h b/include/tasks.h index 4208d89..da91419 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -8,8 +8,11 @@ #include namespace smark::tasks { - typedef std::function)> TaskProc; class TaskManager; + class Task; + + typedef std::function)> TaskProc; + class Task : std::enable_shared_from_this { public: enum State { New, Runable, Dead }; diff --git a/source/tasks.cpp b/source/tasks.cpp index d168785..b68f2be 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -14,7 +14,7 @@ namespace smark::tasks { // TODO: add to finally this_ptr->state = State::Dead; - map2task.erase(cotask::this_task::get>); + map2task.erase(cotask::this_task::get>()); }); } @@ -71,7 +71,8 @@ namespace smark::tasks { starting_tasks_.pop(); switch (task->state) { case Task::State::New: - task->Start() break; + task->Start(); + break; case Task::State::Runable: task->Resume(); @@ -96,6 +97,8 @@ namespace smark::tasks { task_mgr.Wait(current_task, task); current_task->Yield(); + + return task; } thread_local TaskManager task_mgr; diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 6b89208..469a9be 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -10,7 +10,7 @@ using namespace smark::tasks; TEST_CASE("SimpleTask") { int task = 0; INIT_TASK; - Task co_task([](std::shared_ptr this_task) { + Task co_task([&](std::shared_ptr this_task) { SUB_TASK(task); this_task->Yield(); SUB_TASK(task); From 66f857daf7435e3d83f6089fc4437c4bee717eca Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Tue, 16 Jun 2020 23:43:28 +0800 Subject: [PATCH 05/24] build(copp): add copp and cotask --- CMakeLists.txt | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 122af99..9e780b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,7 +53,7 @@ packageProject( ) CPMAddPackage( - NAME copp + NAME libcopp GITHUB_REPOSITORY owt5008137/libcopp GIT_TAG 1.2.1 ) @@ -61,13 +61,23 @@ CPMAddPackage( packageProject( NAME copp VERSION ${PROJECT_VERSION} - BINARY_DIR ${copp_BINARY_DIR} - INCLUDE_DIR ${copp_SOURCE_DIR}/include + BINARY_DIR ${libcopp_BINARY_DIR} + INCLUDE_DIR ${libcopp_SOURCE_DIR}/include + INCLUDE_DESTINATION include/${PROJECT_NAME}-${PROJECT_VERSION} + DEPENDENCIES +) + +packageProject( + NAME cotask + VERSION ${PROJECT_VERSION} + BINARY_DIR ${libcopp_BINARY_DIR} + INCLUDE_DIR ${libcopp_SOURCE_DIR}/include INCLUDE_DESTINATION include/${PROJECT_NAME}-${PROJECT_VERSION} DEPENDENCIES ) -target_include_directories(copp PUBLIC ${copp_SOURCE_DIR}/include) +target_include_directories(copp PUBLIC ${libcopp_SOURCE_DIR}/include) +target_include_directories(cotask PUBLIC ${libcopp_SOURCE_DIR}/include) if (NOT TARGET lua) CPMFindPackage( @@ -136,7 +146,7 @@ target_compile_options(Smark PUBLIC "$<$:/permissive->") # Link dependencies (if required) # target_link_libraries(Smark PUBLIC cxxopts) -target_link_libraries(Smark PUBLIC uv_a HttpParser LuaForGlue copp) +target_link_libraries(Smark PUBLIC uv_a HttpParser LuaForGlue copp cotask) target_include_directories(Smark PUBLIC From a19ed887c5f09a0dd308b15535b4d8d5b19d2608 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Tue, 16 Jun 2020 23:44:02 +0800 Subject: [PATCH 06/24] feat(util): enable shared_from_this in constructor --- include/util.h | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/include/util.h b/include/util.h index 8a71ad3..58e177a 100644 --- a/include/util.h +++ b/include/util.h @@ -20,6 +20,39 @@ extern "C" { [&](void*) { X }}; namespace smark::util { + // enabe shared_from_this in construct + // from http://www.cplusplus.com/forum/general/202645/ + template void construct_deleter(T* t) { + t->~T(); + free(t); + } + + template class enable_shared_from_this { + public: + std::shared_ptr* _construct_pself; + std::weak_ptr _construct_self; + + std::shared_ptr shared_from_this() { + if (_construct_pself) { + return *_construct_pself; + } else { + return _construct_self.lock(); + } + } + }; + + template std::shared_ptr make_shared(Params&&... args) { + std::shared_ptr rtn; + T* t = (T*)calloc(1, sizeof(T)); + rtn.reset(t, construct_deleter); + t->_construct_pself = &rtn; + t = new (t) T(std::forward(args)...); + t->_construct_pself = NULL; + t->_construct_self = rtn; + + return rtn; + } + typedef std::function CallbackType; // void(int status) class EventLoop; class IEventObj { From 68b10afca2b269d3cfef4912786cddd0e7e82de0 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Tue, 16 Jun 2020 23:44:29 +0800 Subject: [PATCH 07/24] fix(test): fix and pass test 'SimpleTask' --- include/tasks.h | 4 +++- test/source/tasks.cpp | 12 ++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/include/tasks.h b/include/tasks.h index da91419..9b3db7f 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -7,13 +7,15 @@ #include #include +#include "util.h" + namespace smark::tasks { class TaskManager; class Task; typedef std::function)> TaskProc; - class Task : std::enable_shared_from_this { + class Task : public smark::util::enable_shared_from_this { public: enum State { New, Runable, Dead }; State state = State::New; diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 469a9be..28c1df9 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -10,16 +10,16 @@ using namespace smark::tasks; TEST_CASE("SimpleTask") { int task = 0; INIT_TASK; - Task co_task([&](std::shared_ptr this_task) { + auto co_task = smark::util::make_shared([&](std::shared_ptr this_task) { SUB_TASK(task); this_task->Yield(); SUB_TASK(task); }); - CHECK(co_task.state == Task::State::New); - co_task.Start(); - CHECK(co_task.state == Task::State::Runable); - co_task.Resume(); - CHECK(co_task.state == Task::State::Dead); + CHECK(co_task->state == Task::State::New); + co_task->Start(); + CHECK(co_task->state == Task::State::Runable); + co_task->Resume(); + CHECK(co_task->state == Task::State::Dead); END_TASK; CHECK(task == __task_count); } From 746053da6c08a0298569a6d7ca2120f1731e6595 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Wed, 17 Jun 2020 22:11:19 +0800 Subject: [PATCH 08/24] (test, Task): add TaskMap2Task --- include/tasks.h | 5 +++++ source/tasks.cpp | 7 ++----- test/source/tasks.cpp | 18 ++++++++++++++++-- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/include/tasks.h b/include/tasks.h index 9b3db7f..daa74d2 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -7,6 +7,7 @@ #include #include +#include "debug.h" #include "util.h" namespace smark::tasks { @@ -15,6 +16,7 @@ namespace smark::tasks { typedef std::function)> TaskProc; + // TODO: add exception handler class Task : public smark::util::enable_shared_from_this { public: enum State { New, Runable, Dead }; @@ -48,4 +50,7 @@ namespace smark::tasks { std::shared_ptr async(TaskProc proc); std::shared_ptr await(std::shared_ptr task); extern thread_local TaskManager task_mgr; +#ifdef DEBUG + extern thread_local std::map*, std::shared_ptr> map2task; +#endif } // namespace smark::tasks \ No newline at end of file diff --git a/source/tasks.cpp b/source/tasks.cpp index b68f2be..2839a2a 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -8,13 +8,10 @@ namespace smark::tasks { Task::Task(TaskProc proc) { auto this_ptr = shared_from_this(); task_ptr_ = cotask::task<>::create([=]() { + DEFER(this_ptr->state = State::Dead; + map2task.erase(cotask::this_task::get>());) map2task[cotask::this_task::get>()] = this_ptr; - proc(this_ptr); - - // TODO: add to finally - this_ptr->state = State::Dead; - map2task.erase(cotask::this_task::get>()); }); } diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 28c1df9..c2205e8 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -6,11 +6,12 @@ DISABLE_SOME_WARNINGS #include "util.h" using namespace smark::tasks; +using namespace smark::util; -TEST_CASE("SimpleTask") { +TEST_CASE("TaskSimple") { int task = 0; INIT_TASK; - auto co_task = smark::util::make_shared([&](std::shared_ptr this_task) { + auto co_task = make_shared([&](std::shared_ptr this_task) { SUB_TASK(task); this_task->Yield(); SUB_TASK(task); @@ -23,3 +24,16 @@ TEST_CASE("SimpleTask") { END_TASK; CHECK(task == __task_count); } + +TEST_CASE("TaskMap2Task") { + cotask::task<>* t = nullptr; + auto co_task = make_shared([&t](std::shared_ptr this_task) { + t = cotask::this_task::get>(); + this_task->Yield(); + }); + co_task->Start(); + auto r1 = map2task[t]; + CHECK(r1.get() == co_task.get()); + co_task->Resume(); + CHECK(map2task.find(t) == map2task.end()); +} From f4aaf54d2b3face9af4bf3ff330f2ec86d6e12bb Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Thu, 18 Jun 2020 10:01:27 +0800 Subject: [PATCH 09/24] test(task): add TaskAsync test. --- include/tasks.h | 12 +++++++++--- source/tasks.cpp | 20 ++++++++++++-------- test/source/tasks.cpp | 28 ++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 11 deletions(-) diff --git a/include/tasks.h b/include/tasks.h index daa74d2..970f2ea 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -27,19 +27,25 @@ namespace smark::tasks { void Resume(); void Wait(std::shared_ptr task); void WaitAll(const std::vector>* task_list); - template void Complete(ResultType* result); - template ResultType* GetResult(); + template void Complete(ResultType* result) { + _Complete(reinterpret_cast(result)); + } + template ResultType* GetResult() { + return reinterpret_cast(_GetResult()); + } private: cotask::task<>::ptr_t task_ptr_; void* result_; + void _Complete(void* result); + void* _GetResult(); }; class TaskManager { public: std::shared_ptr NewTask(TaskProc proc); void Wait(std::shared_ptr waiter, std::shared_ptr waitting); void StopTask(std::shared_ptr task); - void RunOnce(); + int RunOnce(); private: std::vector> task_list_; diff --git a/source/tasks.cpp b/source/tasks.cpp index 2839a2a..d4c293d 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -32,23 +32,22 @@ namespace smark::tasks { } } - template void Task::Complete(ResultType* result) { - result_ = reinterpret_cast(result); + void Task::_Complete(void* result) { + result_ = result; task_mgr.StopTask(shared_from_this()); } - template ResultType* Task::GetResult() { - return reinterpret_cast(result_); - } + void* Task::_GetResult() { return result_; } std::shared_ptr TaskManager::NewTask(TaskProc proc) { - auto task = std::make_shared(proc); + auto task = smark::util::make_shared(proc); task_list_.push_back(task); return task; } void TaskManager::Wait(std::shared_ptr waiter, std::shared_ptr waitting) { waitting_tasks_[waitting] = waiter; + starting_tasks_.push(waitting); } void TaskManager::StopTask(std::shared_ptr task) { @@ -63,21 +62,26 @@ namespace smark::tasks { } } - void TaskManager::RunOnce() { - auto task = starting_tasks_.front(); + int TaskManager::RunOnce() { + int run_task_count = 0; + if (starting_tasks_.size() == 0) return 0; + auto task = starting_tasks_.front(); // use front before size check is undefined behaivour. starting_tasks_.pop(); switch (task->state) { case Task::State::New: task->Start(); + run_task_count++; break; case Task::State::Runable: task->Resume(); + run_task_count++; break; default: break; } + return run_task_count; } std::shared_ptr GetCurrentTask() { diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index c2205e8..5685d9a 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -11,6 +11,7 @@ using namespace smark::util; TEST_CASE("TaskSimple") { int task = 0; INIT_TASK; + auto co_task = make_shared([&](std::shared_ptr this_task) { SUB_TASK(task); this_task->Yield(); @@ -21,6 +22,7 @@ TEST_CASE("TaskSimple") { CHECK(co_task->state == Task::State::Runable); co_task->Resume(); CHECK(co_task->state == Task::State::Dead); + END_TASK; CHECK(task == __task_count); } @@ -29,7 +31,9 @@ TEST_CASE("TaskMap2Task") { cotask::task<>* t = nullptr; auto co_task = make_shared([&t](std::shared_ptr this_task) { t = cotask::this_task::get>(); + CHECK(GetCurrentTask().get() == this_task.get()); this_task->Yield(); + CHECK(GetCurrentTask().get() == this_task.get()); }); co_task->Start(); auto r1 = map2task[t]; @@ -37,3 +41,27 @@ TEST_CASE("TaskMap2Task") { co_task->Resume(); CHECK(map2task.find(t) == map2task.end()); } + +TEST_CASE("TaskAsync") { + int task = 0; + INIT_TASK; + + auto t1 = task_mgr.NewTask([&](std::shared_ptr this_task) { + (void)this_task; + auto child_task = async([&](std::shared_ptr this_task) { + SUB_TASK(task); + auto result = new int(1); + this_task->Complete(result); + }); + SUB_TASK(task); + auto result = await(child_task)->GetResult(); + CHECK(*result == 1); + }); + t1->Start(); + + while (task_mgr.RunOnce()) + ; + + END_TASK; + CHECK(task == __task_count); +} From fac03a93b210352954d1f1ee88333d948909ddfb Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Thu, 18 Jun 2020 23:13:49 +0800 Subject: [PATCH 10/24] test(task): support multi thread test --- test/source/tasks.cpp | 81 +++++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 5685d9a..8b65b25 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -12,55 +12,68 @@ TEST_CASE("TaskSimple") { int task = 0; INIT_TASK; - auto co_task = make_shared([&](std::shared_ptr this_task) { - SUB_TASK(task); - this_task->Yield(); - SUB_TASK(task); - }); - CHECK(co_task->state == Task::State::New); - co_task->Start(); - CHECK(co_task->state == Task::State::Runable); - co_task->Resume(); - CHECK(co_task->state == Task::State::Dead); + std::thread([&task]() { + auto co_task = make_shared([&](std::shared_ptr this_task) { + SUB_TASK(task); + this_task->Yield(); + SUB_TASK(task); + }); + CHECK(co_task->state == Task::State::New); + co_task->Start(); + CHECK(co_task->state == Task::State::Runable); + co_task->Resume(); + CHECK(co_task->state == Task::State::Dead); + }).join(); END_TASK; CHECK(task == __task_count); } TEST_CASE("TaskMap2Task") { - cotask::task<>* t = nullptr; - auto co_task = make_shared([&t](std::shared_ptr this_task) { - t = cotask::this_task::get>(); - CHECK(GetCurrentTask().get() == this_task.get()); - this_task->Yield(); - CHECK(GetCurrentTask().get() == this_task.get()); - }); - co_task->Start(); - auto r1 = map2task[t]; - CHECK(r1.get() == co_task.get()); - co_task->Resume(); - CHECK(map2task.find(t) == map2task.end()); + int task = 0; + INIT_TASK; + + std::thread([&task]() { + SUB_TASK(task); + cotask::task<>* t = nullptr; + auto co_task = make_shared([&t](std::shared_ptr this_task) { + t = cotask::this_task::get>(); + CHECK(GetCurrentTask().get() == this_task.get()); + this_task->Yield(); + CHECK(GetCurrentTask().get() == this_task.get()); + }); + co_task->Start(); + auto r1 = map2task[t]; + CHECK(r1.get() == co_task.get()); + co_task->Resume(); + CHECK(map2task.find(t) == map2task.end()); + }).join(); + + END_TASK; + CHECK(task == __task_count); } TEST_CASE("TaskAsync") { int task = 0; INIT_TASK; - auto t1 = task_mgr.NewTask([&](std::shared_ptr this_task) { - (void)this_task; - auto child_task = async([&](std::shared_ptr this_task) { + std::thread([&task]() { + auto t1 = task_mgr.NewTask([&](std::shared_ptr this_task) { + (void)this_task; + auto child_task = async([&](std::shared_ptr this_task) { + SUB_TASK(task); + auto result = new int(1); + this_task->Complete(result); + }); SUB_TASK(task); - auto result = new int(1); - this_task->Complete(result); + auto result = await(child_task)->GetResult(); + CHECK(*result == 1); }); - SUB_TASK(task); - auto result = await(child_task)->GetResult(); - CHECK(*result == 1); - }); - t1->Start(); + t1->Start(); - while (task_mgr.RunOnce()) - ; + while (task_mgr.RunOnce()) + ; + }).join(); END_TASK; CHECK(task == __task_count); From cd7bab352b7a062178e971931c8694ba36ed1026 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Thu, 18 Jun 2020 23:43:38 +0800 Subject: [PATCH 11/24] test(task): add TaskCompleteFromOutside and pass --- test/source/tasks.cpp | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 8b65b25..e03fd5b 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -78,3 +78,36 @@ TEST_CASE("TaskAsync") { END_TASK; CHECK(task == __task_count); } + +TEST_CASE("TaskCompleteFromOutside") { + int task = 0; + INIT_TASK; + + std::thread([&task]() { + std::function func([]() {}); + + auto t1 = task_mgr.NewTask([&](std::shared_ptr this_task) { + (void)this_task; + auto child_task = async([&](std::shared_ptr this_task) { + SUB_TASK(task); + func = [this_task]() { + auto result = new int(1); + this_task->Complete(result); + }; + }); + SUB_TASK(task); + auto result = await(child_task)->GetResult(); + CHECK(*result == 1); + }); + t1->Start(); + + task_mgr.RunOnce(); // run child_task + func(); // set result of child_task + task_mgr.RunOnce(); + + CHECK(task_mgr.RunOnce() == 0); // ensure no running task remain. + }).join(); + + END_TASK; + CHECK(task == __task_count); +} From c671934dd164e46156f3d27ef8f39ae46f5a758c Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Thu, 18 Jun 2020 23:47:41 +0800 Subject: [PATCH 12/24] feat(task): add stop to task_mgr --- include/tasks.h | 2 ++ source/tasks.cpp | 3 +++ test/source/tasks.cpp | 2 +- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/include/tasks.h b/include/tasks.h index 970f2ea..f07590c 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -46,6 +46,8 @@ namespace smark::tasks { void Wait(std::shared_ptr waiter, std::shared_ptr waitting); void StopTask(std::shared_ptr task); int RunOnce(); + void Stop(); + bool is_stopped = false; private: std::vector> task_list_; diff --git a/source/tasks.cpp b/source/tasks.cpp index d4c293d..8b94faf 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -63,6 +63,7 @@ namespace smark::tasks { } int TaskManager::RunOnce() { + if (is_stopped) return 0; int run_task_count = 0; if (starting_tasks_.size() == 0) return 0; auto task = starting_tasks_.front(); // use front before size check is undefined behaivour. @@ -84,6 +85,8 @@ namespace smark::tasks { return run_task_count; } + void TaskManager::Stop() { is_stopped = true; } + std::shared_ptr GetCurrentTask() { return map2task[cotask::this_task::get>()]; } diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index e03fd5b..70eadee 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -103,7 +103,7 @@ TEST_CASE("TaskCompleteFromOutside") { task_mgr.RunOnce(); // run child_task func(); // set result of child_task - task_mgr.RunOnce(); + task_mgr.RunOnce(); // let child_task trigger parent_task CHECK(task_mgr.RunOnce() == 0); // ensure no running task remain. }).join(); From 366b35394deca3cb0c0ab1297e1c1eb270774eec Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Fri, 19 Jun 2020 22:58:09 +0800 Subject: [PATCH 13/24] feat(util-shared_from_this): enable multi inherit --- include/util.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/include/util.h b/include/util.h index 58e177a..331f485 100644 --- a/include/util.h +++ b/include/util.h @@ -20,7 +20,7 @@ extern "C" { [&](void*) { X }}; namespace smark::util { - // enabe shared_from_this in construct + // enabe shared_from_this in constructor // from http://www.cplusplus.com/forum/general/202645/ template void construct_deleter(T* t) { t->~T(); @@ -34,7 +34,7 @@ namespace smark::util { std::shared_ptr shared_from_this() { if (_construct_pself) { - return *_construct_pself; + return *_construct_pself; // in constructor } else { return _construct_self.lock(); } @@ -45,10 +45,10 @@ namespace smark::util { std::shared_ptr rtn; T* t = (T*)calloc(1, sizeof(T)); rtn.reset(t, construct_deleter); - t->_construct_pself = &rtn; + t->enable_shared_from_this::_construct_pself = &rtn; // fix ambiguous t = new (t) T(std::forward(args)...); - t->_construct_pself = NULL; - t->_construct_self = rtn; + t->enable_shared_from_this::_construct_pself = NULL; + t->enable_shared_from_this::_construct_self = rtn; return rtn; } From c68c2b5c2ad2df6fc2b0b6a894513992a8232676 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Fri, 19 Jun 2020 23:15:14 +0800 Subject: [PATCH 14/24] feat(task): add ValueTask and pass test --- include/tasks.h | 58 +++++++++++++++++++++++++++++++++++++++---- source/tasks.cpp | 28 ++++++++++----------- test/source/smark.cpp | 25 +++++++++---------- test/source/tasks.cpp | 21 ++++++++++++++++ 4 files changed, 100 insertions(+), 32 deletions(-) diff --git a/include/tasks.h b/include/tasks.h index f07590c..174396b 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -21,25 +21,71 @@ namespace smark::tasks { public: enum State { New, Runable, Dead }; State state = State::New; - Task(TaskProc proc); + explicit Task(TaskProc proc); + Task() = default; + void SetProc(TaskProc proc); void Start(); void Yield(); void Resume(); void Wait(std::shared_ptr task); void WaitAll(const std::vector>* task_list); + void Stop(); template void Complete(ResultType* result) { - _Complete(reinterpret_cast(result)); + result_ = reinterpret_cast(result); + Stop(); } template ResultType* GetResult() { - return reinterpret_cast(_GetResult()); + return reinterpret_cast(result_); + } + virtual ~Task() = default; + + protected: + template + void SetProcContext_(std::shared_ptr task_ptr, + std::function)> proc) { + task_ptr_ = cotask::task<>::create([&, task_ptr, proc]() { + DEFER(std::dynamic_pointer_cast(task_ptr)->state = State::Dead; + UnregisterTaskFromMap2Task();) + RegisterTaskToMap2Task(std::dynamic_pointer_cast(task_ptr)); + proc(task_ptr); + }); } private: cotask::task<>::ptr_t task_ptr_; void* result_; - void _Complete(void* result); - void* _GetResult(); + void RegisterTaskToMap2Task(std::shared_ptr task_ptr); + void UnregisterTaskFromMap2Task(); + }; + + template class ValueTask : public smark::util::enable_shared_from_this>, + public Task { + public: + typedef std::function>)> ProcType; + explicit ValueTask(ProcType proc) : Task() { SetProc(proc); } + void SetProc(ProcType proc) { + SetProcContext_>(enable_shared_from_this>::shared_from_this(), + proc); + } + inline void Start() { Task::Start(); } + inline void Yield() { Task::Yield(); } + inline void Resume() { Task::Resume(); } + inline void Stop() { Task::Stop(); } + inline void Wait(std::shared_ptr task) { Task::Wait(task); } + inline void WaitAll(const std::vector>* task_list) { + Task::WaitAll(task_list); + } + inline void Complete(std::shared_ptr result) { + result_ = std::static_pointer_cast(result); + Task::Stop(); + } + inline std::shared_ptr GetResult() { return std::dynamic_pointer_cast(result_); } + inline State GetState() { return state; } + + private: + std::shared_ptr result_; }; + class TaskManager { public: std::shared_ptr NewTask(TaskProc proc); @@ -56,6 +102,8 @@ namespace smark::tasks { }; std::shared_ptr GetCurrentTask(); std::shared_ptr async(TaskProc proc); + // template std::shared_ptr> async(ValueTaskProc + // proc); std::shared_ptr await(std::shared_ptr task); extern thread_local TaskManager task_mgr; #ifdef DEBUG diff --git a/source/tasks.cpp b/source/tasks.cpp index 8b94faf..f92f6d1 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -5,14 +5,16 @@ namespace smark::tasks { thread_local std::map*, std::shared_ptr> map2task; - Task::Task(TaskProc proc) { - auto this_ptr = shared_from_this(); - task_ptr_ = cotask::task<>::create([=]() { - DEFER(this_ptr->state = State::Dead; - map2task.erase(cotask::this_task::get>());) - map2task[cotask::this_task::get>()] = this_ptr; - proc(this_ptr); - }); + Task::Task(TaskProc proc) { SetProc(proc); } + + void Task::SetProc(TaskProc proc) { SetProcContext_(shared_from_this(), proc); } + + void Task::RegisterTaskToMap2Task(std::shared_ptr task_ptr) { + map2task[cotask::this_task::get>()] = task_ptr; + } + + void Task::UnregisterTaskFromMap2Task() { + map2task.erase(cotask::this_task::get>()); } void Task::Start() { @@ -32,12 +34,7 @@ namespace smark::tasks { } } - void Task::_Complete(void* result) { - result_ = result; - task_mgr.StopTask(shared_from_this()); - } - - void* Task::_GetResult() { return result_; } + void Task::Stop() { task_mgr.StopTask(shared_from_this()); } std::shared_ptr TaskManager::NewTask(TaskProc proc) { auto task = smark::util::make_shared(proc); @@ -96,6 +93,9 @@ namespace smark::tasks { return task; } + // template std::shared_ptr> async(ValueTaskProc proc) + // {} + std::shared_ptr await(std::shared_ptr task) { auto current_task = GetCurrentTask(); task_mgr.Wait(current_task, task); diff --git a/test/source/smark.cpp b/test/source/smark.cpp index a1421a2..c44699d 100644 --- a/test/source/smark.cpp +++ b/test/source/smark.cpp @@ -18,7 +18,6 @@ DISABLE_SOME_WARNINGS #endif #include "testsvr.h" -#include "util.h" using namespace smark_tests; @@ -45,7 +44,7 @@ TEST_CASE("TCPClient") { int task = 0; INIT_TASK; - util::EventLoop el; + smark::util::EventLoop el; TCPClient cli(&el); const char data[] = "Hello world"; @@ -57,11 +56,11 @@ TEST_CASE("TCPClient") { }; cli.Connect("127.0.0.1", port, [&cli, &task, &data](int status) { if (status) { - ERR("Connect error:" << util::EventLoop::GetErrorStr(status)); + ERR("Connect error:" << smark::util::EventLoop::GetErrorStr(status)); } cli.Write(data, sizeof(data), [](int status) { if (status) { - ERR("Write error:" << util::EventLoop::GetErrorStr(status)); + ERR("Write error:" << smark::util::EventLoop::GetErrorStr(status)); } }); }); @@ -78,12 +77,12 @@ TEST_CASE("FailConnect") { int task = 0; INIT_TASK; - util::EventLoop el; + smark::util::EventLoop el; TCPClient cli(&el); cli.Connect("127.0.0.1", port, [&task](int status) { SUB_TASK(task); if (status) { - DLOG("Test fail connect:" << util::EventLoop::GetErrorStr(status)); + DLOG("Test fail connect:" << smark::util::EventLoop::GetErrorStr(status)); } // Use macro instead of actual status code. @@ -123,18 +122,18 @@ TEST_CASE("HttpClient") { DLOG("Run Http server on port:" << p); INIT_TASK; int task = 0; - auto req = std::make_shared(); + auto req = std::make_shared(); req->method = "Get"; req->request_uri = "/test"; - auto test_header = std::make_shared(); + auto test_header = std::make_shared(); test_header->name = "test-header"; test_header->value = "test_value"; req->headers.push_back(test_header); req->body = "This is a request"; - util::EventLoop el; + smark::util::EventLoop el; HttpClient cli(&el); - cli.on_response = [&task, &el, &cli](auto, std::shared_ptr res) { + cli.on_response = [&task, &el, &cli](auto, std::shared_ptr res) { SUB_TASK(task); CHECK(STR_COMPARE(res->status_code, "OK")); int header_count = res->headers.size(); @@ -147,7 +146,7 @@ TEST_CASE("HttpClient") { }; cli.Connect("127.0.0.1", p, [&cli, &req](int status) { if (status) { - ERR("Connect error:" << util::EventLoop::GetErrorStr(status)); + ERR("Connect error:" << smark::util::EventLoop::GetErrorStr(status)); } cli.Request(req.get()); }); @@ -237,10 +236,10 @@ TEST_CASE("Script_Response") { " pass=true\n" "end"; script.Run(code); - util::HttpResponse res; + smark::util::HttpResponse res; res.body = "content"; res.status_code = "200"; - auto header = std::make_shared(); + auto header = std::make_shared(); header->name = "test"; header->value = "value"; res.headers.push_back(header); diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 70eadee..6f949d0 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -111,3 +111,24 @@ TEST_CASE("TaskCompleteFromOutside") { END_TASK; CHECK(task == __task_count); } + +TEST_CASE("TaskValueTask") { + int task = 0; + INIT_TASK; + + std::thread([&task]() { + auto co_task = make_shared>([&](std::shared_ptr> this_task) { + SUB_TASK(task); + this_task->Yield(); + SUB_TASK(task); + }); + CHECK(co_task->GetState() == Task::State::New); + co_task->Start(); + CHECK(co_task->GetState() == Task::State::Runable); + co_task->Resume(); + CHECK(co_task->GetState() == Task::State::Dead); + }).join(); + + END_TASK; + CHECK(task == __task_count); +} From 33b78645050851163eaa5482556c6cf28d338875 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Fri, 19 Jun 2020 23:26:57 +0800 Subject: [PATCH 15/24] refactor(task): remove Complete and GetResult from Task --- include/tasks.h | 15 --------------- test/source/tasks.cpp | 16 +++++----------- 2 files changed, 5 insertions(+), 26 deletions(-) diff --git a/include/tasks.h b/include/tasks.h index 174396b..adb2c04 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -30,13 +30,6 @@ namespace smark::tasks { void Wait(std::shared_ptr task); void WaitAll(const std::vector>* task_list); void Stop(); - template void Complete(ResultType* result) { - result_ = reinterpret_cast(result); - Stop(); - } - template ResultType* GetResult() { - return reinterpret_cast(result_); - } virtual ~Task() = default; protected: @@ -67,14 +60,6 @@ namespace smark::tasks { SetProcContext_>(enable_shared_from_this>::shared_from_this(), proc); } - inline void Start() { Task::Start(); } - inline void Yield() { Task::Yield(); } - inline void Resume() { Task::Resume(); } - inline void Stop() { Task::Stop(); } - inline void Wait(std::shared_ptr task) { Task::Wait(task); } - inline void WaitAll(const std::vector>* task_list) { - Task::WaitAll(task_list); - } inline void Complete(std::shared_ptr result) { result_ = std::static_pointer_cast(result); Task::Stop(); diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 6f949d0..2799653 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -62,12 +62,10 @@ TEST_CASE("TaskAsync") { (void)this_task; auto child_task = async([&](std::shared_ptr this_task) { SUB_TASK(task); - auto result = new int(1); - this_task->Complete(result); + this_task->Stop(); }); SUB_TASK(task); - auto result = await(child_task)->GetResult(); - CHECK(*result == 1); + await(child_task); }); t1->Start(); @@ -79,7 +77,7 @@ TEST_CASE("TaskAsync") { CHECK(task == __task_count); } -TEST_CASE("TaskCompleteFromOutside") { +TEST_CASE("TaskStopFromOutside") { int task = 0; INIT_TASK; @@ -90,14 +88,10 @@ TEST_CASE("TaskCompleteFromOutside") { (void)this_task; auto child_task = async([&](std::shared_ptr this_task) { SUB_TASK(task); - func = [this_task]() { - auto result = new int(1); - this_task->Complete(result); - }; + func = [this_task]() { this_task->Stop(); }; }); SUB_TASK(task); - auto result = await(child_task)->GetResult(); - CHECK(*result == 1); + await(child_task); }); t1->Start(); From 5e4ad31568a3149c677aaabaa215902a6a4cf30a Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Sat, 20 Jun 2020 00:19:25 +0800 Subject: [PATCH 16/24] refactor(task): add task_async and vt_async --- include/tasks.h | 19 ++++++++++++++++--- source/tasks.cpp | 10 ++++++---- test/source/tasks.cpp | 6 +++--- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/include/tasks.h b/include/tasks.h index adb2c04..6d1d08a 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -19,6 +19,7 @@ namespace smark::tasks { // TODO: add exception handler class Task : public smark::util::enable_shared_from_this { public: + typedef std::function)> ProcType; enum State { New, Runable, Dead }; State state = State::New; explicit Task(TaskProc proc); @@ -74,6 +75,11 @@ namespace smark::tasks { class TaskManager { public: std::shared_ptr NewTask(TaskProc proc); + void AddTask(std::shared_ptr task); + template std::shared_ptr AddTask(std::shared_ptr task) { + AddTask(std::dynamic_pointer_cast(task)); + return task; + } void Wait(std::shared_ptr waiter, std::shared_ptr waitting); void StopTask(std::shared_ptr task); int RunOnce(); @@ -86,9 +92,16 @@ namespace smark::tasks { std::queue> starting_tasks_; }; std::shared_ptr GetCurrentTask(); - std::shared_ptr async(TaskProc proc); - // template std::shared_ptr> async(ValueTaskProc - // proc); +// std::shared_ptr async(TaskProc proc); +#define task_async(proc) smark::tasks::task_mgr.NewTask(proc); +#define vt_async(T, proc) \ + smark::tasks::task_mgr.AddTask>( \ + smark::util::make_shared>(proc)) + // template std::shared_ptr async(T::ProcType[T = T] proc) { + // auto task = util::make_shared(proc); + // task_mgr.AddTask(std::dynamic_pointer_cast(T)); + // return task; + // } std::shared_ptr await(std::shared_ptr task); extern thread_local TaskManager task_mgr; #ifdef DEBUG diff --git a/source/tasks.cpp b/source/tasks.cpp index f92f6d1..50538e2 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -42,6 +42,8 @@ namespace smark::tasks { return task; } + void TaskManager::AddTask(std::shared_ptr task) { task_list_.push_back(task); } + void TaskManager::Wait(std::shared_ptr waiter, std::shared_ptr waitting) { waitting_tasks_[waitting] = waiter; starting_tasks_.push(waitting); @@ -88,10 +90,10 @@ namespace smark::tasks { return map2task[cotask::this_task::get>()]; } - std::shared_ptr async(TaskProc proc) { - auto task = task_mgr.NewTask(proc); - return task; - } + // std::shared_ptr async(TaskProc proc) { + // auto task = task_mgr.NewTask(proc); + // return task; + // } // template std::shared_ptr> async(ValueTaskProc proc) // {} diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 2799653..c61c2f4 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -60,7 +60,7 @@ TEST_CASE("TaskAsync") { std::thread([&task]() { auto t1 = task_mgr.NewTask([&](std::shared_ptr this_task) { (void)this_task; - auto child_task = async([&](std::shared_ptr this_task) { + auto child_task = task_async([&](std::shared_ptr this_task) { SUB_TASK(task); this_task->Stop(); }); @@ -86,7 +86,7 @@ TEST_CASE("TaskStopFromOutside") { auto t1 = task_mgr.NewTask([&](std::shared_ptr this_task) { (void)this_task; - auto child_task = async([&](std::shared_ptr this_task) { + auto child_task = task_async([&](std::shared_ptr this_task) { SUB_TASK(task); func = [this_task]() { this_task->Stop(); }; }); @@ -111,7 +111,7 @@ TEST_CASE("TaskValueTask") { INIT_TASK; std::thread([&task]() { - auto co_task = make_shared>([&](std::shared_ptr> this_task) { + auto co_task = vt_async(int, [&](std::shared_ptr> this_task) { SUB_TASK(task); this_task->Yield(); SUB_TASK(task); From a585ecf1feb8dc6c6a65ba4f8e7b8e6e4971e43a Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Sat, 20 Jun 2020 16:05:36 +0800 Subject: [PATCH 17/24] refactor(task): use macro overload to _async --- include/tasks.h | 1 + include/util.h | 3 +++ test/source/tasks.cpp | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/include/tasks.h b/include/tasks.h index 6d1d08a..abf4b44 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -93,6 +93,7 @@ namespace smark::tasks { }; std::shared_ptr GetCurrentTask(); // std::shared_ptr async(TaskProc proc); +#define _async(...) GET_MACRO_V2(__VA_ARGS__, vt_async, task_async)(__VA_ARGS__) #define task_async(proc) smark::tasks::task_mgr.NewTask(proc); #define vt_async(T, proc) \ smark::tasks::task_mgr.AddTask>( \ diff --git a/include/util.h b/include/util.h index 331f485..fb5b4f5 100644 --- a/include/util.h +++ b/include/util.h @@ -19,6 +19,9 @@ extern "C" { = std::unique_ptr>{reinterpret_cast(1), \ [&](void*) { X }}; +// macro overload util +#define GET_MACRO_V2(_1, _2, NAME, ...) NAME + namespace smark::util { // enabe shared_from_this in constructor // from http://www.cplusplus.com/forum/general/202645/ diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index c61c2f4..8c32c48 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -111,7 +111,7 @@ TEST_CASE("TaskValueTask") { INIT_TASK; std::thread([&task]() { - auto co_task = vt_async(int, [&](std::shared_ptr> this_task) { + auto co_task = _async(int, [&](std::shared_ptr> this_task) { SUB_TASK(task); this_task->Yield(); SUB_TASK(task); From 8924e662dc8c4035960d9bbde34edae03e31ad3a Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Sat, 20 Jun 2020 16:14:55 +0800 Subject: [PATCH 18/24] refactor(task): remove task_list_ in TaskManager --- include/tasks.h | 13 ++----------- source/tasks.cpp | 12 ------------ test/source/tasks.cpp | 4 ++-- 3 files changed, 4 insertions(+), 25 deletions(-) diff --git a/include/tasks.h b/include/tasks.h index abf4b44..041ba02 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -74,12 +74,6 @@ namespace smark::tasks { class TaskManager { public: - std::shared_ptr NewTask(TaskProc proc); - void AddTask(std::shared_ptr task); - template std::shared_ptr AddTask(std::shared_ptr task) { - AddTask(std::dynamic_pointer_cast(task)); - return task; - } void Wait(std::shared_ptr waiter, std::shared_ptr waitting); void StopTask(std::shared_ptr task); int RunOnce(); @@ -87,17 +81,14 @@ namespace smark::tasks { bool is_stopped = false; private: - std::vector> task_list_; std::map, std::shared_ptr> waitting_tasks_; std::queue> starting_tasks_; }; std::shared_ptr GetCurrentTask(); // std::shared_ptr async(TaskProc proc); #define _async(...) GET_MACRO_V2(__VA_ARGS__, vt_async, task_async)(__VA_ARGS__) -#define task_async(proc) smark::tasks::task_mgr.NewTask(proc); -#define vt_async(T, proc) \ - smark::tasks::task_mgr.AddTask>( \ - smark::util::make_shared>(proc)) +#define task_async(proc) smark::util::make_shared(proc) +#define vt_async(T, proc) smark::util::make_shared>(proc) // template std::shared_ptr async(T::ProcType[T = T] proc) { // auto task = util::make_shared(proc); // task_mgr.AddTask(std::dynamic_pointer_cast(T)); diff --git a/source/tasks.cpp b/source/tasks.cpp index 50538e2..6092f3b 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -36,14 +36,6 @@ namespace smark::tasks { void Task::Stop() { task_mgr.StopTask(shared_from_this()); } - std::shared_ptr TaskManager::NewTask(TaskProc proc) { - auto task = smark::util::make_shared(proc); - task_list_.push_back(task); - return task; - } - - void TaskManager::AddTask(std::shared_ptr task) { task_list_.push_back(task); } - void TaskManager::Wait(std::shared_ptr waiter, std::shared_ptr waitting) { waitting_tasks_[waitting] = waiter; starting_tasks_.push(waitting); @@ -55,10 +47,6 @@ namespace smark::tasks { starting_tasks_.push(iter->second); waitting_tasks_.erase(iter); } - auto t = std::find(task_list_.begin(), task_list_.end(), task); - if (t != task_list_.end()) { - task_list_.erase(t); - } } int TaskManager::RunOnce() { diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 8c32c48..0248e16 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -58,7 +58,7 @@ TEST_CASE("TaskAsync") { INIT_TASK; std::thread([&task]() { - auto t1 = task_mgr.NewTask([&](std::shared_ptr this_task) { + auto t1 = _async([&](std::shared_ptr this_task) { (void)this_task; auto child_task = task_async([&](std::shared_ptr this_task) { SUB_TASK(task); @@ -84,7 +84,7 @@ TEST_CASE("TaskStopFromOutside") { std::thread([&task]() { std::function func([]() {}); - auto t1 = task_mgr.NewTask([&](std::shared_ptr this_task) { + auto t1 = _async([&](std::shared_ptr this_task) { (void)this_task; auto child_task = task_async([&](std::shared_ptr this_task) { SUB_TASK(task); From 13dc49d27f9685575d409137d3e16caddcb9952c Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Sun, 21 Jun 2020 20:11:06 +0800 Subject: [PATCH 19/24] fix(util-shared): support inherit --- include/util.h | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/include/util.h b/include/util.h index fb5b4f5..4c1ecd0 100644 --- a/include/util.h +++ b/include/util.h @@ -24,36 +24,41 @@ extern "C" { namespace smark::util { // enabe shared_from_this in constructor - // from http://www.cplusplus.com/forum/general/202645/ - template void construct_deleter(T* t) { + template void deleter(PtrT* __ptr) { + auto t = static_cast(__ptr); t->~T(); free(t); } + /** + * Base class allowing use of member function shared_from_this in constructor function,member + * function. + */ template class enable_shared_from_this { public: std::shared_ptr* _construct_pself; std::weak_ptr _construct_self; - std::shared_ptr shared_from_this() { + template std::shared_ptr shared_from_this() { if (_construct_pself) { - return *_construct_pself; // in constructor + return std::static_pointer_cast(*_construct_pself); // in constructor } else { - return _construct_self.lock(); + return std::static_pointer_cast(_construct_self.lock()); } } }; - template std::shared_ptr make_shared(Params&&... args) { - std::shared_ptr rtn; + template + std::shared_ptr make_shared(Params&&... args) { + std::shared_ptr rtn; T* t = (T*)calloc(1, sizeof(T)); - rtn.reset(t, construct_deleter); - t->enable_shared_from_this::_construct_pself = &rtn; // fix ambiguous + rtn.reset(t, deleter); + t->_construct_pself = &rtn; t = new (t) T(std::forward(args)...); - t->enable_shared_from_this::_construct_pself = NULL; - t->enable_shared_from_this::_construct_self = rtn; + t->_construct_pself = NULL; + t->_construct_self = rtn; - return rtn; + return std::static_pointer_cast(rtn); } typedef std::function CallbackType; // void(int status) From 3a46ee5ef966205a8a5e5d973b998a2873412efd Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Sat, 27 Jun 2020 17:20:02 +0800 Subject: [PATCH 20/24] fix(task): value task async --- include/tasks.h | 34 +++++++++++++++++++--------------- include/util.h | 5 ++--- source/tasks.cpp | 17 ----------------- test/source/tasks.cpp | 39 ++++++++++++++++++++++++++++++++++----- 4 files changed, 55 insertions(+), 40 deletions(-) diff --git a/include/tasks.h b/include/tasks.h index 041ba02..1823ff6 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -52,24 +52,22 @@ namespace smark::tasks { void UnregisterTaskFromMap2Task(); }; - template class ValueTask : public smark::util::enable_shared_from_this>, - public Task { + template class ValueTask : public Task { public: typedef std::function>)> ProcType; explicit ValueTask(ProcType proc) : Task() { SetProc(proc); } void SetProc(ProcType proc) { - SetProcContext_>(enable_shared_from_this>::shared_from_this(), - proc); + SetProcContext_>(shared_from_this>(), proc); } inline void Complete(std::shared_ptr result) { result_ = std::static_pointer_cast(result); Task::Stop(); } - inline std::shared_ptr GetResult() { return std::dynamic_pointer_cast(result_); } + inline std::shared_ptr GetResult() { return std::static_pointer_cast(result_); } inline State GetState() { return state; } private: - std::shared_ptr result_; + std::shared_ptr result_; }; class TaskManager { @@ -84,18 +82,24 @@ namespace smark::tasks { std::map, std::shared_ptr> waitting_tasks_; std::queue> starting_tasks_; }; + std::shared_ptr GetCurrentTask(); -// std::shared_ptr async(TaskProc proc); + + extern thread_local TaskManager task_mgr; + #define _async(...) GET_MACRO_V2(__VA_ARGS__, vt_async, task_async)(__VA_ARGS__) #define task_async(proc) smark::util::make_shared(proc) -#define vt_async(T, proc) smark::util::make_shared>(proc) - // template std::shared_ptr async(T::ProcType[T = T] proc) { - // auto task = util::make_shared(proc); - // task_mgr.AddTask(std::dynamic_pointer_cast(T)); - // return task; - // } - std::shared_ptr await(std::shared_ptr task); - extern thread_local TaskManager task_mgr; +#define vt_async(T, proc) \ + smark::util::make_shared>(proc) + + template std::shared_ptr await(std::shared_ptr task) { + auto current_task = GetCurrentTask(); + task_mgr.Wait(current_task, std::dynamic_pointer_cast(task)); + + current_task->Yield(); + + return std::dynamic_pointer_cast(task); + } #ifdef DEBUG extern thread_local std::map*, std::shared_ptr> map2task; #endif diff --git a/include/util.h b/include/util.h index 4c1ecd0..284569b 100644 --- a/include/util.h +++ b/include/util.h @@ -31,8 +31,7 @@ namespace smark::util { } /** - * Base class allowing use of member function shared_from_this in constructor function,member - * function. + * Base class allowing use of member function shared_from_this. */ template class enable_shared_from_this { public: @@ -48,7 +47,7 @@ namespace smark::util { } }; - template + template std::shared_ptr make_shared(Params&&... args) { std::shared_ptr rtn; T* t = (T*)calloc(1, sizeof(T)); diff --git a/source/tasks.cpp b/source/tasks.cpp index 6092f3b..d03b099 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -78,22 +78,5 @@ namespace smark::tasks { return map2task[cotask::this_task::get>()]; } - // std::shared_ptr async(TaskProc proc) { - // auto task = task_mgr.NewTask(proc); - // return task; - // } - - // template std::shared_ptr> async(ValueTaskProc proc) - // {} - - std::shared_ptr await(std::shared_ptr task) { - auto current_task = GetCurrentTask(); - task_mgr.Wait(current_task, task); - - current_task->Yield(); - - return task; - } - thread_local TaskManager task_mgr; } // namespace smark::tasks \ No newline at end of file diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 0248e16..3fbcaf2 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -8,7 +8,7 @@ DISABLE_SOME_WARNINGS using namespace smark::tasks; using namespace smark::util; -TEST_CASE("TaskSimple") { +TEST_CASE("Task_Simple") { int task = 0; INIT_TASK; @@ -29,7 +29,7 @@ TEST_CASE("TaskSimple") { CHECK(task == __task_count); } -TEST_CASE("TaskMap2Task") { +TEST_CASE("Task_Map2Task") { int task = 0; INIT_TASK; @@ -53,7 +53,7 @@ TEST_CASE("TaskMap2Task") { CHECK(task == __task_count); } -TEST_CASE("TaskAsync") { +TEST_CASE("Task_Async") { int task = 0; INIT_TASK; @@ -66,6 +66,7 @@ TEST_CASE("TaskAsync") { }); SUB_TASK(task); await(child_task); + SUB_TASK(task); }); t1->Start(); @@ -77,7 +78,7 @@ TEST_CASE("TaskAsync") { CHECK(task == __task_count); } -TEST_CASE("TaskStopFromOutside") { +TEST_CASE("Task_StopFromOutside") { int task = 0; INIT_TASK; @@ -92,6 +93,7 @@ TEST_CASE("TaskStopFromOutside") { }); SUB_TASK(task); await(child_task); + SUB_TASK(task); }); t1->Start(); @@ -106,7 +108,7 @@ TEST_CASE("TaskStopFromOutside") { CHECK(task == __task_count); } -TEST_CASE("TaskValueTask") { +TEST_CASE("Task_ValueTask") { int task = 0; INIT_TASK; @@ -126,3 +128,30 @@ TEST_CASE("TaskValueTask") { END_TASK; CHECK(task == __task_count); } + +TEST_CASE("Task_ValueTaskAsync") { + int task = 0; + INIT_TASK; + + std::thread([&task]() { + auto proc = [&](std::shared_ptr this_task) { + (void)this_task; + auto child_task = _async(int, [&](std::shared_ptr> t) { + SUB_TASK(task); + t->Complete(std::make_shared(10)); + }); + SUB_TASK(task); + CHECK(*(await(child_task)->GetResult()) == 10); + SUB_TASK(task); + }; // TODO: why? lambda-expression in template-argument only available with ‘-std=c++2a’ or + // ‘-std=gnu++2a’ + auto t1 = _async(proc); + t1->Start(); + + while (task_mgr.RunOnce()) + ; + }).join(); + + END_TASK; + CHECK(task == __task_count); +} \ No newline at end of file From a92c4f188f38cd75002d47addb8cdaf83c97c85d Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Sat, 27 Jun 2020 17:42:31 +0800 Subject: [PATCH 21/24] refactor(task): change result type to T --- include/tasks.h | 8 ++++---- source/util.cpp | 7 ++++++- test/source/tasks.cpp | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/include/tasks.h b/include/tasks.h index 1823ff6..378cbf3 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -59,15 +59,15 @@ namespace smark::tasks { void SetProc(ProcType proc) { SetProcContext_>(shared_from_this>(), proc); } - inline void Complete(std::shared_ptr result) { - result_ = std::static_pointer_cast(result); + inline void Complete(T result) { + result_ = result; Task::Stop(); } - inline std::shared_ptr GetResult() { return std::static_pointer_cast(result_); } + inline T GetResult() { return result_; } inline State GetState() { return state; } private: - std::shared_ptr result_; + T result_; }; class TaskManager { diff --git a/source/util.cpp b/source/util.cpp index 0b61d2e..276cc5b 100644 --- a/source/util.cpp +++ b/source/util.cpp @@ -5,11 +5,16 @@ #include "debug.h" #include "platform.h" +#include "tasks.h" namespace smark::util { EventLoop::EventLoop() { uv_loop_init(loop_.get()); } - void EventLoop::Wait() { uv_run(loop_.get(), UV_RUN_DEFAULT); } + void EventLoop::Wait() { + while (uv_run(loop_.get(), UV_RUN_ONCE)) { + smark::tasks::task_mgr.RunOnce(); + }; + } void EventLoop::Stop() { uv_stop(loop_.get()); } diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 3fbcaf2..1ee351a 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -138,10 +138,10 @@ TEST_CASE("Task_ValueTaskAsync") { (void)this_task; auto child_task = _async(int, [&](std::shared_ptr> t) { SUB_TASK(task); - t->Complete(std::make_shared(10)); + t->Complete(10); }); SUB_TASK(task); - CHECK(*(await(child_task)->GetResult()) == 10); + CHECK(await(child_task)->GetResult() == 10); SUB_TASK(task); }; // TODO: why? lambda-expression in template-argument only available with ‘-std=c++2a’ or // ‘-std=gnu++2a’ From 799d4f48f17024012dbd1650e577443ffc0c114a Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Sat, 27 Jun 2020 18:21:01 +0800 Subject: [PATCH 22/24] fix(task): enable stop task before wait --- include/tasks.h | 2 ++ source/tasks.cpp | 17 ++++++++++++++++- test/source/tasks.cpp | 25 +++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/include/tasks.h b/include/tasks.h index 378cbf3..ace38ec 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "debug.h" @@ -79,6 +80,7 @@ namespace smark::tasks { bool is_stopped = false; private: + std::set> completed_tasks_; std::map, std::shared_ptr> waitting_tasks_; std::queue> starting_tasks_; }; diff --git a/source/tasks.cpp b/source/tasks.cpp index d03b099..5f0b8f7 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -34,9 +34,19 @@ namespace smark::tasks { } } - void Task::Stop() { task_mgr.StopTask(shared_from_this()); } + void Task::Stop() { + state = State::Dead; + task_mgr.StopTask(shared_from_this()); + } void TaskManager::Wait(std::shared_ptr waiter, std::shared_ptr waitting) { + if (waitting->state == Task::State::Dead) { // waitting taks is completed. + completed_tasks_.erase(waitting); + starting_tasks_.push(waiter); // start waiter. + return; + } + + // waitting task is no completed currently. waitting_tasks_[waitting] = waiter; starting_tasks_.push(waitting); } @@ -46,7 +56,12 @@ namespace smark::tasks { if (iter != waitting_tasks_.end()) { starting_tasks_.push(iter->second); waitting_tasks_.erase(iter); + return; } + + // if no waitter is waitting, add to completed_tasks_ to avoid being freed when task is + // complete. + completed_tasks_.insert(task); } int TaskManager::RunOnce() { diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 1ee351a..9fdb787 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -108,6 +108,31 @@ TEST_CASE("Task_StopFromOutside") { CHECK(task == __task_count); } +TEST_CASE("Task_StopBeforeWait") { + int task = 0; + INIT_TASK; + + std::thread([&task]() { + auto proc = [&](std::shared_ptr this_task) { + (void)this_task; + auto child_task = _async([](auto) {}); + child_task->Stop(); + SUB_TASK(task); + await(child_task); + SUB_TASK(task); + }; + auto t1 = _async(proc); + t1->Start(); + + task_mgr.RunOnce(); // restart t1 + + CHECK(task_mgr.RunOnce() == 0); // ensure no running task remain. + }).join(); + + END_TASK; + CHECK(task == __task_count); +} + TEST_CASE("Task_ValueTask") { int task = 0; INIT_TASK; From ddee7912cdec21ec1f992a7c0084d2ec074d9734 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Sat, 27 Jun 2020 18:49:52 +0800 Subject: [PATCH 23/24] feat(task): add void task --- include/tasks.h | 5 +++++ include/util.h | 1 + test/source/tasks.cpp | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/include/tasks.h b/include/tasks.h index ace38ec..616ee9d 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -39,6 +39,7 @@ namespace smark::tasks { void SetProcContext_(std::shared_ptr task_ptr, std::function)> proc) { task_ptr_ = cotask::task<>::create([&, task_ptr, proc]() { + // void task will not run to this. DEFER(std::dynamic_pointer_cast(task_ptr)->state = State::Dead; UnregisterTaskFromMap2Task();) RegisterTaskToMap2Task(std::dynamic_pointer_cast(task_ptr)); @@ -57,6 +58,7 @@ namespace smark::tasks { public: typedef std::function>)> ProcType; explicit ValueTask(ProcType proc) : Task() { SetProc(proc); } + ValueTask() = default; void SetProc(ProcType proc) { SetProcContext_>(shared_from_this>(), proc); } @@ -93,6 +95,9 @@ namespace smark::tasks { #define task_async(proc) smark::util::make_shared(proc) #define vt_async(T, proc) \ smark::util::make_shared>(proc) +#define void_task(...) GET_MACRO_V0_1(_0, ##__VA_ARGS__, vt_void_task, task_void_task)(__VA_ARGS__) +#define task_void_task() smark::util::make_shared() +#define vt_void_task(T) smark::util::make_shared>() template std::shared_ptr await(std::shared_ptr task) { auto current_task = GetCurrentTask(); diff --git a/include/util.h b/include/util.h index 284569b..5206d39 100644 --- a/include/util.h +++ b/include/util.h @@ -21,6 +21,7 @@ extern "C" { // macro overload util #define GET_MACRO_V2(_1, _2, NAME, ...) NAME +#define GET_MACRO_V0_1(_0, _1, NAME, ...) NAME namespace smark::util { // enabe shared_from_this in constructor diff --git a/test/source/tasks.cpp b/test/source/tasks.cpp index 9fdb787..cdec92f 100644 --- a/test/source/tasks.cpp +++ b/test/source/tasks.cpp @@ -115,7 +115,7 @@ TEST_CASE("Task_StopBeforeWait") { std::thread([&task]() { auto proc = [&](std::shared_ptr this_task) { (void)this_task; - auto child_task = _async([](auto) {}); + auto child_task = void_task(); child_task->Stop(); SUB_TASK(task); await(child_task); From ceb607610fda17b70add8fd9a9d9e2cd17bc37c9 Mon Sep 17 00:00:00 2001 From: Aaron Robert Date: Sun, 5 Jul 2020 10:58:07 +0800 Subject: [PATCH 24/24] test(HttpAsyncClient): add http async client test and pass test --- include/client.h | 15 +++++++++++-- include/tasks.h | 1 + source/client.cpp | 29 +++++++++++++++++++++++++ source/tasks.cpp | 38 ++++++++++++++++++-------------- source/util.cpp | 8 +++++-- test/source/smark.cpp | 50 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 121 insertions(+), 20 deletions(-) diff --git a/include/client.h b/include/client.h index f312d3d..78c7d0a 100644 --- a/include/client.h +++ b/include/client.h @@ -4,21 +4,32 @@ #include #include +#include "tasks.h" #include "util.h" namespace smark { class TCPClient : public smark::util::Socket { public: - TCPClient(smark::util::EventLoop* el); + explicit TCPClient(smark::util::EventLoop* el); }; class HttpClient : public TCPClient { public: - HttpClient(smark::util::EventLoop* el); + explicit HttpClient(smark::util::EventLoop* el); void Request(util::HttpRequest* request); std::function)> on_response; private: util::HttpReponseParser parser_; }; + + class HttpAsyncClient : protected HttpClient { + // use protected inherit to prevent user changes on_complete during requesting. + public: + explicit HttpAsyncClient(smark::util::EventLoop* el); + std::shared_ptr> ConnectAsync(std::string ip, int16_t port); + std::shared_ptr>> RequestAsync( + util::HttpRequest* request); + void Close(); + }; } // namespace smark \ No newline at end of file diff --git a/include/tasks.h b/include/tasks.h index 616ee9d..a2e3af7 100644 --- a/include/tasks.h +++ b/include/tasks.h @@ -79,6 +79,7 @@ namespace smark::tasks { void StopTask(std::shared_ptr task); int RunOnce(); void Stop(); + bool IsEmpty(); bool is_stopped = false; private: diff --git a/source/client.cpp b/source/client.cpp index 7cbe996..5d776fe 100644 --- a/source/client.cpp +++ b/source/client.cpp @@ -25,4 +25,33 @@ namespace smark { }); } + HttpAsyncClient::HttpAsyncClient(smark::util::EventLoop* el) : HttpClient(el) {} + + std::shared_ptr> HttpAsyncClient::ConnectAsync(std::string ip, + int16_t port) { + auto task = _async(int, [=](std::shared_ptr> this_task) { + DLOG("Try to connect:" << LOG_VALUE(ip) << LOG_VALUE(port)); + Connect(ip, port, [=](int status) { + DLOG("Connected result:" << LOG_VALUE(status)); + this_task->Complete(status); + }); + }); + return task; + } + + std::shared_ptr>> + HttpAsyncClient::RequestAsync(util::HttpRequest* request) { + auto task = _async( + std::shared_ptr, + [=](std::shared_ptr>> this_task) { + on_response = [this_task](auto, std::shared_ptr res) { + this_task->Complete(res); + }; + HttpClient::Request(request); + }); + return task; + } + + void HttpAsyncClient::Close() { Socket::Close(); } + } // namespace smark diff --git a/source/tasks.cpp b/source/tasks.cpp index 5f0b8f7..23c83bf 100644 --- a/source/tasks.cpp +++ b/source/tasks.cpp @@ -67,26 +67,32 @@ namespace smark::tasks { int TaskManager::RunOnce() { if (is_stopped) return 0; int run_task_count = 0; - if (starting_tasks_.size() == 0) return 0; - auto task = starting_tasks_.front(); // use front before size check is undefined behaivour. - starting_tasks_.pop(); - switch (task->state) { - case Task::State::New: - task->Start(); - run_task_count++; - break; - - case Task::State::Runable: - task->Resume(); - run_task_count++; - break; - - default: - break; + while (starting_tasks_.size()) { + auto task = starting_tasks_.front(); // use front before size check is undefined behaivour. + starting_tasks_.pop(); + switch (task->state) { + case Task::State::New: + task->Start(); + run_task_count++; + break; + + case Task::State::Runable: + task->Resume(); + run_task_count++; + break; + + default: + break; + } } return run_task_count; } + bool TaskManager::IsEmpty() { + if (completed_tasks_.size() || waitting_tasks_.size() || starting_tasks_.size()) return false; + return true; + } + void TaskManager::Stop() { is_stopped = true; } std::shared_ptr GetCurrentTask() { diff --git a/source/util.cpp b/source/util.cpp index 276cc5b..0601ceb 100644 --- a/source/util.cpp +++ b/source/util.cpp @@ -11,8 +11,12 @@ namespace smark::util { EventLoop::EventLoop() { uv_loop_init(loop_.get()); } void EventLoop::Wait() { - while (uv_run(loop_.get(), UV_RUN_ONCE)) { - smark::tasks::task_mgr.RunOnce(); + while (true) { + int rtc = smark::tasks::task_mgr.RunOnce(); + int uv_res = uv_run(loop_.get(), UV_RUN_ONCE); + DLOG("Run task count:" << LOG_VALUE(rtc) << LOG_VALUE(uv_res) + << LOG_NV("IsEmpty", smark::tasks::task_mgr.IsEmpty())); + if (smark::tasks::task_mgr.IsEmpty() && !uv_run(loop_.get(), UV_RUN_ONCE)) break; }; } diff --git a/test/source/smark.cpp b/test/source/smark.cpp index c44699d..f0ec2b1 100644 --- a/test/source/smark.cpp +++ b/test/source/smark.cpp @@ -10,6 +10,7 @@ DISABLE_SOME_WARNINGS #include "debug.h" #include "script.h" +#include "tasks.h" #include "util.h" #if defined(_WIN32) || defined(WIN32) @@ -156,6 +157,55 @@ TEST_CASE("HttpClient") { CHECK(task == __task_count); } +TEST_CASE("HttpAsyncClient") { + auto svr = new SimpleHttpServer(); + DEFER(delete svr;) + std::thread* thread = nullptr; + uint16_t p = RunServer(svr, &thread); + // DEFER(delete thread;) + DLOG("Run Http server on port:" << p); + INIT_TASK; + int task = 0; + + std::thread([=, &task]() { + auto req = std::make_shared(); + req->method = "Get"; + req->request_uri = "/test"; + auto test_header = std::make_shared(); + test_header->name = "test-header"; + test_header->value = "test_value"; + req->headers.push_back(test_header); + req->body = "This is a request"; + + smark::util::EventLoop el; + auto proc = [&](auto) { + HttpAsyncClient cli(&el); + auto status = await(cli.ConnectAsync("127.0.0.1", p))->GetResult(); + if (status) { + ERR("Connect error:" << smark::util::EventLoop::GetErrorStr(status)); + } + DLOG("Connected"); + auto res = await(cli.RequestAsync(req.get()))->GetResult(); + DLOG("Get response."); + CHECK(STR_COMPARE(res->status_code, "OK")); + int header_count = res->headers.size(); + CHECK(header_count == 1); + auto test_header = res->headers[0]; + CHECK(STR_COMPARE(test_header->name, "test-header")); + CHECK(STR_COMPARE(test_header->value, "test_value")); + CHECK(STR_COMPARE(res->body, "This is a response")); + cli.Close(); + SUB_TASK(task); + }; + _async(proc)->Start(); + + el.Wait(); + }).join(); + + END_TASK; + CHECK(task == __task_count); +} + TEST_CASE("Script_Setup") { LuaThread thread; Script script;