Skip to content

Commit 3f00436

Browse files
committed
client
1 parent 134e920 commit 3f00436

File tree

5 files changed

+547
-3
lines changed

5 files changed

+547
-3
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
/phpunit.xml
55
/vendor/
66
/.idea/
7+
/examples/

Client/MongodbDriver.php

+186
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
<?php
2+
3+
namespace Enqueue\Mongodb\Client;
4+
5+
use Enqueue\Client\Config;
6+
use Enqueue\Client\DriverInterface;
7+
use Enqueue\Client\Message;
8+
use Enqueue\Client\MessagePriority;
9+
use Enqueue\Client\Meta\QueueMetaRegistry;
10+
use Enqueue\Mongodb\MongodbContext;
11+
use Enqueue\Mongodb\MongodbMessage;
12+
use Interop\Queue\PsrMessage;
13+
use Psr\Log\LoggerInterface;
14+
use Psr\Log\NullLogger;
15+
16+
class MongodbDriver implements DriverInterface
17+
{
18+
/**
19+
* @var MongodbContext
20+
*/
21+
private $context;
22+
23+
/**
24+
* @var Config
25+
*/
26+
private $config;
27+
28+
/**
29+
* @var QueueMetaRegistry
30+
*/
31+
private $queueMetaRegistry;
32+
33+
/**
34+
* @var array
35+
*/
36+
private static $priorityMap = [
37+
MessagePriority::VERY_LOW => 0,
38+
MessagePriority::LOW => 1,
39+
MessagePriority::NORMAL => 2,
40+
MessagePriority::HIGH => 3,
41+
MessagePriority::VERY_HIGH => 4,
42+
];
43+
44+
/**
45+
* @param MongodbContext $context
46+
* @param Config $config
47+
* @param QueueMetaRegistry $queueMetaRegistry
48+
*/
49+
public function __construct(MongodbContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
50+
{
51+
$this->context = $context;
52+
$this->config = $config;
53+
$this->queueMetaRegistry = $queueMetaRegistry;
54+
}
55+
56+
/**
57+
* {@inheritdoc}
58+
*
59+
* @return MongodbMessage
60+
*/
61+
public function createTransportMessage(Message $message)
62+
{
63+
$properties = $message->getProperties();
64+
65+
$headers = $message->getHeaders();
66+
$headers['content_type'] = $message->getContentType();
67+
68+
$transportMessage = $this->context->createMessage();
69+
$transportMessage->setBody($message->getBody());
70+
$transportMessage->setHeaders($headers);
71+
$transportMessage->setProperties($properties);
72+
$transportMessage->setMessageId($message->getMessageId());
73+
$transportMessage->setTimestamp($message->getTimestamp());
74+
$transportMessage->setDeliveryDelay($message->getDelay());
75+
$transportMessage->setReplyTo($message->getReplyTo());
76+
$transportMessage->setCorrelationId($message->getCorrelationId());
77+
if (array_key_exists($message->getPriority(), self::$priorityMap)) {
78+
$transportMessage->setPriority(self::$priorityMap[$message->getPriority()]);
79+
}
80+
81+
return $transportMessage;
82+
}
83+
84+
/**
85+
* @param MongodbMessage $message
86+
*
87+
* {@inheritdoc}
88+
*/
89+
public function createClientMessage(PsrMessage $message)
90+
{
91+
$clientMessage = new Message();
92+
93+
$clientMessage->setBody($message->getBody());
94+
$clientMessage->setHeaders($message->getHeaders());
95+
$clientMessage->setProperties($message->getProperties());
96+
97+
$clientMessage->setContentType($message->getHeader('content_type'));
98+
$clientMessage->setMessageId($message->getMessageId());
99+
$clientMessage->setTimestamp($message->getTimestamp());
100+
$clientMessage->setDelay($message->getDeliveryDelay());
101+
$clientMessage->setReplyTo($message->getReplyTo());
102+
$clientMessage->setCorrelationId($message->getCorrelationId());
103+
104+
$priorityMap = array_flip(self::$priorityMap);
105+
$priority = array_key_exists($message->getPriority(), $priorityMap) ?
106+
$priorityMap[$message->getPriority()] :
107+
MessagePriority::NORMAL;
108+
$clientMessage->setPriority($priority);
109+
110+
return $clientMessage;
111+
}
112+
113+
/**
114+
* {@inheritdoc}
115+
*/
116+
public function sendToRouter(Message $message)
117+
{
118+
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
119+
throw new \LogicException('Topic name parameter is required but is not set');
120+
}
121+
122+
$queue = $this->createQueue($this->config->getRouterQueueName());
123+
$transportMessage = $this->createTransportMessage($message);
124+
125+
$this->context->createProducer()->send($queue, $transportMessage);
126+
}
127+
128+
/**
129+
* {@inheritdoc}
130+
*/
131+
public function sendToProcessor(Message $message)
132+
{
133+
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
134+
throw new \LogicException('Processor name parameter is required but is not set');
135+
}
136+
137+
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
138+
throw new \LogicException('Queue name parameter is required but is not set');
139+
}
140+
141+
$transportMessage = $this->createTransportMessage($message);
142+
$destination = $this->createQueue($queueName);
143+
144+
$this->context->createProducer()->send($destination, $transportMessage);
145+
}
146+
147+
/**
148+
* {@inheritdoc}
149+
*/
150+
public function createQueue($queueName)
151+
{
152+
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();
153+
154+
return $this->context->createQueue($transportName);
155+
}
156+
157+
/**
158+
* {@inheritdoc}
159+
*/
160+
public function setupBroker(LoggerInterface $logger = null)
161+
{
162+
$logger = $logger ?: new NullLogger();
163+
$log = function ($text, ...$args) use ($logger) {
164+
$logger->debug(sprintf('[MongodbDriver] ' . $text, ...$args));
165+
};
166+
$contextConfig = $this->context->getConfig();
167+
$log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']);
168+
$this->context->createCollection();
169+
}
170+
171+
/**
172+
* {@inheritdoc}
173+
*/
174+
public function getConfig()
175+
{
176+
return $this->config;
177+
}
178+
179+
/**
180+
* @return array
181+
*/
182+
public static function getPriorityMap()
183+
{
184+
return self::$priorityMap;
185+
}
186+
}

MongodbConnectionFactory.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class MongodbConnectionFactory implements PsrConnectionFactory
2424
*
2525
* or
2626
*
27-
* mongodb://127.0.0.1:27017/dbname/collection_name?polling_interval=1000
27+
* mongodb://127.0.0.1:27017/dbname?polling_interval=1000&enqueue_collection=enqueue
2828
*
2929
* @param array|string|null $config
3030
*/

MongodbContext.php

+8-2
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ public function getCollection()
8080
{
8181
return $this->client
8282
->selectDatabase($this->config['dbname'])
83-
->selectCollection($this->config['collection_name'])
84-
;
83+
->selectCollection($this->config['collection_name']);
8584
}
8685

8786
/**
@@ -99,4 +98,11 @@ public function getConfig()
9998
{
10099
return $this->config;
101100
}
101+
102+
public function createCollection()
103+
{
104+
$collection = $this->getCollection();
105+
$collection->createIndex(['priority' => -1, 'published_at' => 1], ['name' => 'enqueue_priority']);
106+
$collection->createIndex(['delayed_until' => 1], ['name' => 'enqueue_delayed']);
107+
}
102108
}

0 commit comments

Comments
 (0)