Skip to content

Commit

Permalink
Merge pull request #14 from spiral/bugfix/serializer
Browse files Browse the repository at this point in the history
Adding integration with SF serializer
  • Loading branch information
butschster authored Jun 21, 2022
2 parents 056f872 + 8f8faf0 commit b8705b7
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 22 deletions.
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
"spiral/roadrunner-kv": "^2.2",
"spiral/roadrunner-broadcast": "^2.0",
"spiral/roadrunner-tcp": "^2.0",
"spiral/framework": "^3.0.x-dev"
"spiral/framework": "^3.0.x-dev",
"spiral/serializer": "^3.0"
},
"require-dev": {
"spiral/testing": "^2.0.x-dev",
"vimeo/psalm": "^4.22"
"vimeo/psalm": "^4.22",
"spiral/nyholm-bridge": "^1.2"
},
"autoload": {
"psr-4": {
Expand Down
33 changes: 25 additions & 8 deletions src/Bootloader/QueueBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
use Spiral\Core\Container;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\Queue\Bootloader\QueueBootloader as BaseQueueBootloader;
use Spiral\Queue\SerializerInterface;
use Spiral\RoadRunner\Jobs\Consumer;
use Spiral\Queue\Config\QueueConfig;
use Spiral\RoadRunnerBridge\Queue\Consumer;
use Spiral\Serializer\Bootloader\SerializerBootloader;
use Spiral\RoadRunner\Jobs\ConsumerInterface;
use Spiral\RoadRunner\Jobs\Jobs;
use Spiral\RoadRunner\Jobs\JobsInterface;
Expand All @@ -21,11 +22,13 @@
use Spiral\RoadRunnerBridge\Queue\PipelineRegistryInterface;
use Spiral\RoadRunnerBridge\Queue\Queue;
use Spiral\RoadRunnerBridge\Queue\RPCPipelineRegistry;
use Spiral\Serializer\SerializerManager;

final class QueueBootloader extends Bootloader
{
protected const DEPENDENCIES = [
RoadRunnerBootloader::class,
SerializerBootloader::class,
];

public function init(
Expand All @@ -46,17 +49,26 @@ public function init(
private function registerJobsSerializer(Container $container): void
{
$container->bindSingleton(
RRSerializerInterface::class,
static fn (SerializerInterface $serializer) => new JobsAdapterSerializer($serializer)
JobsAdapterSerializer::class,
static fn (SerializerManager $manager, QueueConfig $config) => new JobsAdapterSerializer(
$manager,
$config->getConnections('roadrunner')['roadrunner']['serializerFormat'] ?? null
)
);

$container->bindSingleton(RRSerializerInterface::class, JobsAdapterSerializer::class);
}

private function registerConsumer(Container $container): void
{
$container->bindSingleton(
ConsumerInterface::class,
static fn (WorkerInterface $worker, RRSerializerInterface $serializer): Consumer =>
new Consumer($worker, $serializer)
static fn (JobsAdapterSerializer $serializer, WorkerInterface $worker, QueueConfig $config): Consumer =>
new Consumer(
$serializer,
$worker,
$config->getConnections('roadrunner')['roadrunner']['pipelines'] ?? []
)
);
}

Expand All @@ -72,8 +84,13 @@ private function registerPipelineRegistry(Container $container)
{
$container->bind(
PipelineRegistryInterface::class,
static fn (JobsInterface $jobs, array $pipelines, array $aliases): PipelineRegistryInterface =>
new RPCPipelineRegistry($jobs, $pipelines, $aliases)
static fn (
JobsInterface $jobs,
JobsAdapterSerializer $serializer,
array $pipelines,
array $aliases
): PipelineRegistryInterface =>
new RPCPipelineRegistry($jobs, $serializer, $pipelines, $aliases)
);
}
}
56 changes: 56 additions & 0 deletions src/Queue/Consumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunnerBridge\Queue;

use Spiral\RoadRunner\Jobs\ConsumerInterface;
use Spiral\RoadRunner\Jobs\Task\ReceivedTask;
use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface;
use Spiral\RoadRunner\Payload;
use Spiral\RoadRunner\Worker;
use Spiral\RoadRunner\WorkerInterface;
use Spiral\Serializer\Exception\UnserializeException;

final class Consumer implements ConsumerInterface
{
private WorkerInterface $worker;

public function __construct(
private readonly JobsAdapterSerializer $serializer,
WorkerInterface $worker = null,
private readonly array $pipelines = [],
) {
$this->worker = $worker ?? Worker::create();
}

public function waitTask(): ?ReceivedTaskInterface
{
$payload = $this->worker->waitPayload();

if ($payload === null) {
return null;
}

$header = $this->serializer->withFormat('json')->deserialize($payload->header);

return new ReceivedTask(
$this->worker,
$header['id'],
$header['pipeline'],
$header['job'],
$this->getPayload($payload, $header['pipeline']),
(array) $header['headers']
);
}

/**
* @throws UnserializeException
*/
private function getPayload(Payload $payload, string $pipeline): array
{
return $this->serializer
->withFormat($this->pipelines[$pipeline]['serializerFormat'] ?? null)
->deserialize($payload->body);
}
}
21 changes: 17 additions & 4 deletions src/Queue/JobsAdapterSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,38 @@
namespace Spiral\RoadRunnerBridge\Queue;

use Spiral\RoadRunner\Jobs\Serializer\SerializerInterface;
use Spiral\Queue\SerializerInterface as QueueSerializerInterface;
use Spiral\Serializer\SerializerManager;

/**
* @internal
*/
final class JobsAdapterSerializer implements SerializerInterface
{
public function __construct(
private readonly QueueSerializerInterface $serializer
private readonly SerializerManager $manager,
private ?string $format = null
) {
}

public function withFormat(string $format = null): self
{
if ($format === null) {
return $this;
}

$serializer = clone $this;
$serializer->format = $format;

return $serializer;
}

public function serialize(array $payload): string
{
return $this->serializer->serialize($payload);
return (string) $this->manager->getSerializer($this->format)->serialize($payload);
}

public function deserialize(string $payload): array
{
return $this->serializer->deserialize($payload);
return $this->manager->getSerializer($this->format)->unserialize($payload);
}
}
10 changes: 9 additions & 1 deletion src/Queue/RPCPipelineRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Spiral\RoadRunner\Jobs\JobsInterface;
use Spiral\RoadRunner\Jobs\Queue\CreateInfoInterface;
use Spiral\RoadRunner\Jobs\QueueInterface;
use Spiral\RoadRunner\Jobs\Serializer\SerializerAwareInterface;

/**
* @internal
Expand All @@ -23,7 +24,8 @@ final class RPCPipelineRegistry implements PipelineRegistryInterface
* @param int $ttl Time to cache existing RoadRunner pipelines
*/
public function __construct(
private readonly JobsInterface $jobs,
private JobsInterface $jobs,
private readonly JobsAdapterSerializer $serializer,
private readonly array $pipelines,
private readonly array $aliases,
private readonly int $ttl = 60
Expand Down Expand Up @@ -54,6 +56,12 @@ public function getPipeline(string $name): QueueInterface
);
}

if ($this->jobs instanceof SerializerAwareInterface && !empty($this->pipelines[$name]['serializerFormat'])) {
$this->jobs = $this->jobs->withSerializer(
$this->serializer->withFormat($this->pipelines[$name]['serializerFormat'])
);
}

/** @var CreateInfoInterface $connector */
$connector = $this->pipelines[$name]['connector'];
$consume = (bool)($this->pipelines[$name]['consume'] ?? true);
Expand Down
5 changes: 5 additions & 0 deletions tests/app/config/queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
// php app.php queue:pause local
'consume' => true, // Optional
],
'withSerializer' => [
'connector' => new MemoryCreateInfo('local'),
'serializerFormat' => 'serializer',
'consume' => true,
]
// 'amqp' => [
// 'connector' => new AMQPCreateInfo('bus', ...),
// // Don't consume jobs for this pipeline on start
Expand Down
35 changes: 32 additions & 3 deletions tests/src/Bootloader/QueueBootloaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@
use Mockery as m;
use Spiral\Core\ConfigsInterface;
use Spiral\Exceptions\ExceptionReporterInterface;
use Spiral\Queue\Config\QueueConfig;
use Spiral\Queue\HandlerRegistryInterface;
use Spiral\Queue\QueueInterface;
use Spiral\Queue\SerializerInterface;
use Spiral\RoadRunner\Jobs\Consumer;
use Spiral\Serializer\SerializerInterface;
use Spiral\RoadRunnerBridge\Queue\Consumer;
use Spiral\RoadRunner\Jobs\ConsumerInterface;
use Spiral\RoadRunner\Jobs\Serializer\SerializerInterface as RRSerializerInterface;
use Spiral\RoadRunner\WorkerInterface;
use Spiral\RoadRunnerBridge\Queue\Dispatcher;
use Spiral\RoadRunnerBridge\Queue\JobsAdapterSerializer;
use Spiral\RoadRunnerBridge\Queue\PipelineRegistryInterface;
use Spiral\RoadRunnerBridge\Queue\RPCPipelineRegistry;
use Spiral\Serializer\SerializerManager;
use Spiral\Tests\TestCase;

final class QueueBootloaderTest extends TestCase
Expand Down Expand Up @@ -89,7 +91,7 @@ public function testGetsSerializerInterface(): void
{
$this->assertContainerBoundAsSingleton(
SerializerInterface::class,
\Spiral\Queue\DefaultSerializer::class
SerializerManager::class
);
}

Expand Down Expand Up @@ -128,4 +130,31 @@ public function testConfigShouldBeDefined(): void

$this->assertIsArray($config);
}

public function testFormatIsNull(): void
{
$serializer = $this->getContainer()->get(JobsAdapterSerializer::class);

$ref = new \ReflectionObject($serializer);

$this->assertNull($ref->getProperty('format')->getValue($serializer));
}

public function testFormatIsDefined(): void
{
$this->getContainer()->bind(QueueConfig::class, new QueueConfig([
'connections' => [
'roadrunner' => [
'driver' => 'roadrunner',
'serializerFormat' => 'defined'
]
]
]));

$serializer = $this->getContainer()->get(JobsAdapterSerializer::class);

$ref = new \ReflectionObject($serializer);

$this->assertSame('defined', $ref->getProperty('format')->getValue($serializer));
}
}
46 changes: 46 additions & 0 deletions tests/src/Queue/ConsumerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue;

