diff --git a/CHANGELOG.md b/CHANGELOG.md index 6891515..b827b28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [2.0.0] - 2018-09-20 + +### Added +- interface hx.concurrent.lock.Acquirable +- class hx.concurrent.CountDownLatch +- class hx.concurrent.lock.RWLock (upgradeable Read Write Lock) + +### Changed +- renamed methid hx.concurrent.thread.Threads#wait() to hx.concurrent.thread.Threads#await() +- changed hx.concurrent.lock.Semaphore from abstract to class + +### Fixed +- define "threads" is now set correctly for targets supporting real threading + + ## [1.2.0] - 2018-08-22 ### Added @@ -25,8 +40,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [1.1.0] - 2018-04-15 ### Added -- hx.concurrent.Service.ServiceState.STARTING -- hx.concurrent.Service.Service#start() +- field hx.concurrent.Service.ServiceState.STARTING +- method hx.concurrent.Service.Service#start() ### Changed - replaced license header by "SPDX-License-Identifier: Apache-2.0" diff --git a/haxelib.json b/haxelib.json index 2e496ab..0f5c970 100644 --- a/haxelib.json +++ b/haxelib.json @@ -9,6 +9,6 @@ "vegardit" ], "releasenote": "See https://github.com/vegardit/haxe-concurrent/blob/master/CHANGELOG.md", - "version": "1.2.0", + "version": "2.0.0", "dependencies": { } } diff --git a/src/hx/concurrent/CountDownLatch.hx b/src/hx/concurrent/CountDownLatch.hx new file mode 100644 index 0000000..ccde383 --- /dev/null +++ b/src/hx/concurrent/CountDownLatch.hx @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2016-2018 Vegard IT GmbH, https://vegardit.com + * SPDX-License-Identifier: Apache-2.0 + */ +package hx.concurrent; + +import hx.concurrent.atomic.AtomicInt; +import hx.concurrent.lock.RLock; +import hx.concurrent.thread.Threads; + +/** + * @author Sebastian Thomschke, Vegard IT GmbH + */ +#if java +abstract CountDownLatch(java.util.concurrent.CountDownLatch) { + + public var count(get, never):Int; + inline function get_count():Int return haxe.Int64.toInt(this.getCount()); + + inline public function new(count:Int) this = new java.util.concurrent.CountDownLatch(count); + inline public function countDown():Void this.countDown(); + inline public function await():Void this.await(); + inline public function tryAwait(timeoutMS:Int):Bool return this.await(timeoutMS, java.util.concurrent.TimeUnit.MILLISECONDS); +} + +#elseif threads +class CountDownLatch { + + public var count(get, null):Int; + inline function get_count():Int return _count; + + var _count:AtomicInt; + + + inline + public function new(count:Int) { + _count = new AtomicInt(count); + } + + + inline + public function countDown():Void { + _count--; + } + + + public function await():Void { + while(_count > 0) + Threads.sleep(10); + } + + + public function tryAwait(timeoutMS:Int):Bool { + return Threads.await(function() return count < 1, timeoutMS); + } +} +#end \ No newline at end of file diff --git a/src/hx/concurrent/collection/Queue.hx b/src/hx/concurrent/collection/Queue.hx index c340995..0b16604 100644 --- a/src/hx/concurrent/collection/Queue.hx +++ b/src/hx/concurrent/collection/Queue.hx @@ -68,7 +68,7 @@ class Queue { _queueLock.release(); #end } else { - Threads.wait(function() { + Threads.await(function() { #if (cpp||hl||neko) msg = _queue.pop(false); #elseif java diff --git a/src/hx/concurrent/executor/Executor.hx b/src/hx/concurrent/executor/Executor.hx index 09b72cd..3a62e8d 100644 --- a/src/hx/concurrent/executor/Executor.hx +++ b/src/hx/concurrent/executor/Executor.hx @@ -27,7 +27,7 @@ class Executor extends ServiceBase { */ public static function create(maxConcurrent:Int = 1, autostart = true):Executor { #if threads - if (Threads.isSupported()) + if (Threads.isSupported) return new ThreadPoolExecutor(maxConcurrent, autostart); #end return new TimerExecutor(autostart); @@ -132,12 +132,12 @@ class TaskFutureBase extends FutureBase implements TaskFuture { #if threads public function waitAndGet(timeoutMS:Int):FutureResult { - Threads.wait(function() { + Threads.await(function() { return switch(this.result) { case NONE(_): false; default: true; }; - }, timeoutMS, ThreadPoolExecutor.SCHEDULER_RESOLUTION_SEC); + }, timeoutMS, ThreadPoolExecutor.SCHEDULER_RESOLUTION_MS); return this.result; } diff --git a/src/hx/concurrent/executor/ThreadPoolExecutor.hx b/src/hx/concurrent/executor/ThreadPoolExecutor.hx index 987cb80..174a136 100644 --- a/src/hx/concurrent/executor/ThreadPoolExecutor.hx +++ b/src/hx/concurrent/executor/ThreadPoolExecutor.hx @@ -24,7 +24,7 @@ import hx.concurrent.thread.Threads; #if threads class ThreadPoolExecutor extends Executor { - public inline static var SCHEDULER_RESOLUTION_MS = 5; + public inline static var SCHEDULER_RESOLUTION_MS = 10; public inline static var SCHEDULER_RESOLUTION_SEC = SCHEDULER_RESOLUTION_MS / 1000; var _threadPool:ThreadPool; @@ -110,7 +110,7 @@ class ThreadPoolExecutor extends Executor { t.cancel(); } - Threads.wait(function() { + Threads.await(function() { return _threadPool.state == STOPPED; }, -1); state = STOPPED; diff --git a/src/hx/concurrent/internal/Ints.hx b/src/hx/concurrent/internal/Ints.hx new file mode 100644 index 0000000..0c1eb72 --- /dev/null +++ b/src/hx/concurrent/internal/Ints.hx @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2016-2018 Vegard IT GmbH, https://vegardit.com + * SPDX-License-Identifier: Apache-2.0 + */ +package hx.concurrent.internal; + +/** + * @author Sebastian Thomschke, Vegard IT GmbH + */ +class Ints { + + /** + * Maximum value Int type can hold depending on target platform + */ + public static var MAX_VALUE(default, never):Int = { + #if cs + untyped __cs__("int.MaxValue"); + #elseif flash + untyped __global__["int"].MAX_VALUE; + #elseif java + untyped __java__("Integer.MAX_VALUE"); + #elseif js + untyped __js__("Number.MAX_SAFE_INTEGER"); + #elseif php + untyped __php__("PHP_INT_MAX"); + #elseif python + python.Syntax.pythonCode("import sys"); + python.Syntax.pythonCode("sys.maxsize"); + #else // neko, cpp, lua, etc. + Std.int(Math.pow(2,31)-1); + #end + } + + + /** + * Maximum negative value Int type can hold depending on target platform + */ + public static var MIN_VALUE(default, never):Int = { + #if cs + untyped __cs__("int.MinValue"); + #elseif flash + untyped __global__["int"].MIN_VALUE; + #elseif java + untyped __java__("Integer.MIN_VALUE"); + #elseif js + untyped __js__("Number.MIN_SAFE_INTEGER"); + #elseif php + untyped __php__("PHP_INT_MIN"); + #elseif python + python.Syntax.pythonCode("import sys"); + -python.Syntax.pythonCode("sys.maxsize") -1; + #else // neko, cpp, lua, etc. + -Std.int(Math.pow(2,31)); + #end + } + +} diff --git a/src/hx/concurrent/lock/Acquirable.hx b/src/hx/concurrent/lock/Acquirable.hx new file mode 100644 index 0000000..1df5eeb --- /dev/null +++ b/src/hx/concurrent/lock/Acquirable.hx @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2016-2018 Vegard IT GmbH, https://vegardit.com + * SPDX-License-Identifier: Apache-2.0 + */ +package hx.concurrent.lock; + +/** + * @author Sebastian Thomschke, Vegard IT GmbH + */ +interface Acquirable { + + public var availablePermits(get, never):Int; + + /** + * Blocks until a permit can be acquired. + */ + public function acquire():Void; + + /** + * By default this call is non-blocking, meaning if the object cannot be aqcuired currently `false` is returned immediately. + * + * If timeoutMS is set to value > 0, results in blocking for the given time to aqcuire the object. + * If timeoutMS is set to value lower than 0, results in an exception. + * + * @return `false` if lock could not be acquired + */ + public function tryAcquire(timeoutMS:Int = 0):Bool; + + /** + * Releases one permit. + * + * Depending on the implementation this method may throw an exception if the current thread doesn't hold the permit. + */ + public function release():Void; + +} \ No newline at end of file diff --git a/src/hx/concurrent/lock/RLock.hx b/src/hx/concurrent/lock/RLock.hx index 84a5958..e1d0891 100644 --- a/src/hx/concurrent/lock/RLock.hx +++ b/src/hx/concurrent/lock/RLock.hx @@ -15,13 +15,13 @@ import hx.concurrent.thread.Threads; * * @author Sebastian Thomschke, Vegard IT GmbH */ -class RLock { +class RLock implements Acquirable { /** * Indicates if this class will have any effect on the current target. - * Currently: CPP, CS, Flash, Java, Neko, Python. + * Currently: CPP, CS, Flash, HashLink, Java, Neko, Python. */ - public static inline var isSupported = #if (cpp||cs||flash||java||neko||python) true #else false #end; + public static inline var isSupported = #if (cpp||cs||flash||hl||java||neko||python) true #else false #end; #if cpp var _rlock = new cpp.vm.Mutex(); @@ -30,7 +30,7 @@ class RLock { #elseif flash // flash.concurrent.Mutex requries swf-version >= 11.4 // flash.concurrent.Condition requries swf-version >= 11.5 - var cond = new flash.concurrent.Condition(new flash.concurrent.Mutex()); + var _cond = new flash.concurrent.Condition(new flash.concurrent.Mutex()); #elseif hl var _rlock = new hl.vm.Mutex(); #elseif java @@ -42,6 +42,47 @@ class RLock { #end + var _holder:Dynamic = null; + var _holderEntranceCount = 0; + + + public var availablePermits(get, never):Int; + function get_availablePermits():Int return isAcquiredByAnyThread ? 0 : 1; + + + /** + * Indicates if the lock is acquired by any thread + */ + public var isAcquiredByAnyThread(get, null):Bool; + inline function get_isAcquiredByAnyThread():Bool { + #if java + return _rlock.isLocked(); + #else + return _holder != null; + #end + } + + + /** + * Indicates if the lock is acquired by the current thread + */ + public var isAcquiredByCurrentThread(get, null):Bool; + inline function get_isAcquiredByCurrentThread():Bool { + #if java + return _rlock.isHeldByCurrentThread(); + #else + return _holder == Threads.current; + #end + } + + + /** + * Indicates if the lock is acquired by any other thread + */ + public var isAcquiredByOtherThread(get, null):Bool; + inline function get_isAcquiredByOtherThread():Bool return isAcquiredByAnyThread && !isAcquiredByCurrentThread; + + inline public function new() { } @@ -50,7 +91,7 @@ class RLock { /** * Executes the given function while the lock is acquired. */ - public function execute(func:Void->T, swallowExceptions:Bool = false):T { + public function execute(func:Void->T, swallowExceptions = false):T { var ex:ConcurrentException = null; var result:T = null; @@ -71,7 +112,6 @@ class RLock { /** * Blocks until lock can be acquired. */ - inline public function acquire():Void { #if cs cs.system.threading.Monitor.Enter(this); @@ -80,9 +120,14 @@ class RLock { #elseif java _rlock.lock(); #elseif flash - cond.mutex.lock(); + _cond.mutex.lock(); #else - // no concurrency support + // single-threaded targets: js,lua,php + #end + + #if !java + _holder = Threads.current; + _holderEntranceCount++; #end } @@ -95,33 +140,45 @@ class RLock { * * @return `false` if lock could not be acquired */ - public function tryAcquire(timeoutMS:Int = 0):Bool { - + public function tryAcquire(timeoutMS = 0):Bool { if (timeoutMS < 0) throw "[timeoutMS] must be >= 0"; + if (tryAcquireInternal(timeoutMS)) { + _holder = Threads.current; + _holderEntranceCount++; + return true; + } + + return false; + } + + + #if !flash inline #end + private function tryAcquireInternal(timeoutMS = 0):Bool { #if cs return cs.system.threading.Monitor.TryEnter(this, timeoutMS); #elseif (cpp||hl||neko) - return Threads.wait(function() return _rlock.tryAcquire(), timeoutMS); + return Threads.await(function() return _rlock.tryAcquire(), timeoutMS); #elseif java return _rlock.tryLock(timeoutMS, java.util.concurrent.TimeUnit.MILLISECONDS); #elseif python - return Threads.wait(function() return _rlock.acquire(false), timeoutMS); + return Threads.await(function() return _rlock.acquire(false), timeoutMS); #elseif flash var startAt = Dates.now(); while (true) { - if (cond.mutex.tryLock()) + if (_cond.mutex.tryLock()) return true; var elapsedMS = Dates.now() - startAt; if (elapsedMS >= timeoutMS) return false; - cond.wait(timeoutMS - elapsedMS); + // wait for mutex to be released by other thread + _cond.wait(timeoutMS - elapsedMS); } #else - // no concurrency support - return true; + // single-threaded targets: js,lua,php + return _holder == null || _holder == Threads.current; #end } @@ -131,8 +188,19 @@ class RLock { * * @throws an exception if the lock was not acquired by the current thread */ - inline public function release():Void { + if (isAcquiredByCurrentThread) { + #if !java + _holderEntranceCount--; + if (_holderEntranceCount == 0) + _holder = null; + #end + } else if (isAcquiredByOtherThread) { + throw "Lock was aquired by another thread!"; + } + else + throw "Lock was not aquired by any thread!"; + #if cs cs.system.threading.Monitor.Exit(this); #elseif (cpp||hl||neko||python) @@ -140,10 +208,10 @@ class RLock { #elseif java _rlock.unlock(); #elseif flash - cond.notify(); - cond.mutex.unlock(); + _cond.notify(); + _cond.mutex.unlock(); #else - // no concurrency support + // single-threaded targets: js,lua,php #end } } diff --git a/src/hx/concurrent/lock/RWLock.hx b/src/hx/concurrent/lock/RWLock.hx new file mode 100644 index 0000000..385e2c5 --- /dev/null +++ b/src/hx/concurrent/lock/RWLock.hx @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2016-2018 Vegard IT GmbH, https://vegardit.com + * SPDX-License-Identifier: Apache-2.0 + */ +package hx.concurrent.lock; + +import hx.concurrent.atomic.AtomicBool; +import hx.concurrent.internal.Dates; +import hx.concurrent.internal.Ints; +import hx.concurrent.ConcurrentException; +import hx.concurrent.thread.Threads; + +/** + * A upgradeable re-entrant read-write lock where locks can only be released by the same thread that acquired them. + * + * A thread can acquire a write-lock (upgrade) while holding a read-lock if no other thread holds a read-lock at the same time. + * + * https://stackoverflow.com/questions/2332765/lock-mutex-semaphore-whats-the-difference + * + * @author Sebastian Thomschke, Vegard IT GmbH + */ +@:allow(hx.concurrent.lock.ReadLock) +@:allow(hx.concurrent.lock.WriteLock) +class RWLock { + + public var readLock(default, null):ReadLock; + public var writeLock(default, null):WriteLock; + + var sync(default, null) = new RLock(); + + inline + public function new() { + readLock = new ReadLock(this); + writeLock = new WriteLock(this); + } +} + + +@:allow(hx.concurrent.lock.RWLock) +@:allow(hx.concurrent.lock.WriteLock) +class ReadLock implements Acquirable { + + public var availablePermits(get, never):Int; + function get_availablePermits():Int { + if (rwLock.writeLock.isAcquiredByOtherThread) + return 0; + + return rwLock.sync.execute(function() { + return Ints.MAX_VALUE - holders.length; + }); + } + + + /** + * Indicates if the lock is acquired by any thread + */ + public var isAcquiredByAnyThread(get, null):Bool; + inline function get_isAcquiredByAnyThread():Bool { + return rwLock.sync.execute(function() { + return holders.length > 0; + }); + } + + + /** + * Indicates if the lock is acquired by the current thread + */ + public var isAcquiredByCurrentThread(get, null):Bool; + inline function get_isAcquiredByCurrentThread():Bool { + return rwLock.sync.execute(function() { + return holders.indexOf(Threads.current) > -1; + }); + } + + + /** + * Indicates if the lock is acquired by any other thread + */ + public var isAcquiredByOtherThread(get, null):Bool; + inline function get_isAcquiredByOtherThread():Bool { + var requestor = Threads.current; + return rwLock.sync.execute(function() { + if (holders.length == 0) + return false; + + for (holder in holders) + if (holder != requestor) + return true; + return false; + }); + } + + + var rwLock:RWLock; + var holders = new Array(); + + + inline function new(rwLock:RWLock) { + this.rwLock = rwLock; + } + + + public function acquire():Void { + while (true) { + if (tryAcquire(Ints.MAX_VALUE)) + return; + } + } + + + public function tryAcquire(timeoutMS:Int = 0):Bool { + var requestor = Threads.current; + + var startAt = Dates.now(); + while (true) { + if (rwLock.sync.execute(function() { + if(rwLock.writeLock.isAcquiredByOtherThread) + return false; + + holders.push(requestor); + return true; + })) + return true; + + var elapsedMS = Dates.now() - startAt; + if (elapsedMS >= timeoutMS) + return false; + } + } + + + public function release():Void { + rwLock.sync.execute(function() { + if (!holders.remove(Threads.current)) + throw "This lock was not acquired by the current thread!"; + }); + } +} + + +@:allow(hx.concurrent.lock.RWLock) +@:allow(hx.concurrent.lock.ReadLock) +class WriteLock extends RLock { + + var rwLock:RWLock; + + inline function new(rwLock:RWLock) { + super(); + this.rwLock = rwLock; + } + + override + function get_availablePermits():Int { + if (isAcquiredByAnyThread) + return 0; + + return rwLock.sync.execute(function() { + var readLockHolders = rwLock.readLock.holders; + + // no read locks? + if (readLockHolders.length == 0) + return 1; + + // read locks held by other threads? + var requestor = Threads.current; + for (holder in readLockHolders) + if (holder != requestor) + return 0; + return 1; + }); + } + + + override + public function acquire():Void { + while (true) { + if (tryAcquire(Ints.MAX_VALUE)) + return; + } + } + + + override + public function tryAcquire(timeoutMS = 0):Bool { + var requestor = Threads.current; + var readLockHolders = rwLock.readLock.holders; + + #if (flash||sys) + return Threads.await(function() { + return rwLock.sync.execute(function() { + #end + if (readLockHolders.length > 0) { + // read locks held by other threads? + for (holder in readLockHolders) { + if (holder != requestor) + return false; + } + } + return super_tryAcquire(50); + #if (flash||sys) + }); + }, timeoutMS); + #end + } + + + override + public function release():Void { + rwLock.sync.execute(function() { + super_release(); + }); + } + + + function super_tryAcquire(timeoutMS = 0):Bool return super.tryAcquire(timeoutMS); + function super_release():Void return super.release(); +} diff --git a/src/hx/concurrent/lock/Semaphore.hx b/src/hx/concurrent/lock/Semaphore.hx index 0484d4c..045a279 100644 --- a/src/hx/concurrent/lock/Semaphore.hx +++ b/src/hx/concurrent/lock/Semaphore.hx @@ -10,104 +10,59 @@ import hx.concurrent.lock.RLock; import hx.concurrent.thread.Threads; /** + * See: + * - https://en.wikipedia.org/wiki/Semaphore_(programming) + * - https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphore + * - http://devdocs.io/openjdk~8/java/util/concurrent/semaphore + * * @author Sebastian Thomschke, Vegard IT GmbH */ -abstract Semaphore(SemaphoreImpl) from SemaphoreImpl to SemaphoreImpl { +class Semaphore implements Acquirable { public var availablePermits(get, never):Int; - inline function get_availablePermits():Int return this.availablePermits; - - - inline - public function new(initialPermits:Int) { - this = new SemaphoreImpl(initialPermits); - } - - - #if threads - /** - * Blocks until a permit can be acquired. - */ - inline - public function acquire():Void this.acquire(); - #end - - - /** - * By default this call is non-blocking, meaning if no permit can be acquired `false` is returned immediately. - * - * If timeoutMS is set to value > 0, results in blocking for the given time to aqcuire a permit. - * If timeoutMS is set to value lower than 0, results in an exception. - * - * @return `false` if lock could not be acquired - */ - inline - public function tryAcquire(timeoutMS:Int = 0):Bool return this.tryAcquire(timeoutMS); - - - /** - * Increases availablePermits by one. - */ - inline - public function release():Void this.release(); - -} #if java -private abstract SemaphoreImpl(java.util.concurrent.Semaphore) from java.util.concurrent.Semaphore to java.util.concurrent.Semaphore { - public var availablePermits(get, never):Int; - inline function get_availablePermits():Int return this.availablePermits(); + var _sem:java.util.concurrent.Semaphore; + inline function get_availablePermits():Int return _sem.availablePermits(); inline public function new(initialPermits:Int) { - this = new java.util.concurrent.Semaphore(initialPermits); + _sem = new java.util.concurrent.Semaphore(initialPermits); } - inline - public function acquire():Void { - this.acquire(); - } - - - inline - public function tryAcquire(timeoutMS:Int = 0):Bool { - return this.tryAcquire(timeoutMS, java.util.concurrent.TimeUnit.MILLISECONDS); - } - + inline public function acquire():Void _sem.acquire(); + inline public function tryAcquire(timeoutMS = 0):Bool return _sem.tryAcquire(timeoutMS, java.util.concurrent.TimeUnit.MILLISECONDS); + /** + * Increases availablePermits by one. + */ + inline public function release():Void _sem.release(); - inline - public function release():Void this.release(); -} #else -private class SemaphoreImpl { - - public var availablePermits(default, null):Int; + var _availablePermits:Int; + inline function get_availablePermits():Int return _availablePermits; var permitLock = new RLock(); + inline public function new(initialPermits:Int) { - this.availablePermits = initialPermits; + _availablePermits = initialPermits; } - - #if threads inline public function acquire():Void { - while (tryAcquire(1000) == false) { }; + while (tryAcquire(500) == false) { }; } - #end - inline private function tryAcquireInternal():Bool { return permitLock.execute(function() { - if (availablePermits > 0) { - availablePermits--; + if (_availablePermits > 0) { + _availablePermits--; return true; } return false; @@ -115,20 +70,24 @@ private class SemaphoreImpl { } - public function tryAcquire(timeoutMS:Int = 0):Bool { + inline + public function tryAcquire(timeoutMS = 0):Bool { #if threads - return Threads.wait(tryAcquireInternal, timeoutMS); + return Threads.await(tryAcquireInternal, timeoutMS); #else return tryAcquireInternal(); #end } - inline + /** + * Increases availablePermits by one. + */ public function release():Void { permitLock.acquire(); - availablePermits++; + _availablePermits++; permitLock.release(); } -} #end +} + diff --git a/src/hx/concurrent/thread/Threads.hx b/src/hx/concurrent/thread/Threads.hx index 095eb84..3fcd844 100644 --- a/src/hx/concurrent/thread/Threads.hx +++ b/src/hx/concurrent/thread/Threads.hx @@ -9,22 +9,48 @@ import hx.concurrent.internal.Dates; /** * @author Sebastian Thomschke, Vegard IT GmbH */ -#if sys class Threads { + /** - * Puts the current thread in sleep for the given amount of milli seconds. + *

