Skip to content

Commit

Permalink
Optimized codes of redis-subscriber (#690)
Browse files Browse the repository at this point in the history
Co-authored-by: Deeka Wong <[email protected]>
  • Loading branch information
huangdijia and huangdijia committed Jul 21, 2024
1 parent 8dd5d07 commit 41da346
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 20 deletions.
7 changes: 4 additions & 3 deletions src/CommandInvoker.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public function interrupt(): bool
return true;
}

public function ping(int $timeout = 1): string|bool
public function ping(float $timeout = 1): string|bool
{
$this->connection->send(CommandBuilder::build('ping'));
return $this->pingChannel->pop($timeout);
Expand All @@ -84,6 +84,7 @@ public function ping(int $timeout = 1): string|bool
*/
protected function receive(Connection $connection): void
{
/** @var array|null $buffer */
$buffer = null;

while (true) {
Expand Down Expand Up @@ -129,7 +130,7 @@ protected function receive(Connection $connection): void
$message = new Message();
$message->channel = $buffer[4];
$message->payload = $buffer[6];
$timerID = $this->timer->after(30, function () use ($message) {
$timerID = $this->timer->after(30.0, function () use ($message) {
$this->logger?->error(sprintf('Message channel (%s) is 30 seconds full, disconnected', $message->channel));
$this->interrupt();
});
Expand All @@ -156,7 +157,7 @@ protected function receive(Connection $connection): void
$message->pattern = $buffer[4];
$message->channel = $buffer[6];
$message->payload = $buffer[8];
$timerID = $this->timer->after(30, function () use ($message) {
$timerID = $this->timer->after(30.0, function () use ($message) {
$this->logger?->error(sprintf('Message channel (%s) is 30 seconds full, disconnected', $message->channel));
$this->interrupt();
});
Expand Down
1 change: 1 addition & 0 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public function send(string $data): bool
if ($size === false) {
throw new SocketException('Failed to send data to the socket.');
}

if ($len !== $size) {
throw new SocketException('The sending data is incomplete, it may be that the socket has been closed by the peer.');
}
Expand Down
22 changes: 5 additions & 17 deletions src/Subscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ public function __construct(
*/
public function subscribe(string ...$channels): void
{
$channels = array_map(function ($channel) {
return $this->prefix . $channel;
}, $channels);

$channels = array_map(fn ($channel) => $this->prefix . $channel, $channels);
$result = $this->commandInvoker->invoke(['subscribe', ...$channels], count($channels));

foreach ($result as $value) {
Expand All @@ -62,10 +59,7 @@ public function subscribe(string ...$channels): void
*/
public function unsubscribe(string ...$channels): void
{
$channels = array_map(function ($channel) {
return $this->prefix . $channel;
}, $channels);

$channels = array_map(fn ($channel) => $this->prefix . $channel, $channels);
$result = $this->commandInvoker->invoke(['unsubscribe', ...$channels], count($channels));

foreach ($result as $value) {
Expand All @@ -83,10 +77,7 @@ public function unsubscribe(string ...$channels): void
*/
public function psubscribe(string ...$channels): void
{
$channels = array_map(function ($channel) {
return $this->prefix . $channel;
}, $channels);

$channels = array_map(fn ($channel) => $this->prefix . $channel, $channels);
$result = $this->commandInvoker->invoke(['psubscribe', ...$channels], count($channels));

foreach ($result as $value) {
Expand All @@ -104,10 +95,7 @@ public function psubscribe(string ...$channels): void
*/
public function punsubscribe(string ...$channels): void
{
$channels = array_map(function ($channel) {
return $this->prefix . $channel;
}, $channels);

$channels = array_map(fn ($channel) => $this->prefix . $channel, $channels);
$result = $this->commandInvoker->invoke(['punsubscribe', ...$channels], count($channels));

foreach ($result as $value) {
Expand Down Expand Up @@ -138,7 +126,7 @@ public function close(): void
/**
* @throws SocketException
*/
public function ping(int $timeout = 1): string|bool
public function ping(float $timeout = 1): string|bool
{
return $this->commandInvoker->ping($timeout);
}
Expand Down

0 comments on commit 41da346

Please sign in to comment.