Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Jul 2, 2024
1 parent 16118db commit 5e70e03
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 176 deletions.
8 changes: 4 additions & 4 deletions src/Events/ClientEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,24 @@ public function response(TcpConnection $connection, array $request): void
return;
}
// 当前链接没有订阅这个channel
if (!isset(PushServer::_getConnectionProperty($connection, 'channels')[$channel])) {
if (!isset(PushServer::getConnectionProperty($connection, 'channels')[$channel])) {
PushServer::error($connection, null, 'Client event rejected - you didn\'t subscribe this channel');
return;
}
// 客户端触发事件必须是private 或者 presence的channel
$channelType = PushServer::_getChannelType($channel);
$channelType = PushServer::getChannelType($channel);
if ($channelType !== CHANNEL_TYPE_PRIVATE and $channelType !== CHANNEL_TYPE_PRESENCE) {
PushServer::error($connection, null, 'Client event rejected - only supported on private and presence channels');
return;
}
try {
// 广播 客户端消息
PushServer::publish(PushServer::$publishTypeClient, [
'appKey' => PushServer::_getConnectionProperty($connection,'appKey'),
'appKey' => PushServer::getConnectionProperty($connection,'appKey'),
'channel' => $channel,
'event' => $this->getEvent(),
'data' => $data,
'socketId' => PushServer::_getConnectionProperty($connection,'socketId')
'socketId' => PushServer::getConnectionProperty($connection,'socketId')
]);
} catch (RedisException $exception) {
Log::channel('plugin.workbunny.webman-push-server.error')
Expand Down
24 changes: 12 additions & 12 deletions src/Events/Subscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public function response(TcpConnection $connection, array $request): void
$channelData = $request['data']['channel_data'] ?? [];
$clientAuth = $request['data']['auth'] ?? '';
$auth = self::auth(
$appKey = PushServer::_getConnectionProperty($connection, 'appKey'),
$appKey = PushServer::getConnectionProperty($connection, 'appKey'),
PushServer::getConfig('apps_query')($appKey)['app_secret'],
PushServer::_getConnectionProperty($connection, 'socketId'),
PushServer::getConnectionProperty($connection, 'socketId'),
$channel,
$channelData
);
// private- 和 presence- 开头的channel需要验证
switch ($channelType = PushServer::_getChannelType($channel)){
switch ($channelType = PushServer::getChannelType($channel)){
case CHANNEL_TYPE_PRESENCE:
if (!$channelData) {
PushServer::error($connection, null, 'Empty channel_data');
Expand Down Expand Up @@ -122,17 +122,17 @@ public static function auth(string $appKey, string $appSecret, string $socketId,
public static function subscribeChannel(TcpConnection $connection, string $channel, string $type, string ...$params): void
{
try {
$appKey = PushServer::_getConnectionProperty($connection, 'appKey');
$socketId = PushServer::_getConnectionProperty($connection, 'socketId');
$channels = PushServer::_getConnectionProperty($connection, 'channels');
$appKey = PushServer::getConnectionProperty($connection, 'appKey');
$socketId = PushServer::getConnectionProperty($connection, 'socketId');
$channels = PushServer::getConnectionProperty($connection, 'channels');
$userId = $params[0] ?? 'unknown';
$userInfo = $params[1] ?? '{}';
// 为当前进程增加订阅的通道
PushServer::_setChannel($appKey, $channel, $socketId);
PushServer::setChannel($appKey, $channel, $socketId);

$storage = PushServer::getStorageClient();
// 通道是否已经被建立
$channelExists = $storage->exists($key = PushServer::_getChannelStorageKey($appKey, $channel));
$channelExists = $storage->exists($key = PushServer::getChannelStorageKey($appKey, $channel));
if (!$channelExists) {
/** @see PushServer::$_storage */
$storage->hSet($key, 'type', $type);
Expand All @@ -153,15 +153,15 @@ public static function subscribeChannel(TcpConnection $connection, string $chann
$type = $channels[$channel] ?? null;
if (!$type) {
$channels[$channel] = $type;
PushServer::_setConnectionProperty($connection, 'channels', $channels);
PushServer::_setConnection($appKey, $socketId, $socketId);
PushServer::setConnectionProperty($connection, 'channels', $channels);
PushServer::setConnection($appKey, $socketId, $socketId);
// 递增订阅数
/** @see PushServer::$_storage */
$storage->hIncrBy($key,'subscription_count', 1);
}
// 如果是presence通道
if ($isPresence = ($type === CHANNEL_TYPE_PRESENCE)) {
if (!$storage->exists($userKey = PushServer::_getUserStorageKey($appKey, $channel, $userId))) {
if (!$storage->exists($userKey = PushServer::getUserStorageKey($appKey, $channel, $userId))) {
$storage->hIncrBy($key ,'user_count', 1);
$storage->hMSet($userKey, [
'user_id' => $userId,
Expand Down Expand Up @@ -199,7 +199,7 @@ public static function subscribeChannel(TcpConnection $connection, string $chann
$channel,
EVENT_SUBSCRIPTION_SUCCEEDED,
$isPresence ?
PushServer::_getPresenceChannelDataForSubscribe($appKey, $channel) :
PushServer::getPresenceChannelDataForSubscribe($appKey, $channel) :
'{}'
);
} catch (RedisException $exception){
Expand Down
18 changes: 9 additions & 9 deletions src/Events/Unsubscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Unsubscribe extends AbstractEvent
public function response(TcpConnection $connection, array $request): void
{
$channel = $request['data']['channel'] ?? '';
switch ($channelType = PushServer::_getChannelType($channel)) {
switch ($channelType = PushServer::getChannelType($channel)) {
case CHANNEL_TYPE_PUBLIC:
case CHANNEL_TYPE_PRIVATE:
self::unsubscribeChannel($connection, $channel, $channelType);
Expand Down Expand Up @@ -64,16 +64,16 @@ public function response(TcpConnection $connection, array $request): void
public static function unsubscribeChannel(TcpConnection $connection, string $channel, ?string $uid = null, bool $send = true): void
{
try {
$appKey = PushServer::_getConnectionProperty($connection, 'appKey');
$socketId = PushServer::_getConnectionProperty($connection, 'socketId');
$channels = PushServer::_getConnectionProperty($connection, 'channels');
$appKey = PushServer::getConnectionProperty($connection, 'appKey');
$socketId = PushServer::getConnectionProperty($connection, 'socketId');
$channels = PushServer::getConnectionProperty($connection, 'channels');

if ($type = $channels[$channel] ?? null) {
$storage = PushServer::getStorageClient();
// presence通道
if ($type === CHANNEL_TYPE_PRESENCE) {
if ($users = $storage->keys(PushServer::_getUserStorageKey($appKey, $channel, $uid))) {
$userCount = $storage->hIncrBy(PushServer::_getChannelStorageKey($appKey, $channel), 'user_count', -count($users));
if ($users = $storage->keys(PushServer::getUserStorageKey($appKey, $channel, $uid))) {
$userCount = $storage->hIncrBy(PushServer::getChannelStorageKey($appKey, $channel), 'user_count', -count($users));
if ($userCount <= 0) {
$storage->del(...$users);
}
Expand All @@ -95,7 +95,7 @@ public static function unsubscribeChannel(TcpConnection $connection, string $cha
}
}
// 查询通道订阅数量
$subCount = $storage->hIncrBy($key = PushServer::_getChannelStorageKey($appKey, $channel), 'subscription_count', -1);
$subCount = $storage->hIncrBy($key = PushServer::getChannelStorageKey($appKey, $channel), 'subscription_count', -1);
if ($subCount <= 0) {
$storage->del($key);
// 内部事件广播 通道被移除事件
Expand All @@ -113,8 +113,8 @@ public static function unsubscribeChannel(TcpConnection $connection, string $cha
}
// 移除通道
unset($channels[$channel]);
PushServer::_setConnectionProperty($connection, 'channels', $channels);
PushServer::_unsetChannels($appKey, $channel, $socketId);
PushServer::setConnectionProperty($connection, 'channels', $channels);
PushServer::unsetChannels($appKey, $channel, $socketId);
if ($send) {
/**
* 发送退订成功事件消息
Expand Down
Loading

0 comments on commit 5e70e03

Please sign in to comment.