Skip to content

Commit

Permalink
Merge pull request #80 from neighborhoods/KOJO-143-state-transition-l…
Browse files Browse the repository at this point in the history
…ogger-process-framework

KOJO-143 | Job State Changelog Processor process framework
  • Loading branch information
mucha55 authored Oct 2, 2019
2 parents fde8b7e + 763b654 commit 6ac27c7
Show file tree
Hide file tree
Showing 19 changed files with 247 additions and 28 deletions.
19 changes: 18 additions & 1 deletion src/Process/Collection.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
- [setIterator, ['@process.collection.iterator']]
- [addProcessPrototype, ['@process.listener.command']]
- [addProcessPrototype, ['@process.job']]
- [addProcessPrototype, ['@process.job_state_changelog_processor']]
process.collection:
alias: neighborhoods.kojo.process.collection
neighborhoods.kojo.process.collection-server:
Expand All @@ -24,10 +25,26 @@ services:
- [addProcessPrototype, ['@process.listener.mutex.redis']]
process.collection-job:
alias: neighborhoods.kojo.process.collection-job
neighborhoods.kojo.process.collection-job_state_changelog_processor:
shared: true
class: Neighborhoods\Kojo\Process\Collection
calls:
- [setIterator, ['@process.collection.iterator']]
- [addProcessPrototype, ['@process.listener.mutex.redis']]
process.collection-job_state_changelog_processor:
alias: neighborhoods.kojo.process.collection-job_state_changelog_processor
neighborhoods.kojo.process.collection-singleton_manager:
shared: true
class: Neighborhoods\Kojo\Process\Collection
calls:
- [setIterator, ['@process.collection.iterator']]
- [addProcessPrototype, ['@process.job_state_changelog_processor']]
process.collection-singleton_manager:
alias: neighborhoods.kojo.process.collection-singleton_manager
neighborhoods.kojo.process.collection-empty:
shared: true
class: Neighborhoods\Kojo\Process\Collection
calls:
- [setIterator, ['@process.collection.iterator']]
process.collection-empty:
alias: neighborhoods.kojo.process.collection-empty
alias: neighborhoods.kojo.process.collection-empty
31 changes: 31 additions & 0 deletions src/Process/JobStateChangelogProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php
declare(strict_types=1);

namespace Neighborhoods\Kojo\Process;

use Neighborhoods\Kojo\Semaphore;

class JobStateChangelogProcessor extends Forked implements JobStateChangelogProcessorInterface
{
use Semaphore\Resource\Factory\AwareTrait;

public const TYPE_CODE = 'job_state_changelog_processor';

protected function _run(): Forked
{
$this->_getLogger()->debug('JobStateChangelogProcessor has been instantiated');

if ($this->_getSemaphoreResource('job_state_changelog_processor')->testAndSetLock()) {
$this->_getLogger()->debug('JobStateChangelogProcessor has acquired mutex');

// TODO: replace with business logic
while (true) {
sleep(1);
}
} else {
$this->_getLogger()->debug('JobStateChangelogProcessor failed to acquire mutex');
}

return $this;
}
}
16 changes: 16 additions & 0 deletions src/Process/JobStateChangelogProcessor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
services:
neighborhoods.kojo.process.job_state_changelog_processor:
class: Neighborhoods\Kojo\Process\JobStateChangelogProcessor
public: true
shared: false
parent: process_abstract
calls:
- [setTypeCode, ['job_state_changelog_processor']]
- [setProcessStrategy, ['@process.strategy.process_control']]
- [setTerminationSignalNumber, ['@=constant("SIGTERM")']]
- [setUuidMaximumInteger, [9999999999]]
- [setProcessPoolFactory, ['@process.pool.factory-job_state_changelog_processor']]
- [setTitlePrefix, ['%process.title.prefix%']]
- [addSemaphoreResourceFactory, ['@semaphore.resource.factory-job_state_changelog_processor']]
process.job_state_changelog_processor:
alias: neighborhoods.kojo.process.job_state_changelog_processor
10 changes: 10 additions & 0 deletions src/Process/JobStateChangelogProcessorInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php
declare(strict_types=1);

namespace Neighborhoods\Kojo\Process;

use Neighborhoods\Kojo\ProcessInterface;

