Skip to content

Commit

Permalink
Made it possible to set the next interval between jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
waltertamboer committed Jan 31, 2020
1 parent 3ef2787 commit 9356185
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 9 deletions.
32 changes: 23 additions & 9 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,18 @@ public function run(): int

do {
try {
$this->processNextJob();
$nextInterval = $this->processNextJob();
} catch (Throwable $e) {
$exitCode = $this->handleException($e);
break;
}

if ($nextInterval === null) {
$nextInterval = $this->getOptions()->getInterval();
}

// Allow the server to breath...
usleep($this->getOptions()->getInterval());
usleep($nextInterval);
} while ($this->shouldKeepRunning($startTime, memory_get_usage(true)));

return $exitCode;
Expand All @@ -145,23 +149,25 @@ private function shouldKeepRunning(int $startTime, int $memoryUsage): bool
return !$runningTimeExpired && !$memoryLimitReached;
}

private function processNextJob(): void
private function processNextJob(): ?int
{
$job = $this->storage->retrieveJob();
assert($job instanceof StoredJobInterface || $job === null);

if (!$job) {
return;
return null;
}

// @todo Replace this with PSR-14 so we can move the logging into an event listener.
$this->processJob($job);
return $this->processJob($job);
}

private function processJob(StoredJobInterface $storedJob): void
private function processJob(StoredJobInterface $storedJob): ?int
{
$nextInterval = null;

try {
$this->executeJob($storedJob);
$nextInterval = $this->executeJob($storedJob);
} catch (RecoverableException $exception) {
$this->handleException($exception);

Expand All @@ -177,13 +183,15 @@ private function processJob(StoredJobInterface $storedJob): void

$this->handleException($exception);
}

return $nextInterval;
}

/**
* @throws UnknownWorkerException Thrown when the worker type is unknown.
* @throws ContainerExceptionInterface Thrown when the worker cannot be retrieved from the service container.
*/
private function executeJob(StoredJobInterface $storedJob): void
private function executeJob(StoredJobInterface $storedJob): int
{
$job = $storedJob->createJobRepresentation();
assert($job instanceof JobInterface);
Expand All @@ -201,14 +209,20 @@ private function executeJob(StoredJobInterface $storedJob): void

$worker = $this->workers->get($job->getWorkerName());
assert($worker instanceof WorkerInterface);
$worker->run(new Context($this->storage, $this->logger, $storedJob));

$context = new Context($this->storage, $this->logger, $storedJob);
$context->setNextInterval($this->getOptions()->getInterval());

$worker->run($context);

$this->storage->deleteJob($storedJob);

$this->logger->info(sprintf(
'[#%d] Finished and successfully deleted job',
$storedJob->getId()
));

return $context->getNextInterval();
}

private function rescheduleJob(
Expand Down
22 changes: 22 additions & 0 deletions src/Context/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ final class Context implements ContextInterface
*/
private $stats;

/**
* @var int
*/
private $nextInterval;

/**
* Initializes a new instance of this class.
*/
Expand All @@ -66,6 +71,23 @@ public function __construct(StorageInterface $storage, LoggerInterface $logger,
$this->storedJob = $storedJob;
$this->params = $storedJob->createJobRepresentation()->getWorkerParams();
$this->stats = $storedJob->getStats();
$this->nextInterval = 1;
}

/**
* Gets the time in microseconds to wait before the next job can be executed.
*/
public function getNextInterval(): int
{
return $this->nextInterval;
}

/**
* Sets the time in microseconds to wait before the next job can be executed.
*/
public function setNextInterval(int $nextInterval): void
{
$this->nextInterval = $nextInterval;
}

/**
Expand Down
10 changes: 10 additions & 0 deletions src/Context/ContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
*/
interface ContextInterface // phpcs:ignore
{
/**
* Gets the time in microseconds to wait before the next job can be executed.
*/
public function getNextInterval(): int;

/**
* Sets the time in microseconds to wait before the next job can be executed.
*/
public function setNextInterval(int $nextInterval): void;

/**
* Gets the logger which can be used to write to the output stream.
*/
Expand Down

0 comments on commit 9356185

Please sign in to comment.