From bb37636ebf4414846fa12f03f8cb3148be0c83d4 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Apr 2024 10:50:29 +0800 Subject: [PATCH 1/4] Add psubscribe and punsubscribe support --- src/redis-subscriber/src/CommandInvoker.php | 27 +++++++++++++++ src/redis-subscriber/src/Message.php | 5 +++ src/redis-subscriber/src/Subscriber.php | 38 +++++++++++++++++++++ 3 files changed, 70 insertions(+) diff --git a/src/redis-subscriber/src/CommandInvoker.php b/src/redis-subscriber/src/CommandInvoker.php index 111690fa..eff94aa1 100644 --- a/src/redis-subscriber/src/CommandInvoker.php +++ b/src/redis-subscriber/src/CommandInvoker.php @@ -113,6 +113,33 @@ public function receive(Connection $connection) continue; } + if ($type == 'psubscribe' && count($buffer) == 6) { + $this->resultChannel->push($buffer); + $buffer = null; + continue; + } + + if ($type == 'punsubscribe' && count($buffer) == 6) { + $this->resultChannel->push($buffer); + $buffer = null; + continue; + } + + if ($type == 'pmessage' && count($buffer) == 9) { + $message = new Message(); + $message->pattern = $buffer[4]; + $message->channel = $buffer[6]; + $message->payload = $buffer[8]; + $timerID = Timer::after(30 * 1000, function () use ($message) { + static::error(sprintf('Message channel (%s) is 30 seconds full, disconnected', $message->channel)); + $this->interrupt(); + }); + $this->messageChannel->push($message); + Timer::clear($timerID); + $buffer = null; + continue; + } + if ($type == 'pong' && count($buffer) == 5) { $this->pingChannel->push('pong'); $buffer = null; diff --git a/src/redis-subscriber/src/Message.php b/src/redis-subscriber/src/Message.php index 9295de6e..207c6104 100644 --- a/src/redis-subscriber/src/Message.php +++ b/src/redis-subscriber/src/Message.php @@ -9,6 +9,11 @@ class Message { + /** + * @var string + */ + public $pattern; + /** * @var string */ diff --git a/src/redis-subscriber/src/Subscriber.php b/src/redis-subscriber/src/Subscriber.php index 1a0f6b15..a9db3399 100644 --- a/src/redis-subscriber/src/Subscriber.php +++ b/src/redis-subscriber/src/Subscriber.php @@ -125,6 +125,44 @@ public function unsubscribe(string ...$channels) } } + /** + * PSubscribe. + * @throws \Swoole\Exception + * @throws Throwable + */ + public function psubscribe(string ...$channels) + { + $channels = array_map(function ($channel) { + return $this->prefix . $channel; + }, $channels); + $result = $this->commandInvoker->invoke(['psubscribe', ...$channels], count($channels)); + foreach ($result as $value) { + if ($value === false) { + $this->commandInvoker->interrupt(); + throw new SubscribeException('Psubscribe failed'); + } + } + } + + /** + * PUnsubscribe. + * @throws \Swoole\Exception + * @throws Throwable + */ + public function punsubscribe(string ...$channels) + { + $channels = array_map(function ($channel) { + return $this->prefix . $channel; + }, $channels); + $result = $this->commandInvoker->invoke(['punsubscribe', ...$channels], count($channels)); + foreach ($result as $value) { + if ($value === false) { + $this->commandInvoker->interrupt(); + throw new UnsubscribeException('Punsubscribe failed'); + } + } + } + /** * Channel * @return \Swoole\Coroutine\Channel From 64f69c973eb35ff37826c203760c104096acbb08 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Apr 2024 11:01:06 +0800 Subject: [PATCH 2/4] Add test case --- src/redis-subscriber/tests/MainTest.php | 38 ++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/src/redis-subscriber/tests/MainTest.php b/src/redis-subscriber/tests/MainTest.php index ef849b6e..cabb5ea8 100644 --- a/src/redis-subscriber/tests/MainTest.php +++ b/src/redis-subscriber/tests/MainTest.php @@ -6,7 +6,7 @@ final class MainTest extends TestCase { - public function test(): void + public function testSubscribe(): void { $func = function () { $sub = new \Mix\Redis\Subscriber\Subscriber('127.0.0.1', 6379, '', 5); @@ -31,6 +31,42 @@ public function test(): void } break; } + $this->assertEquals($data->channel, 'foo1'); + $this->assertEquals($data->payload, 'foo1data'); + break; + } + $sub->close(); + }; + run($func); + } + + public function testPsubscribe(): void + { + $func = function () { + $sub = new \Mix\Redis\Subscriber\Subscriber('127.0.0.1', 6379, '', 5); + $sub->psubscribe('foo.*', 'bar'); // 订阅失败将抛出异常 + $sub->psubscribe('foo1.*', 'bar1'); + $sub->punsubscribe('foo.*', 'bar'); + + go(function () { + $redis = new \Redis(); + $redis->connect('127.0.0.1', 6379); + $redis->publish('foo.1', 'foodata'); + $redis->publish('foo1.1', 'foo1data'); + }); + + $chan = $sub->channel(); + while (true) { + $data = $chan->pop(); + if (empty($data)) { // 手动close与redis异常断开都会导致返回false + if (!$sub->closed) { + // redis异常断开处理 + var_dump('Redis connection is disconnected abnormally'); + } + break; + } + $this->assertEquals($data->pattern, 'foo1.*'); + $this->assertEquals($data->channel, 'foo1.1'); $this->assertEquals($data->payload, 'foo1data'); break; } From 83a27273317c5579b17842fa4d2a703e5d4043ce Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Apr 2024 15:04:29 +0800 Subject: [PATCH 3/4] Updated .gitignore file to exclude vendor, composer.lock, .vscode, and .bashrc files --- .gitignore | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.gitignore b/.gitignore index 9b1fe01a..90751202 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,13 @@ composer.phar phpunit.phar # local phpunit config /phpunit.xml + +# composer +/vendor +/composer.lock + +# vscode +/.vscode + +# other +/.bashrc \ No newline at end of file From 25ab5249c9b0d6cc49ce375dee4b149d155697b8 Mon Sep 17 00:00:00 2001 From: LIU JIAN Date: Sun, 7 Apr 2024 10:29:26 +0800 Subject: [PATCH 4/4] Update .gitignore --- .gitignore | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 90751202..ceebd889 100644 --- a/.gitignore +++ b/.gitignore @@ -12,23 +12,17 @@ nbproject # windows thumbnail cache Thumbs.db -# composer itself is not needed +# composer composer.phar +/vendor +/composer.lock # Mac DS_Store Files .DS_Store -# phpunit itself is not needed +# phpunit phpunit.phar -# local phpunit config /phpunit.xml -# composer -/vendor -/composer.lock - # vscode /.vscode - -# other -/.bashrc \ No newline at end of file