Skip to content

Commit 1bcc5e9

Browse files
authored
Merge pull request #347 from keboola/odin-SOX-407
JobMatcher for orchestration retry
2 parents 358c0db + cdede0d commit 1bcc5e9

File tree

7 files changed

+498
-0
lines changed

7 files changed

+498
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@
1010
/build/logs
1111
/infection.log
1212
/.phpunit.result.cache
13+
/var/

provisioning/local/.terraform.lock.hcl

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\Exception;
6+
7+
class OrchestrationJobMatcherValidationException extends ClientException
8+
{
9+
10+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\Orchestration;
6+
7+
use Keboola\JobQueueInternalClient\Client;
8+
use Keboola\JobQueueInternalClient\Exception\ClientException;
9+
use Keboola\JobQueueInternalClient\Exception\OrchestrationJobMatcherValidationException;
10+
use Keboola\JobQueueInternalClient\JobFactory;
11+
use Keboola\JobQueueInternalClient\JobFactory\Job;
12+
use Keboola\JobQueueInternalClient\JobFactory\JobInterface;
13+
use Keboola\JobQueueInternalClient\JobListOptions;
14+
15+
// https://keboola.atlassian.net/wiki/spaces/ENGG/pages/3074195457/DRAFT+RFC-2023-011+-+Rerun+orchestration#Pair-Jobs-and-Tasks
16+
class OrchestrationJobMatcher
17+
{
18+
public function __construct(
19+
private readonly Client $internalClient,
20+
) {
21+
}
22+
23+
public function matchTaskJobsForOrchestrationJob(string $jobId): OrchestrationJobMatcherResults
24+
{
25+
$job = $this->internalClient->getJob($jobId);
26+
$childJobs = $this->getOrchestrationTaskJobs($job);
27+
$configuration = $this->getCurrentOrchestrationConfiguration($job);
28+
$this->validateInputs($job, $configuration);
29+
$matchedTasks = [];
30+
foreach ($configuration['tasks'] as $task) {
31+
$matched = false;
32+
foreach ($childJobs as $index => $childJob) {
33+
if (((string) $task['id']) === $childJob->getOrchestrationTaskId()) {
34+
$matchedTasks[] = new OrchestrationTaskMatched(
35+
(string) $task['id'],
36+
true,
37+
$childJob->getId(),
38+
$childJob->getComponentId(),
39+
$childJob->getConfigId(),
40+
$childJob->getStatus(),
41+
);
42+
unset($childJobs[$index]);
43+
$matched = true;
44+
break;
45+
}
46+
}
47+
if (!$matched) {
48+
$matchedTasks[] = new OrchestrationTaskMatched((string) $task['id'], false, null, null, null, null);
49+
}
50+
}
51+
return new OrchestrationJobMatcherResults(
52+
$jobId,
53+
$job->getConfigId(),
54+
$matchedTasks,
55+
);
56+
}
57+
58+
/**
59+
* @return array<JobInterface>
60+
*/
61+
private function getOrchestrationTaskJobs(JobInterface $job): array
62+
{
63+
return $this->internalClient->listJobs(
64+
(new JobListOptions())->setParentRunId($job->getId()),
65+
true,
66+
);
67+
}
68+
69+
private function getCurrentOrchestrationConfiguration(JobInterface $job): array
70+
{
71+
$configuration = $job->getConfigData();
72+
if (!$configuration) {
73+
$configuration = $job->getComponentConfiguration()['configuration'];
74+
};
75+
return $configuration;
76+
}
77+
78+
private function validateInputs(JobInterface $job, array $configuration): void
79+
{
80+
/* since the matcher accepts a jobId, we need to check that it is a sort of sensible input -
81+
the main use case is root orchestration job, but it seems that a phaseContainer might be
82+
equally valid input. */
83+
if ($job->getComponentId() !== JobFactory::ORCHESTRATOR_COMPONENT) {
84+
throw new OrchestrationJobMatcherValidationException(sprintf(
85+
'Job "%s" is not an orchestration job.',
86+
$job->getId(),
87+
));
88+
}
89+
if (!isset($configuration['tasks']) || !is_array($configuration['tasks'])) {
90+
throw new OrchestrationJobMatcherValidationException(sprintf(
91+
'Orchestration "%s" tasks must be an array.',
92+
$job->getId(),
93+
));
94+
}
95+
array_walk($configuration['tasks'], function (array $task) {
96+
if (!isset($task['id'])) {
97+
throw new OrchestrationJobMatcherValidationException(sprintf(
98+
'Task does not have an id. (%s)',
99+
json_encode($task),
100+
));
101+
}
102+
});
103+
}
104+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\Orchestration;
6+
7+
class OrchestrationJobMatcherResults
8+
{
9+
/**
10+
* @param array<OrchestrationTaskMatched> $matchedTasks
11+
*/
12+
public function __construct(
13+
public readonly string $jobId,
14+
public readonly ?string $configId,
15+
public readonly array $matchedTasks,
16+
) {
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\Orchestration;
6+
7+
class OrchestrationTaskMatched
8+
{
9+
public function __construct(
10+
public readonly string $taskId,
11+
public readonly bool $matched,
12+
public readonly ?string $jobId,
13+
public readonly ?string $componentId,
14+
public readonly ?string $configId,
15+
public readonly ?string $status,
16+
) {
17+
}
18+
}

0 commit comments

Comments
 (0)