interface JobStateChangelogProcessorInterface extends ProcessInterface
{
}
17 changes: 16 additions & 1 deletion src/Process/Pool/Factory.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ parameters:
process_pool_strategy-job.max_child_processes: 1
process_pool_strategy-job.child_process_wait_throttle: 0
process_pool_strategy-job.max_alarm_time: 0
process_pool_strategy-job_state_changelog_processor.max_child_processes: 1
process_pool_strategy-job_state_changelog_processor.child_process_wait_throttle: 0
process_pool_strategy-job_state_changelog_processor.max_alarm_time: 0
process_pool_strategy-empty.max_child_processes: 0
process_pool_strategy-empty.child_process_wait_throttle: 0
process_pool_strategy-empty.max_alarm_time: 0
Expand Down Expand Up @@ -48,6 +51,18 @@ services:
- [setProcessCollection, ['@process.collection-job']]
process.pool.factory-job:
alias: neighborhoods.kojo.process.pool.factory-job
neighborhoods.kojo.process.pool.factory-job_state_changelog_processor:
class: Neighborhoods\Kojo\Process\Pool\Factory
shared: false
calls:
- [setMaxChildProcesses, ['%process_pool_strategy-job_state_changelog_processor.max_child_processes%']]
- [setChildProcessWaitThrottle, ['%process_pool_strategy-job_state_changelog_processor.child_process_wait_throttle%']]
- [setMaxAlarmTime, ['%process_pool_strategy-job_state_changelog_processor.max_alarm_time%']]
- [setProcessPool, ['@process.pool']]
- [setProcessPoolStrategy, ['@process.pool.strategy-job_state_changelog_processor']]
- [setProcessCollection, ['@process.collection-job_state_changelog_processor']]
process.pool.factory-job_state_changelog_processor:
alias: neighborhoods.kojo.process.pool.factory-job_state_changelog_processor
neighborhoods.kojo.process.pool.factory-empty:
class: Neighborhoods\Kojo\Process\Pool\Factory
shared: false
Expand All @@ -59,4 +74,4 @@ services:
- [setProcessPoolStrategy, ['@process.pool.strategy-job']]
- [setProcessCollection, ['@process.collection-empty']]
process.pool.factory-empty:
alias: neighborhoods.kojo.process.pool.factory-empty
alias: neighborhoods.kojo.process.pool.factory-empty
3 changes: 3 additions & 0 deletions src/Process/Pool/Strategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Neighborhoods\Kojo\ProcessInterface;
use Neighborhoods\Kojo\Process\JobInterface;
use Neighborhoods\Kojo\Process\ListenerInterface;
use Neighborhoods\Kojo\Process\JobStateChangelogProcessorInterface;

class Strategy extends StrategyAbstract
{
Expand All @@ -19,6 +20,8 @@ public function childProcessExited(ProcessInterface $process): StrategyInterface
$this->_jobProcessExited($process);
} elseif ($process instanceof ListenerInterface) {
$this->_listenerProcessExited($process);
} elseif ($process instanceof JobStateChangelogProcessorInterface) {
// A new STL process will be created by the Root when appropriate
} else {
$className = get_class($process);
throw new \UnexpectedValueException("Unexpected process class[$className].");
Expand Down
9 changes: 9 additions & 0 deletions src/Process/Pool/Strategy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ parameters:
process.pool.strategy.maximum_load_average: 10.0
process.pool.strategy-server.maximum_load_average: 10.0
process.pool.strategy-job.maximum_load_average: 10.0
process.pool.strategy-job_state_changelog_processor.maximum_load_average: 10.0
services:
neighborhoods.kojo.process.pool.strategy:
shared: false
Expand All @@ -28,3 +29,11 @@ services:
- [setMaximumLoadAverage, ['%process.pool.strategy-job.maximum_load_average%']]
process.pool.strategy-job:
alias: neighborhoods.kojo.process.pool.strategy-job
neighborhoods.kojo.process.pool.strategy-job_state_changelog_processor:
shared: false
class: Neighborhoods\Kojo\Process\Pool\Strategy\Worker
calls:
- [setLogger, ['@process.pool.logger']]
- [setMaximumLoadAverage, ['%process.pool.strategy-job_state_changelog_processor.maximum_load_average%']]
process.pool.strategy-job_state_changelog_processor:
alias: neighborhoods.kojo.process.pool.strategy-job_state_changelog_processor
33 changes: 33 additions & 0 deletions src/Process/Root.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@

namespace Neighborhoods\Kojo\Process;

use Neighborhoods\Kojo\Semaphore;

class Root extends Forked
{
public const TYPE_CODE = 'root';
protected const SINGLETON_PROCESSES = [
JobStateChangelogProcessor::TYPE_CODE
];

use Semaphore\Resource\Factory\AwareTrait;
use Collection\AwareTrait;

public function __construct()
{
Expand All @@ -14,11 +22,36 @@ public function __construct()

protected function _run(): Forked
{
$this->_getProcessCollection()->applyProcessPool($this->_getProcessPool());

while (true) {
$this->getProcessSignalDispatcher()->processBufferedSignals();
$this->pollSingletonProcesses();
sleep(1);
}

return $this;
}

protected function pollSingletonProcesses() : Root
{
foreach (self::SINGLETON_PROCESSES as $singletonType) {
$semaphoreResource = $this->_getSemaphoreResource($singletonType);

// soft test the lock to spawn a process
// that process will go on to attempt to actually acquire the global mutex
if ($semaphoreResource->testLock()) {
try {
$process = $this->_getProcessCollection()->getProcessPrototypeClone($singletonType);
$this->_getProcessPool()->addChildProcess($process);
} catch (Forked\Exception $forkedException) {
// this is fine, another execution environment will spawn this process
// TODO: consider breaking here to stop attempting to spawn other singletons
$this->_getLogger()->debug($forkedException->getMessage(), ['exception' => $forkedException]);
}
}
}

return $this;
}
}
4 changes: 3 additions & 1 deletion src/Process/Root.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
- [setTerminationSignalNumber, ['@=constant("SIGTERM")']]
- [setUuidMaximumInteger, [9999999999]]
- [setTitlePrefix, ['%process.title.prefix%']]
- [addSemaphoreResourceFactory, ['@semaphore.resource.factory-job_state_changelog_processor']]
- [setProcessCollection, ['@process.collection-singleton_manager']]
process.root:
alias: neighborhoods.kojo.process.root
public: false
public: false
17 changes: 16 additions & 1 deletion src/Semaphore.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ public function testAndSetLock(ResourceInterface $resource): bool
return $this->hasLock($resource);
}

public function testLock(ResourceInterface $resource) : bool
{
$resourceId = $resource->getResourceId();

if (!$this->_hasResource($resourceId)) {
$this->_resources[$resourceId] = $resource;
}

$isLockAvailable = $this->_getResource($resourceId)->getMutex()->testLock();

$this->_unsetResource($resourceId);

return $isLockAvailable;
}

public function releaseLock(ResourceInterface $resource): SemaphoreInterface
{
$resourceId = $resource->getResourceId();
Expand Down Expand Up @@ -125,4 +140,4 @@ protected function _decrementLockCount(string $resourceId): SemaphoreInterface

return $this;
}
}
}
5 changes: 5 additions & 0 deletions src/Semaphore/Mutex/Flock.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public function testAndSetLock(): bool
return $this->_hasLock;
}

