Skip to content

Commit

Permalink
updates, added ability to cancel a thread
Browse files Browse the repository at this point in the history
- this really just stops `then` callbacks, once thread starts no way to actually cancel thread execution.
  • Loading branch information
TheTechsTech committed Apr 20, 2022
1 parent 65619d6 commit 1eafcff
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 78 deletions.
136 changes: 94 additions & 42 deletions Spawn/Thread.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ final class Thread
protected $isYield = false;
protected $isClosed = false;
protected $releaseQueue = false;
protected $tid = 0;

protected static $uv = null;

Expand All @@ -56,7 +57,16 @@ public function __destruct()
if (!$this->isClosed)
$this->close();

$this->releaseQueue = true;
if (!$this->hasLoop && self::$uv instanceof \UVLoop) {
@\uv_stop(self::$uv);
@\uv_run(self::$uv);
@\uv_loop_delete(self::$uv);
self::$uv = null;
}

$this->successCallbacks = [];
$this->errorCallbacks = [];
$this->threads = null;
}

public function close()
Expand All @@ -65,12 +75,10 @@ public function close()
$this->success = null;
$this->failed = null;
$this->loop = null;
$this->successCallbacks = [];
$this->errorCallbacks = [];
$this->isYield = false;
$this->isClosed = true;

$this->hasLoop = null;
$this->releaseQueue = true;
\gc_collect_cycles();
}
}

