Skip to content

🔌 RoadRunner Jobs (Queue) SDK

License

Notifications You must be signed in to change notification settings

roadrunner-php/jobs

Repository files navigation

RoadRunner Jobs Plugin

PHP Version Require Latest Stable Version phpunit psalm Codecov Total Downloads StyleCI

This repository contains the codebase PHP bridge using RoadRunner Jobs plugin.

Installation

To install application server and Jobs codebase

composer require spiral/roadrunner-jobs

You can use the convenient installer to download the latest available compatible version of RoadRunner assembly:

composer require spiral/roadrunner-cli --dev
vendor/bin/rr get

Configuration

First you need to add at least one jobs adapter to your RoadRunner configuration. For example, such a configuration would be quite feasible to run:

rpc:
  listen: tcp://127.0.0.1:6001

server:
  command: php consumer.php
  relay: pipes

jobs:
  consume: [ "local" ]
  pipelines:
    local:
      driver: memory
      config:
        priority: 10
        prefetch: 10000

Note Read more about all available drivers on the documentation page.

After starting the server with this configuration, one driver named local will be available to you.

Usage

Producer

The following code will allow writing and reading an arbitrary value from the RoadRunner server.

<?php

use Spiral\RoadRunner\Jobs\Jobs;
use Spiral\Goridge\RPC\RPC;

require __DIR__ . '/vendor/autoload.php';

// Jobs service
$jobs = new Jobs(RPC::create('tcp://127.0.0.1:6001'));

// Select "local" pipeline from jobs
$queue = $jobs->connect('local');

// Create task prototype with default headers
$task = $queue->create('ping', '{"site": "https://example.com"}') // Create task with "echo" name
    ->withHeader('attempts', 4) // Number of attempts to execute the task
    ->withHeader('retry-delay', 10); // Delay between attempts

// Push "echo" task to the queue
$task = $queue->dispatch($task);

var_dump($task->getId() . ' has been queued');

Consumer

The Consumer processes tasks from RoadRunner server and responds based on the processing outcome:

  • ack - is used for positive acknowledgements.
  • nack - is used for negative acknowledgements.
  • requeue - is used for requeuing the task.

The behavior of the nack method depends on its implementation by the queue driver. It can accept an additional parameter redelivery; if it is passed and set to true, the task will be requeued. However, not all drivers support this functionality. If the redelivery parameter is not passed, set to false, or the queue driver's implementation does not support it, the task will not be requeued.

$task->nack(message: $reason, redelivery: true);

The requeue method is implemented by RoadRunner and does not depend on the queue driver. It allows you to resend the task to the end of the queue and add additional headers to the task.

$task->withHeader('attempts', (string) ($attempts + 1))->requeue($exception);

The nack and requeue methods have the ability to specify a delay for requeuing the task. To do this, call the withDelay method and pass the desired value before invoking the nack or requeue methods.

$task->withDelay(10)->requeue($exception);
<?php

use Spiral\RoadRunner\Jobs\Consumer;
use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface;

require __DIR__ . '/vendor/autoload.php';

$consumer = new Spiral\RoadRunner\Jobs\Consumer();

/** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */
while ($task = $consumer->waitTask()) {
    try {
        $name = $task->getName(); // "ping"
        $queue = $task->getQueue(); // "local"
        $driver = $task->getDriver(); // "memory"
        $payload = $task->getPayload(); // {"site": "https://example.com"}
    
        // Process task

        $task->ack();
    } catch (\Throwable $e) {
        $task->requeue($e);
    }
}
try Spiral Framework

License

The MIT License (MIT). Please see LICENSE for more information. Maintained by Spiral Scout.