Skip to content

Commit

Permalink
Merge pull request #13 from spiral/feature/broadcasting
Browse files Browse the repository at this point in the history
Adds Broadcasting plugin support
  • Loading branch information
butschster authored Apr 29, 2022
2 parents 9b069f9 + 9223fdc commit 2fce041
Show file tree
Hide file tree
Showing 9 changed files with 566 additions and 3 deletions.
151 changes: 149 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Make sure that your server is configured with following PHP version and extensions:

- PHP 7.4+
- Spiral framework 2.9+
- Spiral framework 2.13+

## Installation

Expand All @@ -26,11 +26,13 @@ After package install you need to add bootloaders from the package in your appli

```php
use Spiral\RoadRunnerBridge\Bootloader as RoadRunnerBridge;

protected const LOAD = [
RoadRunnerBridge\HttpBootloader::class,
RoadRunnerBridge\QueueBootloader::class,
RoadRunnerBridge\CacheBootloader::class,
RoadRunnerBridge\GRPCBootloader::class,
RoadRunnerBridge\BroadcastingBootloader::class,
RoadRunnerBridge\CommandBootloader::class,
RoadRunnerBridge\TcpBootloader::class, // Optional, if need to work with TCP
// ...
Expand Down Expand Up @@ -485,7 +487,8 @@ class MyService {

### Domain specific queues

Domain specific queues are an important part of an application. You can create aliases for exists connections and use them
Domain specific queues are an important part of an application. You can create aliases for exists connections and use
them
instead of real names. When you decide to switch queue connection for alias, you can do it in one place.

```php
Expand Down Expand Up @@ -627,6 +630,7 @@ RoadRunner includes TCP server and can be used to replace classic TCP setup with
#### Bootloader

Add `Spiral\RoadRunnerBridge\Bootloader\TcpBootloader` to application bootloaders list:

```php
use Spiral\RoadRunnerBridge\Bootloader as RoadRunnerBridge;

Expand Down Expand Up @@ -726,6 +730,149 @@ class TestService implements ServiceInterface
}
```

### Broadcasting

#### Configuration

You can create config file `app/config/broadcasting.php` if you want to configure Broadcasting drivers.


```php
<?php
declare(strict_types=1);
use Psr\Log\LogLevel;
use Spiral\Broadcasting\Driver\LogBroadcast;
use Spiral\Broadcasting\Driver\NullBroadcast;
use Spiral\Core\Container\Autowire;
use Spiral\RoadRunnerBridge\Broadcasting\RoadRunnerBroadcast;
use Spiral\RoadRunnerBridge\Broadcasting\RoadRunnerGuard;
return [
'default' => env('BROADCAST_CONNECTION', 'null'),
'authorize' => [
'path' => env('BROADCAST_AUTHORIZE_PATH'),
'topics' => [
// 'topic' => static fn (ServerRequestInterface $request): bool => $request->getHeader('SECRET')[0] == 'secret',
// 'user.{id}' => static fn ($id, Actor $actor): bool => $actor->getId() === $id
],
],
'connections' => [
'null' => [
'driver' => 'null',
],
'log' => [
'driver' => 'log',
'level' => LogLevel::INFO,
],
'roadrunner' => [
'driver' => 'roadrunner',
'guard' => Autowire::wire(RoadRunnerGuard::class),
]
],
'driverAliases' => [
'null' => NullBroadcast::class,
'log' => LogBroadcast::class,
'roadrunner' => RoadRunnerBroadcast::class,
],
];
```

Configure `broadcasting` section in the RoadRunner yaml config:

```yaml
http:
address: 0.0.0.0:8000
middleware: [ "static", "gzip", "websockets", "headers" ]
static:
dir: "public"
forbid: [ ".php" ]
pool:
num_workers: 2
headers:
cors:
allowed_origin: "*"
allowed_headers: "*"
allowed_methods: "GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS"
websockets:
broker: default
path: "/ws"
broadcast:
default:
driver: memory
config: { }
```

#### Working with default driver

```php
use Spiral\Broadcasting\BroadcastInterface;
final class SendVerificationLink
{
private BroadcastInterface $broadcast;
private UserReposiory $users;
private VerificationLinkGenerator $linkGenerator;
public function __construct(
BroadcastInterface $broadcast,
UserReposiory $users,
VerificationLinkGenerator $linkGenerator
) {
$this->broadcast = $broadcast;
$this->users = $users;
$this->linkGenerator = $linkGenerator;
}
public function handle(int $userId): void
{
$user = $this->users->findByPK($userId);
// ...
$this->broadcast->publish(
'user.{$user->id}',
\sprintf('Your verification link is: %s', $this->linkGenerator->getLink($user))
);
}
}
```

