Skip to content

Commit

Permalink
Merge pull request #72 from neighborhoods/client-defined-signal-delivery
Browse files Browse the repository at this point in the history
Client specified signal buffering per signal
  • Loading branch information
alexberryman committed Jun 12, 2019
2 parents 86facad + d1cb4cd commit 2e084cf
Show file tree
Hide file tree
Showing 28 changed files with 566 additions and 260 deletions.
8 changes: 4 additions & 4 deletions src/Process/Forked.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@

namespace Neighborhoods\Kojo\Process;

use ErrorException;
use Neighborhoods\Kojo\Process\Forked\Exception;
use Neighborhoods\Kojo\ProcessAbstract;
use Neighborhoods\Kojo\ProcessInterface;

abstract class Forked extends ProcessAbstract implements ProcessInterface
abstract class Forked extends ProcessAbstract
{
const FORK_FAILURE_CODE = -1;
const PROP_HAS_FORKED = 'has_forked';
protected const PROP_HAS_FORKED = 'has_forked';

public function start(): ProcessInterface
{
$this->_create(self::PROP_HAS_FORKED, true);
try {
$processId = $this->_getProcessStrategy()->fork();
} /** @noinspection PhpRedundantCatchClauseInspection */
catch (\ErrorException $errorException) {
catch (ErrorException $errorException) {
throw (new Exception())->setCode(Exception::CODE_FORK_FAILED)->setPrevious($errorException);
}

Expand Down
20 changes: 18 additions & 2 deletions src/Process/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
use Neighborhoods\Kojo\Foreman;
use Neighborhoods\Kojo\Maintainer;
use Neighborhoods\Kojo\Process;
use Neighborhoods\Kojo\ProcessInterface;
use Neighborhoods\Kojo\Scheduler;
use Neighborhoods\Kojo\Selector;
use Throwable;

class Job extends Forked implements JobInterface
{
Expand All @@ -20,18 +22,32 @@ class Job extends Forked implements JobInterface
protected function _run(): Forked
{
try {
$this->_getProcessSignal()->setCanBufferSignals(false);
$this->_getSelector()->setProcess($this);
$this->_getMaintainer()->rescheduleCrashedJobs();
$this->_getScheduler()->scheduleStaticJobs();
$this->_getMaintainer()->updatePendingJobs();
$this->_getMaintainer()->deleteCompletedJobs();
$this->_getForeman()->workWorker();
} catch (\Throwable $throwable) {
} catch (Throwable $throwable) {
$this->_getLogger()->critical($throwable->getMessage(), ['exception' => $throwable]);
$this->_setOrReplaceExitCode(255);
}

return $this;
}

protected function _registerSignalHandlers(): ProcessInterface
{
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGCHLD, $this, false);
$this->getProcessSignalDispatcher()->ignoreSignal(SIGALRM);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGTERM, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGINT, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGHUP, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGQUIT, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGABRT, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGUSR1, $this, false);
$this->_getLogger()->debug('Registered signal handlers.');

return $this;
}
}
29 changes: 22 additions & 7 deletions src/Process/Listener/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

use Neighborhoods\Kojo\Process\Forked;
use Neighborhoods\Kojo\Process\Forked\Exception;
use Neighborhoods\Kojo\Process\ListenerInterface;
use Neighborhoods\Kojo\Process\ListenerAbstract;
use Neighborhoods\Kojo\Process\ListenerInterface;
use Neighborhoods\Kojo\ProcessInterface;
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;

class Command extends ListenerAbstract implements CommandInterface
{
const PROP_EXPRESSION_LANGUAGE = 'expression_language';
protected const PROP_EXPRESSION_LANGUAGE = 'expression_language';

public function hasMessages(): bool
{
Expand All @@ -21,21 +22,21 @@ public function hasMessages(): bool
public function processMessages(): ListenerInterface
{
$message = $this->_getMessageBroker()->getNextMessage();
if (json_decode($message) !== null) {
if (json_decode($message, true) !== null) {
$this->_getExpressionLanguage()->evaluate(
json_decode($message, true)['command'],
[
'commandProcess' => $this,
]
);
} else {
$this->_getLogger()->warning('The message is not a JSON: "' . $message . '".');
$this->_getLogger()->warning(sprintf('The message is not a JSON: [%s]', $message));
}

return $this;
}

public function addProcess(string $processTypeCode): Command
public function addProcess(string $processTypeCode): CommandInterface
{
$process = $this->_getProcessCollection()->getProcessPrototypeClone($processTypeCode);
try {
Expand All @@ -47,7 +48,7 @@ public function addProcess(string $processTypeCode): Command
return $this;
}

public function setExpressionLanguage(ExpressionLanguage $expressionLanguage): Command
public function setExpressionLanguage(ExpressionLanguage $expressionLanguage): CommandInterface
{
$this->_create(self::PROP_EXPRESSION_LANGUAGE, $expressionLanguage);

Expand All @@ -61,11 +62,25 @@ protected function _getExpressionLanguage(): ExpressionLanguage

protected function _run(): Forked
{
$this->_getProcessSignal()->setCanBufferSignals(false);
if (!$this->_getMessageBroker()->hasMessage()) {
$this->_getMessageBroker()->waitForNewMessage();
}

return $this;
}

protected function _registerSignalHandlers(): ProcessInterface
{
$this->getProcessSignalDispatcher()->ignoreSignal(SIGCHLD);
$this->getProcessSignalDispatcher()->ignoreSignal(SIGALRM);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGTERM, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGINT, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGHUP, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGQUIT, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGABRT, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGUSR1, $this, false);
$this->_getLogger()->debug('Registered signal handlers.');

return $this;
}
}
23 changes: 19 additions & 4 deletions src/Process/Listener/Mutex/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
use Neighborhoods\Kojo\Process\ListenerInterface;
use Neighborhoods\Kojo\ProcessInterface;
use Neighborhoods\Kojo\Redis\Factory;
use RuntimeException;
use Throwable;

class Redis extends ListenerAbstract implements RedisInterface
{
use Factory\AwareTrait;
const PROP_REDIS = 'redis';
public const PROP_REDIS = 'redis';

protected function _run(): Forked
{
$this->_getProcessSignal()->setCanBufferSignals(false);
try {
$this->_register();
} catch (\Throwable $throwable) {
} catch (Throwable $throwable) {
posix_kill($this->getParentProcessId(), $this->getParentProcessTerminationSignalNumber());
$this->_getLogger()->critical(
'Redis mutex watchdog registration encountered a Throwable.',
Expand All @@ -44,7 +45,7 @@ protected function _register(): ProcessInterface

public function processMessages(): ListenerInterface
{
throw new \RuntimeException('The connection to redis was lost.');
throw new RuntimeException('The connection to redis was lost.');
}

public function hasMessages(): bool
Expand All @@ -60,4 +61,18 @@ protected function _getRedis(): \Redis

return $this->_read(self::PROP_REDIS);
}

protected function _registerSignalHandlers(): ProcessInterface
{
$this->getProcessSignalDispatcher()->ignoreSignal(SIGCHLD);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGTERM, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGINT, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGHUP, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGQUIT, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGABRT, $this, false);
$this->getProcessSignalDispatcher()->registerSignalHandler(SIGUSR1, $this, false);
$this->_getLogger()->debug('Registered signal handlers.');

return $this;
}
}
4 changes: 2 additions & 2 deletions src/Process/Pool.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ services:
shared: false
calls:
- [setLogger, ['@process.pool.logger']]
- [setProcessSignal, ['@process.signal']]
- [setProcessSignalDispatcher, ['@Neighborhoods\Kojo\Process\Signal\DispatcherInterface']]
process.pool:
alias: neighborhoods.kojo.process.pool
alias: neighborhoods.kojo.process.pool
2 changes: 1 addition & 1 deletion src/Process/Pool/Logger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ services:
- [setProcessPoolLoggerMessageFactory, ['@neighborhoods.kojo.process.pool.logger.message.factory']]
- [setLevelFilterMask, ['%neighborhoods.kojo.process.pool.logger.level_filter_mask%']]
process.pool.logger:
alias: neighborhoods.kojo.process.pool.logger
alias: neighborhoods.kojo.process.pool.logger
8 changes: 4 additions & 4 deletions src/Process/Pool/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@

namespace Neighborhoods\Kojo\Process\Pool;

use Neighborhoods\Kojo\Process;
use Neighborhoods\Kojo\ProcessAbstract;
use Neighborhoods\Kojo\ProcessInterface;
use Neighborhoods\Kojo\Semaphore;
use Neighborhoods\Pylon\Data\Property;
use Neighborhoods\Kojo\Process;

class Server extends ProcessAbstract implements ServerInterface
{
use Process\Pool\Factory\AwareTrait;
use Property\Defensive\AwareTrait;
use Logger\AwareTrait;
use Semaphore\AwareTrait;
use Semaphore\Resource\Factory\AwareTrait;
const SERVER_SEMAPHORE_RESOURCE_NAME = 'server';

public const SERVER_SEMAPHORE_RESOURCE_NAME = 'server';

public function start(): ProcessInterface
{
Expand All @@ -26,7 +26,7 @@ public function start(): ProcessInterface
$this->_getLogger()->debug('Process pool server started.');
$this->_getProcessPool()->start();
while (true) {
$this->_getProcessSignal()->processBufferedSignals();
$this->getProcessSignalDispatcher()->processBufferedSignals();
sleep(1);
}
} else {
Expand Down
21 changes: 11 additions & 10 deletions src/Process/PoolAbstract.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Neighborhoods\Kojo\Process;

use LogicException;
use Neighborhoods\Kojo\Process;
use Neighborhoods\Kojo\Process\Signal\InformationInterface;
use Neighborhoods\Kojo\ProcessInterface;
Expand All @@ -14,7 +15,7 @@ abstract class PoolAbstract implements PoolInterface
use Process\Pool\Logger\AwareTrait;
use Process\Pool\Strategy\AwareTrait;
use Process\AwareTrait;
use Process\Signal\AwareTrait;
use Process\Signal\Dispatcher\AwareTrait;

abstract protected function _childExitSignal(InformationInterface $information): PoolInterface;

Expand All @@ -32,9 +33,9 @@ public function hasAlarm(): bool
public function setAlarm(int $seconds): PoolInterface
{
if ($seconds === 0) {
$this->_getLogger()->debug("Disabling any existing alarm.");
}else {
$this->_getLogger()->debug("Setting alarm for $seconds seconds.");
$this->_getLogger()->debug('Disabling any existing alarm.');
} else {
$this->_getLogger()->debug(sprintf('Setting alarm for [%s] seconds.', $seconds));
}
pcntl_alarm($seconds);

Expand Down Expand Up @@ -63,8 +64,8 @@ protected function _initialize(): PoolInterface
return $this;
}

protected function _alarmSignal(InformationInterface $information): PoolInterface
{
protected function _alarmSignal(/** @noinspection PhpUnusedParameterInspection */ InformationInterface $information
): PoolInterface {
$this->_getProcessPoolStrategy()->receivedAlarm();

return $this;
Expand All @@ -74,12 +75,12 @@ protected function _validateAlarm(): PoolInterface
{
$alarmValue = pcntl_alarm(0);
if ($this->isEmpty()) {
if ($alarmValue == 0) {
if ($alarmValue === 0) {
$this->_getLogger()->emergency('Process pool has no alarms and no processes.');
throw new \LogicException('Invalid process pool state.');
}else {
$this->_getLogger()->debug('Process pool only has a set alarm.');
throw new LogicException('Invalid process pool state.');
}

$this->_getLogger()->debug('Process pool only has a set alarm.');
}
pcntl_alarm($alarmValue);

Expand Down
8 changes: 3 additions & 5 deletions src/Process/Root.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@

namespace Neighborhoods\Kojo\Process;

use Neighborhoods\Kojo\ProcessInterface;

class Root extends Forked implements ProcessInterface
class Root extends Forked
{
const TYPE_CODE = 'root';
public const TYPE_CODE = 'root';

public function __construct()
{
Expand All @@ -17,7 +15,7 @@ public function __construct()
protected function _run(): Forked
{
while (true) {
$this->_getProcessSignal()->processBufferedSignals();
$this->getProcessSignalDispatcher()->processBufferedSignals();
sleep(1);
}

Expand Down
Loading

0 comments on commit 2e084cf

Please sign in to comment.