Skip to content

Commit ccf5875

Browse files
authored
Merge pull request #365 from keboola/erik-SOX-461
re-run orchestration fixup
2 parents 6c031d1 + d8a9125 commit ccf5875

File tree

6 files changed

+202
-40
lines changed

6 files changed

+202
-40
lines changed

src/JobFactory/Job.php

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -381,19 +381,10 @@ public function getComponentConfiguration(): array
381381
if ($this->componentConfiguration !== null) {
382382
return $this->componentConfiguration;
383383
}
384-
385-
if (!$this->getConfigId()) {
386-
throw new ClientException('Can\'t fetch component configuration: job has no configId set');
387-
}
388-
389-
try {
390-
return $this->componentConfiguration = $this->getComponentsApiClient()->getConfiguration(
391-
$this->getComponentId(),
392-
$this->getConfigId(),
393-
);
394-
} catch (StorageApiClientException $e) {
395-
throw new ClientException('Failed to fetch component configuration: '.$e->getMessage(), 0, $e);
396-
}
384+
return $this->componentConfiguration = JobConfigurationResolver::resolveJobConfiguration(
385+
$this,
386+
$this->getComponentsApiClient(),
387+
);
397388
}
398389

399390
private function getComponentsApiClient(): ComponentsApiClient
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\JobFactory;
6+
7+
use Keboola\JobQueueInternalClient\Exception\ClientException;
8+
use Keboola\StorageApi\ClientException as StorageApiClientException;
9+
use Keboola\StorageApi\Components as ComponentsApiClient;
10+
11+
class JobConfigurationResolver
12+
{
13+
public static function resolveJobConfiguration(JobInterface $job, ComponentsApiClient $componentsApiClient): array
14+
{
15+
if (!$job->getConfigId()) {
16+
throw new ClientException('Can\'t fetch component configuration: job has no configId set');
17+
}
18+
19+
try {
20+
return $componentsApiClient->getConfiguration(
21+
$job->getComponentId(),
22+
$job->getConfigId(),
23+
);
24+
} catch (StorageApiClientException $e) {
25+
throw new ClientException('Failed to fetch component configuration: '.$e->getMessage(), 0, $e);
26+
}
27+
}
28+
}

src/Orchestration/OrchestrationJobMatcher.php

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,34 @@
55
namespace Keboola\JobQueueInternalClient\Orchestration;
66

77
use Keboola\JobQueueInternalClient\Client;
8-
use Keboola\JobQueueInternalClient\Exception\ClientException;
98
use Keboola\JobQueueInternalClient\Exception\OrchestrationJobMatcherValidationException;
109
use Keboola\JobQueueInternalClient\JobFactory;
11-
use Keboola\JobQueueInternalClient\JobFactory\Job;
1210
use Keboola\JobQueueInternalClient\JobFactory\JobInterface;
1311
use Keboola\JobQueueInternalClient\JobListOptions;
12+
use Keboola\StorageApi\Components;
13+
use Keboola\StorageApiBranch\Factory\ClientOptions;
14+
use Keboola\StorageApiBranch\Factory\StorageClientPlainFactory;
15+
use SensitiveParameter;
1416

1517
// https://keboola.atlassian.net/wiki/spaces/ENGG/pages/3074195457/DRAFT+RFC-2023-011+-+Rerun+orchestration#Pair-Jobs-and-Tasks
1618
class OrchestrationJobMatcher
1719
{
1820
public function __construct(
1921
private readonly Client $internalClient,
22+
private readonly StorageClientPlainFactory $storageClientFactory,
2023
) {
2124
}
2225

23-
public function matchTaskJobsForOrchestrationJob(string $jobId): OrchestrationJobMatcherResults
24-
{
26+
public function matchTaskJobsForOrchestrationJob(
27+
string $jobId,
28+
#[SensitiveParameter] string $token,
29+
): OrchestrationJobMatcherResults {
2530
$job = $this->internalClient->getJob($jobId);
2631
$childJobs = $this->getOrchestrationTaskJobs($job);
27-
$configuration = $this->getCurrentOrchestrationConfiguration($job);
32+
$configuration = $this->getCurrentOrchestrationConfiguration(
33+
$job,
34+
$this->createComponentsApi($token, $job->getBranchId()),
35+
);
2836
$this->validateInputs($job, $configuration);
2937
$matchedTasks = [];
3038
foreach ($configuration['tasks'] as $task) {
@@ -49,7 +57,7 @@ public function matchTaskJobsForOrchestrationJob(string $jobId): OrchestrationJo
4957
}
5058
}
5159
return new OrchestrationJobMatcherResults(
52-
$jobId,
60+
$job->getId(),
5361
$job->getConfigId(),
5462
$matchedTasks,
5563
);
@@ -66,13 +74,17 @@ private function getOrchestrationTaskJobs(JobInterface $job): array
6674
);
6775
}
6876

