Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mabar committed Nov 24, 2023
1 parent c86275b commit 387c95c
Show file tree
Hide file tree
Showing 12 changed files with 347 additions and 112 deletions.
64 changes: 38 additions & 26 deletions src/Executor/BasicJobExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use function array_keys;
use function assert;
use function max;
use function microtime;
use function usleep;

/**
* @internal
Expand All @@ -29,11 +29,11 @@ final class BasicJobExecutor implements JobExecutor

private JobManager $jobManager;

/** @var Closure(string|int, Job, CronExpression): array{JobSummary, Throwable|null} */
/** @var Closure(string|int, Job, CronExpression, int): array{JobSummary, Throwable|null} */
private Closure $runCb;

/**
* @param Closure(string|int, Job, CronExpression): array{JobSummary, Throwable|null} $runCb
* @param Closure(string|int, Job, CronExpression, int): array{JobSummary, Throwable|null} $runCb
*/
public function __construct(ClockInterface $clock, JobManager $jobManager, Closure $runCb)
{
Expand All @@ -49,41 +49,30 @@ public function runJobs(array $ids, DateTimeImmutable $runStart): Generator
}

$scheduledJobsBySecond = $this->getScheduledJobsBySecond($ids);
$lastExecutionSecond = max(array_keys($scheduledJobsBySecond));
$lastSecond = max(array_keys($scheduledJobsBySecond));

