Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mabar committed Nov 27, 2023
1 parent 2705cd4 commit 9e9ff76
Show file tree
Hide file tree
Showing 27 changed files with 559 additions and 177 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"require": {
"php": ">=7.4.0 <8.3.0",
"dragonmantank/cron-expression": "^3.3",
"orisai/clock": "^1.1.1",
"orisai/clock": "^1.2.0",
"orisai/exceptions": "^1.1.0",
"symfony/console": "^5.3.0|^6.0.0",
"symfony/lock": "^5.3.0|^6.0.0",
Expand Down
1 change: 1 addition & 0 deletions src/Command/BaseRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ protected function jobToArray(JobSummary $summary): array
'id' => $info->getId(),
'name' => $info->getName(),
'expression' => $info->getExpression(),
'second' => $info->getSecond(),
'start' => $info->getStart()->format('U.v'),
],
'result' => [
Expand Down
84 changes: 72 additions & 12 deletions src/Command/ListCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use function sprintf;
use function str_pad;
use function str_repeat;
use function strlen;
use function strnatcmp;
use function uasort;
use const STR_PAD_LEFT;
Expand Down Expand Up @@ -74,13 +75,13 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$terminalWidth = $this->getTerminalWidth();
$expressionSpacing = $this->getCronExpressionSpacing($jobs);

foreach ($this->sortJobs($jobs, $nextOption) as $key => [$job, $expression]) {
foreach ($this->sortJobs($jobs, $nextOption) as $key => [$job, $expression, $repeatAfterSeconds]) {
$expressionString = $this->formatCronExpression($expression, $expressionSpacing);

$name = $job->getName();

$nextDueDateLabel = 'Next Due:';
$nextDueDate = $this->getNextDueDateForEvent($expression);
$nextDueDate = $this->getNextDueDate($expression, $repeatAfterSeconds);
$nextDueDate = $output->isVerbose()
? $nextDueDate->format('Y-m-d H:i:s P')
: $this->getRelativeTime($nextDueDate);
Expand Down Expand Up @@ -138,18 +139,18 @@ private function validateOptionNext(InputInterface $input)
}

/**
* @param array<int|string, array{Job, CronExpression}> $jobs
* @param bool|int<1, max> $next
* @return array<int|string, array{Job, CronExpression}>
* @param array<int|string, array{Job, CronExpression, int<0, 30>}> $jobs
* @param bool|int<1, max> $next
* @return array<int|string, array{Job, CronExpression, int<0, 30>}>
*/
private function sortJobs(array $jobs, $next): array
{
if ($next !== false) {
/** @infection-ignore-all */
uasort($jobs, function ($a, $b): int {
$nextDueDateA = $this->getNextDueDateForEvent($a[1])
$nextDueDateA = $this->getNextDueDate($a[1], $a[2])
->setTimezone(new DateTimeZone('UTC'));
$nextDueDateB = $this->getNextDueDateForEvent($b[1])
$nextDueDateB = $this->getNextDueDate($b[1], $b[2])
->setTimezone(new DateTimeZone('UTC'));

if (
Expand Down Expand Up @@ -182,10 +183,38 @@ private function sortJobs(array $jobs, $next): array
return $jobs;
}

private function getNextDueDateForEvent(CronExpression $expression): DateTimeImmutable
private function getNextDueDate(CronExpression $expression, int $repeatAfterSeconds): DateTimeImmutable
{
return DateTimeImmutable::createFromMutable(
$expression->getNextRunDate($this->clock->now()),
$now = $this->clock->now();
$nextDueDate = DateTimeImmutable::createFromMutable(
$expression->getNextRunDate($now),
);

if ($repeatAfterSeconds === 0) {
return $nextDueDate;
}

$previousDueDate = DateTimeImmutable::createFromMutable(
$expression->getPreviousRunDate($now, 0, true),
);

$currentMinute = $now->setTime(
(int) $now->format('H'),
(int) $now->format('i'),
);
// TODO
// pokud poslední run nebyl v téhle minutě (v téhle minutě se cron vůbec nespustil),
// tak vrátit přístí čas
if ($currentMinute->getTimestamp() !== $previousDueDate->getTimestamp()) {
return $nextDueDate;
}

// TODO - vypočítat z aktuálního času další reálný čas
// TODO - pokud je další reálný čas v další minutě, tak vracíme příští run date
return $now->setTime(
(int) $now->format('H'),
(int) $now->format('i'),
(int) $now->format('s') + $repeatAfterSeconds,
);
}

Expand Down Expand Up @@ -225,14 +254,14 @@ private function getRelativeTime(DateTimeImmutable $time): string
}

/**
* @param array<int|string, array{Job, CronExpression}> $jobs
* @param array<int|string, array{Job, CronExpression, int<0, 30>}> $jobs
*
* @infection-ignore-all
*/
private function getCronExpressionSpacing(array $jobs): int
{
$max = 0;
foreach ($jobs as [$job, $expression]) {
foreach ($jobs as [$job, $expression, $repeatAfterSeconds]) {
$length = mb_strlen($expression->getExpression());
if ($length > $max) {
$max = $length;
Expand All @@ -247,6 +276,37 @@ private function formatCronExpression(CronExpression $expression, int $spacing):
return str_pad($expression->getExpression(), $spacing, ' ', STR_PAD_LEFT);
}

/**
* @param array<int|string, array{Job, CronExpression, int<0, 30>}|null> $jobs
*
* @infection-ignore-all
*/
private function getRepeatAfterSecondsSpacing(array $jobs): int
{
$max = 0;
foreach ($jobs as [$job, $expression, $repeatAfterSeconds]) {
if ($repeatAfterSeconds === 0) {
continue;
}

$length = strlen((string) $repeatAfterSeconds);
if ($length > $max) {
$max = $length;
}
}

return $max;
}

private function formatRepeatAfterSeconds(int $repeatAfterSeconds, int $spacing): string
{
if ($repeatAfterSeconds === 0) {
return str_pad('', $spacing);
}

return str_pad((string) $repeatAfterSeconds, $spacing);
}

private function getTerminalWidth(): int
{
return (new Terminal())->getWidth();
Expand Down
7 changes: 7 additions & 0 deletions src/Command/RunJobCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
use Orisai\Scheduler\Scheduler;
use Orisai\Scheduler\Status\JobResultState;
use Orisai\Scheduler\Status\JobSummary;
use Orisai\Scheduler\Status\RunParameters;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use function json_decode;
use function json_encode;
use const JSON_PRETTY_PRINT;
use const JSON_THROW_ON_ERROR;
Expand Down Expand Up @@ -44,14 +46,19 @@ protected function configure(): void
'Don\'t force job to run and respect due time instead',
);
$this->addOption('json', null, InputOption::VALUE_NONE, 'Output in json format');
$this->addOption('parameters', null, InputOption::VALUE_REQUIRED, '[Internal]');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$json = $input->getOption('json');
$params = $input->getOption('parameters');
$summary = $this->scheduler->runJob(
$input->getArgument('id'),
!$input->getOption('no-force'),
$params === null
? null
: RunParameters::fromArray(json_decode($params, true, 512, JSON_THROW_ON_ERROR)),
);

if ($summary === null) {
Expand Down
100 changes: 84 additions & 16 deletions src/Executor/BasicJobExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,34 @@
use Cron\CronExpression;
use DateTimeImmutable;
use Generator;
use Orisai\Clock\Clock;
use Orisai\Scheduler\Exception\RunFailure;
use Orisai\Scheduler\Job\Job;
use Orisai\Scheduler\Manager\JobManager;
use Orisai\Scheduler\Status\JobSummary;
use Orisai\Scheduler\Status\RunSummary;
use Psr\Clock\ClockInterface;
use Throwable;
use function array_keys;
use function assert;
use function max;

/**
* @internal
*/
final class BasicJobExecutor implements JobExecutor
{

private ClockInterface $clock;
private Clock $clock;

private JobManager $jobManager;

/** @var Closure(string|int, Job, CronExpression): array{JobSummary, Throwable|null} */
/** @var Closure(string|int, Job, CronExpression, int): array{JobSummary, Throwable|null} */
private Closure $runCb;

/**
* @param Closure(string|int, Job, CronExpression): array{JobSummary, Throwable|null} $runCb
* @param Closure(string|int, Job, CronExpression, int): array{JobSummary, Throwable|null} $runCb
*/
public function __construct(ClockInterface $clock, JobManager $jobManager, Closure $runCb)
public function __construct(Clock $clock, JobManager $jobManager, Closure $runCb)
{
$this->clock = $clock;
$this->jobManager = $jobManager;
Expand All @@ -40,29 +42,95 @@ public function __construct(ClockInterface $clock, JobManager $jobManager, Closu

public function runJobs(array $ids, DateTimeImmutable $runStart): Generator
{
if ($ids === []) {
return new RunSummary($runStart, $runStart, []);
}

$scheduledJobsBySecond = $this->getScheduledJobsBySecond($ids);
$lastSecond = max(array_keys($scheduledJobsBySecond));

$jobSummaries = [];
$suppressed = [];
foreach ($ids as $id) {
$scheduledJob = $this->jobManager->getScheduledJob($id);
assert($scheduledJob !== null);
[$job, $expression] = $scheduledJob;
$suppressedExceptions = [];
for ($second = 0; $second <= $lastSecond; $second++) {
$secondInitiatedAt = $this->clock->now();

[$jobSummary, $throwable] = ($this->runCb)($id, $job, $expression);
foreach ($scheduledJobsBySecond[$second] ?? [] as [$id, $job, $expression]) {
[$jobSummary, $throwable] = ($this->runCb)($id, $job, $expression, $second);

yield $jobSummaries[] = $jobSummary;
yield $jobSummaries[] = $jobSummary;

if ($throwable !== null) {
$suppressed[] = $throwable;
if ($throwable !== null) {
$suppressedExceptions[] = $throwable;
}
}

$this->sleepTillNextSecond($second, $lastSecond, $secondInitiatedAt);
}

$summary = new RunSummary($runStart, $this->clock->now(), $jobSummaries);

if ($suppressed !== []) {
throw RunFailure::create($summary, $suppressed);
if ($suppressedExceptions !== []) {
throw RunFailure::create($summary, $suppressedExceptions);
}

return $summary;
}

/**
* @param non-empty-list<int|string> $ids
* @return non-empty-array<int, list<array{int|string, Job, CronExpression}>>
*/
private function getScheduledJobsBySecond(array $ids): array
{
$scheduledJobsBySecond = [];
foreach ($ids as $id) {
$scheduledJob = $this->jobManager->getScheduledJob($id);
assert($scheduledJob !== null);
[$job, $expression, $repeatAfterSeconds] = $scheduledJob;

if ($repeatAfterSeconds === 0) {
$scheduledJobsBySecond[0][] = [$id, $job, $expression];
} else {
for ($second = 0; $second <= 59; $second += $repeatAfterSeconds) {
$scheduledJobsBySecond[$second][] = [$id, $job, $expression];
}
}
}

// $ids are not empty and for cycle is always run at least once
assert($scheduledJobsBySecond !== []);

return $scheduledJobsBySecond;
}

/**
* More accurate than (float) $dateTime->format('U.u')
*/
private function getMicroTimestamp(DateTimeImmutable $dateTime): float
{
$seconds = (float) $dateTime->format('U');
$microseconds = (float) $dateTime->format('u') / 1e6;

return $seconds + $microseconds;
}

private function sleepTillNextSecond(int $second, int $lastSecond, DateTimeImmutable $secondInitiatedAt): void
{
$sleepTime = $this->getTimeTillNextSecond($second, $lastSecond, $secondInitiatedAt);
$this->clock->sleep(0, 0, (int) ($sleepTime * 1e6));
}

private function getTimeTillNextSecond(int $second, int $lastSecond, DateTimeImmutable $secondInitiatedAt): float
{
if ($second === $lastSecond) {
return 0;
}

$startOfSecond = $this->getMicroTimestamp($secondInitiatedAt);
$endOfSecond = $this->getMicroTimestamp($this->clock->now());
$timeElapsed = $endOfSecond - $startOfSecond;

return 1 - $timeElapsed;
}

}
Loading

0 comments on commit 9e9ff76

Please sign in to comment.