66
77use Keboola \JobQueueInternalClient \Client ;
88use Keboola \JobQueueInternalClient \Exception \ClientException ;
9+ use Keboola \JobQueueInternalClient \Exception \OrchestrationJobMatcherValidationException ;
910use Keboola \JobQueueInternalClient \JobFactory ;
1011use Keboola \JobQueueInternalClient \JobFactory \Job ;
1112use Keboola \JobQueueInternalClient \JobFactory \JobInterface ;
1213use Keboola \JobQueueInternalClient \JobListOptions ;
1314
1415// https://keboola.atlassian.net/wiki/spaces/ENGG/pages/3074195457/DRAFT+RFC-2023-011+-+Rerun+orchestration#Pair-Jobs-and-Tasks
15- class JobMatcher
16+ class OrchestrationJobMatcher
1617{
1718 public function __construct (
1819 private readonly Client $ internalClient ,
1920 ) {
2021 }
2122
23+ public function matchTaskJobsForOrchestrationJob (string $ jobId ): OrchestrationJobMatcherResults
24+ {
25+ $ job = $ this ->internalClient ->getJob ($ jobId );
26+ $ childJobs = $ this ->getOrchestrationTaskJobs ($ job );
27+ $ configuration = $ this ->getCurrentOrchestrationConfiguration ($ job );
28+ $ this ->validateInputs ($ job , $ configuration );
29+ $ matchedTasks = [];
30+ foreach ($ configuration ['tasks ' ] as $ task ) {
31+ $ matched = false ;
32+ foreach ($ childJobs as $ index => $ childJob ) {
33+ if (((string ) $ task ['id ' ]) === $ childJob ->getOrchestrationTaskId ()) {
34+ $ matchedTasks [] = new OrchestrationTaskMatched (
35+ (string ) $ task ['id ' ],
36+ true ,
37+ $ childJob ->getId (),
38+ $ childJob ->getComponentId (),
39+ $ childJob ->getConfigId (),
40+ $ childJob ->getStatus (),
41+ );
42+ unset($ childJobs [$ index ]);
43+ $ matched = true ;
44+ break ;
45+ }
46+ }
47+ if (!$ matched ) {
48+ $ matchedTasks [] = new OrchestrationTaskMatched ((string ) $ task ['id ' ], false , null , null , null , null );
49+ }
50+ }
51+ return new OrchestrationJobMatcherResults (
52+ $ jobId ,
53+ $ job ->getConfigId (),
54+ $ matchedTasks ,
55+ );
56+ }
57+
2258 /**
2359 * @return array<JobInterface>
2460 */
2561 private function getOrchestrationTaskJobs (JobInterface $ job ): array
2662 {
27- $ childJobs = $ this ->internalClient ->listJobs (
63+ return $ this ->internalClient ->listJobs (
2864 (new JobListOptions ())->setParentRunId ($ job ->getId ()),
2965 true ,
3066 );
31- return $ childJobs ;
3267 }
3368
3469 private function getCurrentOrchestrationConfiguration (JobInterface $ job ): array
@@ -46,53 +81,24 @@ private function validateInputs(JobInterface $job, array $configuration): void
4681 the main use case is root orchestration job, but it seems that a phaseContainer might be
4782 equally valid input. */
4883 if ($ job ->getComponentId () !== JobFactory::ORCHESTRATOR_COMPONENT ) {
49- throw new ClientException (sprintf (
84+ throw new OrchestrationJobMatcherValidationException (sprintf (
5085 'Job "%s" is not an orchestration job. ' ,
5186 $ job ->getId (),
5287 ));
5388 }
5489 if (!isset ($ configuration ['tasks ' ]) || !is_array ($ configuration ['tasks ' ])) {
55- throw new ClientException (sprintf (
56- 'Orchestration "%s" does not have tasks . ' ,
90+ throw new OrchestrationJobMatcherValidationException (sprintf (
91+ 'Orchestration "%s" tasks must be an array . ' ,
5792 $ job ->getId (),
5893 ));
5994 }
6095 array_walk ($ configuration ['tasks ' ], function (array $ task ) {
6196 if (!isset ($ task ['id ' ])) {
62- throw new ClientException (sprintf (
97+ throw new OrchestrationJobMatcherValidationException (sprintf (
6398 'Task does not have an id. (%s) ' ,
6499 json_encode ($ task ),
65100 ));
66101 }
67102 });
68103 }
69-
70- public function matchTaskJobsForOrchestrationJob (string $ jobId ): JobMatcherResults
71- {
72- $ job = $ this ->internalClient ->getJob ($ jobId );
73- $ childJobs = $ this ->getOrchestrationTaskJobs ($ job );
74- $ configuration = $ this ->getCurrentOrchestrationConfiguration ($ job );
75- $ this ->validateInputs ($ job , $ configuration );
76- $ matchedTasks = [];
77- foreach ($ configuration ['tasks ' ] as $ task ) {
78- foreach ($ childJobs as $ index => $ childJob ) {
79- if ((string ) $ task ['id ' ] === $ childJob ->getOrchestrationTaskId ()) {
80- $ matchedTasks [] = new MatchedTask (
81- (string ) $ task ['id ' ],
82- $ childJob ->getId (),
83- $ childJob ->getComponentId (),
84- $ childJob ->getConfigId (),
85- $ childJob ->getStatus (),
86- );
87- unset($ childJobs [$ index ]);
88- break ;
89- }
90- }
91- }
92- return new JobMatcherResults (
93- $ jobId ,
94- $ job ->getConfigId (),
95- $ matchedTasks ,
96- );
97- }
98104}
0 commit comments