Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async task #25

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c641305
build(copp): add copp
RobertIndie Jun 15, 2020
ad8637d
fix(copp): fix bug: can not include headers
RobertIndie Jun 16, 2020
92f1392
feat(task): impl Task, TaskManager
RobertIndie Jun 16, 2020
eae8b2a
fix(task): fix compile errors
RobertIndie Jun 16, 2020
66f857d
build(copp): add copp and cotask
RobertIndie Jun 16, 2020
a19ed88
feat(util): enable shared_from_this in constructor
RobertIndie Jun 16, 2020
68b10af
fix(test): fix and pass test 'SimpleTask'
RobertIndie Jun 16, 2020
746053d
(test, Task): add TaskMap2Task
RobertIndie Jun 17, 2020
f4aaf54
test(task): add TaskAsync test.
RobertIndie Jun 18, 2020
fac03a9
test(task): support multi thread test
RobertIndie Jun 18, 2020
cd7bab3
test(task): add TaskCompleteFromOutside and pass
RobertIndie Jun 18, 2020
c671934
feat(task): add stop to task_mgr
RobertIndie Jun 18, 2020
366b353
feat(util-shared_from_this): enable multi inherit
RobertIndie Jun 19, 2020
c68c2b5
feat(task): add ValueTask and pass test
RobertIndie Jun 19, 2020
33b7864
refactor(task): remove Complete and GetResult from Task
RobertIndie Jun 19, 2020
5e4ad31
refactor(task): add task_async and vt_async
RobertIndie Jun 19, 2020
a585ecf
refactor(task): use macro overload to _async
RobertIndie Jun 20, 2020
8924e66
refactor(task): remove task_list_ in TaskManager
RobertIndie Jun 20, 2020
13dc49d
fix(util-shared): support inherit
RobertIndie Jun 21, 2020
3a46ee5
fix(task): value task async
RobertIndie Jun 27, 2020
a92c4f1
refactor(task): change result type to T
RobertIndie Jun 27, 2020
799d4f4
fix(task): enable stop task before wait
RobertIndie Jun 27, 2020
ddee791
feat(task): add void task
RobertIndie Jun 27, 2020
ceb6076
test(HttpAsyncClient): add http async client test and pass test
RobertIndie Jul 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,33 @@ packageProject(
DEPENDENCIES
)

CPMAddPackage(
NAME libcopp
GITHUB_REPOSITORY owt5008137/libcopp
GIT_TAG 1.2.1
)

packageProject(
NAME copp
VERSION ${PROJECT_VERSION}
BINARY_DIR ${libcopp_BINARY_DIR}
INCLUDE_DIR ${libcopp_SOURCE_DIR}/include
INCLUDE_DESTINATION include/${PROJECT_NAME}-${PROJECT_VERSION}
DEPENDENCIES
)

packageProject(
NAME cotask
VERSION ${PROJECT_VERSION}
BINARY_DIR ${libcopp_BINARY_DIR}
INCLUDE_DIR ${libcopp_SOURCE_DIR}/include
INCLUDE_DESTINATION include/${PROJECT_NAME}-${PROJECT_VERSION}
DEPENDENCIES
)

target_include_directories(copp PUBLIC ${libcopp_SOURCE_DIR}/include)
target_include_directories(cotask PUBLIC ${libcopp_SOURCE_DIR}/include)