69-
private function getCurrentOrchestrationConfiguration(JobInterface $job): array
77+
private function getCurrentOrchestrationConfiguration(JobInterface $job, Components $componentsApi): array
7078
{
7179
$configuration = $job->getConfigData();
72-
if (!$configuration) {
73-
$configuration = $job->getComponentConfiguration()['configuration'];
74-
};
75-
return $configuration;
80+
if ($configuration) {
81+
return $configuration;
82+
}
83+
84+
return JobFactory\JobConfigurationResolver::resolveJobConfiguration(
85+
$job,
86+
$componentsApi,
87+
)['configuration'];
7688
}
7789

7890
private function validateInputs(JobInterface $job, array $configuration): void
@@ -101,4 +113,17 @@ private function validateInputs(JobInterface $job, array $configuration): void
101113
}
102114
});
103115
}
116+
117+
private function createComponentsApi(
118+
#[SensitiveParameter] string $token,
119+
?string $branchId,
120+
): Components {
121+
return new Components(
122+
$this->storageClientFactory->createClientWrapper(
123+
(new ClientOptions())
124+
->setBranchId($branchId)
125+
->setToken($token),
126+
)->getBranchClient(),
127+
);
128+
}
104129
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\JobQueueInternalClient\Tests\JobFactory;
6+
7+
use Keboola\JobQueueInternalClient\Exception\ClientException;
8+
use Keboola\JobQueueInternalClient\JobFactory\JobConfigurationResolver;
9+
use Keboola\JobQueueInternalClient\JobFactory\JobInterface;
10+
use Keboola\StorageApi\ClientException as StorageApiClientException;
11+
use Keboola\StorageApi\Components;
12+
use PHPUnit\Framework\TestCase;
13+
14+
class JobConfigurationResolverTest extends TestCase
15+
{
16+
public function testResolveJobConfiguration(): void
17+
{
18+
$jobMock = $this->createMock(JobInterface::class);
19+
$jobMock->expects(self::exactly(2))
20+
->method('getConfigId')
21+
->willReturn('123')
22+
;
23+
$jobMock->expects(self::once())
24+
->method('getComponentId')
25+
->willReturn('keboola.test-component')
26+
;
27+
28+
$componentsApiClientMock = $this->createMock(Components::class);
29+
$componentsApiClientMock->expects(self::once())
30+
->method('getConfiguration')
31+
->with('keboola.test-component', '123')
32+
->willReturn(['id' => '123', 'name' => 'test'])
33+
;
34+
35+
self::assertSame(
36+
['id' => '123', 'name' => 'test'],
37+
JobConfigurationResolver::resolveJobConfiguration($jobMock, $componentsApiClientMock),
38+
);
39+
}
40+
41+
public function testResolveJobConfigurationFailsForNoConfigId(): void
42+
{
43+
$jobMock = $this->createMock(JobInterface::class);
44+
$jobMock->expects(self::once())
45+
->method('getConfigId')
46+
->willReturn(null)
47+
;
48+
49+
$this->expectExceptionMessage('Can\'t fetch component configuration: job has no configId set');
50+
JobConfigurationResolver::resolveJobConfiguration(
51+
$jobMock,
52+
$this->createMock(Components::class),
53+
);
54+
}
55+
56+
public function testResolveJobConfigurationHandlesStorageApiErrors(): void
57+
{
58+
$jobMock = $this->createMock(JobInterface::class);
59+
$jobMock->expects(self::exactly(2))
60+
->method('getConfigId')
61+
->willReturn('123')
62+
;
63+
$jobMock->expects(self::once())
64+
->method('getComponentId')
65+
->willReturn('keboola.test-component')
66+
;
67+
68+
$componentsApiClientMock = $this->createMock(Components::class);
69+
$componentsApiClientMock->expects(self::once())
70+
->method('getConfiguration')
71+
->with('keboola.test-component', '123')
72+
->willThrowException(new StorageApiClientException('Sample error'))
73+
;
74+
75+
$this->expectExceptionMessage('Failed to fetch component configuration: Sample error');
76+
$this->expectException(ClientException::class);
77+
JobConfigurationResolver::resolveJobConfiguration($jobMock, $componentsApiClientMock);
78+
}
79+
}

