Skip to content

Commit ee254c3

Browse files
committed
Added support for Beanstalkd
1 parent ab8f60e commit ee254c3

File tree

15 files changed

+354
-14
lines changed

15 files changed

+354
-14
lines changed

.travis.yml

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ services:
1010
- rabbitmq
1111

1212
before_script:
13+
- sudo apt-get install -y beanstalkd
14+
- sudo beanstalkd -l 127.0.0.1 -p 11300 &
1315
- composer require satooshi/php-coveralls:dev-master --dev --no-progress --prefer-source
1416
- mkdir -p build/logs
1517

Vagrantfile

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ Vagrant.configure("2") do |config|
77
v.customize ["modifyvm", :id, "--memory", 1024]
88
end
99

10+
config.vm.network "private_network", ip: "192.168.50.4"
11+
config.vm.synced_folder ".", "/vagrant", type: "nfs"
12+
1013
config.vm.provision :shell, :path => "vagrant/bootstrap.sh"
1114

1215
end

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
"php": ">=5.4.0"
1616
},
1717
"require-dev": {
18-
"videlalvaro/php-amqplib": "~2.0"
18+
"videlalvaro/php-amqplib": "~2.0",
19+
"pda/pheanstalk": "~2.0"
1920
},
2021
"suggest": {
2122
"videlalvaro/php-amqplib": "Driver for RabbitMQ"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
namespace MyCLabs\Work\Adapter\Beanstalkd;
4+
5+
use MyCLabs\Work\Dispatcher\WorkDispatcher;
6+
use MyCLabs\Work\Dispatcher\WorkDispatcherEventTrait;
7+
use MyCLabs\Work\Task\Task;
8+
use Pheanstalk_PheanstalkInterface;
9+
10+
/**
11+
* Beanstalkd implementation.
12+
*
13+
* @author Matthieu Napoli <[email protected]>
14+
*/
15+
class BeanstalkdWorkDispatcher implements WorkDispatcher
16+
{
17+
use WorkDispatcherEventTrait;
18+
19+
/**
20+
* @var Pheanstalk_PheanstalkInterface
21+
*/
22+
private $connection;
23+
24+
/**
25+
* @var string
26+
*/
27+
private $tube;
28+
29+
/**
30+
* @param Pheanstalk_PheanstalkInterface $connection
31+
* @param string $tube
32+
*/
33+
public function __construct(Pheanstalk_PheanstalkInterface $connection, $tube)
34+
{
35+
$this->connection = $connection;
36+
$this->tube = $tube;
37+
}
38+
39+
/**
40+
* {@inheritdoc}
41+
*/
42+
public function run(Task $task)
43+
{
44+
// Event: before dispatching the task
45+
$this->triggerEvent(self::EVENT_BEFORE_TASK_DISPATCHED, [$task]);
46+
47+
// Event: before serialization
48+
$this->triggerEvent(self::EVENT_BEFORE_TASK_SERIALIZATION, [$task]);
49+
50+
$this->connection->putInTube($this->tube, serialize($task));
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
namespace MyCLabs\Work\Adapter\Beanstalkd;
4+
5+
use Exception;
6+
use MyCLabs\Work\Worker\WorkerEventTrait;
7+
use MyCLabs\Work\Worker\WorkerTaskExecutorTrait;
8+
use MyCLabs\Work\Task\Task;
9+
use MyCLabs\Work\Worker\Worker;
10+
use Pheanstalk_Job;
11+
use Pheanstalk_PheanstalkInterface;
12+
13+
/**
14+
* Beanstalkd implementation.
15+
*
16+
* @author Matthieu Napoli <[email protected]>
17+
*/
18+
class BeanstalkdWorker implements Worker
19+
{
20+
use WorkerEventTrait;
21+
use WorkerTaskExecutorTrait;
22+
23+
/**
24+
* @var Pheanstalk_PheanstalkInterface
25+
*/
26+
private $connection;
27+
28+
/**
29+
* @var string
30+
*/
31+
private $tube;
32+
33+
/**
34+
* @param Pheanstalk_PheanstalkInterface $connection
35+
* @param string $tube
36+
*/
37+
public function __construct(Pheanstalk_PheanstalkInterface $connection, $tube)
38+
{
39+
$this->connection = $connection;
40+
$this->tube = $tube;
41+
}
42+
43+
/**
44+
* {@inheritdoc}
45+
*/
46+
public function work($count = null)
47+
{
48+
while (is_null($count) || ($count > 0)) {
49+
/** @var Pheanstalk_Job $job */
50+
$job = $this->connection->reserveFromTube($this->tube);
51+
52+
$task = unserialize($job->getData());
53+
54+
if ($task instanceof Task) {
55+
$this->execute($task);
56+
} else {
57+
// This is not a task, we bury the job
58+
$this->connection->bury($job);
59+
}
60+
61+
if (! is_null($count)) {
62+
$count--;
63+
}
64+
}
65+
}
66+
67+
private function execute(Task $task)
68+
{
69+
try {
70+
$this->triggerEvent(self::EVENT_AFTER_TASK_UNSERIALIZATION, [$task]);
71+
$this->triggerEvent(self::EVENT_BEFORE_TASK_EXECUTION, [$task]);
72+
73+
// Execute the task
74+
$this->getExecutor($task)->execute($task);
75+
76+
$this->triggerEvent(self::EVENT_BEFORE_TASK_FINISHED, [$task]);
77+
$this->triggerEvent(self::EVENT_ON_TASK_SUCCESS, [$task, false]);
78+
} catch (Exception $e) {
79+
$this->triggerEvent(self::EVENT_ON_TASK_ERROR, [$task, $e, false]);
80+
}
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
worker.log
2+
dispatch-task.log
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
<?php
2+
3+
namespace Test\MyCLabs\Work\FunctionalTest\Beanstalkd;
4+
5+
use MyCLabs\Work\Adapter\Beanstalkd\BeanstalkdWorkDispatcher;
6+
use MyCLabs\Work\Adapter\Beanstalkd\BeanstalkdWorker;
7+
use Pheanstalk_Pheanstalk;
8+
use PHPUnit_Framework_TestCase;
9+
use Test\MyCLabs\Work\FunctionalTest\FakeTask;
10+
11+
/**
12+
* Test executing tasks through Beanstalkd
13+
*/
14+
class BeanstalkdTest extends PHPUnit_Framework_TestCase
15+
{
16+
const QUEUE_PREFIX = 'myclabs_work_test';
17+
18+
/**
19+
* @var Pheanstalk_Pheanstalk
20+
*/
21+
private $connection;
22+
23+
/**
24+
* @var string
25+
*/
26+
private $tube;
27+
28+
public function setUp()
29+
{
30+
$this->connection = new Pheanstalk_Pheanstalk('127.0.0.1');
31+
if (! $this->connection->getConnection()->isServiceListening()) {
32+
// Beanstalkd not installed, mark test skipped
33+
$this->markTestSkipped('Beanstalkd is not installed or was not found');
34+
return;
35+
}
36+
$this->tube = self::QUEUE_PREFIX . '_' . rand();
37+
}
38+
39+
public function testSimpleRun()
40+
{
41+
$dispatcher = new BeanstalkdWorkDispatcher($this->connection, $this->tube);
42+
43+
// Pile up a task to execute
44+
$task = new FakeTask();
45+
$dispatcher->run($task);
46+
47+
// Run the worker to execute the task
48+
$worker = new BeanstalkdWorker($this->connection, $this->tube);
49+
50+
// Check that event methods are called
51+
$listener = $this->getMock('MyCLabs\Work\Worker\Event\WorkerEventListener');
52+
$listener->expects($this->once())
53+
->method('beforeTaskExecution');
54+
$listener->expects($this->once())
55+
->method('onTaskSuccess');
56+
$worker->registerEventListener($listener);
57+
58+
// Fake task executor
59+
$taskExecutor = $this->getMockForAbstractClass('MyCLabs\Work\TaskExecutor\TaskExecutor');
60+
$taskExecutor->expects($this->once())
61+
->method('execute')
62+
->with($task);
63+
$worker->registerTaskExecutor(get_class($task), $taskExecutor);
64+
65+
// Work
66+
$worker->work(1);
67+
}
68+
69+
public function testRunWithException()
70+
{
71+
$workDispatcher = new BeanstalkdWorkDispatcher($this->connection, $this->tube);
72+
73+
// Pile up a task to execute
74+
$task = new FakeTask();
75+
$workDispatcher->run($task);
76+
77+
// Run the worker to execute the task
78+
$worker = new BeanstalkdWorker($this->connection, $this->tube);
79+
80+
// Check that event methods are called
81+
$listener = $this->getMock('MyCLabs\Work\Worker\Event\WorkerEventListener');
82+
$listener->expects($this->once())
83+
->method('beforeTaskExecution');
84+
$listener->expects($this->once())
85+
->method('onTaskError');
86+
$worker->registerEventListener($listener);
87+
88+
// Fake task executor
89+
$taskExecutor = $this->getMockForAbstractClass('MyCLabs\Work\TaskExecutor\TaskExecutor');
90+
$taskExecutor->expects($this->once())
91+
->method('execute')
92+
->with($task)
93+
->will($this->throwException(new \Exception()));
94+
$worker->registerTaskExecutor(get_class($task), $taskExecutor);
95+
96+
// Work
97+
$worker->work(1);
98+
}
99+
100+
/**
101+
* Test a scenario where tasks are piling up in Beanstalkd
102+
*/
103+
public function testTaskQueuing()
104+
{
105+
$workDispatcher = new BeanstalkdWorkDispatcher($this->connection, $this->tube);
106+
107+
// Queue 2 tasks
108+
$workDispatcher->run(new FakeTask(), 0.1);
109+
$workDispatcher->run(new FakeTask(), 0.1);
110+
111+
$file = __DIR__ . '/worker.php';
112+
113+
// Process first task
114+
$status = shell_exec("php $file {$this->tube} 0 2>&1");
115+
$this->assertSame("ok", trim($status));
116+
117+
// Process second task
118+
$status = shell_exec("php $file {$this->tube} 0 2>&1");
119+
$this->assertSame("ok", trim($status));
120+
}
121+
}
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
use MyCLabs\Work\Adapter\Beanstalkd\BeanstalkdWorker;
4+
use MyCLabs\Work\Task\Task;
5+
use MyCLabs\Work\TaskExecutor\TaskExecutor;
6+
7+
ini_set('error_reporting', E_ALL);
8+
ini_set('display_errors', true);
9+
10+
require_once __DIR__ . '/../../../vendor/autoload.php';
11+
12+
$tube = $argv[1];
13+
$error = $argv[2];
14+
15+
class FakeTaskExecutor implements TaskExecutor
16+
{
17+
public function execute(Task $task)
18+
{
19+
global $error;
20+
if ($error) {
21+
throw new \Exception('foo');
22+
}
23+
echo "ok";
24+
}
25+
}
26+
27+
$worker = new BeanstalkdWorker(new Pheanstalk_Pheanstalk('127.0.0.1'), $tube);
28+
$worker->registerTaskExecutor('Test\MyCLabs\Work\FunctionalTest\FakeTask', new FakeTaskExecutor());
29+
30+
// Execute 1 task
31+
$worker->work(1);

tests/FunctionalTest/RabbitMQ/FakeTask.php renamed to tests/FunctionalTest/FakeTask.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Test\MyCLabs\Work\FunctionalTest\RabbitMQ;
3+
namespace Test\MyCLabs\Work\FunctionalTest;
44

55
use MyCLabs\Work\Task\Task;
66

tests/FunctionalTest/RabbitMQ/RabbitMQTest.php

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use PhpAmqpLib\Connection\AMQPConnection;
1010
use PhpAmqpLib\Exception\AMQPRuntimeException;
1111
use PHPUnit_Framework_TestCase;
12+
use Test\MyCLabs\Work\FunctionalTest\FakeTask;
1213

1314
/**
1415
* Test executing tasks through RabbitMQ
@@ -209,7 +210,7 @@ public function testWorkWithWait()
209210
$worker = new RabbitMQWorker($this->channel, $this->queue);
210211
/** @var TaskExecutor $taskExecutor */
211212
$taskExecutor = $this->getMockForAbstractClass('MyCLabs\Work\TaskExecutor\TaskExecutor');
212-
$worker->registerTaskExecutor('Test\MyCLabs\Work\FunctionalTest\RabbitMQ\FakeTask', $taskExecutor);
213+
$worker->registerTaskExecutor('Test\MyCLabs\Work\FunctionalTest\FakeTask', $taskExecutor);
213214

214215
// Run the task dispatcher as background task (it will emit 1 task and wait for it)
215216
$file = __DIR__ . '/dispatch-task.php';
@@ -254,7 +255,7 @@ public function testWorkWithWaitDispatcherTimeout()
254255
// The task executes in 500ms
255256
usleep(500000);
256257
}));
257-
$worker->registerTaskExecutor('Test\MyCLabs\Work\FunctionalTest\RabbitMQ\FakeTask', $taskExecutor);
258+
$worker->registerTaskExecutor('Test\MyCLabs\Work\FunctionalTest\FakeTask', $taskExecutor);
258259

259260
// Run the task dispatcher as background task (it will emit 1 task and wait for it)
260261
$file = __DIR__ . '/dispatch-task.php';
@@ -290,7 +291,7 @@ public function testWorkerStartAfterDispatcherTimeout()
290291
{
291292
$worker = new RabbitMQWorker($this->channel, $this->queue);
292293
$taskExecutor = $this->getMockForAbstractClass('MyCLabs\Work\TaskExecutor\TaskExecutor');
293-
$worker->registerTaskExecutor('Test\MyCLabs\Work\FunctionalTest\RabbitMQ\FakeTask', $taskExecutor);
294+
$worker->registerTaskExecutor('Test\MyCLabs\Work\FunctionalTest\FakeTask', $taskExecutor);
294295

295296
// Run the task dispatcher and wait for it to timeout and finish
296297
$file = __DIR__ . '/dispatch-task.php';

tests/FunctionalTest/RabbitMQ/dispatch-task.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
use MyCLabs\Work\Adapter\RabbitMQ\RabbitMQWorkDispatcher;
44
use PhpAmqpLib\Connection\AMQPConnection;
5-
use Test\MyCLabs\Work\FunctionalTest\RabbitMQ\FakeTask;
5+
use Test\MyCLabs\Work\FunctionalTest\FakeTask;
66

77
ini_set('error_reporting', E_ALL);
88
ini_set('display_errors', true);

tests/FunctionalTest/RabbitMQ/worker.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public function execute(Task $task)
3030
$channel = $connection->channel();
3131

3232
$worker = new RabbitMQWorker($channel, $queue);
33-
$worker->registerTaskExecutor('Test\MyCLabs\Work\FunctionalTest\RabbitMQ\FakeTask', new FakeTaskExecutor());
33+
$worker->registerTaskExecutor('Test\MyCLabs\Work\FunctionalTest\FakeTask', new FakeTaskExecutor());
3434

3535
// Execute 1 task
3636
$worker->work(1);

0 commit comments

Comments
 (0)