From b62808de0b615947f1075b62146f4d8f5eed075d Mon Sep 17 00:00:00 2001 From: mucha55 Date: Tue, 19 Mar 2019 13:05:59 -0500 Subject: [PATCH] KOJO-79 | Add robustness around redis command processing --- src/Message/Broker/BrokerInterface.php | 4 ++-- src/Message/Broker/Redis.php | 21 ++++++++++++++------- src/Process/Listener/Command.php | 25 ++++++++++++++----------- src/Process/Listener/Mutex/Redis.php | 2 +- src/Process/ListenerInterface.php | 4 ++-- src/Process/Pool/Strategy.php | 14 +++++++++----- src/Process/Pool/Strategy/Worker.php | 3 +-- 7 files changed, 43 insertions(+), 30 deletions(-) diff --git a/src/Message/Broker/BrokerInterface.php b/src/Message/Broker/BrokerInterface.php index 3c57ad80..a7bda6a6 100644 --- a/src/Message/Broker/BrokerInterface.php +++ b/src/Message/Broker/BrokerInterface.php @@ -11,7 +11,7 @@ public function publishMessage($message): BrokerInterface; public function hasMessage(): bool; - public function getNextMessage(): string; + public function attemptGetNextMessage(): ?string; public function getPublishChannelLength(): int; @@ -20,4 +20,4 @@ public function getSubscriptionChannelLength(): int; public function setSubscriptionChannelName(string $channelName): BrokerInterface; public function setPublishChannelName(string $channelName): BrokerInterface; -} \ No newline at end of file +} diff --git a/src/Message/Broker/Redis.php b/src/Message/Broker/Redis.php index c4e5681b..fb5f17ed 100644 --- a/src/Message/Broker/Redis.php +++ b/src/Message/Broker/Redis.php @@ -47,22 +47,29 @@ public function hasMessage(): bool throw $throwable; } - return $publishChannelLength + $subscriptionChannelLength > 0 ? true : false; + return ($publishChannelLength + $subscriptionChannelLength > 0); } - public function getNextMessage(): string + public function attemptGetNextMessage(): ?string { try { - $message = $this->_getRedisClient()->lPop($this->_getSubscriptionChannelName()); - if ($message === false) { - $message = $this->_getRedisClient()->rPop($this->_getPublishChannelName()); + $subscriptionMessage = $this->_getRedisClient()->lPop($this->_getSubscriptionChannelName()); + + if ($subscriptionMessage !== false) { + return $subscriptionMessage; + } + + $publicationMessage = $this->_getRedisClient()->rPop($this->_getPublishChannelName()); + + if ($publicationMessage !== false) { + return $publicationMessage; } + + return null; } catch (\Throwable $throwable) { $this->_getLogger()->critical($throwable->getMessage(), ['exception' => $throwable]); throw $throwable; } - - return (string)$message; } public function getPublishChannelLength(): int diff --git a/src/Process/Listener/Command.php b/src/Process/Listener/Command.php index abf92a84..f8511cf3 100644 --- a/src/Process/Listener/Command.php +++ b/src/Process/Listener/Command.php @@ -19,18 +19,21 @@ public function hasMessages(): bool return $this->_getMessageBroker()->hasMessage(); } - public function processMessages(): ListenerInterface + public function processMessage(): ListenerInterface { - $message = $this->_getMessageBroker()->getNextMessage(); - if (json_decode($message, true) !== null) { - $this->_getExpressionLanguage()->evaluate( - json_decode($message, true)['command'], - [ - 'commandProcess' => $this, - ] - ); - } else { - $this->_getLogger()->warning(sprintf('The message is not a JSON: [%s]', $message)); + $message = $this->_getMessageBroker()->attemptGetNextMessage(); + + if ($message !== null) { + if (json_decode($message) !== null) { + $this->_getExpressionLanguage()->evaluate( + json_decode($message, true)['command'], + [ + 'commandProcess' => $this, + ] + ); + } else { + $this->_getLogger()->warning('The message is not a JSON: "' . $message . '".'); + } } return $this; diff --git a/src/Process/Listener/Mutex/Redis.php b/src/Process/Listener/Mutex/Redis.php index 21135dc8..c329bd66 100644 --- a/src/Process/Listener/Mutex/Redis.php +++ b/src/Process/Listener/Mutex/Redis.php @@ -43,7 +43,7 @@ protected function _register(): ProcessInterface return $this; } - public function processMessages(): ListenerInterface + public function processMessage(): ListenerInterface { throw new RuntimeException('The connection to redis was lost.'); } diff --git a/src/Process/ListenerInterface.php b/src/Process/ListenerInterface.php index 1bdf8625..5ca1bcb1 100644 --- a/src/Process/ListenerInterface.php +++ b/src/Process/ListenerInterface.php @@ -7,7 +7,7 @@ interface ListenerInterface extends ProcessInterface { - public function processMessages(): ListenerInterface; + public function processMessage(): ListenerInterface; public function hasMessages(): bool; -} \ No newline at end of file +} diff --git a/src/Process/Pool/Strategy.php b/src/Process/Pool/Strategy.php index 0b1d88cc..c0073110 100644 --- a/src/Process/Pool/Strategy.php +++ b/src/Process/Pool/Strategy.php @@ -33,11 +33,11 @@ protected function _listenerProcessExited(ListenerInterface $listenerProcess): S $this->_pauseListenerProcess($listenerProcess); } else { while ( - $listenerProcess->hasMessages() - && !$this->_getProcessPool()->isFull() + !$this->_getProcessPool()->isFull() && $this->_getProcessPool()->canEnvironmentSustainAdditionProcesses() + && $listenerProcess->hasMessages() ) { - $listenerProcess->processMessages(); + $listenerProcess->processMessage(); } if ($this->_getProcessPool()->isFull()) { @@ -171,8 +171,12 @@ protected function _unPauseListenerProcesses(): Strategy if (!$this->_getProcessPool()->isFull()) { $typeCode = $listenerProcess->getTypeCode(); $newListenerProcess = $this->_getProcessCollection()->getProcessPrototypeClone($typeCode); - while (!$this->_getProcessPool()->isFull() && $listenerProcess->hasMessages()) { - $listenerProcess->processMessages(); + while ( + !$this->_getProcessPool()->isFull() && + $this->_getProcessPool()->canEnvironmentSustainAdditionProcesses() && + $listenerProcess->hasMessages() + ) { + $listenerProcess->processMessage(); } if (!$this->_getProcessPool()->isFull()) { try { diff --git a/src/Process/Pool/Strategy/Worker.php b/src/Process/Pool/Strategy/Worker.php index 25e597b2..33792968 100644 --- a/src/Process/Pool/Strategy/Worker.php +++ b/src/Process/Pool/Strategy/Worker.php @@ -28,10 +28,9 @@ public function childProcessExited(ProcessInterface $process): StrategyInterface protected function _listenerProcessExited(ListenerInterface $listenerProcess): StrategyInterface { while ($listenerProcess->hasMessages()) { - $listenerProcess->processMessages(); + $listenerProcess->processMessage(); } - return $this; }