44
55namespace Keboola \JobQueueInternalClient \JobFactory ;
66
7- use Keboola \BillingApi \CreditsChecker ;
87use Keboola \JobQueueInternalClient \Exception \ClientException ;
98use Keboola \JobQueueInternalClient \Exception \ConfigurationDisabledException ;
9+ use Keboola \JobQueueInternalClient \JobFactory ;
1010use Keboola \StorageApi \ClientException as StorageClientException ;
1111use Keboola \StorageApi \Components ;
1212use Keboola \StorageApiBranch \Factory \ClientOptions ;
1818 */
1919class JobRuntimeResolver
2020{
21+ private const JOB_TYPES_WITH_DEFAULT_BACKEND = [
22+ JobInterface::TYPE_STANDARD ,
23+ ];
24+
2125 private const PAY_AS_YOU_GO_FEATURE = 'pay-as-you-go ' ;
2226
2327 private StorageClientPlainFactory $ storageClientFactory ;
@@ -37,18 +41,18 @@ public function resolveJobData(array $jobData, array $tokenInfo): array
3741
3842 try {
3943 $ this ->componentData = $ this ->getComponentsApiClient (null )
40- ->getComponent ($ this ->jobData ['componentId ' ]);
41-
42- $ tag = $ this ->resolveTag ();
44+ ->getComponent ($ jobData ['componentId ' ]);
45+ $ jobData ['tag ' ] = $ this ->resolveTag ($ jobData );
4346 $ variableValues = $ this ->resolveVariables ();
44- $ backend = $ this ->resolveBackend ($ tokenInfo );
45- $ parallelism = $ this ->resolveParallelism ();
47+ $ jobData ['parallelism ' ] = $ this ->resolveParallelism ($ jobData );
48+ // set type after resolving parallelism
49+ $ jobData ['type ' ] = $ this ->resolveJobType ($ jobData );
50+ // set backend after resolving type
51+ $ jobData ['backend ' ] = $ this ->resolveBackend ($ jobData , $ tokenInfo )->toDataArray ();
52+
4653 foreach ($ variableValues ->asDataArray () as $ key => $ value ) {
4754 $ jobData [$ key ] = $ value ;
4855 }
49- $ jobData ['backend ' ] = $ backend ->toDataArray ();
50- $ jobData ['tag ' ] = $ tag ;
51- $ jobData ['parallelism ' ] = $ parallelism ;
5256 return $ jobData ;
5357 } catch (InvalidConfigurationException $ e ) {
5458 throw new ClientException ('Invalid configuration: ' . $ e ->getMessage (), 0 , $ e );
@@ -57,10 +61,10 @@ public function resolveJobData(array $jobData, array $tokenInfo): array
5761 }
5862 }
5963
60- private function resolveTag (): string
64+ private function resolveTag (array $ jobData ): string
6165 {
62- if (!empty ($ this -> jobData ['tag ' ])) {
63- return (string ) $ this -> jobData ['tag ' ];
66+ if (!empty ($ jobData ['tag ' ])) {
67+ return (string ) $ jobData ['tag ' ];
6468 }
6569 if (!empty ($ this ->getConfigData ()['runtime ' ]['tag ' ])) {
6670 return (string ) $ this ->getConfigData ()['runtime ' ]['tag ' ];
@@ -75,7 +79,7 @@ private function resolveTag(): string
7579 if (!empty ($ this ->componentData ['data ' ]['definition ' ]['tag ' ])) {
7680 return $ this ->componentData ['data ' ]['definition ' ]['tag ' ];
7781 } else {
78- throw new ClientException (sprintf ('The component "%s" is not runnable. ' , $ this -> jobData ['componentId ' ]));
82+ throw new ClientException (sprintf ('The component "%s" is not runnable. ' , $ jobData ['componentId ' ]));
7983 }
8084 }
8185
@@ -96,19 +100,23 @@ private function resolveVariables(): VariableValues
96100 return VariableValues::fromDataArray ($ configuration );
97101 }
98102
99- private function getDefaultBackendContext (string $ componentType ): string
103+ private function getDefaultBackendContext (array $ jobData , string $ componentType ): ? string
100104 {
105+ if (!in_array ($ jobData ['type ' ], self ::JOB_TYPES_WITH_DEFAULT_BACKEND )) {
106+ return null ;
107+ }
108+
101109 return sprintf (
102110 '%s-%s ' ,
103- $ this -> jobData ['projectId ' ],
111+ $ jobData ['projectId ' ],
104112 $ componentType
105113 );
106114 }
107115
108- private function getBackend (): Backend
116+ private function getBackend (array $ jobData ): Backend
109117 {
110- if (!empty ($ this -> jobData ['backend ' ])) {
111- $ backend = Backend::fromDataArray ($ this -> jobData ['backend ' ]);
118+ if (!empty ($ jobData ['backend ' ])) {
119+ $ backend = Backend::fromDataArray ($ jobData ['backend ' ]);
112120 if (!$ backend ->isEmpty ()) {
113121 return $ backend ;
114122 }
@@ -127,21 +135,24 @@ private function getBackend(): Backend
127135 return new Backend (null , null , null );
128136 }
129137
130- private function resolveBackend (array $ tokenInfo ): Backend
138+ private function resolveBackend (array $ jobData , array $ tokenInfo ): Backend
131139 {
132- $ tempBackend = $ this ->getBackend ();
140+ $ tempBackend = $ this ->getBackend ($ jobData );
133141
134142 if ($ tempBackend ->isEmpty ()) {
135143 return new Backend (
136144 $ tempBackend ->getType (),
137145 $ tempBackend ->getContainerType (),
138- $ this ->getDefaultBackendContext ($ this ->componentData ['type ' ])
146+ $ this ->getDefaultBackendContext ($ jobData , $ this ->componentData ['type ' ])
139147 );
140148 }
141149
142150 // decide whether to set "type' (aka workspaceSize) or containerType (aka containerSize)
143151 $ stagingStorage = $ this ->componentData ['data ' ]['staging_storage ' ]['input ' ] ?? '' ;
144- $ backendContext = $ tempBackend ->getContext () ?? $ this ->getDefaultBackendContext ($ this ->componentData ['type ' ]);
152+ $ backendContext = $ tempBackend ->getContext () ?? $ this ->getDefaultBackendContext (
153+ $ jobData ,
154+ $ this ->componentData ['type ' ]
155+ );
145156
146157 /* Possible values of staging storage: https://github.com/keboola/docker-bundle/blob/ec9a628b614a70d0ed8a6ec36f2b6003a8e07ed4/src/Docker/Configuration/Component.php#L87
147158 For the purpose of setting backend, we consider: 'local', 's3', 'abs', 'none' to use container.
@@ -162,10 +173,10 @@ private function resolveBackend(array $tokenInfo): Backend
162173 return new Backend (null , null , $ backendContext );
163174 }
164175
165- private function resolveParallelism (): ?string
176+ private function resolveParallelism (array $ jobData ): ?string
166177 {
167- if (isset ($ this -> jobData ['parallelism ' ]) && ($ this -> jobData ['parallelism ' ] !== null )) {
168- return (string ) $ this -> jobData ['parallelism ' ];
178+ if (isset ($ jobData ['parallelism ' ]) && ($ jobData ['parallelism ' ] !== null )) {
179+ return (string ) $ jobData ['parallelism ' ];
169180 }
170181 if (isset ($ this ->getConfigData ()['runtime ' ]['parallelism ' ])
171182 && $ this ->getConfigData ()['runtime ' ]['parallelism ' ] !== null ) {
@@ -233,4 +244,24 @@ private function resolveIsForceRunMode(): bool
233244 {
234245 return isset ($ this ->jobData ['mode ' ]) && $ this ->jobData ['mode ' ] === JobInterface::MODE_FORCE_RUN ;
235246 }
247+
248+ private function resolveJobType (array $ jobData ): string
249+ {
250+ if (!empty ($ jobData ['type ' ])) {
251+ return (string ) $ jobData ['type ' ];
252+ }
253+
254+ if ((intval ($ jobData ['parallelism ' ]) > 0 ) || $ jobData ['parallelism ' ] === JobInterface::PARALLELISM_INFINITY ) {
255+ return JobInterface::TYPE_ROW_CONTAINER ;
256+ } else {
257+ if ($ jobData ['componentId ' ] === JobFactory::ORCHESTRATOR_COMPONENT ) {
258+ if (isset ($ jobData ['configData ' ]['phaseId ' ]) && (string ) ($ jobData ['configData ' ]['phaseId ' ]) !== '' ) {
259+ return JobInterface::TYPE_PHASE_CONTAINER ;
260+ } else {
261+ return JobInterface::TYPE_ORCHESTRATION_CONTAINER ;
262+ }
263+ }
264+ }
265+ return JobInterface::TYPE_STANDARD ;
266+ }
236267}
0 commit comments