+     * >>> Threads.current == Threads.current
+     * 
+ * + * @return a target-specific object or ID representing the current thread. */ - inline - public static function sleep(timeMS:Int) { - Sys.sleep(timeMS/1000); + public static var current(get, never):Dynamic; + static function get_current():Dynamic { + #if cpp + return cpp.vm.Thread.current().handle; + #elseif cs + return cs.system.threading.Thread.CurrentThread; + #elseif flash + var worker = flash.system.Worker.current; + return worker == null ? "MainThread" : worker; + #elseif hl + return Std.string(hl.vm.Thread.current()); + #elseif java + return java.vm.Thread.current(); + #elseif neko + return neko.vm.Thread.current(); + #elseif python + python.Syntax.pythonCode("import threading"); + return python.Syntax.pythonCode("threading.current_thread()"); + #else // javascript, lua + return "MainThread"; + #end } + /** + * @return true if spawning threads is supported by current target + */ + public static var isSupported(get, never):Bool; #if !python inline #end - public static function isSupported():Bool { + static function get_isSupported():Bool { #if threads #if python try { @@ -40,34 +66,7 @@ class Threads { #end } - - #if threads - /** - * Spawns a new deamon thread (i.e. terminates with the main thread) to execute the given function. - */ - inline - public static function spawn(func:Void->Void) { - #if cpp - cpp.vm.Thread.create(func); - #elseif cs - new cs.system.threading.Thread(cs.system.threading.ThreadStart.FromHaxeFunction(func)).Start(); - #elseif hl - hl.vm.Thread.create(func); - #elseif java - java.vm.Thread.create(func); - #elseif neko - neko.vm.Thread.create(func); - #elseif python - var t = new python.lib.threading.Thread({target: func}); - t.daemon = true; - t.start(); - #else - throw "Unsupported operation."; - #end - } - #end - - + #if (flash||sys) /** * Blocks the current thread until `condition` returns `true`. * @@ -76,13 +75,19 @@ class Threads { * If timeoutMS is set to `-1`, the function waits indefinitely until a new message is available. * If timeoutMS is set to value lower than -1, results in an exception. */ - public static function wait(condition:Void->Bool, timeoutMS:Int, sleepSecs = 0.001):Bool { + public static function await(condition:Void->Bool, timeoutMS:Int, waitLoopSleepMS = 10):Bool { if (timeoutMS < -1) throw "[timeoutMS] must be >= -1"; if (timeoutMS == 0) return condition(); + #if flash + var cond = new flash.concurrent.Condition(new flash.concurrent.Mutex()); + #else + var waitLoopSleepSecs = waitLoopSleepMS / 1000.0; + #end + var startAt = Dates.now(); while (!condition()) { if (timeoutMS > 0) { @@ -91,10 +96,58 @@ class Threads { return false; } // wait 1ms - Sys.sleep(sleepSecs); + #if flash + cond.wait(waitLoopSleepMS); + #else + Sys.sleep(waitLoopSleepSecs); + + #end } return true; } + + + /** + * Puts the current thread to sleep for the given milliseconds. + */ + inline + public static function sleep(timeMS:Int):Void { + #if flash + var cond = new flash.concurrent.Condition(new flash.concurrent.Mutex()); + cond.wait(timeMS); + #else + Sys.sleep(timeMS / 1000); + #end + } + #end + + + #if threads + /** + * Spawns a new deamon thread (i.e. terminates with the main thread) to execute the given function. + */ + inline + public static function spawn(func:Void->Void):Void { + #if cpp + cpp.vm.Thread.create(func); + #elseif cs + new cs.system.threading.Thread(cs.system.threading.ThreadStart.FromHaxeFunction(func)).Start(); + #elseif hl + hl.vm.Thread.create(func); + #elseif java + java.vm.Thread.create(func); + #elseif neko + neko.vm.Thread.create(func); + #elseif python + var t = new python.lib.threading.Thread({target: func}); + t.daemon = true; + t.start(); + #else // flash, javascript, lua + throw "Unsupported operation."; + #end + } + #end + } -#end + diff --git a/test/hx/concurrent/TestRunner.hx b/test/hx/concurrent/TestRunner.hx index 9dab5e8..d01242c 100644 --- a/test/hx/concurrent/TestRunner.hx +++ b/test/hx/concurrent/TestRunner.hx @@ -15,10 +15,12 @@ import hx.concurrent.executor.Executor; import hx.concurrent.executor.Schedule; import hx.concurrent.internal.Dates; import hx.concurrent.lock.RLock; +import hx.concurrent.lock.RWLock; import hx.concurrent.lock.Semaphore; import hx.concurrent.thread.ThreadPool; import hx.concurrent.thread.Threads; + /** * @author Sebastian Thomschke, Vegard IT GmbH */ @@ -145,6 +147,28 @@ class TestRunner extends hx.doctest.DocTestRunner { } + #if threads + function testCountDownLatch() { + var signal = new CountDownLatch(1); + assertEquals(1, signal.count); + assertFalse(signal.tryAwait(10)); + + signal.countDown(); + assertEquals(0, signal.count); + assertTrue(signal.tryAwait(10)); + + signal.await(); + + var signal = new CountDownLatch(1); + Threads.spawn(function() { + signal.countDown(); + }); + + signal.await(); + } + #end + + function testQueue() { var q = new Queue(); assertEquals(null, q.pop()); @@ -191,6 +215,10 @@ class TestRunner extends hx.doctest.DocTestRunner { function testRLock() { var lock = new RLock(); + assertFalse(lock.isAcquiredByCurrentThread); + assertFalse(lock.isAcquiredByOtherThread); + assertFalse(lock.isAcquiredByAnyThread); + assertEquals(1, lock.availablePermits); #if threads Threads.spawn(function() { @@ -199,8 +227,19 @@ class TestRunner extends hx.doctest.DocTestRunner { lock.release(); }); Threads.sleep(100); - assertEquals(false, lock.tryAcquire(100)); - assertEquals(true, lock.tryAcquire(3000)); + assertFalse(lock.tryAcquire(100)); + + assertFalse(lock.isAcquiredByCurrentThread); + assertTrue(lock.isAcquiredByOtherThread); + assertTrue(lock.isAcquiredByAnyThread); + assertEquals(0, lock.availablePermits); + + assertTrue(lock.tryAcquire(3000)); + + assertTrue(lock.isAcquiredByCurrentThread); + assertFalse(lock.isAcquiredByOtherThread); + assertTrue(lock.isAcquiredByAnyThread); + assertEquals(0, lock.availablePermits); #end var flag = new AtomicBool(false); @@ -209,7 +248,119 @@ class TestRunner extends hx.doctest.DocTestRunner { assertTrue(lock.execute(function():Bool { flag.value = true; return true; } )); assertTrue(flag.value); lock.release(); + } + + function testRWLock() { + var lock = new RWLock(); + + try { + lock.readLock.release(); + fail("Exception expected!"); + } catch (ex:Dynamic) { /* expected */ } + + try { + lock.writeLock.release(); + fail("Exception expected!"); + } catch (ex:Dynamic) { /* expected */ } + + assertTrue(lock.readLock.availablePermits > 0); + assertTrue(lock.writeLock.availablePermits == 1); + assertFalse(lock.readLock.isAcquiredByAnyThread); + assertFalse(lock.readLock.isAcquiredByCurrentThread); + assertFalse(lock.readLock.isAcquiredByOtherThread); + assertFalse(lock.writeLock.isAcquiredByAnyThread); + assertFalse(lock.writeLock.isAcquiredByCurrentThread); + assertFalse(lock.writeLock.isAcquiredByOtherThread); + + lock.readLock.acquire(); + + assertTrue(lock.readLock.availablePermits > 0); + assertTrue(lock.writeLock.availablePermits == 1); + assertTrue(lock.readLock.isAcquiredByAnyThread); + assertTrue(lock.readLock.isAcquiredByCurrentThread); + assertFalse(lock.readLock.isAcquiredByOtherThread); + assertFalse(lock.writeLock.isAcquiredByAnyThread); + assertFalse(lock.writeLock.isAcquiredByCurrentThread); + assertFalse(lock.writeLock.isAcquiredByOtherThread); + + lock.writeLock.acquire(); // upgrading read to write lock + + assertTrue(lock.readLock.availablePermits > 0); + assertTrue(lock.writeLock.availablePermits == 0); + assertTrue(lock.readLock.isAcquiredByAnyThread); + assertTrue(lock.readLock.isAcquiredByCurrentThread); + assertFalse(lock.readLock.isAcquiredByOtherThread); + assertTrue(lock.writeLock.isAcquiredByAnyThread); + assertTrue(lock.writeLock.isAcquiredByCurrentThread); + assertFalse(lock.writeLock.isAcquiredByOtherThread); + + var oldPermits = lock.readLock.availablePermits; + lock.readLock.acquire(); // read lock reentrance + assertEquals(oldPermits -1, lock.readLock.availablePermits); + + lock.readLock.release(); + assertEquals(oldPermits, lock.readLock.availablePermits); + + assertTrue(lock.readLock.isAcquiredByAnyThread); + assertTrue(lock.readLock.isAcquiredByCurrentThread); + assertFalse(lock.readLock.isAcquiredByOtherThread); + + lock.readLock.release(); + assertFalse(lock.readLock.isAcquiredByAnyThread); + assertFalse(lock.readLock.isAcquiredByCurrentThread); + assertFalse(lock.readLock.isAcquiredByOtherThread); + + lock.writeLock.acquire(); // write lock reentrance + lock.writeLock.release(); + assertTrue(lock.writeLock.availablePermits == 0); + assertTrue(lock.writeLock.isAcquiredByAnyThread); + assertTrue(lock.writeLock.isAcquiredByCurrentThread); + assertFalse(lock.writeLock.isAcquiredByOtherThread); + + lock.writeLock.release(); + assertTrue(lock.writeLock.availablePermits == 1); + assertFalse(lock.writeLock.isAcquiredByAnyThread); + assertFalse(lock.writeLock.isAcquiredByCurrentThread); + assertFalse(lock.writeLock.isAcquiredByOtherThread); + + #if threads + lock.writeLock.acquire(); + + var signal = new CountDownLatch(1); + Threads.spawn(function() { + assertFalse(lock.readLock.tryAcquire()); + assertFalse(lock.writeLock.tryAcquire()); + + assertTrue(lock.writeLock.isAcquiredByAnyThread); + assertFalse(lock.writeLock.isAcquiredByCurrentThread); + assertTrue(lock.writeLock.isAcquiredByOtherThread); + signal.countDown(); + }); + signal.await(); + + lock.writeLock.release(); + lock.readLock.acquire(); + + var signal = new CountDownLatch(1); + Threads.spawn(function() { + assertTrue(lock.readLock.isAcquiredByAnyThread); + assertFalse(lock.readLock.isAcquiredByCurrentThread); + assertTrue(lock.readLock.isAcquiredByOtherThread); + + assertTrue(lock.readLock.tryAcquire()); + + assertTrue(lock.readLock.isAcquiredByAnyThread); + assertTrue(lock.readLock.isAcquiredByCurrentThread); + assertTrue(lock.readLock.isAcquiredByOtherThread); + lock.readLock.release(); + + assertFalse(lock.writeLock.tryAcquire()); + signal.countDown(); + }); + signal.await(); + lock.readLock.release(); + #end } @@ -218,12 +369,12 @@ class TestRunner extends hx.doctest.DocTestRunner { assertEquals(2, sem.availablePermits); - assertEquals(true, sem.tryAcquire()); - assertEquals(true, sem.tryAcquire()); + assertTrue(sem.tryAcquire()); + assertTrue(sem.tryAcquire()); assertEquals(0, sem.availablePermits); - assertEquals(false, sem.tryAcquire()); + assertFalse(sem.tryAcquire()); sem.release(); - assertEquals(true, sem.tryAcquire()); + assertTrue(sem.tryAcquire()); sem.release(); sem.release(); sem.release(); @@ -231,14 +382,20 @@ class TestRunner extends hx.doctest.DocTestRunner { } - #if threads function testThreads() { - var i = new AtomicInt(0); - for (j in 0...10) - Threads.spawn(function() i.increment()); - assertTrue(Threads.wait(function() { return i.value == 10; }, 200)); + #if threads + assertTrue(Threads.isSupported); + var i = new AtomicInt(0); + for (j in 0...10) + Threads.spawn(function() i.increment()); + assertTrue(Threads.await(function() return i.value == 10, 200)); + #else + assertFalse(Threads.isSupported); + #end } + + #if threads function testThreadPool() { var pool = new ThreadPool(2); var ids = [-1, -1]; @@ -247,7 +404,7 @@ class TestRunner extends hx.doctest.DocTestRunner { Threads.sleep(50); ids[j] = ctx.id; }); - assertTrue(Threads.wait(function() { return ids[0] != -1 && ids[1] != -1; }, 200)); + assertTrue(Threads.await(function() { return ids[0] != -1 && ids[1] != -1; }, 200)); pool.stop(); assertNotEquals(ids[0], ids[1]); } @@ -371,9 +528,9 @@ class TestRunner extends hx.doctest.DocTestRunner { var flag3 = new AtomicBool(false); var startAt = Dates.now(); var future1 = executor.submit(function():Void flag1.negate(), ONCE(0)); - var future2 = executor.submit(function():Void flag2.negate(), ONCE(100)); - var future3 = executor.submit(function():Void flag3.negate(), ONCE(100)); - _later(40, function() { + var future2 = executor.submit(function():Void flag2.negate(), ONCE(140)); + var future3 = executor.submit(function():Void flag3.negate(), ONCE(140)); + _later(30, function() { assertTrue(flag1.value); assertTrue(future1.isStopped); @@ -386,7 +543,7 @@ class TestRunner extends hx.doctest.DocTestRunner { assertFalse(flag3.value); assertTrue(future3.isStopped); }); - _later(140, function() { + _later(200, function() { assertTrue(flag2.value); assertTrue(future2.isStopped);