Skip to content

Commit

Permalink
Added support for Pheanstalk 4
Browse files Browse the repository at this point in the history
  • Loading branch information
waltertamboer committed Jan 31, 2020
1 parent c46269f commit 3ef2787
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 49 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
},
"require-dev": {
"chesszebra/coding-standards": "dev-master",
"pda/pheanstalk": "^3.1",
"pda/pheanstalk": "^4.0",
"phpunit/phpunit": "^6.3",
"squizlabs/php_codesniffer": "^3.0"
},
Expand Down
31 changes: 17 additions & 14 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions src/Storage/Pheanstalk.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

namespace ChessZebra\JobSystem\Storage;

use ArrayObject;
use ChessZebra\JobSystem\Job\JobInterface;
use ChessZebra\JobSystem\Storage\Pheanstalk\StoredJob;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Job;
use Pheanstalk\PheanstalkInterface;
use function assert;
use function json_encode;
use function microtime;
Expand Down Expand Up @@ -73,7 +74,12 @@ public function addJob(JobInterface $job): void
$delay = $job->getDelay() ?: PheanstalkInterface::DEFAULT_DELAY;
$ttr = $job->getTimeToRun() ?: PheanstalkInterface::DEFAULT_TTR;

$this->connection->putInTube($tube, $data, $priority, $delay, $ttr);
$tubeBackup = $this->connection->listTubeUsed();

$this->connection->useTube($tube);
$this->connection->put($data, $priority, $delay, $ttr);

$this->connection->useTube($tubeBackup);
}

/**
Expand Down Expand Up @@ -124,14 +130,16 @@ public function rescheduleJob(StoredJobInterface $storedJob, ?int $delay, ?int $
*/
public function retrieveJob(): ?StoredJobInterface
{
$job = $this->connection->reserve($this->getReserveTimeout());
$job = $this->connection->reserve();
assert($job instanceof Job || $job === null);

if (!$job) {
if ($job === null) {
return null;
}

$stats = $this->connection->statsJob($job)->getArrayCopy();
$statsResponse = $this->connection->statsJob($job);

$stats = (new ArrayObject($statsResponse))->getArrayCopy();

return new StoredJob($job, $stats);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Storage/Pheanstalk/StoredJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
use ChessZebra\JobSystem\Job\Job;
use ChessZebra\JobSystem\Job\JobInterface;
use ChessZebra\JobSystem\Storage\StoredJobInterface;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Job as PheanstalkJob;
use Pheanstalk\PheanstalkInterface;
use RuntimeException;
use function array_key_exists;
use function json_decode;
Expand Down
2 changes: 1 addition & 1 deletion tests/Storage/Pheanstalk/StoredJobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
namespace ChessZebra\JobSystem\Storage\Pheanstalk;

use ChessZebra\JobSystem\Job\JobInterface;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Job;
use Pheanstalk\PheanstalkInterface;
use PHPUnit\Framework\TestCase;
use RuntimeException;
use function json_encode;
Expand Down
54 changes: 27 additions & 27 deletions tests/Storage/PheanstalkTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
use ArrayObject;
use ChessZebra\JobSystem\Job\Job;
use ChessZebra\JobSystem\Storage\Pheanstalk\StoredJob;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Contract\ResponseInterface;
use Pheanstalk\Job as PheanstalkJob;
use Pheanstalk\PheanstalkInterface;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use PHPUnit_Framework_MockObject_MockObject;
use function json_encode;

final class PheanstalkTest extends TestCase
{
/**
* The connection used to create tests.
*
* @var PHPUnit_Framework_MockObject_MockObject
* @var MockObject|PheanstalkInterface
*/
private $connection;

Expand Down Expand Up @@ -82,8 +83,9 @@ public function testSetGetReserveTimeout(): void
public function testAddJobWithNullQueue(): void
{
// Arrange
$this->connection->expects($this->once())->method('putInTube')->with(
$this->equalTo('default')
$this->connection->expects(static::exactly(2))->method('useTube')->withConsecutive(
[static::equalTo('default')],
[static::equalTo('')]
);

$storage = new Pheanstalk($this->connection);
Expand All @@ -105,8 +107,9 @@ public function testAddJobWithNullQueue(): void
public function testAddJobWithQueueName(): void
{
// Arrange
$this->connection->expects($this->once())->method('putInTube')->with(
$this->equalTo('awesome')
$this->connection->expects(static::exactly(2))->method('useTube')->withConsecutive(
[static::equalTo('awesome')],
[static::equalTo('')]
);

$storage = new Pheanstalk($this->connection);
Expand All @@ -130,8 +133,8 @@ public function testDeleteJob(): void
// Arrange
$job = new PheanstalkJob(123, '');

$this->connection->expects($this->once())->method('delete')->with(
$this->equalTo($job)
$this->connection->expects(static::once())->method('delete')->with(
static::equalTo($job)
);

$storage = new Pheanstalk($this->connection);
Expand All @@ -155,8 +158,8 @@ public function testFailJob(): void
// Arrange
$job = new PheanstalkJob(123, '');

$this->connection->expects($this->once())->method('bury')->with(
$this->equalTo($job)
$this->connection->expects(static::once())->method('bury')->with(
static::equalTo($job)
);

$storage = new Pheanstalk($this->connection);
Expand All @@ -183,8 +186,8 @@ public function testRescheduleJob(): void
'data' => [],
]));

$this->connection->expects($this->once())->method('release')->with(
$this->equalTo($job)
$this->connection->expects(static::once())->method('release')->with(
static::equalTo($job)
);

$storage = new Pheanstalk($this->connection);
Expand All @@ -211,11 +214,8 @@ public function testRescheduleJob(): void
public function testRetrieveJobWithoutJobPresent(): void
{
// Arrange
$this->connection->expects($this->once())->method('reserve')->with(
$this->equalTo(1)
);

$this->connection->expects($this->never())->method('statsJob');
$this->connection->expects(static::once())->method('reserve');
$this->connection->expects(static::never())->method('statsJob');

$storage = new Pheanstalk($this->connection);

Expand All @@ -236,13 +236,13 @@ public function testRetrieveJobWithJobPresent(): void
// Arrange
$job = new PheanstalkJob(123, '');

$this->connection->expects($this->once())->method('reserve')->with(
$this->equalTo(1)
)->willReturn($job);
$this->connection->expects(static::once())->method('reserve')->willReturn($job);

$this->connection->expects($this->once())->method('statsJob')->with(
$this->equalTo($job)
)->willReturn(new ArrayObject(['ttr' => 0]));
$statsResponse = $this->getMockForAbstractClass(ResponseInterface::class);

$this->connection->expects(static::once())->method('statsJob')->with(
static::equalTo($job)
)->willReturn($statsResponse);

$storage = new Pheanstalk($this->connection);

Expand All @@ -264,9 +264,9 @@ public function testPingJob(): void
$job = new PheanstalkJob(123, '');
$storedJob = new StoredJob($job, []);

$this->connection->expects($this->once())->method('touch')->with(
$this->equalTo($job)
)->willReturn($job);
$this->connection->expects(static::once())->method('touch')->with(
static::equalTo($job)
);

$storage = new Pheanstalk($this->connection);

Expand Down

0 comments on commit 3ef2787

Please sign in to comment.