public function testLock(): bool
{
throw new \LogicException('Mutex::testLock() is unimplemented for filesystem locks');
}

public function releaseLock(): MutexInterface
{
if ($this->_hasLock === true) {
Expand Down
63 changes: 45 additions & 18 deletions src/Semaphore/Mutex/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function testAndSetLock(): bool

// If the mutex resource ID is set, then check if the owning client is connected.
$mutexKeyValue = $this->_getRedisClient()->get($key);
if (!empty($mutexKeyValue)) {
if ($mutexKeyValue !== false) {
$mutexClientIsConnected = false;

// Get a list of connected clients.
Expand All @@ -44,40 +44,67 @@ public function testAndSetLock(): bool
// If the client that registered for the mutex resource ID is connected, the mutex is held by another client.
if ($mutexClientIsConnected === false) {
// If not, try to obtain the lock by registering on the mutex resource ID.
$this->_getRedisClient()->multi();
$this->_getRedisClient()->set($key, $processUUID);
$reply = $this->_getRedisClient()->exec();

// If the mutex resource ID was not set by another client, the mutex is obtained by this client.
if ($reply[0] === true) {
if ($this->setLockKey($key, $processUUID)) {
$this->_hasLock = true;
}
}
}else {
} else {
// If the mutex resource ID is not set, try to obtain the mutex.
$this->_getRedisClient()->multi();
$this->_getRedisClient()->set($key, $processUUID);
$reply = $this->_getRedisClient()->exec();
if (is_array($reply) && $reply[0] === true) {
if ($this->setLockKey($key, $processUUID)) {
$this->_hasLock = true;
}elseif ($reply !== false) {
$type = gettype($reply);
throw new \UnexpectedValueException("Reply is of type [$type]");
}
}
}else {
} else {
throw new \LogicException('The mutex already has obtained a lock.');
}

return $this->_hasLock;
}

protected function setLockKey(string $key, string $processUUID) : bool
{
$this->_getRedisClient()->multi();
$this->_getRedisClient()->set($key, $processUUID);

$reply = $this->_getRedisClient()->exec();

if (is_array($reply) && $reply[0] === true) {
return true;
}

if ($reply === false) {
return false;
}

$type = gettype($reply);
throw new \UnexpectedValueException("Reply is of type [$type]");
}

public function testLock() : bool
{
$keyValue = $this->_getRedisClient()->get($this->_getKey());

// not likely since the entire key is deleted normally when the lock is released
if ($keyValue === false) {
return true;
}

$clients = $this->_getRedisClient()->client('LIST');
foreach ($clients as $client) {
if ($client['name'] === $keyValue) {
return false;
}
}

return true;
}

public function releaseLock(): MutexInterface
{
if ($this->_hasLock === true) {
$this->_getRedisClient()->del($this->_getKey());
$this->_hasLock = false;
}else {
} else {
throw new \LogicException('The mutex has not obtained a lock.');
}

Expand Down Expand Up @@ -114,4 +141,4 @@ protected function _getRedisClient(): \Redis

return $this->_read(self::PROP_REDIS);
}
}
}
11 changes: 10 additions & 1 deletion src/Semaphore/MutexInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,18 @@ public function setIsBlocking(bool $isBlocking): MutexInterface;

public function testAndSetLock(): bool;

/**
* Soft check for whether a mutex is available
*
* DOES NOT ACQUIRE THE MUTEX
*
* @return bool
*/
public function testLock(): bool;

public function releaseLock(): MutexInterface;

public function setResource(ResourceInterface $resource): MutexInterface;

public function hasLock(): bool;
}
}
Loading

0 comments on commit 6ac27c7

Please sign in to comment.