Skip to content

Commit cc2dd27

Browse files
committed
JobMatcher for orchestration retry
https://keboola.atlassian.net/browse/SOX-407
1 parent 358c0db commit cc2dd27

File tree

5 files changed

+333
-0
lines changed

5 files changed

+333
-0
lines changed

provisioning/local/.terraform.lock.hcl

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Orchestration/JobMatcher.php

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\Orchestration;
6+
7+
use Keboola\JobQueueInternalClient\Client;
8+
use Keboola\JobQueueInternalClient\Exception\ClientException;
9+
use Keboola\JobQueueInternalClient\JobFactory;
10+
use Keboola\JobQueueInternalClient\JobFactory\Job;
11+
use Keboola\JobQueueInternalClient\JobFactory\JobInterface;
12+
use Keboola\JobQueueInternalClient\JobListOptions;
13+
14+
// https://keboola.atlassian.net/wiki/spaces/ENGG/pages/3074195457/DRAFT+RFC-2023-011+-+Rerun+orchestration#Pair-Jobs-and-Tasks
15+
class JobMatcher
16+
{
17+
public function __construct(
18+
private readonly Client $internalClient,
19+
) {
20+
}
21+
22+
/**
23+
* @return array<JobInterface>
24+
*/
25+
private function getOrchestrationTaskJobs(JobInterface $job): array
26+
{
27+
$childJobs = $this->internalClient->listJobs(
28+
(new JobListOptions())->setParentRunId($job->getId()),
29+
true,
30+
);
31+
return $childJobs;
32+
}
33+
34+
private function getCurrentOrchestrationConfiguration(JobInterface $job): array
35+
{
36+
$configuration = $job->getConfigData();
37+
if (!$configuration) {
38+
$configuration = $job->getComponentConfiguration()['configuration'];
39+
};
40+
return $configuration;
41+
}
42+
43+
private function validateInputs(JobInterface $job, array $configuration): void
44+
{
45+
/* since the matcher accepts a jobId, we need to check that it is a sort of sensible input -
46+
the main use case is root orchestration job, but it seems that a phaseContainer might be
47+
equally valid input. */
48+
if ($job->getComponentId() !== JobFactory::ORCHESTRATOR_COMPONENT) {
49+
throw new ClientException(sprintf(
50+
'Job "%s" is not an orchestration job.',
51+
$job->getId(),
52+
));
53+
}
54+
if (!isset($configuration['tasks']) || !is_array($configuration['tasks'])) {
55+
throw new ClientException(sprintf(
56+
'Orchestration "%s" does not have tasks.',
57+
$job->getId(),
58+
));
59+
}
60+
array_walk($configuration['tasks'], function (array $task) {
61+
if (!isset($task['id'])) {
62+
throw new ClientException(sprintf(
63+
'Task does not have an id. (%s)',
64+
json_encode($task),
65+
));
66+
}
67+
});
68+
}
69+
70+
public function matchTaskJobsForOrchestrationJob(string $jobId): JobMatcherResults
71+
{
72+
$job = $this->internalClient->getJob($jobId);
73+
$childJobs = $this->getOrchestrationTaskJobs($job);
74+
$configuration = $this->getCurrentOrchestrationConfiguration($job);
75+
$this->validateInputs($job, $configuration);
76+
$matchedTasks = [];
77+
foreach ($configuration['tasks'] as $task) {
78+
foreach ($childJobs as $index => $childJob) {
79+
if ((string) $task['id'] === $childJob->getOrchestrationTaskId()) {
80+
$matchedTasks[] = new MatchedTask(
81+
(string) $task['id'],
82+
$childJob->getId(),
83+
$childJob->getComponentId(),
84+
$childJob->getConfigId(),
85+
$childJob->getStatus(),
86+
);
87+
unset($childJobs[$index]);
88+
break;
89+
}
90+
}
91+
}
92+
return new JobMatcherResults(
93+
$jobId,
94+
$job->getConfigId(),
95+
$matchedTasks,
96+
);
97+
}
98+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\Orchestration;
6+
7+
class JobMatcherResults
8+
{
9+
/**
10+
* @param array<MatchedTask> $matchedTasks
11+
*/
12+
public function __construct(
13+
public readonly string $jobId,
14+
public readonly ?string $configId,
15+
public readonly array $matchedTasks,
16+
) {
17+
}
18+
}

src/Orchestration/MatchedTask.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\Orchestration;
6+
7+
class MatchedTask
8+
{
9+
public function __construct(
10+
public readonly string $taskId,
11+
public readonly string $jobId,
12+
public readonly string $componentId,
13+
public readonly ?string $configId,
14+
public readonly string $status,
15+
) {
16+
}
17+
}
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\Tests\Orchestration;
6+
7+
use Keboola\JobQueueInternalClient\JobFactory;
8+
use Keboola\JobQueueInternalClient\Orchestration\JobMatcher;
9+
use Keboola\JobQueueInternalClient\Orchestration\JobMatcherResults;
10+
use Keboola\JobQueueInternalClient\Orchestration\MatchedTask;
11+
use Keboola\JobQueueInternalClient\Tests\BaseClientFunctionalTest;
12+
use Keboola\StorageApi\Client as StorageClient;
13+
use Keboola\StorageApi\Components;
14+
use Keboola\StorageApi\Options\Components\Configuration;
15+
16+
class JobMatcherTest extends BaseClientFunctionalTest
17+
{
18+
private function getOrchestrationConfiguration(): array
19+
{
20+
return [
21+
'phases' => [
22+
[
23+
'id' => 26427,
24+
'name' => 'Extractors',
25+
'dependsOn' => [],
26+
],
27+
[
28+
'id' => 27406,
29+
'name' => 'Transformations',
30+
'dependsOn' => [26427],
31+
],
32+
],
33+
'tasks' => [
34+
[
35+
'id' => 30679,
36+
'name' => 'keboola.ex-db-snowflake-493493',
37+
'phase' => 26427,
38+
'task' => [
39+
'componentId' => 'keboola.ex-db-snowflake',
40+
'configData' => [], # not important, we're not going to start the job
41+
'mode' => 'run',
42+
],
43+
'continueOnFailure' => false,
44+
],
45+
[
46+
'id' => 92543,
47+
'name' => 'keboola.snowflake-transformation-11072450',
48+
'phase' => 27406,
49+
'task' => [
50+
'componentId' => 'keboola.snowflake-transformation',
51+
'configData' => [],
52+
'mode' => 'run',
53+
],
54+
'continueOnFailure' => false,
55+
'enabled' => true,
56+
],
57+
[
58+
'id' => 25052,
59+
'name' => 'keboola.ex-sample-data-7796763',
60+
'phase' => 26427,
61+
'task' => [
62+
'componentId' => 'keboola.ex-sample-data',
63+
'configData' => [],
64+
'mode' => 'run',
65+
],
66+
'continueOnFailure' => false,
67+
'enabled' => true,
68+
],
69+
],
70+
];
71+
}
72+
73+
74+
/**
75+
* @param array $configurationData
76+
* @return array{orchestrationJobId: string, orchestrationConfigurationId: string, jobIds: string[]}
77+
*/
78+
private function createOrchestrationLikeJobs(array $configurationData): array
79+
{
80+
$storageClient = new StorageClient([
81+
'token' => (string) getenv('TEST_STORAGE_API_TOKEN'),
82+
'url' => (string) getenv('TEST_STORAGE_API_URL'),
83+
]);
84+
$queueClient = $this->getClient();
85+
$componentsApi = new Components($storageClient);
86+
$configuration = new Configuration();
87+
$configuration->setConfiguration($configurationData);
88+
$configuration->setName($this->getName());
89+
$configuration->setComponentId(JobFactory::ORCHESTRATOR_COMPONENT);
90+
$orchestrationConfigurationId = $componentsApi->addConfiguration($configuration)['id'];
91+
$orchestrationJob = $this->getNewJobFactory()->createNewJob(
92+
[
93+
'#tokenString' => getenv('TEST_STORAGE_API_TOKEN'),
94+
'configData' => [],
95+
'componentId' => JobFactory::ORCHESTRATOR_COMPONENT,
96+
'configId' => $orchestrationConfigurationId,
97+
'mode' => 'run',
98+
'parentRunId' => '',
99+
'orchestrationJobId' => null,
100+
],
101+
);
102+
$queueClient->createJob($orchestrationJob);
103+
$phaseJobIds = [];
104+
foreach ($configurationData['phases'] as $phase) {
105+
$phaseTasks = array_filter(
106+
$configurationData['tasks'],
107+
fn ($task) => $task['phase'] === $phase['id'],
108+
);
109+
$configData = [
110+
'tasks' => $phaseTasks,
111+
'phaseId' => $phase['id'],
112+
'dependsOn' => $phase['dependsOn'],
113+
'orchestrationJobId' => $orchestrationJob->getId(),
114+
];
115+
$phaseJob = $queueClient->createJob($this->getNewJobFactory()->createNewJob(
116+
[
117+
'#tokenString' => getenv('TEST_STORAGE_API_TOKEN'),
118+
'configData' => $configData,
119+
'componentId' => JobFactory::ORCHESTRATOR_COMPONENT,
120+
'configId' => $orchestrationConfigurationId,
121+
'mode' => 'run',
122+
'orchestrationJobId' => $orchestrationJob->getId(),
123+
'parentRunId' => $orchestrationJob->getId(),
124+
],
125+
));
126+
$phaseJobIds[$phase['id']] = $phaseJob->getId();
127+
}
128+
$jobIds = [];
129+
foreach ($configurationData['tasks'] as $task) {
130+
$jobIds[] = $queueClient->createJob($this->getNewJobFactory()->createNewJob(
131+
[
132+
'#tokenString' => getenv('TEST_STORAGE_API_TOKEN'),
133+
'componentId' => $task['task']['componentId'],
134+
'configData' => $task['task']['configData'],
135+
'mode' => $task['task']['mode'],
136+
'orchestrationJobId' => $orchestrationJob->getId(),
137+
'orchestrationTaskId' => (string) $task['id'],
138+
'parentRunId' => $orchestrationJob->getId() . '.' . $phaseJobIds[$task['phase']],
139+
],
140+
))->getId();
141+
}
142+
/*
143+
$jobListOptions = new JobListOptions();
144+
$jobListOptions->setParentRunId($orchestrationJob->getId());
145+
$jobs = $this->getClient()->listJobs($jobListOptions, true);
146+
foreach ($jobs as $job) {
147+
var_dump($job->jsonSerialize());
148+
}
149+
*/
150+
return [
151+
'orchestrationJobId' => $orchestrationJob->getId(),
152+
'orchestrationConfigurationId' => $orchestrationConfigurationId,
153+
'jobIds' => $jobIds,
154+
];
155+
}
156+
157+
public function testMatcher(): void
158+
{
159+
$client = $this->getClient();
160+
$result = $this->createOrchestrationLikeJobs($this->getOrchestrationConfiguration());
161+
extract($result);
162+
163+
$matcher = new JobMatcher($client);
164+
$results = $matcher->matchTaskJobsForOrchestrationJob($orchestrationJobId);
165+
self::assertEquals(
166+
new JobMatcherResults(
167+
$orchestrationJobId,
168+
$orchestrationConfigurationId,
169+
[
170+
new MatchedTask(
171+
'30679',
172+
$jobIds[0],
173+
'keboola.ex-db-snowflake',
174+
null,
175+
'created',
176+
),
177+
new MatchedTask(
178+
'92543',
179+
$jobIds[1],
180+
'keboola.snowflake-transformation',
181+
null,
182+
'created',
183+
),
184+
new MatchedTask(
185+
'25052',
186+
$jobIds[2],
187+
'keboola.ex-sample-data',
188+
null,
189+
'created',
190+
),
191+
],
192+
),
193+
$results,
194+
);
195+
}
196+
}

0 commit comments

Comments
 (0)