-
Notifications
You must be signed in to change notification settings - Fork 8
/
EnqueueMessageProducerTest.php
131 lines (101 loc) · 4.91 KB
/
EnqueueMessageProducerTest.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
<?php
/**
* This file is part of prooph/psb-enqueue-producer.
* (c) 2017-2021 Alexander Miertsch <[email protected]>
* (c) 2017-2021 Sascha-Oliver Prolic <[email protected]>
* (c) 2017-2021 Formapro <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
declare(strict_types=1);
namespace ProophTest\ServiceBus\Enqueue\Functional;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
use Enqueue\SimpleClient\SimpleClient;
use PHPUnit\Framework\TestCase;
use Prooph\Common\Messaging\FQCNMessageFactory;
use Prooph\Common\Messaging\NoOpMessageConverter;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Message\Enqueue\EnqueueMessageProcessor;
use Prooph\ServiceBus\Message\Enqueue\EnqueueMessageProducer;
use Prooph\ServiceBus\Message\Enqueue\EnqueueSerializer;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use Prooph\ServiceBus\QueryBus;
use ProophTest\ServiceBus\Mock\DoSomething;
use ProophTest\ServiceBus\Mock\MessageHandler;
use ProophTest\ServiceBus\Mock\SomethingDone;
use Symfony\Component\Filesystem\Filesystem;
class EnqueueMessageProducerTest extends TestCase
{
/**
* @var SimpleClient
*/
private $client;
/**
* @var EnqueueSerializer
*/
private $serializer;
protected function setUp(): void
{
(new Filesystem())->remove(__DIR__.'/queues/');
$this->client = new SimpleClient('file://'.__DIR__.'/queues');
$this->client->getQueueConsumer()->setReceiveTimeout(1);
$this->serializer = new EnqueueSerializer(new FQCNMessageFactory(), new NoOpMessageConverter());
}
/**
* @test
*/
public function it_sends_a_command_to_queue_pulls_it_with_consumer_and_forwards_it_to_command_bus(): void
{
$command = new DoSomething(['data' => 'test command']);
//The message dispatcher works with a ready-to-use enqueue producer and one queue
$messageProducer = new EnqueueMessageProducer($this->client->getProducer(), $this->serializer, 'prooph_bus', 2000);
//Set up command bus which will receive the command message from the enqueue consumer
$consumerCommandBus = new CommandBus();
$doSomethingHandler = new MessageHandler();
$router = new CommandRouter();
$router->route($command->messageName())->to($doSomethingHandler);
$router->attachToMessageBus($consumerCommandBus);
$enqueueProcessor = new EnqueueMessageProcessor($consumerCommandBus, new EventBus(), new QueryBus(), $this->serializer);
$this->client->bindCommand('prooph_bus', $enqueueProcessor);
//Normally you would send the command on a command bus. We skip this step here cause we are only
//interested in the function of the message dispatcher
$messageProducer($command);
$this->client->consume(new ChainExtension([
new LimitConsumedMessagesExtension(2),
new LimitConsumptionTimeExtension(new \DateTime('now + 1 seconds')),
]));
$this->assertNotNull($doSomethingHandler->getLastMessage());
$this->assertEquals($command->payload(), $doSomethingHandler->getLastMessage()->payload());
}
/**
* @test
*/
public function it_sends_an_event_to_queue_pulls_it_with_consumer_and_forwards_it_to_event_bus(): void
{
$event = new SomethingDone(['data' => 'test event']);
//The message dispatcher works with a ready-to-use enqueue producer and one queue
$messageProducer = new EnqueueMessageProducer($this->client->getProducer(), $this->serializer, 'prooph_bus', 2000);
//Set up event bus which will receive the event message from the enqueue consumer
$consumerEventBus = new EventBus();
$somethingDoneListener = new MessageHandler();
$router = new EventRouter();
$router->route($event->messageName())->to($somethingDoneListener);
$router->attachToMessageBus($consumerEventBus);
$enqueueProcessor = new EnqueueMessageProcessor(new CommandBus(), $consumerEventBus, new QueryBus(), $this->serializer);
$this->client->bindCommand('prooph_bus', $enqueueProcessor);
//Normally you would send the event on a event bus. We skip this step here cause we are only
//interested in the function of the message dispatcher
$messageProducer($event);
$this->client->consume(new ChainExtension([
new LimitConsumedMessagesExtension(2),
new LimitConsumptionTimeExtension(new \DateTime('now + 1 seconds')),
]));
$this->assertNotNull($somethingDoneListener->getLastMessage());
$this->assertEquals($event->payload(), $somethingDoneListener->getLastMessage()->payload());
}
}