Skip to content

Commit

Permalink
Per-second scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
mabar committed Dec 11, 2023
1 parent 2705cd4 commit b3cd4da
Show file tree
Hide file tree
Showing 31 changed files with 830 additions and 199 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- `Scheduler`
- `runPromise()` - allows `scheduler:run` and `scheduler:work` commands to output job result as soon as it is
finished
- `getScheduledJobs()` - added `repeatAfterSeconds` parameter into the returned array
- `SimpleScheduler`
- `addJob()` accepts parameter `repeatAfterSeconds`
- `JobInfo`
- `getSecond()`- returns for which second within a minute was job scheduled
- `JobManager`
- `getScheduledJob()` - added `repeatAfterSeconds` parameter into the returned array
- `getScheduledJobs()` - added `repeatAfterSeconds` parameter into the returned array
- `SimpleJobManager`
- `addJob()` accepts parameter `repeatAfterSeconds`
- `CallbackJobManager`
- `addJob()` accepts parameter `repeatAfterSeconds`

### Changed

Expand All @@ -24,3 +36,5 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- `getJobs()` -> `getJobSummaries()`
- `JobExecutor`
- `runJobs()` returns `Generator<int, JobSummary, void, RunSummary>` instead of `RunSummary`
- `ProcessJobExecutor`
- constructor requires `JobManager` as first parameter
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"require": {
"php": ">=7.4.0 <8.3.0",
"dragonmantank/cron-expression": "^3.3",
"orisai/clock": "^1.1.1",
"orisai/clock": "^1.2.0",
"orisai/exceptions": "^1.1.0",
"symfony/console": "^5.3.0|^6.0.0",
"symfony/lock": "^5.3.0|^6.0.0",
Expand Down
65 changes: 59 additions & 6 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Cron job scheduler - with locks, parallelism and more
- [Why do you need it?](#why-do-you-need-it)
- [Quick start](#quick-start)
- [Execution time](#execution-time)
- [Cron expression - minutes and above](#cron-expression---minutes-and-above)
- [Seconds](#seconds)
- [Events](#events)
- [Handling errors](#handling-errors)
- [Locks and job overlapping](#locks-and-job-overlapping)
Expand Down Expand Up @@ -50,6 +52,7 @@ Orisai Scheduler solves all of these problems.
On top of that you get:

- [locking](#locks-and-job-overlapping) - each job should run only once at a time, without overlapping
- [per-second scheduling](#seconds) - run jobs multiple times in a minute
- [before/after job events](#events) for accessing job status
- [overview of all jobs](#list-command), including estimated time of next run
- running jobs either [once](#run-command) or [periodically](#worker-command) during development
Expand Down Expand Up @@ -90,7 +93,22 @@ Good to go!

## Execution time

Cron execution time is expressed via `CronExpression`, using crontab syntax
Execution time is determined by [cron expression](#cron-expression---minutes-and-above) which allows you to schedule
jobs from anywhere between once a year and once every minute and [seconds], allowing you tu run job several times in a
minute.

In ideal situation, jobs are executed just in time, but it may not be always the case. Crontab can execute jobs several
seconds late, serial jobs execution may take way over a minute and long jobs may overlap. To prevent any issues, we
implement multiple measures:

- jobs [repeated after seconds](#seconds) take in account crontab may run late and delay each execution accordingly to
minimize unwanted gaps between executions (e.g. if crontab starts 10 seconds late, all jobs also run 10 seconds late)
- [parallel execution](#parallelization-and-process-isolation) can be used instead of the serial
- [locks](#locks-and-job-overlapping) should be used to prevent overlapping of long-running jobs

### Cron expression - minutes and above

Main job execution time is expressed via `CronExpression`, using crontab syntax

```php
use Cron\CronExpression;
Expand Down Expand Up @@ -131,6 +149,37 @@ You can also use macro instead of an expression:
- `@daily`, `@midnight` - Run once a day, midnight - `0 0 * * *`
- `@hourly` - Run once an hour, first minute - `0 * * * *`

### Seconds

Run a job every n seconds within a minute.

```php
use Cron\CronExpression;

$scheduler->addJob(
/* ... */,
new CronExpression('* * * * *'),
/* ... */,
1, // every second, 60 times a minute
);
```

```php
use Cron\CronExpression;

$scheduler->addJob(
/* ... */,
new CronExpression('* * * * *'),
/* ... */,
30, // every 30 seconds, 2 times a minute
);
```

With default, synchronous job executor, all jobs scheduled for current second are executed and just after it is
finished, jobs for the next second are executed. With [parallel](#parallelization-and-process-isolation) executor it is
different - all jobs are executed as soon as it is their time. Therefore, it is strongly recommended to
use [locking](#locks-and-job-overlapping) to prevent overlapping.

## Events

Run callbacks before and after job to collect statistics, etc.
Expand Down Expand Up @@ -178,6 +227,7 @@ $errorHandler = function(Throwable $throwable, JobInfo $info, JobResult $result)
'exception' => $throwable,
'name' => $info->getName(),
'expression' => $info->getExpression(),
'second' => $info->getSecond(),
'start' => $info->getStart()->format(DateTimeInterface::ATOM),
'end' => $result->getEnd()->format(DateTimeInterface::ATOM),
]);
Expand Down Expand Up @@ -248,19 +298,21 @@ have [proc_*](https://www.php.net/manual/en/ref.exec.php) functions enabled. Als
used [run-job command](#run-job-command), so you need to have [console](#cli-commands) set up as well.

```php
use Orisai\Scheduler\SimpleScheduler;
use Orisai\Scheduler\Executor\ProcessJobExecutor;
use Orisai\Scheduler\ManagedScheduler;
use Orisai\Scheduler\Manager\SimpleJobManager;

$executor = new ProcessJobExecutor();
$scheduler = new SimpleScheduler(null, null, $executor);
$jobManager = new SimpleJobManager();
$executor = new ProcessJobExecutor($jobManager);
$scheduler = new ManagedScheduler($jobManager, null, null, $executor);
```

If your executable script is not `bin/console` or if you are using multiple scheduler setups, specify the executable:

```php
use Orisai\Scheduler\Executor\ProcessJobExecutor;

$executor = new ProcessJobExecutor();
$executor = new ProcessJobExecutor($jobManager);
$executor->setExecutable('bin/console', 'scheduler:run-job');
```

Expand Down Expand Up @@ -328,6 +380,7 @@ Info:
$id = $info->getId(); // string|int
$name = $info->getName(); // string
$expression = $info->getExpression(); // string, e.g. '* * * * *'
$second = $info->getSecond();
$start = $info->getStart(); // DateTimeImmutable
```

Expand Down Expand Up @@ -435,7 +488,7 @@ Run single job, ignoring scheduled time

### List command

List all scheduled jobs (in `expression [id] name... next-due` format)
List all scheduled jobs (in `expression / second [id] name... next-due` format)

`bin/console scheduler:list`

Expand Down
1 change: 1 addition & 0 deletions src/Command/BaseRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ protected function jobToArray(JobSummary $summary): array
'id' => $info->getId(),
'name' => $info->getName(),
'expression' => $info->getExpression(),
'second' => $info->getSecond(),
'start' => $info->getStart()->format('U.v'),
],
'result' => [
Expand Down
111 changes: 97 additions & 14 deletions src/Command/ListCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
use Symfony\Component\Console\Terminal;
use function abs;
use function array_splice;
use function floor;
use function max;
use function mb_strlen;
use function preg_match;
use function sprintf;
use function str_pad;
use function str_repeat;
use function strlen;
use function strnatcmp;
use function uasort;
use const STR_PAD_LEFT;
Expand Down Expand Up @@ -73,14 +75,19 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$terminalWidth = $this->getTerminalWidth();
$expressionSpacing = $this->getCronExpressionSpacing($jobs);
$repeatAfterSecondsSpacing = $this->getRepeatAfterSecondsSpacing($jobs);

foreach ($this->sortJobs($jobs, $nextOption) as $key => [$job, $expression]) {
foreach ($this->sortJobs($jobs, $nextOption) as $key => [$job, $expression, $repeatAfterSeconds]) {
$expressionString = $this->formatCronExpression($expression, $expressionSpacing);
$repeatAfterSecondsString = $this->formatRepeatAfterSeconds(
$repeatAfterSeconds,
$repeatAfterSecondsSpacing,
);

$name = $job->getName();

$nextDueDateLabel = 'Next Due:';
$nextDueDate = $this->getNextDueDateForEvent($expression);
$nextDueDate = $this->getNextDueDate($expression, $repeatAfterSeconds);
$nextDueDate = $output->isVerbose()
? $nextDueDate->format('Y-m-d H:i:s P')
: $this->getRelativeTime($nextDueDate);
Expand All @@ -89,14 +96,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int
'.',
max(
/* @infection-ignore-all */
$terminalWidth - mb_strlen($expressionString . $key . $name . $nextDueDateLabel . $nextDueDate) - 9,
$terminalWidth - mb_strlen(
$expressionString . $repeatAfterSecondsString . $key . $name . $nextDueDateLabel . $nextDueDate,
) - 9,
0,
),
);

$output->writeln(sprintf(
' <fg=yellow>%s</> [%s] %s<fg=#6C7280>%s %s %s</>',
' <fg=yellow>%s</><fg=#6C7280>%s</> [%s] %s<fg=#6C7280>%s %s %s</>',
$expressionString,
$repeatAfterSecondsString,
$key,
$name,
$dots,
Expand Down Expand Up @@ -138,18 +148,18 @@ private function validateOptionNext(InputInterface $input)
}

/**
* @param array<int|string, array{Job, CronExpression}> $jobs
* @param bool|int<1, max> $next
* @return array<int|string, array{Job, CronExpression}>
* @param array<int|string, array{Job, CronExpression, int<0, 30>}> $jobs
* @param bool|int<1, max> $next
* @return array<int|string, array{Job, CronExpression, int<0, 30>}>
*/
private function sortJobs(array $jobs, $next): array
{
if ($next !== false) {
/** @infection-ignore-all */
uasort($jobs, function ($a, $b): int {
$nextDueDateA = $this->getNextDueDateForEvent($a[1])
$nextDueDateA = $this->getNextDueDate($a[1], $a[2])
->setTimezone(new DateTimeZone('UTC'));
$nextDueDateB = $this->getNextDueDateForEvent($b[1])
$nextDueDateB = $this->getNextDueDate($b[1], $b[2])
->setTimezone(new DateTimeZone('UTC'));

if (
Expand Down Expand Up @@ -182,11 +192,49 @@ private function sortJobs(array $jobs, $next): array
return $jobs;
}

private function getNextDueDateForEvent(CronExpression $expression): DateTimeImmutable
private function getNextDueDate(CronExpression $expression, int $repeatAfterSeconds): DateTimeImmutable
{
$now = $this->clock->now();
$nextDueDate = DateTimeImmutable::createFromMutable(
$expression->getNextRunDate($now),
);

if ($repeatAfterSeconds === 0) {
return $nextDueDate;
}

$previousDueDate = DateTimeImmutable::createFromMutable(
$expression->getPreviousRunDate($now, 0, true),
);

if (!$this->wasPreviousDueDateInCurrentMinute($now, $previousDueDate)) {
return $nextDueDate;
}

$currentSecond = (int) $now->format('s');
$runTimes = (int) floor($currentSecond / $repeatAfterSeconds);
$nextRunSecond = ($runTimes + 1) * $repeatAfterSeconds;

// Don't abuse seconds overlap
if ($nextRunSecond > 59) {
return $nextDueDate;
}

return $now->setTime(
(int) $now->format('H'),
(int) $now->format('i'),
$nextRunSecond,
);
}

private function wasPreviousDueDateInCurrentMinute(DateTimeImmutable $now, DateTimeImmutable $previousDueDate): bool
{
return DateTimeImmutable::createFromMutable(
$expression->getNextRunDate($this->clock->now()),
$currentMinute = $now->setTime(
(int) $now->format('H'),
(int) $now->format('i'),
);

return $currentMinute->getTimestamp() === $previousDueDate->getTimestamp();
}

/**
Expand Down Expand Up @@ -225,14 +273,14 @@ private function getRelativeTime(DateTimeImmutable $time): string
}

/**
* @param array<int|string, array{Job, CronExpression}> $jobs
* @param array<int|string, array{Job, CronExpression, int<0, 30>}> $jobs
*
* @infection-ignore-all
*/
private function getCronExpressionSpacing(array $jobs): int
{
$max = 0;
foreach ($jobs as [$job, $expression]) {
foreach ($jobs as [$job, $expression, $repeatAfterSeconds]) {
$length = mb_strlen($expression->getExpression());
if ($length > $max) {
$max = $length;
Expand All @@ -247,6 +295,41 @@ private function formatCronExpression(CronExpression $expression, int $spacing):
return str_pad($expression->getExpression(), $spacing, ' ', STR_PAD_LEFT);
}

/**
* @param array<int|string, array{Job, CronExpression, int<0, 30>}|null> $jobs
*
* @infection-ignore-all
*/
private function getRepeatAfterSecondsSpacing(array $jobs): int
{
$max = 0;
foreach ($jobs as [$job, $expression, $repeatAfterSeconds]) {
if ($repeatAfterSeconds === 0) {
continue;
}

$length = strlen((string) $repeatAfterSeconds);
if ($length > $max) {
$max = $length;
}
}

if ($max !== 0) {
$max += 3;
}

return $max;
}

private function formatRepeatAfterSeconds(int $repeatAfterSeconds, int $spacing): string
{
if ($repeatAfterSeconds === 0) {
return str_pad('', $spacing);
}

return str_pad(" / $repeatAfterSeconds", $spacing);
}

private function getTerminalWidth(): int
{
return (new Terminal())->getWidth();
Expand Down
7 changes: 7 additions & 0 deletions src/Command/RunJobCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
use Orisai\Scheduler\Scheduler;
use Orisai\Scheduler\Status\JobResultState;
use Orisai\Scheduler\Status\JobSummary;
use Orisai\Scheduler\Status\RunParameters;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use function json_decode;
use function json_encode;
use const JSON_PRETTY_PRINT;
use const JSON_THROW_ON_ERROR;
Expand Down Expand Up @@ -44,14 +46,19 @@ protected function configure(): void
'Don\'t force job to run and respect due time instead',
);
$this->addOption('json', null, InputOption::VALUE_NONE, 'Output in json format');
$this->addOption('parameters', null, InputOption::VALUE_REQUIRED, '[Internal]');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$json = $input->getOption('json');
$params = $input->getOption('parameters');
$summary = $this->scheduler->runJob(
$input->getArgument('id'),
!$input->getOption('no-force'),
$params === null
? null
: RunParameters::fromArray(json_decode($params, true, 512, JSON_THROW_ON_ERROR)),
);

if ($summary === null) {
Expand Down
Loading

0 comments on commit b3cd4da

Please sign in to comment.