Skip to content

Commit

Permalink
Merge pull request #75 from neighborhoods/KOJO-79-more-robust-redis-m…
Browse files Browse the repository at this point in the history
…essage-processing

KOJO-79 | Add robustness around redis command processing
  • Loading branch information
mucha55 committed Sep 5, 2019
2 parents 75e95e2 + b62808d commit 9e086bd
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/Message/Broker/BrokerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public function publishMessage($message): BrokerInterface;

public function hasMessage(): bool;

public function getNextMessage(): string;
public function attemptGetNextMessage(): ?string;

public function getPublishChannelLength(): int;

Expand All @@ -20,4 +20,4 @@ public function getSubscriptionChannelLength(): int;
public function setSubscriptionChannelName(string $channelName): BrokerInterface;

public function setPublishChannelName(string $channelName): BrokerInterface;
}
}
21 changes: 14 additions & 7 deletions src/Message/Broker/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,29 @@ public function hasMessage(): bool
throw $throwable;
}

return $publishChannelLength + $subscriptionChannelLength > 0 ? true : false;
return ($publishChannelLength + $subscriptionChannelLength > 0);
}

public function getNextMessage(): string
public function attemptGetNextMessage(): ?string
{
try {
$message = $this->_getRedisClient()->lPop($this->_getSubscriptionChannelName());
if ($message === false) {
$message = $this->_getRedisClient()->rPop($this->_getPublishChannelName());
$subscriptionMessage = $this->_getRedisClient()->lPop($this->_getSubscriptionChannelName());

if ($subscriptionMessage !== false) {
return $subscriptionMessage;
}

$publicationMessage = $this->_getRedisClient()->rPop($this->_getPublishChannelName());

if ($publicationMessage !== false) {
return $publicationMessage;
}

return null;
} catch (\Throwable $throwable) {
$this->_getLogger()->critical($throwable->getMessage(), ['exception' => $throwable]);
throw $throwable;
}

return (string)$message;
}

public function getPublishChannelLength(): int
Expand Down
25 changes: 14 additions & 11 deletions src/Process/Listener/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@ public function hasMessages(): bool
return $this->_getMessageBroker()->hasMessage();
}

public function processMessages(): ListenerInterface
public function processMessage(): ListenerInterface
{
$message = $this->_getMessageBroker()->getNextMessage();
if (json_decode($message, true) !== null) {
$this->_getExpressionLanguage()->evaluate(
json_decode($message, true)['command'],
[
'commandProcess' => $this,
]
);
} else {
$this->_getLogger()->warning(sprintf('The message is not a JSON: [%s]', $message));
$message = $this->_getMessageBroker()->attemptGetNextMessage();

if ($message !== null) {
if (json_decode($message) !== null) {
$this->_getExpressionLanguage()->evaluate(
json_decode($message, true)['command'],
[
'commandProcess' => $this,
]
);
} else {
$this->_getLogger()->warning('The message is not a JSON: "' . $message . '".');
}
}

return $this;
Expand Down
2 changes: 1 addition & 1 deletion src/Process/Listener/Mutex/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected function _register(): ProcessInterface
return $this;
}

public function processMessages(): ListenerInterface
public function processMessage(): ListenerInterface
{
throw new RuntimeException('The connection to redis was lost.');
}
Expand Down
4 changes: 2 additions & 2 deletions src/Process/ListenerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

interface ListenerInterface extends ProcessInterface
{
public function processMessages(): ListenerInterface;
public function processMessage(): ListenerInterface;

public function hasMessages(): bool;
}
}
14 changes: 9 additions & 5 deletions src/Process/Pool/Strategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ protected function _listenerProcessExited(ListenerInterface $listenerProcess): S
$this->_pauseListenerProcess($listenerProcess);
} else {
while (
$listenerProcess->hasMessages()
&& !$this->_getProcessPool()->isFull()
!$this->_getProcessPool()->isFull()
&& $this->_getProcessPool()->canEnvironmentSustainAdditionProcesses()
&& $listenerProcess->hasMessages()
) {
$listenerProcess->processMessages();
$listenerProcess->processMessage();
}

if ($this->_getProcessPool()->isFull()) {
Expand Down Expand Up @@ -171,8 +171,12 @@ protected function _unPauseListenerProcesses(): Strategy
if (!$this->_getProcessPool()->isFull()) {
$typeCode = $listenerProcess->getTypeCode();
$newListenerProcess = $this->_getProcessCollection()->getProcessPrototypeClone($typeCode);
while (!$this->_getProcessPool()->isFull() && $listenerProcess->hasMessages()) {
$listenerProcess->processMessages();
while (
!$this->_getProcessPool()->isFull() &&
$this->_getProcessPool()->canEnvironmentSustainAdditionProcesses() &&
$listenerProcess->hasMessages()
) {
$listenerProcess->processMessage();
}
if (!$this->_getProcessPool()->isFull()) {
try {
Expand Down
3 changes: 1 addition & 2 deletions src/Process/Pool/Strategy/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ public function childProcessExited(ProcessInterface $process): StrategyInterface
protected function _listenerProcessExited(ListenerInterface $listenerProcess): StrategyInterface
{
while ($listenerProcess->hasMessages()) {
$listenerProcess->processMessages();
$listenerProcess->processMessage();
}


return $this;
}

Expand Down

0 comments on commit 9e086bd

Please sign in to comment.