Skip to content

Commit

Permalink
Add proper command deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
flavioheleno committed Apr 7, 2022
1 parent a90f87b commit d69e591
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 18 deletions.
42 changes: 35 additions & 7 deletions src/Application/Processor/Handler/PackageDiscoveryHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace App\Application\Processor\Handler;

use App\Application\Handler\DependencyUpdatedEvent;
use App\Application\Message\Command\PackageDiscoveryCommand;
use App\Application\Message\Event\Dependency\DependencyCreatedEvent;
use App\Application\Message\Event\Package\PackageUpdatedEvent;
use App\Application\Message\Event\Version\VersionCreatedEvent;
Expand Down Expand Up @@ -51,26 +52,53 @@ public function __construct(
* Retrieves package metadata from packagist.org
* - List of avaialble versions
* - List of required dependencies per version
* - Package statistics
*
* @param array{
* appId?: string,
* correlationId?: string,
* expiration?: string,
* headers?: array<string, mixed>,
* isRedelivery?: bool,
* messageId?: string,
* priority?: \Courier\Message\EnvelopePriorityEnum,
* replyTo?: string,
* timestamp?: \DateTimeImmutable|null,
* type?: string,
* userId?: string
* } $attributes
*/
public function __invoke(CommandInterface $command, array $attributes = []): HandlerResultEnum {
static $lastPackage = '';
static $lastUniqueId = '';
static $lastTimestamp = 0;

if (($command instanceof PackageDiscoveryCommand) === false) {
$this->logger->critical(
sprintf(
'Invalid command argument for PackageDiscoveryHandler: "%s"',
$command::class
)
);

return HandlerResultEnum::Reject;
}

try {
$package = $command->getPackage();

$packageName = $package->getName();

// check for job duplication
$uniqueId = $packageName;
$timestamp = ($attributes['timestamp'] ?? new DateTimeImmutable())->getTimestamp();
if (
$command->forceExecution() === false &&
$lastPackage === $packageName &&
time() - $lastTimestamp < 10
$lastUniqueId === $uniqueId &&
$timestamp - $lastTimestamp < 10
) {
$this->logger->debug(
'Package discovery handler: Skipping duplicated job',
[
'package' => $packageName,
'lastUniqueId' => $lastUniqueId,
'lastTimestamp' => (new DateTimeImmutable)->setTimestamp($lastTimestamp)->format(DateTimeInterface::ATOM)
]
);
Expand Down Expand Up @@ -261,8 +289,8 @@ static function (string $key): bool {
}

// update deduplication guards
$lastPackage = $packageName;
$lastTimestamp = ($attributes['timestamp'] ?? new DateTimeImmutable())->getTimestamp();
$lastUniqueId = $uniqueId;
$lastTimestamp = $timestamp;

return HandlerResultEnum::Accept;
} catch (Exception $exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace App\Application\Processor\Handler;

use App\Application\Message\Command\UpdateDependencyStatusCommand;
use App\Application\Message\Event\Dependency\DependencyUpdatedEvent;
use App\Domain\Dependency\DependencyRepositoryInterface;
use App\Domain\Dependency\DependencyStatusEnum;
Expand All @@ -13,6 +14,7 @@
use Courier\Processor\Handler\InvokeHandlerInterface;
use DateTimeImmutable;
use DateTimeInterface;
use Exception;
use Psr\Log\LoggerInterface;

class UpdateDependencyStatusHandler implements InvokeHandlerInterface {
Expand All @@ -32,25 +34,58 @@ public function __construct(

/**
* Updates all dependency references that "require" or "require-dev" $package
*
* @param array{
* appId?: string,
* correlationId?: string,
* expiration?: string,
* headers?: array<string, mixed>,
* isRedelivery?: bool,
* messageId?: string,
* priority?: \Courier\Message\EnvelopePriorityEnum,
* replyTo?: string,
* timestamp?: \DateTimeImmutable|null,
* type?: string,
* userId?: string
* } $attributes
*/
public function __invoke(CommandInterface $command, array $attributes = []): HandlerResultEnum {
static $lastPackage = '';
static $lastUniqueId = '';
static $lastTimestamp = 0;

if (($command instanceof UpdateDependencyStatusCommand) === false) {
$this->logger->critical(
sprintf(
'Invalid command argument for UpdateDependencyStatusHandler: "%s"',
$command::class
)
);

return HandlerResultEnum::Reject;
}

try {
$package = $command->getPackage();

$packageName = $package->getName();

// check for job duplication
$uniqueId = sprintf(
'%s%s',
$packageName,
$package->getLatestVersion()
);
$timestamp = ($attributes['timestamp'] ?? new DateTimeImmutable())->getTimestamp();
if (
$command->forceExecution() === false &&
$lastPackage === $packageName &&
time() - $lastTimestamp < 10
$lastUniqueId === $uniqueId &&
$timestamp - $lastTimestamp < 10
) {
$this->logger->debug(
'Update dependency status handler: Skipping duplicated job',
[
'package' => $packageName,
'uniqueId' => $uniqueId,
'lastTimestamp' => (new DateTimeImmutable)->setTimestamp($lastTimestamp)->format(DateTimeInterface::ATOM)
]
);
Expand Down Expand Up @@ -98,8 +133,8 @@ public function __invoke(CommandInterface $command, array $attributes = []): Han
}

// update deduplication guards
$lastPackage = $packageName;
$lastTimestamp = ($attributes['timestamp'] ?? new DateTimeImmutable())->getTimestamp();
$lastUniqueId = $uniqueId;
$lastTimestamp = $timestamp;

return HandlerResultEnum::Accept;
} catch (Exception $exception) {
Expand Down
46 changes: 40 additions & 6 deletions src/Application/Processor/Handler/UpdateVersionStatusHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace App\Application\Processor\Handler;

use App\Application\Message\Command\UpdateVersionStatusCommand;
use App\Application\Message\Event\Version\VersionUpdatedEvent;
use App\Domain\Dependency\Dependency;
use App\Domain\Dependency\DependencyRepositoryInterface;
Expand All @@ -15,6 +16,7 @@
use Courier\Processor\Handler\InvokeHandlerInterface;
use DateTimeImmutable;
use DateTimeInterface;
use Exception;
use Psr\Log\LoggerInterface;

final class UpdateVersionStatusHandler implements InvokeHandlerInterface {
Expand All @@ -37,26 +39,58 @@ public function __construct(

/**
* Updates the version status that requires $dependency
*
* @param array{
* appId?: string,
* correlationId?: string,
* expiration?: string,
* headers?: array<string, mixed>,
* isRedelivery?: bool,
* messageId?: string,
* priority?: \Courier\Message\EnvelopePriorityEnum,
* replyTo?: string,
* timestamp?: \DateTimeImmutable|null,
* type?: string,
* userId?: string
* } $attributes
*/
public function __invoke(CommandInterface $command, array $attributes = []): HandlerResultEnum {
static $lastDependency = '';
static $lastUniqueId = '';
static $lastTimestamp = 0;
try {

if (($command instanceof UpdateVersionStatusCommand) === false) {
$this->logger->critical(
sprintf(
'Invalid command argument for UpdateVersionStatusHandler: "%s"',
$command::class
)
);

return HandlerResultEnum::Reject;
}

try {
$dependency = $command->getDependency();

$dependencyName = $dependency->getName();

// check for job duplication
$uniqueId = sprintf(
'%s%d',
$dependencyName,
$dependency->getVersionId()
);
$timestamp = ($attributes['timestamp'] ?? new DateTimeImmutable())->getTimestamp();
if (
$command->forceExecution() === false &&
$lastDependency === $dependencyName &&
time() - $lastTimestamp < 10
$lastUniqueId === $uniqueId &&
$timestamp - $lastTimestamp < 10
) {
$this->logger->debug(
'Update version status handler: Skipping duplicated job',
[
'dependency' => $dependencyName,
'lastUniqueId' => $lastUniqueId,
'lastTimestamp' => (new DateTimeImmutable)->setTimestamp($lastTimestamp)->format(DateTimeInterface::ATOM)
]
);
Expand Down Expand Up @@ -105,8 +139,8 @@ static function (Dependency $dependency): DependencyStatusEnum {
}

// update deduplication guards
$lastDependency = $dependencyName;
$lastTimestamp = ($attributes['timestamp'] ?? new DateTimeImmutable())->getTimestamp();
$lastUniqueId = $uniqueId;
$lastTimestamp = $timestamp;

return HandlerResultEnum::Accept;
} catch (Exception $exception) {
Expand Down

0 comments on commit d69e591

Please sign in to comment.