44
55namespace Keboola \JobQueueInternalClient ;
66
7- use Keboola \JobQueueInternalClient \DataPlane \DataPlaneConfigRepository ;
87use Keboola \JobQueueInternalClient \Exception \ClientException ;
9- use Keboola \JobQueueInternalClient \JobFactory \Behavior ;
108use Keboola \JobQueueInternalClient \JobFactory \FullJobDefinition ;
11- use Keboola \JobQueueInternalClient \JobFactory \Job ;
12- use Keboola \JobQueueInternalClient \JobFactory \JobInterface ;
13- use Keboola \JobQueueInternalClient \JobFactory \JobRuntimeResolver ;
149use Keboola \JobQueueInternalClient \JobFactory \NewJobDefinition ;
15- use Keboola \ObjectEncryptor \ObjectEncryptor ;
16- use Keboola \StorageApi \ClientException as StorageClientException ;
17- use Keboola \StorageApiBranch \Factory \ClientOptions ;
18- use Keboola \StorageApiBranch \Factory \StorageClientPlainFactory ;
1910use Symfony \Component \Config \Definition \Exception \InvalidConfigurationException ;
2011
21- class JobFactory
12+ abstract class JobFactory
2213{
2314 public const STATUS_CANCELLED = 'cancelled ' ;
2415 public const STATUS_CREATED = 'created ' ;
@@ -43,26 +34,6 @@ class JobFactory
4334
4435 public const PAY_AS_YOU_GO_FEATURE = 'pay-as-you-go ' ;
4536
46- private StorageClientPlainFactory $ storageClientFactory ;
47- private JobRuntimeResolver $ jobRuntimeResolver ;
48- private ObjectEncryptor $ controlPlaneObjectEncryptor ;
49- private DataPlaneConfigRepository $ dataPlaneConfigRepository ;
50- private bool $ supportsDataPlanes ;
51-
52- public function __construct (
53- StorageClientPlainFactory $ storageClientFactory ,
54- JobRuntimeResolver $ jobRuntimeResolver ,
55- ObjectEncryptor $ controlPlaneEncryptor ,
56- DataPlaneConfigRepository $ dataPlaneConfigRepository ,
57- bool $ supportsDataPlanes
58- ) {
59- $ this ->storageClientFactory = $ storageClientFactory ;
60- $ this ->jobRuntimeResolver = $ jobRuntimeResolver ;
61- $ this ->controlPlaneObjectEncryptor = $ controlPlaneEncryptor ;
62- $ this ->dataPlaneConfigRepository = $ dataPlaneConfigRepository ;
63- $ this ->supportsDataPlanes = $ supportsDataPlanes ;
64- }
65-
6637 public static function getFinishedStatuses (): array
6738 {
6839 return [self ::STATUS_SUCCESS , self ::STATUS_WARNING , self ::STATUS_ERROR , self ::STATUS_CANCELLED ,
@@ -102,141 +73,15 @@ public static function getAllowedParallelismValues(): array
10273 return array_merge ($ intValues , ['infinity ' , null ]);
10374 }
10475
105- public function createNewJob (array $ data ): JobInterface
106- {
107- $ data = $ this ->validateJobData ($ data , NewJobDefinition::class);
108-
109- try {
110- $ client = $ this ->storageClientFactory ->createClientWrapper (new ClientOptions (
111- null ,
112- $ data ['#tokenString ' ]
113- ))->getBasicClient ();
114- $ tokenInfo = $ client ->verifyToken ();
115- $ jobId = $ client ->generateId ();
116- $ runId = empty ($ data ['parentRunId ' ]) ? $ jobId :
117- $ data ['parentRunId ' ] . JobInterface::RUN_ID_DELIMITER . $ jobId ;
118- } catch (StorageClientException $ e ) {
119- throw new ClientException (
120- 'Cannot create job: " ' . $ e ->getMessage () . '". ' ,
121- $ e ->getCode (),
122- $ e
123- );
124- }
125-
126- if (!empty ($ data ['variableValuesId ' ]) && !empty ($ data ['variableValuesData ' ]['values ' ])) {
127- throw new ClientException (
128- 'Provide either "variableValuesId" or "variableValuesData", but not both. '
129- );
130- }
131-
132- if ($ this ->supportsDataPlanes ) {
133- $ dataPlaneConfig = $ this ->dataPlaneConfigRepository ->fetchProjectDataPlane (
134- (string ) $ tokenInfo ['owner ' ]['id ' ],
135- );
136- } else {
137- $ dataPlaneConfig = null ;
138- }
139-
140- if ($ dataPlaneConfig !== null ) {
141- $ objectEncryptor = $ dataPlaneConfig ->getEncryption ()->createEncryptor ();
142- } else {
143- $ objectEncryptor = $ this ->controlPlaneObjectEncryptor ;
144- }
145-
146- $ jobData = [
147- 'id ' => $ jobId ,
148- 'runId ' => $ runId ,
149- 'projectId ' => $ tokenInfo ['owner ' ]['id ' ],
150- 'projectName ' => $ tokenInfo ['owner ' ]['name ' ],
151- 'dataPlaneId ' => $ dataPlaneConfig ? $ dataPlaneConfig ->getId () : null ,
152- 'tokenId ' => $ tokenInfo ['id ' ],
153- '#tokenString ' => $ data ['#tokenString ' ],
154- 'tokenDescription ' => $ tokenInfo ['description ' ],
155- 'status ' => self ::STATUS_CREATED ,
156- 'desiredStatus ' => self ::DESIRED_STATUS_PROCESSING ,
157- 'mode ' => $ data ['mode ' ],
158- 'componentId ' => $ data ['componentId ' ],
159- 'configId ' => $ data ['configId ' ] ?? null ,
160- 'configData ' => $ data ['configData ' ] ?? null ,
161- 'configRowIds ' => $ data ['configRowIds ' ] ?? null ,
162- 'tag ' => $ data ['tag ' ] ?? null ,
163- 'parallelism ' => $ data ['parallelism ' ] ?? null ,
164- 'backend ' => $ data ['backend ' ] ?? null ,
165- 'behavior ' => $ data ['behavior ' ] ?? (new Behavior ())->toDataArray (),
166- 'result ' => [],
167- 'usageData ' => [],
168- 'isFinished ' => false ,
169- 'branchId ' => $ data ['branchId ' ] ?? null ,
170- 'variableValuesId ' => $ data ['variableValuesId ' ] ?? null ,
171- 'variableValuesData ' => $ data ['variableValuesData ' ] ?? [],
172- 'orchestrationJobId ' => $ data ['orchestrationJobId ' ] ?? null ,
173- ];
174-
175- $ jobData = $ this ->jobRuntimeResolver ->resolveJobData ($ jobData , $ tokenInfo );
176- // set type after resolving parallelism
177- $ jobData ['type ' ] = $ data ['type ' ] ?? $ this ->getJobType ($ jobData );
178-
179- $ data = $ objectEncryptor ->encryptForProject (
180- $ jobData ,
181- (string ) $ data ['componentId ' ],
182- (string ) $ tokenInfo ['owner ' ]['id ' ]
183- );
184-
185- $ data = $ this ->validateJobData ($ data , FullJobDefinition::class);
186- return new Job ($ objectEncryptor , $ this ->storageClientFactory , $ data );
187- }
188-
189- public function loadFromExistingJobData (array $ data ): JobInterface
190- {
191- $ data = $ this ->validateJobData ($ data , FullJobDefinition::class);
192-
193- // combination $this->supportsDataPlanes === false && data['dataPlaneId'] !== null should be considered an error
194- // in the future, but we can't do that now as Job Runner does use this now but knows nothing about the data
195- // plane
196-
197- if ($ this ->supportsDataPlanes && ($ data ['dataPlaneId ' ] ?? null )) {
198- $ dataPlaneConfig = $ this ->dataPlaneConfigRepository ->fetchDataPlaneConfig ($ data ['dataPlaneId ' ]);
199- $ objectEncryptor = $ dataPlaneConfig ->getEncryption ()->createEncryptor ();
200- } else {
201- $ objectEncryptor = $ this ->controlPlaneObjectEncryptor ;
202- }
203-
204- return new Job ($ objectEncryptor , $ this ->storageClientFactory , $ data );
205- }
206-
207- public function modifyJob (JobInterface $ job , array $ patchData ): JobInterface
208- {
209- $ data = $ job ->jsonSerialize ();
210- $ data = array_replace_recursive ($ data , $ patchData );
211-
212- return $ this ->loadFromExistingJobData ($ data );
213- }
214-
21576 /**
21677 * @param class-string<FullJobDefinition|NewJobDefinition> $validatorClass
21778 */
218- private function validateJobData (array $ data , string $ validatorClass ): array
79+ protected function validateJobData (array $ data , string $ validatorClass ): array
21980 {
22081 try {
22182 return (new $ validatorClass ())->processData ($ data );
22283 } catch (InvalidConfigurationException $ e ) {
22384 throw new ClientException ($ e ->getMessage (), $ e ->getCode (), $ e );
22485 }
22586 }
226-
227- private function getJobType (array $ data ): string
228- {
229- if ((intval ($ data ['parallelism ' ]) > 0 ) || $ data ['parallelism ' ] === self ::PARALLELISM_INFINITY ) {
230- return self ::TYPE_ROW_CONTAINER ;
231- } else {
232- if ($ data ['componentId ' ] === self ::ORCHESTRATOR_COMPONENT ) {
233- if (isset ($ data ['configData ' ]['phaseId ' ]) && (string ) ($ data ['configData ' ]['phaseId ' ]) !== '' ) {
234- return self ::TYPE_PHASE_CONTAINER ;
235- } else {
236- return self ::TYPE_ORCHESTRATION_CONTAINER ;
237- }
238- }
239- }
240- return self ::TYPE_STANDARD ;
241- }
24287}
0 commit comments