Skip to content

Commit 88cbf3b

Browse files
committed
swoole server, queue consumer with redis, ready for testing and discussions
1 parent 145de45 commit 88cbf3b

File tree

8 files changed

+164
-23
lines changed

8 files changed

+164
-23
lines changed

bin/messenger.php

-21
This file was deleted.

config/config.php

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
\Netglue\PsrContainer\Messenger\ConfigProvider::class,
2323

2424
// Default App module config
25+
\Queue\App\ConfigProvider::class,
2526
\Queue\Swoole\ConfigProvider::class,
2627

2728
// Load application config in a pre-defined order in such a way that local settings

src/App/ConfigProvider.php

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?php
2+
3+
namespace Queue\App;
4+
5+
use Laminas\ServiceManager\Factory\InvokableFactory;
6+
use Netglue\PsrContainer\Messenger\Container\MessageBusStaticFactory;
7+
use Netglue\PsrContainer\Messenger\Container\Middleware\BusNameStampMiddlewareStaticFactory;
8+
use Netglue\PsrContainer\Messenger\Container\Middleware\MessageHandlerMiddlewareStaticFactory;
9+
use Netglue\PsrContainer\Messenger\Container\Middleware\MessageSenderMiddlewareStaticFactory;
10+
use Netglue\PsrContainer\Messenger\HandlerLocator\OneToManyFqcnContainerHandlerLocator;
11+
use Queue\App\Message\ExampleMessage;
12+
use Queue\App\Message\ExampleMessageHandler;
13+
use Queue\App\Message\ExampleMessageHandlerFactory;
14+
use Symfony\Component\Messenger\MessageBusInterface;
15+
16+
class ConfigProvider
17+
{
18+
public function __invoke()
19+
{
20+
return [
21+
"dependencies" => $this->getDependencies(),
22+
'symfony' => [
23+
'messenger' => [
24+
'buses' => $this->busConfig(),
25+
],
26+
],
27+
];
28+
}
29+
30+
31+
private function getDependencies()
32+
{
33+
return [
34+
"factories" => [
35+
"message_bus" => [MessageBusStaticFactory::class, "message_bus"],
36+
"message_bus_stamp_middleware" => [BusNameStampMiddlewareStaticFactory::class, "message_bus"],
37+
"message_bus_sender_middleware" => [MessageSenderMiddlewareStaticFactory::class, "message_bus"],
38+
"message_bus_handler_middleware" => [MessageHandlerMiddlewareStaticFactory::class, "message_bus"],
39+
ExampleMessageHandler::class => ExampleMessageHandlerFactory::class
40+
],
41+
"aliases" => [
42+
MessageBusInterface::class => "message_bus"
43+
]
44+
];
45+
}
46+
47+
private function busConfig()
48+
{
49+
return [
50+
"message_bus" => [
51+
'allows_zero_handlers' => false, // Means that it's an error if no handlers are defined for a given message
52+
53+
/**
54+
* Each bus needs middleware to do anything useful.
55+
*
56+
* Below is a minimal configuration to handle messages
57+
*/
58+
'middleware' => [
59+
// … Middleware that inspects the message before it has been sent to a transport would go here.
60+
"message_bus_stamp_middleware",
61+
'message_bus_sender_middleware', // Sends messages via a transport if configured.
62+
'message_bus_handler_middleware', // Executes the handlers configured for the message
63+
],
64+
65+
/**
66+
* Map messages to one or more handlers:
67+
*
68+
* Two locators are shipped, 1 message type to 1 handler and 1 message type to many handlers.
69+
* Both locators operate on the basis that handlers are available in the container.
70+
*
71+
*/
72+
'handler_locator' => OneToManyFqcnContainerHandlerLocator::class,
73+
'handlers' => [
74+
ExampleMessage::class => [ExampleMessageHandler::class],
75+
],
76+
77+
/**
78+
* Routes define which transport(s) that messages dispatched on this bus should be sent with.
79+
*
80+
* The * wildcard applies to all messages.
81+
* The transport for each route must be an array of one or more transport identifiers. Each transport
82+
* is retrieved from the DI container by this value.
83+
*
84+
* An empty routes definition would mean that messages would be handled immediately and synchronously,
85+
* i.e. no transport would be used.
86+
*
87+
* Route specific messages to specific transports by using the message name as the key.
88+
*/
89+
'routes' => [
90+
ExampleMessage::class => ["redis"],
91+
],
92+
]
93+
];
94+
}
95+
}

src/App/Message/ExampleMessage.php

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace Queue\App\Message;
4+
5+
class ExampleMessage
6+
{
7+
public function __construct(
8+
private array $payload,
9+
)
10+
{
11+
}
12+
13+
public function getPayload(): array
14+
{
15+
return $this->payload;
16+
}
17+
18+
}
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
namespace Queue\App\Message;
4+
5+
use Dot\Log\Logger;
6+
use Psr\Container\ContainerInterface;
7+
class ExampleMessageHandler
8+
{
9+
public function __construct(private readonly ContainerInterface $container)
10+
{
11+
}
12+
13+
public function __invoke(ExampleMessage $message)
14+
{
15+
16+
/** @var Logger $logger */
17+
$logger = $this->container->get("dot-log.queue-log");
18+
19+
$logger->info("message: " . $message->getPayload()['foo'] ?? null);
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
namespace Queue\App\Message;
4+
5+
use Psr\Container\ContainerInterface;
6+
7+
class ExampleMessageHandlerFactory
8+
{
9+
public function __invoke(ContainerInterface $container)
10+
{
11+
return new ExampleMessageHandler($container);
12+
}
13+
14+
}

src/Swoole/Command/StartCommand.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
7070

7171
$pidManager = $this->pidManager;
7272

73-
$server->on('start', function () use ($server, $pidManager, $processName) {
73+
$server->on('start', function () use ($server, $pidManager, $processName, $output) {
74+
$output->writeln('<info>Server started...</info>');
7475
$pidManager->write($server->master_pid, $server->manager_pid);
7576

7677
swoole_set_process_name(sprintf('%s-master', $processName));

src/Swoole/Delegators/TCPServerDelegator.php

+13-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
use Psr\Container\ContainerInterface;
66

7+
use Queue\App\Message\ExampleMessage;
78
use Swoole\Server as TCPSwooleServer;
9+
use Symfony\Component\Messenger\MessageBusInterface;
10+
use Symfony\Component\Messenger\Stamp\DelayStamp;
811

912
class TCPServerDelegator
1013
{
@@ -13,14 +16,23 @@ public function __invoke(ContainerInterface $container, string $serviceName, cal
1316
/** @var TCPSwooleServer $server */
1417
$server = $callback();
1518

19+
/** @var MessageBusInterface $bus */
20+
$bus = $container->get(MessageBusInterface::class);
21+
1622
$logger = $container->get("dot-log.queue-log");
1723

1824
$server->on('Connect', function ($server, $fd) {
1925
echo "Client: Connect.\n";
2026
});
2127

2228
// Register the function for the event `receive`
23-
$server->on('receive', function ($server, $fd, $from_id, $data) use ($logger) {
29+
$server->on('receive', function ($server, $fd, $from_id, $data) use ($logger, $bus) {
30+
31+
$bus->dispatch(new ExampleMessage(["foo" => $data]));
32+
$bus->dispatch(new ExampleMessage(["foo" => "with 5 seconds delay"]), [
33+
new DelayStamp(5000)
34+
]);
35+
2436
$server->send($fd, "Server: {$data}");
2537
$logger->notice("Request received on receive", [
2638
'fd' => $fd,

0 commit comments

Comments
 (0)