Skip to content

Commit c9916af

Browse files
Merge pull request #20 from hbelmiro/cherry-pick-cannot-save-parameters
fix(backend): fixes "cannot save parameter" error message. Fixes kubeflow#9678 (kubeflow#10459)
2 parents 57830bf + 90a92f9 commit c9916af

File tree

3 files changed

+126
-10
lines changed

3 files changed

+126
-10
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package main
2+
3+
type ExecutionPaths struct {
4+
ExecutionID string
5+
IterationCount string
6+
CachedDecision string
7+
Condition string
8+
PodSpecPatch string
9+
}

backend/src/v2/cmd/driver/main.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ import (
3737

3838
const (
3939
driverTypeArg = "type"
40+
ROOT_DAG = "ROOT_DAG"
41+
DAG = "DAG"
42+
CONTAINER = "CONTAINER"
4043
)
4144

4245
var (
@@ -160,12 +163,12 @@ func drive() (err error) {
160163
var execution *driver.Execution
161164
var driverErr error
162165
switch *driverType {
163-
case "ROOT_DAG":
166+
case ROOT_DAG:
164167
options.RuntimeConfig = runtimeConfig
165168
execution, driverErr = driver.RootDAG(ctx, options, client)
166-
case "DAG":
169+
case DAG:
167170
execution, driverErr = driver.DAG(ctx, options, client)
168-
case "CONTAINER":
171+
case CONTAINER:
169172
options.Container = containerSpec
170173
options.KubernetesExecutorConfig = k8sExecCfg
171174
execution, driverErr = driver.Container(ctx, options, client, cacheClient)
@@ -183,35 +186,60 @@ func drive() (err error) {
183186
err = driverErr
184187
}()
185188
}
189+
190+
executionPaths := &ExecutionPaths{
191+
ExecutionID: *executionIDPath,
192+
IterationCount: *iterationCountPath,
193+
CachedDecision: *cachedDecisionPath,
194+
Condition: *conditionPath,
195+
PodSpecPatch: *podSpecPatchPath}
196+
197+
return handleExecution(execution, *driverType, executionPaths)
198+
}
199+
200+
func handleExecution(execution *driver.Execution, driverType string, executionPaths *ExecutionPaths) error {
186201
if execution.ID != 0 {
187202
glog.Infof("output execution.ID=%v", execution.ID)
188-
if *executionIDPath != "" {
189-
if err = writeFile(*executionIDPath, []byte(fmt.Sprint(execution.ID))); err != nil {
203+
if executionPaths.ExecutionID != "" {
204+
if err := writeFile(executionPaths.ExecutionID, []byte(fmt.Sprint(execution.ID))); err != nil {
190205
return fmt.Errorf("failed to write execution ID to file: %w", err)
191206
}
192207
}
193208
}
194209
if execution.IterationCount != nil {
195-
if err = writeFile(*iterationCountPath, []byte(fmt.Sprintf("%v", *execution.IterationCount))); err != nil {
210+
if err := writeFile(executionPaths.IterationCount, []byte(fmt.Sprintf("%v", *execution.IterationCount))); err != nil {
196211
return fmt.Errorf("failed to write iteration count to file: %w", err)
197212
}
213+
} else {
214+
if driverType == ROOT_DAG {
215+
if err := writeFile(executionPaths.IterationCount, []byte("0")); err != nil {
216+
return fmt.Errorf("failed to write iteration count to file: %w", err)
217+
}
218+
}
198219
}
199220
if execution.Cached != nil {
200-
if err = writeFile(*cachedDecisionPath, []byte(strconv.FormatBool(*execution.Cached))); err != nil {
221+
if err := writeFile(executionPaths.CachedDecision, []byte(strconv.FormatBool(*execution.Cached))); err != nil {
201222
return fmt.Errorf("failed to write cached decision to file: %w", err)
202223
}
203224
}
204225
if execution.Condition != nil {
205-
if err = writeFile(*conditionPath, []byte(strconv.FormatBool(*execution.Condition))); err != nil {
226+
if err := writeFile(executionPaths.Condition, []byte(strconv.FormatBool(*execution.Condition))); err != nil {
206227
return fmt.Errorf("failed to write condition to file: %w", err)
207228
}
229+
} else {
230+
// nil is a valid value for Condition
231+
if driverType == ROOT_DAG || driverType == CONTAINER {
232+
if err := writeFile(executionPaths.Condition, []byte("nil")); err != nil {
233+
return fmt.Errorf("failed to write condition to file: %w", err)
234+
}
235+
}
208236
}
209237
if execution.PodSpecPatch != "" {
210238
glog.Infof("output podSpecPatch=\n%s\n", execution.PodSpecPatch)
211-
if *podSpecPatchPath == "" {
239+
if executionPaths.PodSpecPatch == "" {
212240
return fmt.Errorf("--pod_spec_patch_path is required for container executor drivers")
213241
}
214-
if err = writeFile(*podSpecPatchPath, []byte(execution.PodSpecPatch)); err != nil {
242+
if err := writeFile(executionPaths.PodSpecPatch, []byte(execution.PodSpecPatch)); err != nil {
215243
return fmt.Errorf("failed to write pod spec patch to file: %w", err)
216244
}
217245
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package main
2+
3+
import (
4+
"github.com/kubeflow/pipelines/backend/src/v2/driver"
5+
"os"
6+
"testing"
7+
)
8+
9+
func Test_handleExecutionContainer(t *testing.T) {
10+
execution := &driver.Execution{}
11+
12+
executionPaths := &ExecutionPaths{
13+
Condition: "condition.txt",
14+
}
15+
16+
err := handleExecution(execution, CONTAINER, executionPaths)
17+
18+
if err != nil {
19+
t.Errorf("Unexpected error: %v", err)
20+
}
21+
22+
verifyFileContent(t, executionPaths.Condition, "nil")
23+
24+
cleanup(t, executionPaths)
25+
}
26+
27+
func Test_handleExecutionRootDAG(t *testing.T) {
28+
execution := &driver.Execution{}
29+
30+
executionPaths := &ExecutionPaths{
31+
IterationCount: "iteration_count.txt",
32+
Condition: "condition.txt",
33+
}
34+
35+
err := handleExecution(execution, ROOT_DAG, executionPaths)
36+
37+
if err != nil {
38+
t.Errorf("Unexpected error: %v", err)
39+
}
40+
41+
verifyFileContent(t, executionPaths.IterationCount, "0")
42+
verifyFileContent(t, executionPaths.Condition, "nil")
43+
44+
cleanup(t, executionPaths)
45+
}
46+
47+
func cleanup(t *testing.T, executionPaths *ExecutionPaths) {
48+
removeIfExists(t, executionPaths.IterationCount)
49+
removeIfExists(t, executionPaths.ExecutionID)
50+
removeIfExists(t, executionPaths.Condition)
51+
removeIfExists(t, executionPaths.PodSpecPatch)
52+
removeIfExists(t, executionPaths.CachedDecision)
53+
}
54+
55+
func removeIfExists(t *testing.T, filePath string) {
56+
_, err := os.Stat(filePath)
57+
if err == nil {
58+
err = os.Remove(filePath)
59+
if err != nil {
60+
t.Errorf("Unexpected error while removing the created file: %v", err)
61+
}
62+
}
63+
}
64+
65+
func verifyFileContent(t *testing.T, filePath string, expectedContent string) {
66+
_, err := os.Stat(filePath)
67+
if os.IsNotExist(err) {
68+
t.Errorf("Expected file %s to be created, but it doesn't exist", filePath)
69+
}
70+
71+
fileContent, err := os.ReadFile(filePath)
72+
if err != nil {
73+
t.Errorf("Failed to read file contents: %v", err)
74+
}
75+
76+
if string(fileContent) != expectedContent {
77+
t.Errorf("Expected file fileContent to be %q, got %q", expectedContent, string(fileContent))
78+
}
79+
}

0 commit comments

Comments
 (0)