$jobSummaries = [];
$suppressed = [];
for ($second = 0; $second <= $lastExecutionSecond; $second++) {
$startOfSecond = $this->getMicroTimestamp($this->clock->now());
$suppressedExceptions = [];
for ($second = 0; $second <= $lastSecond; $second++) {
$secondInitiatedAt = $this->clock->now();

if (isset($scheduledJobsBySecond[$second])) {
foreach ($scheduledJobsBySecond[$second] as [$id, $job, $expression]) {
//TODO - předat přes callback do JobResult sekundu (a její pořadové číslo?)
[$jobSummary, $throwable] = ($this->runCb)($id, $job, $expression);
foreach ($scheduledJobsBySecond[$second] ?? [] as [$id, $job, $expression]) {
[$jobSummary, $throwable] = ($this->runCb)($id, $job, $expression, $second);

yield $jobSummaries[] = $jobSummary;
yield $jobSummaries[] = $jobSummary;

if ($throwable !== null) {
$suppressed[] = $throwable;
}
if ($throwable !== null) {
$suppressedExceptions[] = $throwable;
}
}

$endOfSecond = $this->getMicroTimestamp($this->clock->now());
$timeElapsed = $endOfSecond - $startOfSecond;
$sleepTime = 1 - $timeElapsed;

if ($sleepTime > 0) {
$this->clock instanceof FrozenClock
? $this->clock->move($sleepTime)
: usleep((int) ($sleepTime * 1e6));
}
$this->sleepTillNextSecond($second, $lastSecond, $secondInitiatedAt);
}

$summary = new RunSummary($runStart, $this->clock->now(), $jobSummaries);

if ($suppressed !== []) {
throw RunFailure::create($summary, $suppressed);
if ($suppressedExceptions !== []) {
throw RunFailure::create($summary, $suppressedExceptions);
}

return $summary;
Expand Down Expand Up @@ -127,4 +116,27 @@ private function getMicroTimestamp(DateTimeImmutable $dateTime): float
return $seconds + $microseconds;
}

private function sleepTillNextSecond(int $second, int $lastSecond, DateTimeImmutable $secondInitiatedAt): void
{
$sleepTime = $this->getTimeTillNextSecond($second, $lastSecond, $secondInitiatedAt);

//TODO - nové hodiny
$this->clock instanceof FrozenClock
? $this->clock->move($sleepTime)
: usleep((int) ($sleepTime * 1e6));
}

private function getTimeTillNextSecond(int $second, int $lastSecond, DateTimeImmutable $secondInitiatedAt): float
{
if ($second === $lastSecond) {
return 0;
}

$startOfSecond = $this->getMicroTimestamp($secondInitiatedAt);
$endOfSecond = $this->getMicroTimestamp($this->clock->now());
$timeElapsed = $endOfSecond - $startOfSecond;

return 1 - $timeElapsed;
}

}
219 changes: 170 additions & 49 deletions src/Executor/ProcessJobExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@
use DateTimeImmutable;
use Generator;
use JsonException;
use Orisai\Clock\FrozenClock;
use Orisai\Clock\SystemClock;
use Orisai\Scheduler\Exception\JobProcessFailure;
use Orisai\Scheduler\Exception\RunFailure;
use Orisai\Scheduler\Job\Job;
use Orisai\Scheduler\Manager\JobManager;
use Orisai\Scheduler\Status\JobInfo;
use Orisai\Scheduler\Status\JobResult;
use Orisai\Scheduler\Status\JobResultState;
use Orisai\Scheduler\Status\JobSummary;
use Orisai\Scheduler\Status\RunSummary;
use Psr\Clock\ClockInterface;
use Symfony\Component\Process\Process;
use function array_map;
use function assert;
use function escapeshellarg;
use function implode;
use function is_array;
use function json_decode;
use function PHPUnit\Framework\assertNull;
use function usleep;
use const JSON_THROW_ON_ERROR;
use const PHP_BINARY;
Expand All @@ -32,14 +33,17 @@
final class ProcessJobExecutor implements JobExecutor
{

private JobManager $jobManager;

private ClockInterface $clock;

private string $script = 'bin/console';

private string $command = 'scheduler:run-job';

public function __construct(?ClockInterface $clock = null)
public function __construct(JobManager $jobManager, ?ClockInterface $clock = null)
{
$this->jobManager = $jobManager;
$this->clock = $clock ?? new SystemClock();
}

Expand All @@ -51,66 +55,183 @@ public function setExecutable(string $script, string $command = 'scheduler:run-j

public function runJobs(array $ids, DateTimeImmutable $runStart): Generator
{
$executions = [];
foreach ($ids as $id) {
$command = implode(' ', array_map(static fn (string $arg) => escapeshellarg($arg), [
PHP_BINARY,
$this->script,
$this->command,
$id,
'--json',
]));

//TODO - repeat
$executions[$id] = $execution = Process::fromShellCommandline($command);
$execution->start();
if ($ids === []) {
return new RunSummary($runStart, $runStart, []);
}

$scheduledJobsBySecond = $this->getScheduledJobsBySecond($ids);

$jobExecutions = [];
$jobSummaries = [];
$suppressed = [];
while ($executions !== []) {
foreach ($executions as $key => $execution) {
if ($execution->isRunning()) {
continue;
$suppressedExceptions = [];

$lastExecutedSecond = -1;
$currentSecond = 0;
$secondInitiatedAt = $this->clock->now();
while ($jobExecutions !== [] || $scheduledJobsBySecond !== []) {
// If we have scheduled jobs and are at right second, execute them
if ($scheduledJobsBySecond !== []) {
$shouldRunSecond = $this->clock->now()->getTimestamp() - $runStart->getTimestamp();

while ($lastExecutedSecond < $shouldRunSecond) {
$currentSecond = $lastExecutedSecond + 1;
if (isset($scheduledJobsBySecond[$currentSecond])) {
$secondInitiatedAt = $this->clock->now();
$jobExecutions = $this->startJobs($scheduledJobsBySecond[$currentSecond], $jobExecutions);
unset($scheduledJobsBySecond[$currentSecond]);
}

$lastExecutedSecond = $currentSecond;
}
}

// Wait for next second if no jobs are running
if ($jobExecutions === []) {
//TODO - prospané sekundy by měly být vždy pozitivní
//TODO - sleep till next sec (můžeme hledat první index v $scheduledJobsBySecond, nejen +1)
$this->sleepTillNextSecond($currentSecond, $lastExecutedSecond, $secondInitiatedAt);
//TODO - sleep time nefunguje a díky continue pak nenaskočí ani sleep na konci cyklu
//continue;
}

// Check running jobs
foreach ($jobExecutions as $i => $execution) {
if (!$execution->isRunning()) {
unset($jobExecutions[$i]);

unset($executions[$key]);

$output = $execution->getOutput() . $execution->getErrorOutput();

try {
$decoded = json_decode($output, true, 512, JSON_THROW_ON_ERROR);
assert(is_array($decoded));

yield $jobSummaries[] = new JobSummary(
new JobInfo(
$decoded['info']['id'],
$decoded['info']['name'],
$decoded['info']['expression'],
DateTimeImmutable::createFromFormat('U.v', $decoded['info']['start']),
),
new JobResult(
new CronExpression($decoded['info']['expression']),
DateTimeImmutable::createFromFormat('U.v', $decoded['result']['end']),
JobResultState::from($decoded['result']['state']),
),
);
} catch (JsonException $e) {
$suppressed[] = JobProcessFailure::create()
->withMessage("Job subprocess failed with following output:\n$output");
$output = $execution->getOutput() . $execution->getErrorOutput();

try {
$decoded = json_decode($output, true, 512, JSON_THROW_ON_ERROR);
assert(is_array($decoded));

yield $jobSummaries[] = $this->createSummary($decoded);
} catch (JsonException $e) {
$suppressedExceptions[] = JobProcessFailure::create()
->withMessage("Job subprocess failed with following output:\n$output");
}
}
}

usleep(1_000);
// Nothing to do, wait
//TODO - nové hodiny
$this->clock instanceof FrozenClock
? $this->clock->move(0.001)
: usleep(1_000);
}

$summary = new RunSummary($runStart, $this->clock->now(), $jobSummaries);

if ($suppressed !== []) {
throw RunFailure::create($summary, $suppressed);
if ($suppressedExceptions !== []) {
throw RunFailure::create($summary, $suppressedExceptions);
}

return $summary;
}

/**
* @param non-empty-list<int|string> $ids
* @return non-empty-array<int, list<array{int|string, Job, CronExpression}>>
*/
private function getScheduledJobsBySecond(array $ids): array
{
$scheduledJobsBySecond = [];
foreach ($ids as $id) {
$scheduledJob = $this->jobManager->getScheduledJob($id);
assert($scheduledJob !== null);
[$job, $expression, $repeatAfterSeconds] = $scheduledJob;

if ($repeatAfterSeconds === 0) {
$scheduledJobsBySecond[0][] = [$id, $job, $expression];
} else {
for ($second = 0; $second <= 59; $second += $repeatAfterSeconds) {
$scheduledJobsBySecond[$second][] = [$id, $job, $expression];
}
}
}

// $ids are not empty and for cycle is always run at least once
assert($scheduledJobsBySecond !== []);

return $scheduledJobsBySecond;
}

/**
* @param list<array{int|string, Job, CronExpression}> $scheduledJobs
* @param list<Process> $jobExecutions
* @return list<Process>
*/
private function startJobs(array $scheduledJobs, array $jobExecutions): array
{
foreach ($scheduledJobs as [$id, $job, $expression]) {
$jobExecutions[] = $execution = new Process([
PHP_BINARY,
$this->script,
$this->command,
$id,
'--json',
]);
$execution->start();
}

return $jobExecutions;
}

/**
* @param array<mixed> $raw
*/
private function createSummary(array $raw): JobSummary
{
return new JobSummary(
new JobInfo(
$raw['info']['id'],
$raw['info']['name'],
$raw['info']['expression'],
//TODO - second - měla by být získatelná tady, bez předání z vnitřního jobu
// - a tím se vyřeší i předání pro run-job
0,
DateTimeImmutable::createFromFormat('U.v', $raw['info']['start']),
),
new JobResult(
new CronExpression($raw['info']['expression']),
DateTimeImmutable::createFromFormat('U.v', $raw['result']['end']),
JobResultState::from($raw['result']['state']),
),
);
}

/**
* More accurate than (float) $dateTime->format('U.u')
*/
private function getMicroTimestamp(DateTimeImmutable $dateTime): float
{
$seconds = (float) $dateTime->format('U');
$microseconds = (float) $dateTime->format('u') / 1e6;

return $seconds + $microseconds;
}

private function sleepTillNextSecond(int $second, int $lastSecond, DateTimeImmutable $secondInitiatedAt): void
{
$sleepTime = $this->getTimeTillNextSecond($second, $lastSecond, $secondInitiatedAt);

//TODO - nové hodiny
$this->clock instanceof FrozenClock
? $this->clock->move($sleepTime)
: usleep((int) ($sleepTime * 1e6));
}

private function getTimeTillNextSecond(int $second, int $lastSecond, DateTimeImmutable $secondInitiatedAt): float
{
if ($second === $lastSecond) {
return 0;
}

$startOfSecond = $this->getMicroTimestamp($secondInitiatedAt);
$endOfSecond = $this->getMicroTimestamp($this->clock->now());
$timeElapsed = $endOfSecond - $startOfSecond;

return 1 - $timeElapsed;
}

}
Loading

0 comments on commit 387c95c

Please sign in to comment.