diff --git a/binding.gyp b/binding.gyp index 3e6f59b429..ebe77a3d22 100644 --- a/binding.gyp +++ b/binding.gyp @@ -12,6 +12,7 @@ 'deps/exokit-bindings/util/src/*.cc', 'deps/exokit-bindings/console/src/*.cc', 'deps/exokit-bindings/cache/src/*.cc', + 'deps/exokit-bindings/threadpool/src/*.cc', 'deps/exokit-bindings/browser/src/*.cpp', 'deps/exokit-bindings/canvas/src/*.cpp', 'deps/exokit-bindings/nanosvg/src/*.cpp', @@ -49,6 +50,7 @@ '<(module_root_dir)/deps/exokit-bindings/util/include', '<(module_root_dir)/deps/exokit-bindings/console/include', '<(module_root_dir)/deps/exokit-bindings/cache/include', + '<(module_root_dir)/deps/exokit-bindings/threadpool/include', '<(module_root_dir)/deps/exokit-bindings/bindings/include', '<(module_root_dir)/deps/exokit-bindings/canvas/include', '<(module_root_dir)/deps/exokit-bindings/browser/include', @@ -135,6 +137,7 @@ ' +#include + #include #include #include + #include #include #include @@ -14,8 +18,7 @@ #include #include #include -#include -#include +#include using namespace v8; using namespace node; @@ -27,7 +30,6 @@ class Image : public ObjectWrap { unsigned int GetHeight(); unsigned int GetNumChannels(); // unsigned char *GetData(); - static void RunInMainThread(uv_async_t *handle); void Load(Local arrayBuffer, size_t byteOffset, size_t byteLength, Local cbFn); // void Set(canvas::Image *image); @@ -45,12 +47,10 @@ class Image : public ObjectWrap { sk_sp image; Nan::Persistent dataArray; - Nan::Persistent arrayBuffer; Nan::Persistent cbFn; bool loading; bool hasCbFn; std::string error; - uv_async_t threadAsyncHandle; uv_sem_t sem; friend class CanvasRenderingContext2D; diff --git a/deps/exokit-bindings/canvascontext/src/image-context.cc b/deps/exokit-bindings/canvascontext/src/image-context.cc index 5794355d80..8805aa5b02 100644 --- a/deps/exokit-bindings/canvascontext/src/image-context.cc +++ b/deps/exokit-bindings/canvascontext/src/image-context.cc @@ -46,47 +46,18 @@ unsigned int Image::GetNumChannels() { } } */ -void Image::RunInMainThread(uv_async_t *handle) { - Nan::HandleScope scope; - - Image *image = (Image *)handle->data; - - Local asyncObject = Nan::New(); - AsyncResource asyncResource(Isolate::GetCurrent(), asyncObject, "Image::RunInMainThread"); - - Local cbFn = Nan::New(image->cbFn); - Local arg0 = Nan::New(image->error).ToLocalChecked(); - Local argv[] = { - arg0, - }; - asyncResource.MakeCallback(cbFn, sizeof(argv)/sizeof(argv[0]), argv); - - image->cbFn.Reset(); - image->arrayBuffer.Reset(); - image->error = ""; - - uv_close((uv_handle_t *)handle, nullptr); -} - void Image::Load(Local arrayBuffer, size_t byteOffset, size_t byteLength, Local cbFn) { if (!this->loading) { unsigned char *buffer = (unsigned char *)arrayBuffer->GetContents().Data() + byteOffset; + sk_sp extendedData = SkData::MakeUninitialized(byteLength + 1); + memcpy((unsigned char *)extendedData->data(), buffer, byteLength); + sk_sp data = SkData::MakeWithoutCopy(buffer, byteLength); - this->arrayBuffer.Reset(arrayBuffer); this->cbFn.Reset(cbFn); this->loading = true; this->hasCbFn = !cbFn.IsEmpty(); - - if (this->hasCbFn) { - uv_loop_t *loop = windowsystembase::GetEventLoop(); - uv_async_init(loop, &this->threadAsyncHandle, RunInMainThread); - this->threadAsyncHandle.data = this; - } else { - uv_sem_init(&this->sem, 0); - } - - std::thread([this, buffer, byteLength]() -> void { - sk_sp data = SkData::MakeWithoutCopy(buffer, byteLength); + + auto loadFn = [this, extendedData, data]() -> void { SkBitmap bitmap; bool ok = DecodeDataToBitmap(data, &bitmap); @@ -94,11 +65,9 @@ void Image::Load(Local arrayBuffer, size_t byteOffset, size_t byteL bitmap.setImmutable(); this->image = SkImage::MakeFromBitmap(bitmap); } else { - unique_ptr svgString(new char[byteLength + 1]); - memcpy(svgString.get(), buffer, byteLength); - svgString[byteLength] = 0; + ((char *)extendedData->data())[extendedData->size() - 1] = 0; // NUL-terminate - NSVGimage *svgImage = nsvgParse(svgString.get(), "px", 96); + NSVGimage *svgImage = nsvgParse((char *)extendedData->data(), "px", 96); if (svgImage != nullptr) { if (svgImage->width > 0 && svgImage->height > 0 && svgImage->shapes != nullptr) { int w = svgImage->width; @@ -134,14 +103,34 @@ void Image::Load(Local arrayBuffer, size_t byteOffset, size_t byteL } } - if (this->hasCbFn) { - uv_async_send(&this->threadAsyncHandle); - } else { + if (!this->hasCbFn) { uv_sem_post(&this->sem); } - }).detach(); + }; + + if (this->hasCbFn) { + threadpool::ThreadPool *threadPool = threadpool::getWindowThreadPool(); + threadPool->queueWork(loadFn, [this]() -> void { + Nan::HandleScope scope; + + Local asyncObject = Nan::New(); + AsyncResource asyncResource(Isolate::GetCurrent(), asyncObject, "Image::Load"); + + Local cbFn = Nan::New(this->cbFn); + Local arg0 = Nan::New(this->error).ToLocalChecked(); + Local argv[] = { + arg0, + }; + asyncResource.MakeCallback(cbFn, sizeof(argv)/sizeof(argv[0]), argv); + + this->cbFn.Reset(); + this->error = ""; + }); + } else { + uv_sem_init(&this->sem, 0); + + loadFn(); - if (!this->hasCbFn) { uv_sem_wait(&this->sem); uv_sem_destroy(&this->sem); diff --git a/deps/exokit-bindings/threadpool/include/threadpool.h b/deps/exokit-bindings/threadpool/include/threadpool.h new file mode 100644 index 0000000000..f5342d890a --- /dev/null +++ b/deps/exokit-bindings/threadpool/include/threadpool.h @@ -0,0 +1,52 @@ +#ifndef _THREADPOOL_H_ +#define _THREADPOOL_H_ + +#include +#include +#include + +#include +#include + +#include + +using namespace v8; + +#define NUM_THREADS 8 + +namespace threadpool { + +class QueueEntry { +public: + QueueEntry(std::function workFn, std::function cbFn); + QueueEntry(); + + std::function workFn; + std::function cbFn; +}; + +class ThreadPool { +public: + ThreadPool(); + ~ThreadPool(); + + void queueWork(std::function workFn = []() -> void {}, std::function cbFn = []() -> void {}); + static void asyncFn(uv_async_t *handle); + +// protected: + std::vector threads; + std::deque reqQueue; + std::deque> resQueue; + uv_async_t *asyncHandle; + std::mutex mutex; + uv_sem_t sem; + bool live; +}; + +extern thread_local ThreadPool *windowThreadPool; +ThreadPool *getWindowThreadPool(); +void destroyWindowThreadPool(); + +}; + +#endif \ No newline at end of file diff --git a/deps/exokit-bindings/threadpool/src/threadpool.cc b/deps/exokit-bindings/threadpool/src/threadpool.cc new file mode 100644 index 0000000000..e7106f7f0d --- /dev/null +++ b/deps/exokit-bindings/threadpool/src/threadpool.cc @@ -0,0 +1,111 @@ +#include + +namespace threadpool { + +QueueEntry::QueueEntry(std::function workFn, std::function cbFn) : workFn(workFn), cbFn(cbFn) {} +QueueEntry::QueueEntry() {} + +ThreadPool::ThreadPool() : live(true) { + asyncHandle = new uv_async_t(); + asyncHandle->data = this; + uv_loop_t *loop = windowsystembase::GetEventLoop(); + uv_async_init(loop, asyncHandle, asyncFn); + uv_sem_init(&sem, 0); + + threads.reserve(NUM_THREADS); + for (int i = 0; i < NUM_THREADS; i++) { + std::thread *thread = new std::thread([this]() -> void { + for (;;) { + uv_sem_wait(&sem); + + QueueEntry queueEntry; + { + std::lock_guard lock(mutex); + + if (live) { + queueEntry = std::move(reqQueue.front()); + reqQueue.pop_front(); + } + } + + if (queueEntry.workFn) { + queueEntry.workFn(); + + resQueue.push_back(queueEntry.cbFn); + uv_async_send(asyncHandle); + } else { + break; + } + } + }); + threads.push_back(thread); + } +} + +void deleteHandle(uv_handle_t *handle) { + delete (uv_async_t *)handle; +} +ThreadPool::~ThreadPool() { + { + std::lock_guard lock(mutex); + + live = false; + } + + for (int i = 0; i < NUM_THREADS; i++) { + uv_sem_post(&sem); + } + for (int i = 0; i < NUM_THREADS; i++) { + delete threads[i]; + } + uv_close((uv_handle_t *)asyncHandle, deleteHandle); + uv_sem_destroy(&sem); +} + +void ThreadPool::queueWork(std::function workFn, std::function cbFn) { + { + std::lock_guard lock(mutex); + + reqQueue.push_back(QueueEntry(workFn, cbFn)); + + uv_sem_post(&sem); + } +} + +void ThreadPool::asyncFn(uv_async_t *handle) { + ThreadPool *threadPool = (ThreadPool *)handle->data; + + for (;;) { + std::function queueEntry; + { + std::lock_guard lock(threadPool->mutex); + + if (threadPool->resQueue.size() > 0) { + queueEntry = threadPool->resQueue.front(); + threadPool->resQueue.pop_front(); + } + } + + if (queueEntry) { + queueEntry(); + } else { + break; + } + } +} + +thread_local ThreadPool *windowThreadPool = nullptr; +ThreadPool *getWindowThreadPool() { + if (!windowThreadPool) { + windowThreadPool = new ThreadPool(); + } + return windowThreadPool; +} +void destroyWindowThreadPool() { + if (windowThreadPool) { + delete windowThreadPool; + windowThreadPool = nullptr; + } +} + +}; \ No newline at end of file diff --git a/deps/exokit-bindings/webaudiocontext/include/AudioContext.h b/deps/exokit-bindings/webaudiocontext/include/AudioContext.h index dabf62951e..7693db2f69 100644 --- a/deps/exokit-bindings/webaudiocontext/include/AudioContext.h +++ b/deps/exokit-bindings/webaudiocontext/include/AudioContext.h @@ -21,6 +21,7 @@ #include #include #include +#include #include using namespace std; diff --git a/deps/exokit-bindings/webaudiocontext/src/Audio.cpp b/deps/exokit-bindings/webaudiocontext/src/Audio.cpp index d3f70359a3..b9af4eaee5 100644 --- a/deps/exokit-bindings/webaudiocontext/src/Audio.cpp +++ b/deps/exokit-bindings/webaudiocontext/src/Audio.cpp @@ -61,11 +61,12 @@ void Audio::Load(uint8_t *bufferValue, size_t bufferLength, Local cbFn WebAudioAsync *webAudioAsync = getWebAudioAsync(); std::vector buffer(bufferLength); memcpy(buffer.data(), bufferValue, bufferLength); - std::thread([this, webAudioAsync, buffer{std::move(buffer)}]() mutable -> void { + threadpool::ThreadPool *threadPool = threadpool::getWindowThreadPool(); + threadPool->queueWork([this, webAudioAsync, buffer{std::move(buffer)}]() mutable -> void { this->audioBus = lab::MakeBusFromMemory(buffer, false, &this->error); - + webAudioAsync->QueueOnMainThread(std::bind(ProcessLoadInMainThread, this)); - }).detach(); + }); } else { Local arg0 = Nan::New("already loading").ToLocalChecked(); Local argv[] = { diff --git a/deps/exokit-bindings/webaudiocontext/src/AudioBuffer.cpp b/deps/exokit-bindings/webaudiocontext/src/AudioBuffer.cpp index 9bae6d3f91..7948ec9632 100644 --- a/deps/exokit-bindings/webaudiocontext/src/AudioBuffer.cpp +++ b/deps/exokit-bindings/webaudiocontext/src/AudioBuffer.cpp @@ -210,11 +210,13 @@ void AudioBuffer::Load(Local arrayBuffer, size_t byteOffset, size_t WebAudioAsync *webAudioAsync = getWebAudioAsync(); std::vector buffer(byteLength); memcpy(buffer.data(), (unsigned char *)arrayBuffer->GetContents().Data() + byteOffset, byteLength); - std::thread([this, webAudioAsync, buffer{std::move(buffer)}]() mutable -> void { + + threadpool::ThreadPool *threadPool = threadpool::getWindowThreadPool(); + threadPool->queueWork([this, webAudioAsync, buffer{std::move(buffer)}]() mutable -> void { this->audioBus = lab::MakeBusFromMemory(buffer, false, &this->error); webAudioAsync->QueueOnMainThread(std::bind(ProcessLoadInMainThread, this)); - }).detach(); + }); } else { Local arg0 = Nan::New("already loading").ToLocalChecked(); Local argv[] = { diff --git a/deps/exokit-bindings/windowsystem/src/windowsystem.cc b/deps/exokit-bindings/windowsystem/src/windowsystem.cc index 530472220b..85b380eb49 100644 --- a/deps/exokit-bindings/windowsystem/src/windowsystem.cc +++ b/deps/exokit-bindings/windowsystem/src/windowsystem.cc @@ -1,4 +1,5 @@ #include +#include #include @@ -1370,6 +1371,10 @@ NAN_METHOD(ClearFramebuffer) { glClear(GL_COLOR_BUFFER_BIT|GL_DEPTH_BUFFER_BIT|GL_STENCIL_BUFFER_BIT); } +NAN_METHOD(DestroyThreadPool) { + threadpool::destroyWindowThreadPool(); +} + void Decorate(Local target) { Nan::SetMethod(target, "createRenderTarget", CreateRenderTarget); Nan::SetMethod(target, "resizeRenderTarget", ResizeRenderTarget); @@ -1383,6 +1388,7 @@ void Decorate(Local target) { Nan::SetMethod(target, "deleteSync", DeleteSync); Nan::SetMethod(target, "composeLayers", ComposeLayers); Nan::SetMethod(target, "clearFramebuffer", ClearFramebuffer); + Nan::SetMethod(target, "destroyThreadPool", DestroyThreadPool); } } diff --git a/src/Window.js b/src/Window.js index ca0af9ef84..e8ba3188c1 100644 --- a/src/Window.js +++ b/src/Window.js @@ -1371,11 +1371,10 @@ global.onrunasync = req => { error: req.error, result: req.result, }); - - return Promise.resolve(); } else { - return Promise.reject(new Error(`response for unknown window ${method} ${JSON.stringify(windows.map(window => window.id))}`)); + console.warn('ignoring unknown response', req, {windowId}); } + return Promise.resolve(); } } case 'keyEvent': { @@ -1465,5 +1464,6 @@ global.onexit = () => { } AudioContext.Destroy(); + nativeWindow.destroyThreadPool(); }; // global.setImmediate = undefined; // need this for the TLS implementation