Skip to content

Commit

Permalink
Merge pull request #131 from huangdijia/psubscribe-support
Browse files Browse the repository at this point in the history
Add psubscribe and punsubscribe support
  • Loading branch information
onanying authored Apr 7, 2024
2 parents aea8d65 + 25ab524 commit c19590f
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 4 deletions.
10 changes: 7 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ nbproject
# windows thumbnail cache
Thumbs.db

# composer itself is not needed
# composer
composer.phar
/vendor
/composer.lock

# Mac DS_Store Files
.DS_Store

# phpunit itself is not needed
# phpunit
phpunit.phar
# local phpunit config
/phpunit.xml

# vscode
/.vscode
27 changes: 27 additions & 0 deletions src/redis-subscriber/src/CommandInvoker.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,33 @@ public function receive(Connection $connection)
continue;
}

if ($type == 'psubscribe' && count($buffer) == 6) {
$this->resultChannel->push($buffer);
$buffer = null;
continue;
}

if ($type == 'punsubscribe' && count($buffer) == 6) {
$this->resultChannel->push($buffer);
$buffer = null;
continue;
}

if ($type == 'pmessage' && count($buffer) == 9) {
$message = new Message();
$message->pattern = $buffer[4];
$message->channel = $buffer[6];
$message->payload = $buffer[8];
$timerID = Timer::after(30 * 1000, function () use ($message) {
static::error(sprintf('Message channel (%s) is 30 seconds full, disconnected', $message->channel));
$this->interrupt();
});
$this->messageChannel->push($message);
Timer::clear($timerID);
$buffer = null;
continue;
}

if ($type == 'pong' && count($buffer) == 5) {
$this->pingChannel->push('pong');
$buffer = null;
Expand Down
5 changes: 5 additions & 0 deletions src/redis-subscriber/src/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
class Message
{

/**
* @var string
*/
public $pattern;

/**
* @var string
*/
Expand Down
38 changes: 38 additions & 0 deletions src/redis-subscriber/src/Subscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,44 @@ public function unsubscribe(string ...$channels)
}
}

/**
* PSubscribe.
* @throws \Swoole\Exception
* @throws Throwable
*/
public function psubscribe(string ...$channels)
{
$channels = array_map(function ($channel) {
return $this->prefix . $channel;
}, $channels);
$result = $this->commandInvoker->invoke(['psubscribe', ...$channels], count($channels));
foreach ($result as $value) {
if ($value === false) {
$this->commandInvoker->interrupt();
throw new SubscribeException('Psubscribe failed');
}
}
}

/**
* PUnsubscribe.
* @throws \Swoole\Exception
* @throws Throwable
*/
public function punsubscribe(string ...$channels)
{
$channels = array_map(function ($channel) {
return $this->prefix . $channel;
}, $channels);
$result = $this->commandInvoker->invoke(['punsubscribe', ...$channels], count($channels));
foreach ($result as $value) {
if ($value === false) {
$this->commandInvoker->interrupt();
throw new UnsubscribeException('Punsubscribe failed');
}
}
}

/**
* Channel
* @return \Swoole\Coroutine\Channel
Expand Down
38 changes: 37 additions & 1 deletion src/redis-subscriber/tests/MainTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
final class MainTest extends TestCase
{

public function test(): void
public function testSubscribe(): void
{
$func = function () {
$sub = new \Mix\Redis\Subscriber\Subscriber('127.0.0.1', 6379, '', 5);
Expand All @@ -31,6 +31,42 @@ public function test(): void
}
break;
}
$this->assertEquals($data->channel, 'foo1');
$this->assertEquals($data->payload, 'foo1data');
break;
}
$sub->close();
};
run($func);
}

public function testPsubscribe(): void
{
$func = function () {
$sub = new \Mix\Redis\Subscriber\Subscriber('127.0.0.1', 6379, '', 5);
$sub->psubscribe('foo.*', 'bar'); // 订阅失败将抛出异常
$sub->psubscribe('foo1.*', 'bar1');
$sub->punsubscribe('foo.*', 'bar');

go(function () {
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);
$redis->publish('foo.1', 'foodata');
$redis->publish('foo1.1', 'foo1data');
});

$chan = $sub->channel();
while (true) {
$data = $chan->pop();
if (empty($data)) { // 手动close与redis异常断开都会导致返回false
if (!$sub->closed) {
// redis异常断开处理
var_dump('Redis connection is disconnected abnormally');
}
break;
}
$this->assertEquals($data->pattern, 'foo1.*');
$this->assertEquals($data->channel, 'foo1.1');
$this->assertEquals($data->payload, 'foo1data');
break;
}
Expand Down

0 comments on commit c19590f

Please sign in to comment.