Expand Down Expand Up @@ -112,34 +120,59 @@ public function __construct($loop = null, ?\UVLoop $uv = null, bool $yielding =
public function create($tid, callable $task, ...$args): self
{
$tid = \is_scalar($tid) ? $tid : (int) $tid;
$this->tid = $tid;
$this->status[$tid] = 'queued';
$async = $this;
if (!isset($this->threads[$tid]))
$this->threads[$tid] = \uv_async_init(self::$uv, function () use ($async, $tid) {
$async->handlers($tid);
});

\uv_queue_work(self::$uv, function () use (&$async, $task, $tid, &$args) {
\uv_queue_work(self::$uv, function () use ($async, &$task, $tid, &$args) {
include 'vendor/autoload.php';
try {
$result = $task(...$args);
$async->setResult($tid, $result);
if (!$async->isCancelled($tid))
$result = $task(...$args);

if (!$async->isCancelled($tid))
$async->setResult($tid, $result);
} catch (\Throwable $exception) {
$async->setException($tid, $exception);
}

if (isset($async->threads[$tid]) && $async->threads[$tid] instanceof \UVAsync) {
if (isset($async->threads[$tid]) && $async->threads[$tid] instanceof \UVAsync && \uv_is_active($async->threads[$tid])) {
\uv_async_send($async->threads[$tid]);
do {
\usleep($async->count() * 70000);
} while (!$async->releaseQueue);
}

\gc_collect_cycles();
}, function () {
});

return $this;
}

/**
* This method will sends a cancellation request to the thread.
*
* @param string|int $tid Thread ID
* @return void
*/
public function cancel($tid = null): void
{
$lock = \mutex_lock();
if (isset($this->status[$tid])) {
$this->status[$tid] = ['cancelled'];
$this->exception[$tid] = new \RuntimeException(\sprintf('Thread %s cancelled!', (string)$tid));
if (isset($this->threads[$tid]) && $this->threads[$tid] instanceof \UVAsync && \uv_is_active($this->threads[$tid]))
\uv_async_send($this->threads[$tid]);
}

\mutex_unlock($lock);
}

/**
* This method will join the spawned `tid` or `all` threads.
* - It will first wait for that thread's internal task queue to finish.
Expand All @@ -150,21 +183,24 @@ public function create($tid, callable $task, ...$args): self
public function join($tid = null): void
{
$isCoroutine = $this->hasLoop && \is_object($this->loop) && \method_exists($this->loop, 'futureOn') && \method_exists($this->loop, 'futureOff');
while (!empty($tid) ? $this->isRunning($tid) : !$this->isEmpty()) {
$isCancelling = !empty($tid) && $this->isCancelled($tid) && !$this->isEmpty() && \uv_is_active($this->threads[$tid]);
while (!empty($tid) ? ($this->isRunning($tid) || $this->isCancelled($tid)) : !$this->isEmpty()) {
if ($isCoroutine) { // @codeCoverageIgnoreStart
$this->loop->futureOn();
$this->loop->run();
$this->loop->futureOff();
} elseif ($this->hasLoop) {
$this->loop->run();
} else { // @codeCoverageIgnoreEnd
$this->loop->run(); // @codeCoverageIgnoreEnd
} else {
\uv_run(self::$uv, !empty($tid) ? \UV::RUN_ONCE : \UV::RUN_NOWAIT);
}

if ($isCancelling)
break;
}

if (!empty($tid)) {
if (!empty($tid))
$this->releaseQueue = true;
}
}

/**
Expand All @@ -178,18 +214,18 @@ protected function handlers($tid): void
if ($this->isRunning($tid)) {
} elseif ($this->isSuccessful($tid)) {
$this->remove($tid);
if ($this->hasLoop)
if ($this->hasLoop) // @codeCoverageIgnoreStart
$this->loop->executeTask($this->success, $tid);
elseif ($this->isYield)
$this->yieldSuccess($tid);
$this->yieldAsFinished($tid); // @codeCoverageIgnoreEnd
else
$this->triggerSuccess($tid);
} elseif ($this->isTerminated($tid)) {
} elseif ($this->isTerminated($tid) || $this->isCancelled($tid)) {
$this->remove($tid);
if ($this->hasLoop)
if ($this->hasLoop) // @codeCoverageIgnoreStart
$this->loop->executeTask($this->failed, $tid);
elseif ($this->isYield)
$this->yieldAsFailed($tid);
$this->yieldAsFailed($tid); // @codeCoverageIgnoreEnd
else
$this->triggerError($tid);
}
Expand All @@ -210,6 +246,17 @@ public function isEmpty(): bool
return empty($this->threads);
}

/**
* Tell if the referenced `tid` is cancelled.
*
* @param string|int $tid Thread ID
* @return bool
*/
public function isCancelled($tid): bool
{
return isset($this->status[$tid]) && \is_array($this->status[$tid]);
}

/**
* Tell if the referenced `tid` is executing.
*
Expand Down Expand Up @@ -306,7 +353,7 @@ public function getFailed(): array
*/
public function then(callable $thenCallback, callable $failCallback = null): self
{
$this->successCallbacks[] = $thenCallback;
$this->successCallbacks[$this->tid][] = $thenCallback;
if ($failCallback !== null) {
$this->catch($failCallback);
}
Expand All @@ -322,7 +369,7 @@ public function then(callable $thenCallback, callable $failCallback = null): sel
*/
public function catch(callable $callback): self
{
$this->errorCallbacks[] = $callback;
$this->errorCallbacks[$this->tid][] = $callback;

return $this;
}
Expand All @@ -335,14 +382,16 @@ public function catch(callable $callback): self
*/
public function triggerSuccess($tid)
{
$result = $this->result[$tid];
if ($this->isYield)
return $this->yieldSuccess($result);
if (isset($this->result[$tid])) {
$result = $this->result[$tid];
if ($this->isYield)
return $this->yieldSuccess($result, $tid);

foreach ($this->successCallbacks as $callback)
$callback($result);
foreach ($this->successCallbacks[$tid] as $callback)
$callback($result);

return $result;
return $result;
}
}

/**
Expand All @@ -353,32 +402,31 @@ public function triggerSuccess($tid)
*/
public function triggerError($tid)
{
$exception = $this->exception[$tid];
if ($this->isYield)
return $this->yieldError($exception);
if (isset($this->exception[$tid])) {
$exception = $this->exception[$tid];
if ($this->isYield)
return $this->yieldError($exception, $tid);

foreach ($this->errorCallbacks as $callback)
$callback($exception);
foreach ($this->errorCallbacks[$tid] as $callback)
$callback($exception);

if (!$this->errorCallbacks) {
throw $exception;
if (!$this->errorCallbacks)
throw $exception;
}
}

protected function yieldSuccess($output)
protected function yieldSuccess($output, $tid)
{
foreach ($this->successCallbacks as $callback) {
foreach ($this->successCallbacks[$tid] as $callback)
yield $callback($output);
}

return $output;
}

protected function yieldError($exception)
protected function yieldError($exception, $tid)
{
foreach ($this->errorCallbacks as $callback) {
foreach ($this->errorCallbacks[$tid] as $callback)
yield $callback($exception);
}

if (!$this->errorCallbacks) {
throw $exception;
Expand All @@ -395,8 +443,12 @@ protected function remove($tid): void
if (isset($this->threads[$tid]) && $this->threads[$tid] instanceof \UVAsync)
\uv_close($this->threads[$tid]);

if (isset($this->status[$tid]))
unset($this->threads[$tid], $this->status[$tid]);
if (isset($this->status[$tid])) {
if (\is_array($this->status[$tid]))
unset($this->threads[$tid]);
else
unset($this->threads[$tid], $this->status[$tid]);
}

\mutex_unlock($lock);
}
Expand Down
79 changes: 43 additions & 36 deletions tests/ZThreadTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,44 @@ public function testIt_can_handle_multi()
{
$this->markTestSkipped('Test skipped "uv_loop_new" and "PHP ZTS" missing. currently buggy - zend_mm_heap corrupted');
$thread = new Thread();

$counter = 0;

$thread->create(5, function () {
return 2;
})->then(function (int $output) use (&$counter) {
$counter += $output;
});

$thread->create(6, function () {
return 2;
})->then(function (int $output) use (&$counter) {
$counter += $output;
});

$thread->create(7, function () {
usleep(20000);
return 2;
})->then(function (int $output) use (&$counter) {
$counter += $output;
});

$thread->join(6);
$this->assertCount(2, $thread->getSuccess());
$this->assertEquals(2, $thread->getResult(6));
$this->assertEquals(6, $counter);

$thread->join();
$this->assertCount(3, $thread->getSuccess());
$this->assertEquals(18, $counter);

$thread->close();
try {
$thread->create(5, function () {
usleep(50000);
return 2;
})->then(function (int $output) use (&$counter) {
$counter += $output;
})->catch(function (\Throwable $e) {
var_dump($e->getMessage());
});

$thread->create(6, function () {
usleep(50);
return 4;
})->then(function (int $output) use (&$counter) {
$counter += $output;
})->catch(function (\Throwable $e) {
var_dump($e->getMessage());
});

$thread->create(7, function () {
usleep(5000000);
})->then(function (int $output) use (&$counter) {
$counter += $output;
})->catch(function (\Throwable $exception) {
$this->assertEquals('Thread 7 cancelled!', $exception->getMessage());
});

$thread->join(6);
$this->assertCount(1, $thread->getSuccess());
$this->assertEquals(4, $thread->getResult(6));

$thread->cancel(7);
$thread->join(1);
$this->assertCount(1, $thread->getFailed());
} catch (\Throwable $th) {
var_dump($th->getMessage());
}
}

public function testIt_can_create_and_return_results()
Expand Down Expand Up @@ -99,15 +105,16 @@ public function testIt_can_handle_exceptions_via_catch_callback()
$thread = new Thread();

$thread->create(44, function () {
throw new \Exception('test');
})->catch(function (\Exception $e) {
$this->assertEquals('test', $e->getMessage());
sleep(100);
})->catch(function (\Throwable $e) {
$this->assertEquals('Thread 44 cancelled!', $e->getMessage());
});

$thread->cancel(44);
$thread->join(44);
$thread->close();
$this->assertInstanceOf(\RuntimeException::class, $thread->getException(44));

$this->assertInstanceOf(\Exception::class, $thread->getException(44));
$thread->close();
$this->assertCount(1, $thread->getFailed());
}
}

0 comments on commit 1eafcff

Please sign in to comment.