use Spiral\RoadRunner\Jobs\ConsumerInterface;
use Spiral\RoadRunner\Payload;
use Spiral\Tests\TestCase;

final class ConsumerTest extends TestCase
{
public function testGetPayloadWithDefaultSerializer(): void
{
$consumer = $this->getContainer()->get(ConsumerInterface::class);

$ref = new \ReflectionMethod($consumer, 'getPayload');

// default json serializer
$payload = new Payload(\json_encode([
'test' => 'test',
'other' => 'data'
]));

$result = $ref->invoke($consumer, $payload, 'memory');

$this->assertSame(['test' => 'test', 'other' => 'data'], $result);
}

public function testGetPayloadWithConfiguredSerializer(): void
{
$consumer = $this->getContainer()->get(ConsumerInterface::class);

$ref = new \ReflectionMethod($consumer, 'getPayload');

// php serialize from config
$payload = new Payload(\serialize([
'test' => 'test',
'other' => 'data'
]));

$result = $ref->invoke($consumer, $payload, 'withSerializer');

$this->assertSame(['test' => 'test', 'other' => 'data'], $result);
}
}
2 changes: 2 additions & 0 deletions tests/src/Queue/RPCPipelineRegistryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Spiral\RoadRunner\Jobs\JobsInterface;
use Spiral\RoadRunner\Jobs\Queue\CreateInfoInterface;
use Spiral\RoadRunner\Jobs\QueueInterface;
use Spiral\RoadRunnerBridge\Queue\JobsAdapterSerializer;
use Spiral\RoadRunnerBridge\Queue\RPCPipelineRegistry;
use Spiral\Tests\TestCase;

