diff --git a/backend/pkg/table-dependency/circular-dependencies.go b/backend/pkg/table-dependency/circular-dependencies.go index f25774422b..8c7c2e3037 100644 --- a/backend/pkg/table-dependency/circular-dependencies.go +++ b/backend/pkg/table-dependency/circular-dependencies.go @@ -101,14 +101,15 @@ func FindCircularDependencies(dependencies map[string][]string) [][]string { var result [][]string for node := range dependencies { - visited, recStack := make(map[string]bool), make(map[string]bool) - dfsCycles(node, node, dependencies, visited, recStack, []string{}, &result) + recStack := make(map[string]bool) + path := []string{} + dfsCycles(node, node, dependencies, recStack, path, &result) } return uniqueCycles(result) } // finds all possible path variations -func dfsCycles(start, current string, dependencies map[string][]string, visited, recStack map[string]bool, path []string, result *[][]string) { +func dfsCycles(start, current string, dependencies map[string][]string, recStack map[string]bool, path []string, result *[][]string) { if recStack[current] { if current == start { // make copy to prevent reference issues @@ -123,15 +124,10 @@ func dfsCycles(start, current string, dependencies map[string][]string, visited, path = append(path, current) for _, neighbor := range dependencies[current] { - if !visited[neighbor] { - dfsCycles(start, neighbor, dependencies, visited, recStack, path, result) - } + dfsCycles(start, neighbor, dependencies, recStack, path, result) } recStack[current] = false - if start == current { - visited[current] = true - } } func uniqueCycles(cycles [][]string) [][]string { diff --git a/backend/pkg/table-dependency/circular-dependencies_test.go b/backend/pkg/table-dependency/circular-dependencies_test.go index 1ff58dd50f..15318881be 100644 --- a/backend/pkg/table-dependency/circular-dependencies_test.go +++ b/backend/pkg/table-dependency/circular-dependencies_test.go @@ -87,6 +87,99 @@ func Test_FindCircularDependencies(t *testing.T) { }, expect: [][]string{{"public.a", "public.b", "public.c"}, {"public.b"}}, }, + { + name: "Nested cycles", + dependencies: map[string][]string{ + "public.a": {"public.b"}, + "public.b": {"public.c"}, + "public.c": {"public.d", "public.a"}, + "public.d": {"public.b"}, + }, + expect: [][]string{ + {"public.a", "public.b", "public.c"}, + {"public.b", "public.c", "public.d"}, + }, + }, + { + name: "Multiple overlapping cycles with shared nodes", + dependencies: map[string][]string{ + "public.a": {"public.b", "public.d"}, + "public.b": {"public.c"}, + "public.c": {"public.a"}, + "public.d": {"public.e"}, + "public.e": {"public.f"}, + "public.f": {"public.d", "public.a"}, + }, + expect: [][]string{ + {"public.a", "public.b", "public.c"}, + {"public.a", "public.d", "public.e", "public.f"}, + {"public.d", "public.e", "public.f"}, + }, + }, + { + name: "Diamond shape with multiple paths", + dependencies: map[string][]string{ + "public.a": {"public.b", "public.c"}, + "public.b": {"public.d"}, + "public.c": {"public.d"}, + "public.d": {"public.a"}, + }, + expect: [][]string{ + {"public.a", "public.b", "public.d"}, + {"public.a", "public.c", "public.d"}, + }, + }, + { + name: "Complex web of dependencies", + dependencies: map[string][]string{ + "public.a": {"public.b", "public.c"}, + "public.b": {"public.d", "public.e"}, + "public.c": {"public.e", "public.f"}, + "public.d": {"public.g"}, + "public.e": {"public.g", "public.h"}, + "public.f": {"public.h"}, + "public.g": {"public.i"}, + "public.h": {"public.i"}, + "public.i": {"public.a"}, + }, + expect: [][]string{ + {"public.a", "public.b", "public.d", "public.g", "public.i"}, + {"public.a", "public.b", "public.e", "public.g", "public.i"}, + {"public.a", "public.b", "public.e", "public.h", "public.i"}, + {"public.a", "public.c", "public.e", "public.g", "public.i"}, + {"public.a", "public.c", "public.e", "public.h", "public.i"}, + {"public.a", "public.c", "public.f", "public.h", "public.i"}, + }, + }, + { + name: "Multiple self-references with shared dependencies", + dependencies: map[string][]string{ + "public.a": {"public.a", "public.b"}, + "public.b": {"public.b", "public.c"}, + "public.c": {"public.a", "public.c"}, + }, + expect: [][]string{ + {"public.a"}, + {"public.b"}, + {"public.c"}, + {"public.a", "public.b", "public.c"}, + }, + }, + { + name: "Cycle with branching paths", + dependencies: map[string][]string{ + "public.root": {"public.a1", "public.a2"}, + "public.a1": {"public.b1"}, + "public.a2": {"public.b2"}, + "public.b1": {"public.c"}, + "public.b2": {"public.c"}, + "public.c": {"public.root"}, + }, + expect: [][]string{ + {"public.root", "public.a1", "public.b1", "public.c"}, + {"public.root", "public.a2", "public.b2", "public.c"}, + }, + }, } for _, tt := range tests { diff --git a/backend/pkg/table-dependency/table-dependency.go b/backend/pkg/table-dependency/table-dependency.go index 5ed7515ce2..8dfbeb9090 100644 --- a/backend/pkg/table-dependency/table-dependency.go +++ b/backend/pkg/table-dependency/table-dependency.go @@ -183,7 +183,7 @@ func GetRunConfigs( } cycleConfigs, err := processCycles(group, tableColumnsMap, primaryKeyMap, subsets, dependencyMap, d.foreignKeyCols) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to process cycles: %w", err) } // update table processed map for _, cfg := range cycleConfigs { @@ -336,7 +336,8 @@ func processCycles( if isTableInCycles(cycles, fkTable) { if len(fkCols.NullableColumns) > 0 { updateConfig.appendDependsOn(fkTable, fkCols.NullableColumns) - } else { + } + if len(fkCols.NonNullableColumns) > 0 { insertConfig.appendDependsOn(fkTable, fkCols.NonNullableColumns) } } else { diff --git a/internal/benthos/benthos-builder/builders/processors_test.go b/internal/benthos/benthos-builder/builders/processors_test.go index a113a7a019..8a0da513ae 100644 --- a/internal/benthos/benthos-builder/builders/processors_test.go +++ b/internal/benthos/benthos-builder/builders/processors_test.go @@ -23,7 +23,7 @@ func Test_buildProcessorConfigsJavascript(t *testing.T) { Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{ TransformJavascriptConfig: &mgmtv1alpha1.TransformJavascript{ - Code: `return "hello " + value;`, + Code: `return "hello " + value + " " + input.extra;`, }, }, }, @@ -51,7 +51,7 @@ func Test_buildProcessorConfigsJavascript(t *testing.T) { wrappedCode := fmt.Sprintf(` let programOutput = undefined; const benthos = { - v0_msg_as_structured: () => ({address: "world"}), + v0_msg_as_structured: () => ({address: "world", extra: "foobar"}), }; const neosync = { patchStructuredMessage: (val) => { @@ -70,7 +70,7 @@ const neosync = { require.NotNil(t, programOutput) outputMap, ok := programOutput.(map[string]any) require.True(t, ok) - require.Equal(t, "hello world", outputMap["address"]) + require.Equal(t, "hello world foobar", outputMap["address"]) } func Test_buildProcessorConfigsGenerateJavascript(t *testing.T) { diff --git a/worker/pkg/workflows/datasync/activities/sync/activity_test.go b/worker/pkg/workflows/datasync/activities/sync/activity_test.go index c1671676a9..7370f5582d 100644 --- a/worker/pkg/workflows/datasync/activities/sync/activity_test.go +++ b/worker/pkg/workflows/datasync/activities/sync/activity_test.go @@ -191,7 +191,7 @@ pipeline: threads: 1 processors: - mutation: | - root.name = fake("first_name") + root.name = generate_first_name() output: label: "" stdout: