Skip to content

Commit 134e920

Browse files
committed
Mongo db transport
0 parents  commit 134e920

30 files changed

+2022
-0
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

MongodbConnectionFactory.php

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
<?php
2+
3+
namespace Enqueue\Mongodb;
4+
5+
use Interop\Queue\PsrConnectionFactory;
6+
use MongoDB\Client;
7+
8+
class MongodbConnectionFactory implements PsrConnectionFactory
9+
{
10+
/**
11+
* @var array
12+
*/
13+
private $config;
14+
15+
/**
16+
* The config could be an array, string DSN or null. In case of null it will attempt to connect to Mongodb localhost with default credentials.
17+
*
18+
* $config = [
19+
* 'uri' => 'mongodb://127.0.0.1/' - Mongodb connection string. see http://docs.mongodb.org/manual/reference/connection-string/
20+
* 'dbname' => 'enqueue', - database name.
21+
* 'collection_name' => 'enqueue' - collection name
22+
* 'polling_interval' => '1000', - How often query for new messages (milliseconds)
23+
* ]
24+
*
25+
* or
26+
*
27+
* mongodb://127.0.0.1:27017/dbname/collection_name?polling_interval=1000
28+
*
29+
* @param array|string|null $config
30+
*/
31+
public function __construct($config = 'mongodb:')
32+
{
33+
if (empty($config)) {
34+
$config = $this->parseDsn('mongodb:');
35+
} elseif (is_string($config)) {
36+
$config = $this->parseDsn($config);
37+
} elseif (is_array($config)) {
38+
} else {
39+
throw new \LogicException('The config must be either an array of options, a DSN string or null');
40+
}
41+
$config = array_replace([
42+
'uri' => 'mongodb://127.0.0.1/',
43+
], $config);
44+
45+
$this->config = $config;
46+
}
47+
48+
public function createContext()
49+
{
50+
$client = new Client($this->config['uri']);
51+
52+
return new MongodbContext($client, $this->config);
53+
}
54+
55+
public static function parseDsn($dsn)
56+
{
57+
$parsedUrl = parse_url($dsn);
58+
if (false === $parsedUrl) {
59+
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
60+
}
61+
if (empty($parsedUrl['scheme'])) {
62+
throw new \LogicException('Schema is empty');
63+
}
64+
$supported = [
65+
'mongodb' => true,
66+
];
67+
if (false == isset($parsedUrl['scheme'])) {
68+
throw new \LogicException(sprintf(
69+
'The given DSN schema "%s" is not supported. There are supported schemes: "%s".',
70+
$parsedUrl['scheme'],
71+
implode('", "', array_keys($supported))
72+
));
73+
}
74+
if ('mongodb:' === $dsn) {
75+
return [
76+
'uri' => 'mongodb://127.0.0.1/',
77+
];
78+
}
79+
$config['uri'] = $dsn;
80+
if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) {
81+
$pathParts = explode('/', $parsedUrl['path']);
82+
//DB name
83+
if ($pathParts[1]) {
84+
$config['dbname'] = $pathParts[1];
85+
}
86+
}
87+
if (isset($parsedUrl['query'])) {
88+
$queryParts = null;
89+
parse_str($parsedUrl['query'], $queryParts);
90+
//get enqueue attributes values
91+
if (!empty($queryParts['polling_interval'])) {
92+
$config['polling_interval'] = $queryParts['polling_interval'];
93+
}
94+
if (!empty($queryParts['enqueue_collection'])) {
95+
$config['collection_name'] = $queryParts['enqueue_collection'];
96+
}
97+
}
98+
99+
return $config;
100+
}
101+
}

MongodbConsumer.php

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
<?php
2+
3+
namespace Enqueue\Mongodb;
4+
5+
use Interop\Queue\InvalidMessageException;
6+
use Interop\Queue\PsrConsumer;
7+
use Interop\Queue\PsrMessage;
8+
9+
class MongodbConsumer implements PsrConsumer
10+
{
11+
/**
12+
* @var MongodbContext
13+
*/
14+
private $context;
15+
16+
/**
17+
* @var MongodbDestination
18+
*/
19+
private $queue;
20+
21+
/**
22+
* @var int microseconds
23+
*/
24+
private $pollingInterval = 1000000;
25+
26+
/**
27+
* @param MongodbContext $context
28+
* @param MongodbDestination $queue
29+
*/
30+
public function __construct(MongodbContext $context, MongodbDestination $queue)
31+
{
32+
$this->context = $context;
33+
$this->queue = $queue;
34+
}
35+
36+
/**
37+
* Set polling interval in milliseconds.
38+
*
39+
* @param int $msec
40+
*/
41+
public function setPollingInterval($msec)
42+
{
43+
$this->pollingInterval = $msec * 1000;
44+
}
45+
46+
/**
47+
* Get polling interval in milliseconds.
48+
*
49+
* @return int
50+
*/
51+
public function getPollingInterval()
52+
{
53+
return (int) $this->pollingInterval / 1000;
54+
}
55+
56+
/**
57+
* {@inheritdoc}
58+
*
59+
* @return MongodbDestination
60+
*/
61+
public function getQueue()
62+
{
63+
return $this->queue;
64+
}
65+
66+
/**
67+
* {@inheritdoc}
68+
*
69+
* @return MongodbMessage|null
70+
*/
71+
public function receive($timeout = 0)
72+
{
73+
$timeout /= 1000;
74+
$startAt = microtime(true);
75+
76+
while (true) {
77+
$message = $this->receiveMessage();
78+
79+
if ($message) {
80+
return $message;
81+
}
82+
83+
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
84+
return;
85+
}
86+
87+
usleep($this->pollingInterval);
88+
89+
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
90+
return;
91+
}
92+
}
93+
}
94+
95+
/**
96+
* {@inheritdoc}
97+
*
98+
* @return MongodbMessage|null
99+
*/
100+
public function receiveNoWait()
101+
{
102+
return $this->receiveMessage();
103+
}
104+
105+
/**
106+
* {@inheritdoc}
107+
*
108+
* @param MongodbMessage $message
109+
*/
110+
public function acknowledge(PsrMessage $message)
111+
{
112+
// does nothing
113+
}
114+
115+
/**
116+
* {@inheritdoc}
117+
*
118+
* @param MongodbMessage $message
119+
*/
120+
public function reject(PsrMessage $message, $requeue = false)
121+
{
122+
InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class);
123+
124+
if ($requeue) {
125+
$this->context->createProducer()->send($this->queue, $message);
126+
127+
return;
128+
}
129+
}
130+
131+
/**
132+
* @return MongodbMessage|null
133+
*/
134+
protected function receiveMessage()
135+
{
136+
$now = time();
137+
$collection = $this->context->getCollection();
138+
$message = $collection->findOneAndDelete(
139+
[
140+
'$or' => [
141+
['delayed_until' => ['$exists' => false]],
142+
['delayed_until' => ['$lte' => $now]],
143+
],
144+
],
145+
[
146+
'sort' => ['priority' => -1, 'published_at' => 1],
147+
'typeMap' => ['root' => 'array', 'document' => 'array'],
148+
]
149+
);
150+
151+
if (!$message) {
152+
return null;
153+
}
154+
if (empty($message['time_to_live']) || $message['time_to_live'] > time()) {
155+
return $this->convertMessage($message);
156+
}
157+
}
158+
159+
/**
160+
* @param array $dbalMessage
161+
*
162+
* @return MongodbMessage
163+
*/
164+
protected function convertMessage(array $mongodbMessage)
165+
{
166+
$message = $this->context->createMessage($mongodbMessage['body'], $mongodbMessage['properties'], $mongodbMessage['headers']);
167+
$message->setId((string) $mongodbMessage['_id']);
168+
$message->setPriority((int) $mongodbMessage['priority']);
169+
$message->setRedelivered((bool) $mongodbMessage['redelivered']);
170+
$message->setPublishedAt((int) $mongodbMessage['published_at']);
171+
172+
return $message;
173+
}
174+
}

MongodbContext.php

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?php
2+
3+
namespace Enqueue\Mongodb;
4+
5+
use Interop\Queue\InvalidDestinationException;
6+
use Interop\Queue\PsrContext;
7+
use Interop\Queue\PsrDestination;
8+
use MongoDB\Client;
9+
10+
class MongodbContext implements PsrContext
11+
{
12+
/**
13+
* @var array
14+
*/
15+
private $config;
16+
/**
17+
* @var Client
18+
*/
19+
private $client;
20+
21+
public function __construct($client, array $config = [])
22+
{
23+
$this->config = array_replace([
24+
'dbname' => 'enqueue',
25+
'collection_name' => 'enqueue',
26+
'polling_interval' => null,
27+
], $config);
28+
$this->client = $client;
29+
}
30+
31+
public function createMessage($body = '', array $properties = [], array $headers = [])
32+
{
33+
$message = new MongodbMessage();
34+
$message->setBody($body);
35+
$message->setProperties($properties);
36+
$message->setHeaders($headers);
37+
38+
return $message;
39+
}
40+
41+
public function createTopic($name)
42+
{
43+
return new MongodbDestination($name);
44+
}
45+
46+
public function createQueue($queueName)
47+
{
48+
return new MongodbDestination($queueName);
49+
}
50+
51+
public function createTemporaryQueue()
52+
{
53+
throw new \BadMethodCallException('Mongodb transport does not support temporary queues');
54+
}
55+
56+
public function createProducer()
57+
{
58+
return new MongodbProducer($this);
59+
}
60+
61+
public function createConsumer(PsrDestination $destination)
62+
{
63+
InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class);
64+
65+
$consumer = new MongodbConsumer($this, $destination);
66+
67+
if (isset($this->config['polling_interval'])) {
68+
$consumer->setPollingInterval($this->config['polling_interval']);
69+
}
70+
71+
return $consumer;
72+
}
73+
74+
public function close()
75+
{
76+
// TODO: Implement close() method.
77+
}
78+
79+
public function getCollection()
80+
{
81+
return $this->client
82+
->selectDatabase($this->config['dbname'])
83+
->selectCollection($this->config['collection_name'])
84+
;
85+
}
86+
87+
/**
88+
* @return Client
89+
*/
90+
public function getClient()
91+
{
92+
return $this->client;
93+
}
94+
95+
/**
96+
* @return array
97+
*/
98+
public function getConfig()
99+
{
100+
return $this->config;
101+
}
102+
}

0 commit comments

Comments
 (0)