Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[11.x] Introduce OnQueue and OnConnection attributes for jobs, Mailables, Notifications, queued event listeners #54229

Open
wants to merge 18 commits into
base: 11.x
Choose a base branch
from
Open
17 changes: 14 additions & 3 deletions src/Illuminate/Events/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
use Illuminate\Foundation\Queue\InteractsWithQueueAndConnection;
use Illuminate\Support\Arr;
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
use Illuminate\Support\Traits\Macroable;
use Illuminate\Support\Traits\ReflectsClosures;
use ReflectionClass;

use function Illuminate\Support\enum_value;

class Dispatcher implements DispatcherContract
{
use Macroable, ReflectsClosures;
use InteractsWithQueueAndConnection, Macroable, ReflectsClosures;

/**
* The IoC container instance.
Expand Down Expand Up @@ -618,13 +621,21 @@ protected function queueHandler($class, $method, $arguments)
{
[$listener, $job] = $this->createListenerAndJob($class, $method, $arguments);

/**
* We determine connection and queue based on the ways the user may specify it for the listener,
* stopping once we find the first value set. In order of precedence: via* methods,
* property, and finally On* attributes.
*/
$connection = $this->resolveQueue()->connection(method_exists($listener, 'viaConnection')
? (isset($arguments[0]) ? $listener->viaConnection($arguments[0]) : $listener->viaConnection())
: $listener->connection ?? null);
: $listener->connection
?? enum_value($this->getConnectionFromOnConnectionAttribute($listenerReflectionClass = new ReflectionClass($listener)))
);

$queue = method_exists($listener, 'viaQueue')
? (isset($arguments[0]) ? $listener->viaQueue($arguments[0]) : $listener->viaQueue())
: $listener->queue ?? null;
: $listener->queue
?? enum_value($this->getQueueFromOnQueueAttribute($listenerReflectionClass ?? new ReflectionClass($listener)));

$delay = method_exists($listener, 'withDelay')
? (isset($arguments[0]) ? $listener->withDelay($arguments[0]) : $listener->withDelay())
Expand Down
33 changes: 33 additions & 0 deletions src/Illuminate/Foundation/Bus/PendingDispatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Cache\Repository as Cache;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Foundation\Queue\InteractsWithQueueAndConnection;
use Illuminate\Foundation\Queue\InteractsWithUniqueJobs;

class PendingDispatch
{
use InteractsWithQueueAndConnection;
use InteractsWithUniqueJobs;

/**
Expand Down Expand Up @@ -189,6 +191,36 @@ public function getJob()
return $this->job;
}

/**
* Set the job's queue and connection values based on the job's OnQueue/OnConnection attributes.
*
* @return void
*
* @throws \ReflectionException
*/
protected function setQueueAndConnectionFromAttributesIfNotSet(): void
{
$hasQueueSet = isset($this->job->queue);
$hasConnectionSet = isset($this->job->connection);

if ($hasQueueSet && $hasConnectionSet) {
return;
}

$reflectionClass = new \ReflectionClass($this->job);
if (! $hasQueueSet) {
if ($queue = $this->getQueueFromOnQueueAttribute($reflectionClass)) {
$this->onQueue($queue);
}
}

if (! $hasConnectionSet) {
if ($connection = $this->getConnectionFromOnConnectionAttribute($reflectionClass)) {
$this->onConnection($connection);
}
}
}

/**
* Dynamically proxy methods to the underlying job.
*
Expand All @@ -211,6 +243,7 @@ public function __call($method, $parameters)
public function __destruct()
{
$this->addUniqueJobInformationToContext($this->job);
$this->setQueueAndConnectionFromAttributesIfNotSet();

if (! $this->shouldDispatch()) {
$this->removeUniqueJobInformationFromContext($this->job);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

namespace Illuminate\Foundation\Queue;

use ReflectionClass;

trait InteractsWithQueueAndConnection
{
/**
* Extract the connection from OnConnection attribute if present.
*
* @param \ReflectionClass $reflectionClass
* @return string|\UnitEnum|null
*/
protected function getConnectionFromOnConnectionAttribute(ReflectionClass $reflectionClass)
{
$onConnection = $reflectionClass->getAttributes(OnConnection::class);
if ($onConnection === []) {
return null;
}

return $onConnection[0]->newInstance()->connection;
}

/**
* Extract the queue from OnQueue attribute if present.
*
* @param \ReflectionClass $reflectionClass
* @return string|\UnitEnum|null
*/
protected function getQueueFromOnQueueAttribute(ReflectionClass $reflectionClass)
{
$onQueue = $reflectionClass->getAttributes(OnQueue::class);
if ($onQueue === []) {
return null;
}

return $onQueue[0]->newInstance()->queue;
}
}
17 changes: 17 additions & 0 deletions src/Illuminate/Foundation/Queue/OnConnection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Illuminate\Foundation\Queue;