if (NOT TARGET lua)
CPMFindPackage(
NAME lua
Expand All @@ -70,10 +97,12 @@ if (NOT TARGET lua)
add_library(LuaForGlue ${lua_sources} ${lua_headers})
set_target_properties(LuaForGlue PROPERTIES LINKER_LANGUAGE C)

target_include_directories(LuaForGlue
PUBLIC
$<BUILD_INTERFACE:${lua_SOURCE_DIR}>
)
# target_include_directories(LuaForGlue
# PUBLIC
# $<BUILD_INTERFACE:${lua_SOURCE_DIR}>
# )

target_include_directories(LuaForGlue PUBLIC ${lua_SOURCE_DIR})

if(ANDROID)
target_compile_definitions(LuaForGlue PRIVATE LUA_USE_POSIX LUA_USE_DLOPEN)
Expand Down Expand Up @@ -117,7 +146,7 @@ target_compile_options(Smark PUBLIC "$<$<BOOL:${MSVC}>:/permissive->")

# Link dependencies (if required)
# target_link_libraries(Smark PUBLIC cxxopts)
target_link_libraries(Smark PUBLIC uv_a HttpParser LuaForGlue)
target_link_libraries(Smark PUBLIC uv_a HttpParser LuaForGlue copp cotask)

target_include_directories(Smark
PUBLIC
Expand All @@ -134,5 +163,5 @@ packageProject(
BINARY_DIR ${PROJECT_BINARY_DIR}
INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include
INCLUDE_DESTINATION include/${PROJECT_NAME}-${PROJECT_VERSION}
DEPENDENCIES "uv_a" "HttpParser" "LuaForGlue"
DEPENDENCIES
)
15 changes: 13 additions & 2 deletions include/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,32 @@
#include <queue>
#include <vector>

#include "tasks.h"
#include "util.h"

namespace smark {
class TCPClient : public smark::util::Socket {
public:
TCPClient(smark::util::EventLoop* el);
explicit TCPClient(smark::util::EventLoop* el);
};

class HttpClient : public TCPClient {
public:
HttpClient(smark::util::EventLoop* el);
explicit HttpClient(smark::util::EventLoop* el);
void Request(util::HttpRequest* request);
std::function<void(HttpClient*, std::shared_ptr<util::HttpResponse>)> on_response;

private:
util::HttpReponseParser parser_;
};

class HttpAsyncClient : protected HttpClient {
// use protected inherit to prevent user changes on_complete during requesting.
public:
explicit HttpAsyncClient(smark::util::EventLoop* el);
std::shared_ptr<tasks::ValueTask<int>> ConnectAsync(std::string ip, int16_t port);
std::shared_ptr<tasks::ValueTask<std::shared_ptr<util::HttpResponse>>> RequestAsync(
util::HttpRequest* request);
void Close();
};
} // namespace smark
114 changes: 114 additions & 0 deletions include/tasks.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#pragma once
#include <libcotask/task.h>

#include <functional>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include <vector>

#include "debug.h"
#include "util.h"

namespace smark::tasks {
class TaskManager;
class Task;

typedef std::function<void(std::shared_ptr<Task>)> TaskProc;

// TODO: add exception handler
class Task : public smark::util::enable_shared_from_this<Task> {
public:
typedef std::function<void(std::shared_ptr<Task>)> ProcType;
enum State { New, Runable, Dead };
State state = State::New;
explicit Task(TaskProc proc);
Task() = default;
void SetProc(TaskProc proc);
void Start();
void Yield();
void Resume();
void Wait(std::shared_ptr<Task> task);
void WaitAll(const std::vector<std::shared_ptr<Task>>* task_list);
void Stop();
virtual ~Task() = default;

protected:
template <typename TaskType>
void SetProcContext_(std::shared_ptr<TaskType> task_ptr,
std::function<void(std::shared_ptr<TaskType>)> proc) {
task_ptr_ = cotask::task<>::create([&, task_ptr, proc]() {
// void task will not run to this.
DEFER(std::dynamic_pointer_cast<Task>(task_ptr)->state = State::Dead;
UnregisterTaskFromMap2Task();)
RegisterTaskToMap2Task(std::dynamic_pointer_cast<Task>(task_ptr));
proc(task_ptr);
});
}

private:
cotask::task<>::ptr_t task_ptr_;
void* result_;
void RegisterTaskToMap2Task(std::shared_ptr<Task> task_ptr);
void UnregisterTaskFromMap2Task();
};

template <typename T> class ValueTask : public Task {
public:
typedef std::function<void(std::shared_ptr<ValueTask<T>>)> ProcType;
explicit ValueTask(ProcType proc) : Task() { SetProc(proc); }
ValueTask() = default;
void SetProc(ProcType proc) {
SetProcContext_<ValueTask<T>>(shared_from_this<ValueTask<T>>(), proc);
}
inline void Complete(T result) {
result_ = result;
Task::Stop();
}
inline T GetResult() { return result_; }
inline State GetState() { return state; }

private:
T result_;
};

class TaskManager {
public:
void Wait(std::shared_ptr<Task> waiter, std::shared_ptr<Task> waitting);
void StopTask(std::shared_ptr<Task> task);
int RunOnce();
void Stop();
bool IsEmpty();
bool is_stopped = false;

private:
std::set<std::shared_ptr<Task>> completed_tasks_;
std::map<std::shared_ptr<Task>, std::shared_ptr<Task>> waitting_tasks_;
std::queue<std::shared_ptr<Task>> starting_tasks_;
};

std::shared_ptr<Task> GetCurrentTask();

extern thread_local TaskManager task_mgr;

#define _async(...) GET_MACRO_V2(__VA_ARGS__, vt_async, task_async)(__VA_ARGS__)
#define task_async(proc) smark::util::make_shared<smark::tasks::Task>(proc)
#define vt_async(T, proc) \
smark::util::make_shared<smark::tasks::Task, smark::tasks::ValueTask<T>>(proc)
#define void_task(...) GET_MACRO_V0_1(_0, ##__VA_ARGS__, vt_void_task, task_void_task)(__VA_ARGS__)
#define task_void_task() smark::util::make_shared<smark::tasks::Task>()
#define vt_void_task(T) smark::util::make_shared<smark::tasks::Task, smark::tasks::ValueTask<T>>()

template <typename T> std::shared_ptr<T> await(std::shared_ptr<T> task) {
auto current_task = GetCurrentTask();
task_mgr.Wait(current_task, std::dynamic_pointer_cast<Task>(task));

current_task->Yield();

return std::dynamic_pointer_cast<T>(task);
}
#ifdef DEBUG
extern thread_local std::map<cotask::task<>*, std::shared_ptr<Task>> map2task;
#endif
} // namespace smark::tasks
41 changes: 41 additions & 0 deletions include/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,48 @@ extern "C" {
= std::unique_ptr<void, std::function<void(void*)>>{reinterpret_cast<void*>(1), \
[&](void*) { X }};

// macro overload util
#define GET_MACRO_V2(_1, _2, NAME, ...) NAME
#define GET_MACRO_V0_1(_0, _1, NAME, ...) NAME

namespace smark::util {
// enabe shared_from_this in constructor
template <class T, class PtrT> void deleter(PtrT* __ptr) {
auto t = static_cast<T*>(__ptr);
t->~T();
free(t);
}

/**
* Base class allowing use of member function shared_from_this.
*/
template <class T> class enable_shared_from_this {
public:
std::shared_ptr<T>* _construct_pself;
std::weak_ptr<T> _construct_self;

template <class RT = T> std::shared_ptr<RT> shared_from_this() {
if (_construct_pself) {
return std::static_pointer_cast<RT>(*_construct_pself); // in constructor
} else {
return std::static_pointer_cast<RT>(_construct_self.lock());
}
}
};

template <class BaseT, class T = BaseT, typename... Params>
std::shared_ptr<T> make_shared(Params&&... args) {
std::shared_ptr<BaseT> rtn;
T* t = (T*)calloc(1, sizeof(T));
rtn.reset(t, deleter<BaseT, T>);
t->_construct_pself = &rtn;
t = new (t) T(std::forward<Params>(args)...);
t->_construct_pself = NULL;
t->_construct_self = rtn;

return std::static_pointer_cast<T>(rtn);
}

typedef std::function<void(int)> CallbackType; // void(int status)
class EventLoop;
class IEventObj {
Expand Down
29 changes: 29 additions & 0 deletions source/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,33 @@ namespace smark {
});
}

HttpAsyncClient::HttpAsyncClient(smark::util::EventLoop* el) : HttpClient(el) {}

std::shared_ptr<tasks::ValueTask<int>> HttpAsyncClient::ConnectAsync(std::string ip,
int16_t port) {
auto task = _async(int, [=](std::shared_ptr<tasks::ValueTask<int>> this_task) {
DLOG("Try to connect:" << LOG_VALUE(ip) << LOG_VALUE(port));
Connect(ip, port, [=](int status) {
DLOG("Connected result:" << LOG_VALUE(status));
this_task->Complete(status);
});
});
return task;
}

std::shared_ptr<tasks::ValueTask<std::shared_ptr<util::HttpResponse>>>
HttpAsyncClient::RequestAsync(util::HttpRequest* request) {
auto task = _async(
std::shared_ptr<util::HttpResponse>,
[=](std::shared_ptr<tasks::ValueTask<std::shared_ptr<util::HttpResponse>>> this_task) {
on_response = [this_task](auto, std::shared_ptr<util::HttpResponse> res) {
this_task->Complete(res);
};
HttpClient::Request(request);
});
return task;
}

void HttpAsyncClient::Close() { Socket::Close(); }

} // namespace smark
Loading