#### Working with broadcasting manager

```php
use Spiral\Broadcasting\BroadcastManagerInterface;
final class SendVerificationLink
{
private BroadcastManagerInterface $broadcastManager;
private UserReposiory $users;
private VerificationLinkGenerator $linkGenerator;
public function __construct(
BroadcastManagerInterface $broadcastManager,
UserReposiory $users,
VerificationLinkGenerator $linkGenerator
) {
$this->broadcastManager = $broadcastManager;
$this->users = $users;
$this->linkGenerator = $linkGenerator;
}
public function handle(int $userId): void
{
$broadcast = $this->broadcastManager->connection('log');
// ...
}
}
```

### GRPC

The GRPC protocol provides an extremely efficient way of cross-service communication for distributed applications. The
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
"spiral/roadrunner-grpc": "^2.0",
"spiral/roadrunner-jobs": "^2.0",
"spiral/roadrunner-kv": "^2.0",
"spiral/roadrunner-broadcast": "^2.0",
"spiral/roadrunner-tcp": "^2.0",
"spiral/framework": "^2.9"
"spiral/framework": "^2.13"
},
"require-dev": {
"phpunit/phpunit": "^8.5|^9.0",
Expand Down
64 changes: 64 additions & 0 deletions src/Bootloader/BroadcastingBootloader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunnerBridge\Bootloader;

use Psr\Http\Message\ResponseFactoryInterface;
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Broadcasting\Bootloader\BroadcastingBootloader as BaseBroadcastingBootloader;
use Spiral\Broadcasting\Bootloader\WebsocketsBootloader;
use Spiral\Broadcasting\Config\BroadcastConfig;
use Spiral\Broadcasting\TopicRegistryInterface;
use Spiral\Core\InvokerInterface;
use Spiral\Core\ScopeInterface;
use Spiral\RoadRunner\Broadcast\Broadcast;
use Spiral\RoadRunner\Broadcast\BroadcastInterface;
use Spiral\RoadRunnerBridge\Broadcasting\RoadRunnerBroadcast;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunnerBridge\Broadcasting\RoadRunnerGuard;

final class BroadcastingBootloader extends Bootloader
{
protected const DEPENDENCIES = [
RoadRunnerBootloader::class,
BaseBroadcastingBootloader::class,
WebsocketsBootloader::class,
];

protected const SINGLETONS = [
BroadcastInterface::class => [self::class, 'initBroadcast'],
RoadRunnerGuard::class => [self::class, 'initRoadRunnerGuard'],
];

public function boot(BaseBroadcastingBootloader $broadcastingBootloader): void
{
$broadcastingBootloader->registerDriverAlias('roadrunner', RoadRunnerBroadcast::class);
}

private function initBroadcast(RPCInterface $rpc): BroadcastInterface
{
$broadcast = new Broadcast($rpc);

if (!$broadcast->isAvailable()) {
throw new \LogicException('The [broadcast] plugin not available');
}

return $broadcast;
}

private function initRoadRunnerGuard(
ResponseFactoryInterface $responseFactory,
InvokerInterface $invoker,
ScopeInterface $scope,
TopicRegistryInterface $registry,
BroadcastConfig $config
): RoadRunnerGuard {
return new RoadRunnerGuard(
$invoker,
$scope,
$registry,
$config['authorize']['serverCallback'] ?? null
);
}
}
47 changes: 47 additions & 0 deletions src/Broadcasting/RoadRunnerBroadcast.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunnerBridge\Broadcasting;

use Psr\Http\Message\ServerRequestInterface;
use Spiral\Broadcasting\AuthorizationStatus;
use Spiral\Broadcasting\Driver\AbstractBroadcast;
use Spiral\Broadcasting\GuardInterface;
use Spiral\RoadRunner\Broadcast\BroadcastInterface;
use Spiral\RoadRunner\Broadcast\TopicInterface;

final class RoadRunnerBroadcast extends AbstractBroadcast implements GuardInterface
{
private BroadcastInterface $broadcast;
private GuardInterface $guard;

public function __construct(
BroadcastInterface $broadcast,
GuardInterface $guard
) {
$this->broadcast = $broadcast;
$this->guard = $guard;
}

/**
* @param non-empty-list<string> $topics
* @param non-empty-list<string> $messages
*
* @throws \Spiral\RoadRunner\Broadcast\Exception\BroadcastException
*/
public function publish($topics, $messages): void
{
$this->broadcast->publish($topics, $messages);
}

public function join($topics): TopicInterface
{
return $this->broadcast->join($topics);
}

public function authorize(ServerRequestInterface $request): AuthorizationStatus
{
return $this->guard->authorize($request);
}
}
Loading

0 comments on commit 2fce041

Please sign in to comment.