Skip to content

Commit a832f92

Browse files
authored
Merge pull request #487 from keboola/PST-2444-ondra
keboola.flow is orchestration container job type
2 parents 209eac8 + e695eba commit a832f92

File tree

5 files changed

+113
-2
lines changed

5 files changed

+113
-2
lines changed

src/JobFactory.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
abstract class JobFactory
1313
{
1414
public const ORCHESTRATOR_COMPONENT = 'keboola.orchestrator';
15+
16+
public const FLOW_COMPONENT = 'keboola.flow';
17+
1518
public const PROTECTED_DEFAULT_BRANCH_FEATURE = 'protected-default-branch';
1619

1720
/**

src/JobFactory/JobRuntimeResolver.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,9 @@ private function resolveJobType(array $jobData): JobType
275275
if ((intval($jobData['parallelism']) > 0) || $jobData['parallelism'] === JobInterface::PARALLELISM_INFINITY) {
276276
return JobType::ROW_CONTAINER;
277277
} else {
278-
if ($jobData['componentId'] === JobFactory::ORCHESTRATOR_COMPONENT) {
278+
if ($jobData['componentId'] === JobFactory::FLOW_COMPONENT) {
279+
return JobType::ORCHESTRATION_CONTAINER;
280+
} elseif ($jobData['componentId'] === JobFactory::ORCHESTRATOR_COMPONENT) {
279281
if (isset($jobData['configData']['phaseId']) && (string) ($jobData['configData']['phaseId']) !== '') {
280282
return JobType::PHASE_CONTAINER;
281283
} else {

src/Orchestration/OrchestrationJobMatcher.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private function validateInputs(JobInterface $job, array $configuration): void
9292
/* since the matcher accepts a jobId, we need to check that it is a sort of sensible input -
9393
the main use case is root orchestration job, but it seems that a phaseContainer might be
9494
equally valid input. */
95-
if ($job->getComponentId() !== JobFactory::ORCHESTRATOR_COMPONENT) {
95+
if (!in_array($job->getComponentId(), [JobFactory::ORCHESTRATOR_COMPONENT, JobFactory::FLOW_COMPONENT], true)) {
9696
throw new OrchestrationJobMatcherValidationException(sprintf(
9797
'Job "%s" is not an orchestration job.',
9898
$job->getId(),

tests/JobFactory/JobRuntimeResolverTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ public function resolveDefaultBackendContextData(): Generator
6969
JobType::ORCHESTRATION_CONTAINER,
7070
null,
7171
];
72+
yield 'flow job' => [
73+
JobFactory::FLOW_COMPONENT,
74+
[],
75+
JobType::ORCHESTRATION_CONTAINER,
76+
null,
77+
];
7278
}
7379

7480
/**

tests/Orchestration/OrchestrationJobMatcherTest.php

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,106 @@ public function testMatcherPartial(): void
293293
$componentsApi->deleteConfiguration(JobFactory::ORCHESTRATOR_COMPONENT, $orchestrationConfigurationId);
294294
}
295295

296+
public function testMatcherFlowComponent(): void
297+
{
298+
$client = $this->getClient();
299+
$configuration = $this->getOrchestrationConfiguration();
300+
301+
// Vytvoříme nový job s FLOW_COMPONENT
302+
$storageClient = new StorageClient([
303+
'token' => self::getRequiredEnv('TEST_STORAGE_API_TOKEN_MASTER'),
304+
'url' => self::getRequiredEnv('TEST_STORAGE_API_URL'),
305+
]);
306+
307+
// create token for job
308+
$tokenOptions = (new TokenCreateOptions())
309+
->setDescription(__CLASS__)
310+
->setCanManageBuckets(true)
311+
->setExpiresIn(60 * 5)
312+
;
313+
314+
$tokens = new Tokens($storageClient);
315+
$token = $tokens->createToken($tokenOptions);
316+
317+
$componentsApi = new Components($storageClient);
318+
$componentConfig = new Configuration();
319+
$componentConfig->setConfiguration($configuration);
320+
$componentConfig->setName('testMatcherFlowComponent');
321+
$componentConfig->setComponentId(JobFactory::FLOW_COMPONENT);
322+
$this->componentId = JobFactory::FLOW_COMPONENT;
323+
$this->configurationId = $componentsApi->addConfiguration($componentConfig)['id'];
324+
325+
$flowJob = $this->getNewJobFactory()->createNewJob(
326+
[
327+
'#tokenString' => $token['token'],
328+
'configData' => [],
329+
'componentId' => JobFactory::FLOW_COMPONENT,
330+
'configId' => $this->configurationId,
331+
'mode' => 'run',
332+
'parentRunId' => '',
333+
'orchestrationJobId' => null,
334+
],
335+
);
336+
$client->createJob($flowJob);
337+
338+
// Vytvoříme child jobs
339+
$jobIds = [];
340+
foreach ($configuration['tasks'] as $task) {
341+
$jobIds[] = $client->createJob($this->getNewJobFactory()->createNewJob(
342+
[
343+
'#tokenString' => $token['token'],
344+
'componentId' => $task['task']['componentId'],
345+
'configData' => $task['task']['configData'],
346+
'mode' => $task['task']['mode'],
347+
'orchestrationJobId' => $flowJob->getId(),
348+
'orchestrationTaskId' => (string) $task['id'],
349+
'parentRunId' => $flowJob->getId(),
350+
],
351+
))->getId();
352+
}
353+
354+
$tokens->dropToken($token['id']);
355+
356+
$matcher = new OrchestrationJobMatcher($client, $this->createStorageClientPlainFactory());
357+
$results = $matcher->matchTaskJobsForOrchestrationJob(
358+
$flowJob->getId(),
359+
self::getRequiredEnv('TEST_STORAGE_API_TOKEN'),
360+
);
361+
self::assertEquals(
362+
new OrchestrationJobMatcherResults(
363+
$flowJob->getId(),
364+
$this->configurationId,
365+
[
366+
new OrchestrationTaskMatched(
367+
'30679',
368+
true,
369+
$jobIds[0],
370+
'keboola.ex-db-snowflake',
371+
null,
372+
'created',
373+
),
374+
new OrchestrationTaskMatched(
375+
'92543',
376+
true,
377+
$jobIds[1],
378+
'keboola.snowflake-transformation',
379+
null,
380+
'created',
381+
),
382+
new OrchestrationTaskMatched(
383+
'25052',
384+
true,
385+
$jobIds[2],
386+
'keboola.ex-sample-data',
387+
null,
388+
'created',
389+
),
390+
],
391+
),
392+
$results,
393+
);
394+
}
395+
296396
/**
297397
* @dataProvider invalidConfigurationProvider
298398
*/

0 commit comments

Comments
 (0)