Expand All @@ -26,6 +27,7 @@ protected function setUp(): void

$this->registry = (new RPCPipelineRegistry(
$this->jobs = m::mock(JobsInterface::class),
$this->getContainer()->get(JobsAdapterSerializer::class),
[
'memory' => [
'connector' => $this->memoryConnector = m::mock(CreateInfoInterface::class),
Expand Down
6 changes: 2 additions & 4 deletions tests/src/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
namespace Spiral\Tests;

use Spiral\Boot\EnvironmentInterface;
use Spiral\Config\Patch\Set;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\ConfigsInterface;
use Spiral\Bootloader as Framework;
use Spiral\Http\Bootloader\DiactorosBootloader;
use Spiral\Nyholm\Bootloader\NyholmBootloader;
use Spiral\RoadRunnerBridge\Bootloader as RoadRunnerBridge;

abstract class TestCase extends \Spiral\Testing\TestCase
Expand All @@ -30,7 +28,7 @@ public function defineBootloaders(): array
Framework\CommandBootloader::class,
Framework\SnapshotsBootloader::class,
Framework\Http\HttpBootloader::class,
DiactorosBootloader::class,
NyholmBootloader::class,

\Spiral\SendIt\Bootloader\MailerBootloader::class,

Expand Down

0 comments on commit b8705b7

Please sign in to comment.