Skip to content

Commit

Permalink
Corrections for implementing proper libuv network related features.
Browse files Browse the repository at this point in the history
- rework `isUvActive()` for only Linux detection, and control whether native PHP `network` events are checked for and executed first
  • Loading branch information
TheTechsTech committed Apr 30, 2020
1 parent 970b070 commit c9c9154
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 37 deletions.
66 changes: 35 additions & 31 deletions Coroutine/Coroutine.php
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ public function __construct()
\spawn_setup($this->uv);
\file_operation(true);

// @codeCoverageIgnoreStart
$this->onEvent = function ($event, $status, $events, $stream) {
if ($status !== 0) {
$this->pollEvent($stream);
Expand All @@ -254,7 +253,6 @@ public function __construct()
$this->updateScheduler('write', $stream);
}
};
// @codeCoverageIgnoreEnd

$this->onTimer = function ($timer) {
$taskTimer = $this->timers[(int) $timer];
Expand All @@ -275,19 +273,15 @@ protected function timestamp()
return (float) ($this->isHighTimer ? \hrtime(true) / 1e+9 : \microtime(true));
}

/**
* @codeCoverageIgnore
*/
protected function addEvent($stream)
{
if (!isset($this->events[(int) $stream])) {
$meta = \stream_get_meta_data($stream);
switch ($meta['stream_type'] ?? '') {
case 'STDIO':
if ($meta['wrapper_type'] == 'plainfile') {

break;
}
case 'TEMP':
$this->events[(int) $stream] = false;
break;
case 'tcp_socket/ssl':
$this->events[(int) $stream] = \uv_poll_init($this->uv, $stream);
break;
Expand All @@ -301,47 +295,46 @@ protected function addEvent($stream)
}
}

/**
* @codeCoverageIgnore
*/
protected function removeReadEvent($stream)
{
if (!isset($this->events[(int) $stream])) {
return;
}

if (isset($this->waitingForRead[(int) $stream])) {
\uv_poll_stop($this->events[(int) $stream]);
\uv_close($this->events[(int) $stream]);
$event = $this->events[(int) $stream];
if ($event instanceof \UVPoll) {
\uv_poll_stop($event);
\uv_close($event);
}

unset($this->events[(int) $stream]);
return;
}

$this->pollEvent($stream);
}

/**
* @codeCoverageIgnore
*/
protected function removeWriteEvent($stream)
{
if (!isset($this->events[(int) $stream])) {
return;
}

if (isset($this->waitingForWrite[(int) $stream])) {
\uv_poll_stop($this->events[(int) $stream]);
\uv_close($this->events[(int) $stream]);
$event = $this->events[(int) $stream];
if ($event instanceof \UVPoll) {
\uv_poll_stop($event);
\uv_close($event);
}

unset($this->events[(int) $stream]);
return;
}

$this->pollEvent($stream);
}

/**
* @codeCoverageIgnore
*/
protected function pollEvent($stream)
{
if (!isset($this->events[(int) $stream])) {
Expand Down Expand Up @@ -439,7 +432,7 @@ public function isUv(): bool

public function isUvActive(): bool
{
return ($this->uv instanceof \UVLoop) && $this->useUv;
return ($this->isUv() && \IS_LINUX);
}

public function isPcntl(): bool
Expand Down Expand Up @@ -629,9 +622,14 @@ public function execute($isReturn = false)

if ($task->isFinished()) {
$this->cancelProgress($task);
$task->setState('completed');
$id = $task->taskId();
$this->completedMap[$id] = $task;
if ($task->isNetwork()) {
$task->close();
} else {
$task->setState('completed');
$this->completedMap[$id] = $task;
}

unset($this->taskMap[$id]);
} else {
$task->setState('rescheduled');
Expand Down Expand Up @@ -677,7 +675,7 @@ protected function runTimers()
}

/**
* Check and return `true` for any pending I/O events, signals, subprocess,
* Check and return `true` for `no` pending I/O events, signals, subprocess,
* streams/sockets/fd activity, timers or tasks.
*/
protected function hasCoroutines(): bool
Expand Down Expand Up @@ -719,9 +717,14 @@ protected function ioWaiting()
$this->process->processing();
$nextTimeout = $this->runTimers();
$streamWait = $this->waitTime($nextTimeout);
$this->ioSocketStream($streamWait);
if ($this->isUv()) {
\uv_run($this->uv, ($this->waitTime($nextTimeout) ? \UV::RUN_ONCE : \UV::RUN_NOWAIT));
if ($this->isUvActive()) {
\uv_run($this->uv, ($streamWait ? \UV::RUN_ONCE : \UV::RUN_NOWAIT));
$this->ioSocketStream($this->waitTime($nextTimeout));
} else {
$this->ioSocketStream($streamWait);
if ($this->isUv()) {
\uv_run($this->uv, ($this->waitTime($nextTimeout) ? \UV::RUN_ONCE : \UV::RUN_NOWAIT));
}
}

yield;
Expand Down Expand Up @@ -837,7 +840,7 @@ public function addSignal($signal, $listener)
} elseif ($this->isUvActive() || $this->isUvSignal) {
if (!isset($this->signals[$signal])) {
$signals = $this->signaler;
$this->signals[$signal] = \uv_signal_init($this->isUvActive() ? $this->uv : \uv_default_loop());
$this->signals[$signal] = \uv_signal_init($this->uv);
\uv_signal_start($this->signals[$signal], function ($signal, $signalInt) use ($signals) {
$signals->execute($signalInt);
}, $signal);
Expand All @@ -856,7 +859,8 @@ public function removeSignal($signal, $listener)
\pcntl_signal($signal, \SIG_DFL);
} elseif ($this->isUvActive() || $this->isUvSignal) {
if (isset($this->signals[$signal]) && $this->signaler->count($signal) === 0) {
//\uv_signal_stop($this->signals[$signal]);
if (\uv_is_active($this->signals[$signal]))
@\uv_signal_stop($this->signals[$signal]);
unset($this->signals[$signal]);
}
}
Expand Down
4 changes: 3 additions & 1 deletion Coroutine/CoroutineInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ public function getProcess(
public function getParallel(): ParallelInterface;

/**
* Check if **UV** event loop `libuv` engine is available, and turned `on` for native asynchronous handling.
* Is `libuv` features available and the system is **Linux**.
*
* `Note:` Network related 'libuv` features are currently broken on **Windows**.
*
* @return bool
*/
Expand Down
14 changes: 10 additions & 4 deletions Coroutine/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ final class Task implements TaskInterface
/**
* Task type indicator.
*
* Currently using types of either `paralleled`, `awaited`, or `monitored`.
* Currently using types of either `paralleled`, `awaited`, `networked`, or `monitored`.
*
* @var string
*/
Expand Down Expand Up @@ -210,6 +210,11 @@ public function isParallel(): bool
return ($this->taskType == 'paralleled');
}

public function isNetwork(): bool
{
return ($this->taskType == 'networked');
}

public function isProcess(): bool
{
return ($this->state == 'process');
Expand Down Expand Up @@ -291,17 +296,18 @@ public function run()
? $this->coroutine->throw($this->exception)
: $this->exception;

$this->error = $this->exception;
if (!$this->isNetwork())
$this->error = $this->exception;

$this->exception = null;
return $value;
} else {
$value = ($this->coroutine instanceof \Generator)
? $this->coroutine->send($this->sendValue)
: $this->sendValue;

if (!empty($value)) {
if (!empty($value) && !$this->isNetwork())
$this->result = $value;
}

$this->sendValue = null;
return $value;
Expand Down
50 changes: 49 additions & 1 deletion Coroutine/TaskInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,50 @@ interface TaskInterface
{
public function taskId(): ?int;

/**
* Task type indicator, currently either `paralleled`, `awaited`,
* `networked`, or `monitored`.
*
* @param string $type
*
* @return void
*
* @internal
*/
public function taskType(string $type);

/**
* @param mixed $sendValue
*
* @return void
*
* @internal
*/
public function sendValue($sendValue);

/**
* @param string $status
*
* @return void
*
* @internal
*/
public function setState(string $status);

/**
* Return task current status state.
*
* @return string
*
* @internal
*/
public function getState(): string;

/**
* Start the execution of the callers code, passing any `value` or `exception` back in forth.
*
* @return mixed
*/
public function run();

/**
Expand Down Expand Up @@ -59,6 +95,14 @@ public function isCustomState($state): bool;
*/
public function isParallel(): bool;

/**
* A flag that indicates the task is socket/stream related and nothing will be stored.
* - All memory is freed, not in completed task list, and no results retained.
*
* @return bool
*/
public function isNetwork(): bool;

/**
* A flag that indicates whether or not the sub process task has started.
*
Expand Down Expand Up @@ -122,7 +166,11 @@ public function result();
/**
* Mark the task as done and set an exception.
*
* @param \Exception
* @param \Exception $exception
*
* @return void
*
* @internal
*/
public function setException($exception);

Expand Down

0 comments on commit c9c9154

Please sign in to comment.