tests/JobFactory/JobTest.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -653,10 +653,17 @@ public function testGetComponentConfiguration(): void
653653
public function testGetComponentConfigurationWhenJobHasNoConfigIdSet(): void
654654
{
655655
$storageClientFactory = $this->createMock(StorageClientPlainFactory::class);
656-
$storageClientFactory->expects(self::never())->method(self::anything());
656+
$storageClientFactory->expects(self::once())
657+
->method('createClientWrapper')
658+
->willReturn($this->createMock(ClientWrapper::class))
659+
;
657660

658661
$objectEncryptorMock = $this->createMock(JobObjectEncryptor::class);
659-
$objectEncryptorMock->expects(self::never())->method('decrypt');
662+
$objectEncryptorMock->expects(self::once())
663+
->method('decrypt')
664+
->with('KBC::ProjectSecure::token')
665+
->willReturn('token-decrypted')
666+
;
660667

661668
$jobData = $this->jobData;
662669
$jobData['configId'] = null;

tests/Orchestration/OrchestrationJobMatcherTest.php

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Keboola\JobQueueInternalClient\Tests\Orchestration;
66

77
use Generator;
8+
use Keboola\JobQueueInternalClient\Client;
89
use Keboola\JobQueueInternalClient\Exception\OrchestrationJobMatcherValidationException;
910
use Keboola\JobQueueInternalClient\JobFactory;
1011
use Keboola\JobQueueInternalClient\Orchestration\OrchestrationJobMatcher;
@@ -14,6 +15,10 @@
1415
use Keboola\StorageApi\Client as StorageClient;
1516
use Keboola\StorageApi\Components;
1617
use Keboola\StorageApi\Options\Components\Configuration;
18+
use Keboola\StorageApi\Options\TokenCreateOptions;
19+
use Keboola\StorageApi\Tokens;
20+
use Keboola\StorageApiBranch\Factory\ClientOptions;
21+
use Keboola\StorageApiBranch\Factory\StorageClientPlainFactory;
1722

1823
class OrchestrationJobMatcherTest extends BaseClientFunctionalTest
1924
{
@@ -96,9 +101,20 @@ private function getOrchestrationConfiguration(): array
96101
private function createOrchestrationLikeJobs(array $configurationData, array $createOnlyTasks): array
97102
{
98103
$storageClient = new StorageClient([
99-
'token' => (string) getenv('TEST_STORAGE_API_TOKEN'),
100-
'url' => (string) getenv('TEST_STORAGE_API_URL'),
104+
'token' => self::getRequiredEnv('TEST_STORAGE_API_TOKEN_MASTER'),
105+
'url' => self::getRequiredEnv('TEST_STORAGE_API_URL'),
101106
]);
107+
108+
// create token for job
109+
$tokenOptions = (new TokenCreateOptions())
110+
->setDescription(__CLASS__)
111+
->setCanManageBuckets(true) // access to all components :)
112+
->setExpiresIn(60 * 5)
113+
;
114+
115+
$tokens = new Tokens($storageClient);
116+
$token = $tokens->createToken($tokenOptions); // new token, created specially for this job
117+
102118
$queueClient = $this->getClient();
103119
$componentsApi = new Components($storageClient);
104120
$configuration = new Configuration();
@@ -107,9 +123,10 @@ private function createOrchestrationLikeJobs(array $configurationData, array $cr
107123
$configuration->setComponentId(JobFactory::ORCHESTRATOR_COMPONENT);
108124
$this->componentId = JobFactory::ORCHESTRATOR_COMPONENT;
109125
$this->configurationId = $componentsApi->addConfiguration($configuration)['id'];
126+
110127
$orchestrationJob = $this->getNewJobFactory()->createNewJob(
111128
[
112-
'#tokenString' => getenv('TEST_STORAGE_API_TOKEN'),
129+
'#tokenString' => $token['token'],
113130
'configData' => [],
114131
'componentId' => JobFactory::ORCHESTRATOR_COMPONENT,
115132
'configId' => $this->configurationId,
@@ -133,7 +150,7 @@ private function createOrchestrationLikeJobs(array $configurationData, array $cr
133150
];
134151
$phaseJob = $queueClient->createJob($this->getNewJobFactory()->createNewJob(
135152
[
136-
'#tokenString' => getenv('TEST_STORAGE_API_TOKEN'),
153+
'#tokenString' => $token['token'],
137154
'configData' => $configData,
138155
'componentId' => JobFactory::ORCHESTRATOR_COMPONENT,
139156
'configId' => $this->configurationId,
@@ -151,7 +168,7 @@ private function createOrchestrationLikeJobs(array $configurationData, array $cr
151168
}
152169
$jobIds[] = $queueClient->createJob($this->getNewJobFactory()->createNewJob(
153170
[
154-
'#tokenString' => getenv('TEST_STORAGE_API_TOKEN'),
171+
'#tokenString' => $token['token'],
155172
'componentId' => $task['task']['componentId'],
156173
'configData' => $task['task']['configData'],
157174
'mode' => $task['task']['mode'],
@@ -162,6 +179,8 @@ private function createOrchestrationLikeJobs(array $configurationData, array $cr
162179
))->getId();
163180
}
164181

182+
$tokens->dropToken($token['id']); // drob token
183+
165184
return [
166185
'orchestrationJobId' => $orchestrationJob->getId(),
167186
'orchestrationConfigurationId' => $this->configurationId,
@@ -178,8 +197,11 @@ public function testMatcherFull(): void
178197
'jobIds' => $jobIds,
179198
] = $this->createOrchestrationLikeJobs($this->getOrchestrationConfiguration(), []);
180199

181-
$matcher = new OrchestrationJobMatcher($client);
182-
$results = $matcher->matchTaskJobsForOrchestrationJob($orchestrationJobId);
200+
$matcher = new OrchestrationJobMatcher($client, $this->createStorageClientPlainFactory());
201+
$results = $matcher->matchTaskJobsForOrchestrationJob(
202+
$orchestrationJobId,
203+
self::getRequiredEnv('TEST_STORAGE_API_TOKEN'),
204+
);
183205
self::assertEquals(
184206
new OrchestrationJobMatcherResults(
185207
$orchestrationJobId,
@@ -224,8 +246,11 @@ public function testMatcherPartial(): void
224246
'jobIds' => $jobIds,
225247
] = $this->createOrchestrationLikeJobs($this->getOrchestrationConfiguration(), ['92543']);
226248

227-
$matcher = new OrchestrationJobMatcher($client);
228-
$results = $matcher->matchTaskJobsForOrchestrationJob($orchestrationJobId);
249+
$matcher = new OrchestrationJobMatcher($client, $this->createStorageClientPlainFactory());
250+
$results = $matcher->matchTaskJobsForOrchestrationJob(
251+
$orchestrationJobId,
252+
self::getRequiredEnv('TEST_STORAGE_API_TOKEN'),
253+
);
229254
self::assertEquals(
230255
new OrchestrationJobMatcherResults(
231256
$orchestrationJobId,
@@ -277,8 +302,8 @@ public function testMatcherInvalidConfiguration(
277302
string $expectedMessage,
278303
): void {
279304
$storageClient = new StorageClient([
280-
'token' => (string) getenv('TEST_STORAGE_API_TOKEN'),
281-
'url' => (string) getenv('TEST_STORAGE_API_URL'),
305+
'token' => self::getRequiredEnv('TEST_STORAGE_API_TOKEN'),
306+
'url' => self::getRequiredEnv('TEST_STORAGE_API_URL'),
282307
]);
283308
$queueClient = $this->getClient();
284309
$componentsApi = new Components($storageClient);
@@ -290,7 +315,7 @@ public function testMatcherInvalidConfiguration(
290315
$this->configurationId = $componentsApi->addConfiguration($configuration)['id'];
291316
$orchestrationJob = $this->getNewJobFactory()->createNewJob(
292317
[
293-
'#tokenString' => getenv('TEST_STORAGE_API_TOKEN'),
318+
'#tokenString' => self::getRequiredEnv('TEST_STORAGE_API_TOKEN'),
294319
'configData' => [],
295320
'componentId' => $componentId,
296321
'configId' => $this->configurationId,
@@ -300,10 +325,10 @@ public function testMatcherInvalidConfiguration(
300325
],
301326
);
302327
$orchestrationJobId = $queueClient->createJob($orchestrationJob)->getId();
303-
$matcher = new OrchestrationJobMatcher($queueClient);
328+
$matcher = new OrchestrationJobMatcher($queueClient, $this->createStorageClientPlainFactory());
304329
$this->expectException(OrchestrationJobMatcherValidationException::class);
305330
$this->expectExceptionMessageMatches($expectedMessage);
306-
$matcher->matchTaskJobsForOrchestrationJob($orchestrationJobId);
331+
$matcher->matchTaskJobsForOrchestrationJob($orchestrationJobId, self::getRequiredEnv('TEST_STORAGE_API_TOKEN'));
307332
}
308333

309334
public function invalidConfigurationProvider(): Generator
@@ -340,4 +365,11 @@ public function invalidConfigurationProvider(): Generator
340365
'expectedMessage' => '#Task does not have an id\. \({"name":"foo","phase":1,"task":{"componentId":"keboola.ex-db-snowflake","configData":\[\],"mode":"run"}}\)#',
341366
];
342367
}
368+
369+
private function createStorageClientPlainFactory(): StorageClientPlainFactory
370+
{
371+
return new StorageClientPlainFactory(new ClientOptions(
372+
self::getRequiredEnv('TEST_STORAGE_API_URL'),
373+
));
374+
}
343375
}

0 commit comments

Comments
 (0)