use Attribute;

#[Attribute(Attribute::TARGET_CLASS)]
class OnConnection
{
/**
* @param string|\UnitEnum $connection
* @return void
*/
public function __construct(public $connection)
{
}
}
17 changes: 17 additions & 0 deletions src/Illuminate/Foundation/Queue/OnQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Illuminate\Foundation\Queue;

use Attribute;

#[Attribute(Attribute::TARGET_CLASS)]
class OnQueue
{
/**
* @param string|\UnitEnum $queue
* @return void
*/
public function __construct(public $queue)
{
}
}
53 changes: 40 additions & 13 deletions src/Illuminate/Mail/Mailable.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Illuminate\Contracts\Support\Htmlable;
use Illuminate\Contracts\Support\Renderable;
use Illuminate\Contracts\Translation\HasLocalePreference;
use Illuminate\Foundation\Queue\InteractsWithQueueAndConnection;
use Illuminate\Support\Collection;
use Illuminate\Support\HtmlString;
use Illuminate\Support\Str;
Expand All @@ -28,9 +29,11 @@
use Symfony\Component\Mailer\Header\TagHeader;
use Symfony\Component\Mime\Address;

use function Illuminate\Support\enum_value;

class Mailable implements MailableContract, Renderable
{
use Conditionable, ForwardsCalls, Localizable, Tappable, Macroable {
use Conditionable, ForwardsCalls, Localizable, Tappable, InteractsWithQueueAndConnection, Macroable {
__call as macroCall;
}

Expand Down Expand Up @@ -227,12 +230,8 @@ public function queue(Queue $queue)
return $this->later($this->delay, $queue);
}

$connection = property_exists($this, 'connection') ? $this->connection : null;

$queueName = property_exists($this, 'queue') ? $this->queue : null;

return $queue->connection($connection)->pushOn(
$queueName ?: null, $this->newQueuedJob()
return $queue->connection($this->getConnection())->pushOn(
$this->getQueue(), $this->newQueuedJob()
);
}

Expand All @@ -245,12 +244,8 @@ public function queue(Queue $queue)
*/
public function later($delay, Queue $queue)
{
$connection = property_exists($this, 'connection') ? $this->connection : null;

$queueName = property_exists($this, 'queue') ? $this->queue : null;

return $queue->connection($connection)->laterOn(
$queueName ?: null, $delay, $this->newQueuedJob()
return $queue->connection($this->getConnection())->laterOn(
$this->getQueue(), $delay, $this->newQueuedJob()
);
}

Expand Down Expand Up @@ -467,6 +462,38 @@ protected function buildSubject($message)
return $this;
}

/**
* Get the queue specified on the class or from the OnQueue attribute.
*
* @return string|null
*/
protected function getQueue()
{
$queue = property_exists($this, 'queue') ? $this->queue : null;

if ($queue === null) {
$queue = $this->getQueueFromOnQueueAttribute(new ReflectionClass($this));
}

return $queue !== null ? enum_value($queue) : null;
}

/**
* Get the connection specified on the class or from the OnConnection attribute.
*
* @return string|null
*/
protected function getConnection()
{
$connection = property_exists($this, 'connection') ? $this->connection : null;

if ($connection === null) {
$connection = $this->getConnectionFromOnConnectionAttribute(new ReflectionClass($this));
}

return $connection !== null ? enum_value($connection) : null;
}

/**
* Add all of the attachments to the message.
*
Expand Down
19 changes: 16 additions & 3 deletions src/Illuminate/Notifications/NotificationSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
use Illuminate\Contracts\Translation\HasLocalePreference;
use Illuminate\Database\Eloquent\Collection as EloquentCollection;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Foundation\Queue\InteractsWithQueueAndConnection;
use Illuminate\Notifications\Events\NotificationSending;
use Illuminate\Notifications\Events\NotificationSent;
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
use Illuminate\Support\Traits\Localizable;

use function Illuminate\Support\enum_value;

class NotificationSender
{
use Localizable;
use InteractsWithQueueAndConnection, Localizable;

/**
* The notification manager instance.
Expand Down Expand Up @@ -199,13 +202,23 @@ protected function queueNotification($notifiables, $notification)
$notification->locale = $this->locale;
}

$connection = $notification->connection;
$connection = enum_value(
$notification->connection
?? $this->getConnectionFromOnConnectionAttribute(
$notifiableReflectionClass = new \ReflectionClass($original)
)
);

if (method_exists($notification, 'viaConnections')) {
$connection = $notification->viaConnections()[$channel] ?? null;
}

$queue = $notification->queue;
$queue = enum_value(
$notification->queue
?? $this->getQueueFromOnQueueAttribute(
$notifiableReflectionClass ?? new \ReflectionClass($original)
)
);

if (method_exists($notification, 'viaQueues')) {
$queue = $notification->viaQueues()[$channel] ?? null;
Expand Down
Loading