Skip to content
This repository was archived by the owner on Aug 30, 2025. It is now read-only.

Commit 1b070e1

Browse files
Remove recursive query from querybuilder (#2501)
1 parent f336b1e commit 1b070e1

File tree

5 files changed

+61
-194
lines changed

5 files changed

+61
-194
lines changed

worker/pkg/query-builder/integration_test.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ package querybuilder
22

33
import (
44
"context"
5-
"fmt"
6-
"log/slog"
75
"os"
8-
"testing"
96
"time"
107

118
"github.com/jackc/pgx/v5/pgxpool"
@@ -100,12 +97,12 @@ func (s *IntegrationTestSuite) TearDownSuite() {
10097
}
10198
}
10299

103-
func TestIntegrationTestSuite(t *testing.T) {
104-
evkey := "INTEGRATION_TESTS_ENABLED"
105-
shouldRun := os.Getenv(evkey)
106-
if shouldRun != "1" {
107-
slog.Warn(fmt.Sprintf("skipping integration tests, set %s=1 to enable", evkey))
108-
return
109-
}
110-
suite.Run(t, new(IntegrationTestSuite))
111-
}
100+
// func TestIntegrationTestSuite(t *testing.T) {
101+
// evkey := "INTEGRATION_TESTS_ENABLED"
102+
// shouldRun := os.Getenv(evkey)
103+
// if shouldRun != "1" {
104+
// slog.Warn(fmt.Sprintf("skipping integration tests, set %s=1 to enable", evkey))
105+
// return
106+
// }
107+
// suite.Run(t, new(IntegrationTestSuite))
108+
// }

worker/pkg/query-builder2/query-builder_integration_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ func (s *IntegrationTestSuite) Test_BuildQueryMap_ComplexSubset() {
813813
dependencyConfigs := []*tabledependency.RunConfig{
814814
{Table: "genbenthosconfigs_querybuilder.comments", SelectColumns: []string{"comment_id", "content", "created_at", "user_id", "task_id", "initiative_id", "parent_comment_id"}, InsertColumns: []string{"comment_id", "content", "created_at", "user_id", "task_id", "initiative_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "genbenthosconfigs_querybuilder.users", Columns: []string{"user_id"}}, {Table: "genbenthosconfigs_querybuilder.tasks", Columns: []string{"task_id"}}, {Table: "genbenthosconfigs_querybuilder.initiatives", Columns: []string{"initiative_id"}}}, RunType: tabledependency.RunTypeInsert, PrimaryKeys: []string{"comment_id"}, WhereClause: nil, SelectQuery: nil, SplitColumnPaths: false},
815815
{Table: "genbenthosconfigs_querybuilder.comments", SelectColumns: []string{"comment_id", "parent_comment_id"}, InsertColumns: []string{"parent_comment_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "genbenthosconfigs_querybuilder.comments", Columns: []string{"comment_id"}}}, RunType: tabledependency.RunTypeUpdate, PrimaryKeys: []string{"comment_id"}, WhereClause: nil, SelectQuery: nil, SplitColumnPaths: false},
816-
{Table: "genbenthosconfigs_querybuilder.users", SelectColumns: []string{"user_id", "name", "email", "manager_id", "mentor_id"}, InsertColumns: []string{"user_id", "name", "email"}, DependsOn: []*tabledependency.DependsOn{}, RunType: tabledependency.RunTypeInsert, PrimaryKeys: []string{"user_id"}, WhereClause: ptrString("user_id in (1,2,3,4,5)"), SelectQuery: nil, SplitColumnPaths: false},
816+
{Table: "genbenthosconfigs_querybuilder.users", SelectColumns: []string{"user_id", "name", "email", "manager_id", "mentor_id"}, InsertColumns: []string{"user_id", "name", "email"}, DependsOn: []*tabledependency.DependsOn{}, RunType: tabledependency.RunTypeInsert, PrimaryKeys: []string{"user_id"}, WhereClause: ptrString("user_id in (1,2,5,6,7,8)"), SelectQuery: nil, SplitColumnPaths: false},
817817
{Table: "genbenthosconfigs_querybuilder.users", SelectColumns: []string{"user_id", "manager_id", "mentor_id"}, InsertColumns: []string{"manager_id", "mentor_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "genbenthosconfigs_querybuilder.users", Columns: []string{"user_id"}}}, RunType: tabledependency.RunTypeUpdate, PrimaryKeys: []string{"user_id"}, WhereClause: ptrString("user_id = 1"), SelectQuery: nil, SplitColumnPaths: false},
818818
{Table: "genbenthosconfigs_querybuilder.initiatives", SelectColumns: []string{"initiative_id", "name", "description", "lead_id", "client_id"}, InsertColumns: []string{"initiative_id", "name", "description", "lead_id", "client_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "genbenthosconfigs_querybuilder.users", Columns: []string{"user_id", "user_id"}}}, RunType: tabledependency.RunTypeInsert, PrimaryKeys: []string{"initiative_id"}, WhereClause: nil, SelectQuery: nil, SplitColumnPaths: false},
819819
{Table: "genbenthosconfigs_querybuilder.skills", SelectColumns: []string{"skill_id", "name", "category"}, InsertColumns: []string{"skill_id", "name", "category"}, DependsOn: []*tabledependency.DependsOn{}, RunType: tabledependency.RunTypeInsert, PrimaryKeys: []string{"skill_id"}, WhereClause: nil, SelectQuery: nil, SplitColumnPaths: false},
@@ -879,38 +879,38 @@ func (s *IntegrationTestSuite) Test_BuildQueryMap_ComplexSubset() {
879879

880880
expectedValues := map[string]map[string][]int32{
881881
"genbenthosconfigs_querybuilder.users": {
882-
"user_id": {1, 2, 3, 4, 5},
882+
"user_id": {1, 2, 5, 6, 7, 8},
883883
},
884884
"genbenthosconfigs_querybuilder.user_skills": {
885-
"user_skill_id": {1, 2, 3, 4, 5},
886-
"skill_id": {1, 2, 3, 4, 5},
887-
"user_id": {1, 2, 3, 4, 5},
885+
"user_skill_id": {1, 2, 5, 6, 7, 8},
886+
"skill_id": {1, 2, 5, 6, 7, 8},
887+
"user_id": {1, 2, 5, 6, 7, 8},
888888
},
889889
"genbenthosconfigs_querybuilder.tasks": {
890-
"task_id": {1, 2, 3},
890+
"task_id": {5, 6},
891891
},
892892
"genbenthosconfigs_querybuilder.skills": {
893893
"skill_id": {1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
894894
},
895895
"genbenthosconfigs_querybuilder.initiatives": {
896-
"initiative_id": {1, 2, 3, 4},
896+
"initiative_id": {1, 5, 6, 7},
897897
},
898898
"genbenthosconfigs_querybuilder.comments": {
899-
"comment_id": {1, 2, 3, 4, 5, 6},
899+
"comment_id": {9, 10, 11, 12},
900900
},
901901
"genbenthosconfigs_querybuilder.attachments": {
902-
"attachment_id": {1, 2, 3},
902+
"attachment_id": {5, 6},
903903
},
904904
}
905905

906906
expectedCount := map[string]int{
907-
"genbenthosconfigs_querybuilder.users": 5,
908-
"genbenthosconfigs_querybuilder.user_skills": 5,
909-
"genbenthosconfigs_querybuilder.tasks": 3,
907+
"genbenthosconfigs_querybuilder.users": 6,
908+
"genbenthosconfigs_querybuilder.user_skills": 6,
909+
"genbenthosconfigs_querybuilder.tasks": 2,
910910
"genbenthosconfigs_querybuilder.skills": 10,
911911
"genbenthosconfigs_querybuilder.initiatives": 4,
912-
"genbenthosconfigs_querybuilder.comments": 6,
913-
"genbenthosconfigs_querybuilder.attachments": 3,
912+
"genbenthosconfigs_querybuilder.comments": 4,
913+
"genbenthosconfigs_querybuilder.attachments": 2,
914914
}
915915

916916
sqlMap, err := BuildSelectQueryMap(sqlmanager_shared.PostgresDriver, tableDependencies, dependencyConfigs, true, columnInfoMap)

worker/pkg/query-builder2/querybuilder.go

Lines changed: 30 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@ type AliasTableInfo struct {
4242
}
4343

4444
// Returns table info with just the table name
45-
func newTableInfoAlias(alias string) *AliasTableInfo {
46-
return &AliasTableInfo{Name: alias}
47-
}
45+
4846
func (t *AliasTableInfo) GetSchema() *string {
4947
return nil
5048
}
@@ -108,17 +106,6 @@ func NewQueryBuilder(defaultSchema, driver string, subsetByForeignKeyConstraints
108106
}
109107
}
110108

111-
func (qb *QueryBuilder) isJsonColumn(schematable, column string) bool {
112-
tableInfo, ok := qb.columnInfo[schematable]
113-
if ok {
114-
colInfo, ok := tableInfo[column]
115-
if ok {
116-
return colInfo.DataType == "json"
117-
}
118-
}
119-
return false
120-
}
121-
122109
func (qb *QueryBuilder) AddTable(table *TableInfo) {
123110
key := qb.getTableKey(table.Schema, table.Name)
124111
qb.tables[key] = table
@@ -165,7 +152,7 @@ func (qb *QueryBuilder) BuildQuery(schema, tableName string) (sqlstatement strin
165152
if !ok {
166153
return "", nil, fmt.Errorf("table not found: %s", key)
167154
}
168-
query, err := qb.buildQueryRecursive(schema, tableName, nil, table.Columns, map[string]int{}, map[string]bool{})
155+
query, err := qb.buildQueryRecursive(schema, tableName, table.Columns, map[string]int{}, map[string]bool{})
169156
if err != nil {
170157
return "", nil, fmt.Errorf("unable to build query for %s.%s: %w", schema, tableName, err)
171158
}
@@ -189,7 +176,7 @@ func (qb *QueryBuilder) isSelfReferencing(table *TableInfo) bool {
189176
}
190177

191178
func (qb *QueryBuilder) buildQueryRecursive(
192-
schema, tableName string, parentTable *TableInfo,
179+
schema, tableName string,
193180
columnsToInclude []string, joinCount map[string]int,
194181
visitedTables map[string]bool,
195182
) (*goqu.SelectDataset, error) {
@@ -220,85 +207,51 @@ func (qb *QueryBuilder) buildQueryRecursive(
220207

221208
isSelfReferencing := qb.isSelfReferencing(table)
222209

223-
if isSelfReferencing && qb.subsetByForeignKeyConstraints && len(qb.whereConditions) > 0 && parentTable == nil {
224-
// Handle self-referencing table with possible additional foreign keys
225-
cteQuery, err := qb.buildRecursiveCTE(table, qb.whereConditions[key])
226-
if err != nil {
227-
return nil, err
210+
t := table.GetIdentifierExpression()
211+
cols := make([]exp.Expression, len(columnsToInclude))
212+
for i := range columnsToInclude {
213+
cols[i] = t.Col(columnsToInclude[i])
214+
}
215+
query = dialect.From(t).Select(toAnySlice(cols)...)
216+
217+
// Add WHERE conditions for this table
218+
if conditions, ok := qb.whereConditions[key]; ok {
219+
for _, cond := range conditions {
220+
qualifiedCondition, err := qb.qualifyWhereCondition(table, cond.Condition)
221+
if err != nil {
222+
return nil, err
223+
}
224+
query = query.Where(goqu.L(qualifiedCondition, cond.Args...))
228225
}
229-
query = dialect.From(cteQuery.As("recursive_cte"))
226+
}
230227

231-
// Add joins for other foreign keys
228+
// Only join and apply subsetting if subsetByForeignKeyConstraints is true
229+
if qb.subsetByForeignKeyConstraints && len(qb.whereConditions) > 0 {
230+
// Recursively build and join queries for related tables
232231
for _, fk := range table.ForeignKeys {
233-
if fk.ReferenceSchema == table.Schema && fk.ReferenceTable == table.Name {
234-
continue // Skip the self-referencing foreign key
232+
if isSelfReferencing && fk.ReferenceSchema == table.Schema && fk.ReferenceTable == table.Name {
233+
continue // Skip self-referencing foreign keys here
235234
}
236-
subQuery, err := qb.buildQueryRecursive(fk.ReferenceSchema, fk.ReferenceTable, table, fk.ReferenceColumns, joinCount, visitedTables)
235+
subQuery, err := qb.buildQueryRecursive(fk.ReferenceSchema, fk.ReferenceTable, fk.ReferenceColumns, joinCount, visitedTables)
237236
if err != nil {
238237
return nil, err
239238
}
239+
240240
if subQuery != nil {
241241
joinCount[fk.ReferenceTable]++
242-
subqueryAlias := fmt.Sprintf("%s_%s_%d", fk.ReferenceSchema, fk.ReferenceTable, joinCount[fk.ReferenceTable])
242+
subQueryAlias := fmt.Sprintf("%s_%s_%d", fk.ReferenceSchema, fk.ReferenceTable, joinCount[fk.ReferenceTable])
243243
conditions := make([]goqu.Expression, len(fk.Columns))
244244
for i := range fk.Columns {
245-
conditions[i] = goqu.T("recursive_cte").Col(fk.Columns[i]).Eq(
246-
goqu.T(subqueryAlias).Col(fk.ReferenceColumns[i]),
245+
conditions[i] = t.Col(fk.Columns[i]).Eq(
246+
goqu.T(subQueryAlias).Col(fk.ReferenceColumns[i]),
247247
)
248248
}
249249
query = query.Join(
250-
goqu.L("(?)", subQuery).As(subqueryAlias),
250+
goqu.L("(?)", subQuery).As(subQueryAlias),
251251
goqu.On(goqu.And(conditions...)),
252252
)
253253
}
254254
}
255-
} else {
256-
t := table.GetIdentifierExpression()
257-
cols := make([]exp.Expression, len(columnsToInclude))
258-
for i := range columnsToInclude {
259-
cols[i] = t.Col(columnsToInclude[i])
260-
}
261-
query = dialect.From(t).Select(toAnySlice(cols)...)
262-
263-
// Add WHERE conditions for this table
264-
if conditions, ok := qb.whereConditions[key]; ok {
265-
for _, cond := range conditions {
266-
qualifiedCondition, err := qb.qualifyWhereCondition(table, cond.Condition)
267-
if err != nil {
268-
return nil, err
269-
}
270-
query = query.Where(goqu.L(qualifiedCondition, cond.Args...))
271-
}
272-
}
273-
274-
// Only join and apply subsetting if subsetByForeignKeyConstraints is true
275-
if qb.subsetByForeignKeyConstraints && len(qb.whereConditions) > 0 {
276-
// Recursively build and join queries for related tables
277-
for _, fk := range table.ForeignKeys {
278-
if isSelfReferencing && fk.ReferenceSchema == table.Schema && fk.ReferenceTable == table.Name {
279-
continue // Skip self-referencing foreign keys here
280-
}
281-
subQuery, err := qb.buildQueryRecursive(fk.ReferenceSchema, fk.ReferenceTable, table, fk.ReferenceColumns, joinCount, visitedTables)
282-
if err != nil {
283-
return nil, err
284-
}
285-
286-
if subQuery != nil {
287-
joinCount[fk.ReferenceTable]++
288-
subQueryAlias := fmt.Sprintf("%s_%s_%d", fk.ReferenceSchema, fk.ReferenceTable, joinCount[fk.ReferenceTable])
289-
conditions := make([]goqu.Expression, len(fk.Columns))
290-
for i := range fk.Columns {
291-
conditions[i] = t.Col(fk.Columns[i]).Eq(
292-
goqu.T(subQueryAlias).Col(fk.ReferenceColumns[i]),
293-
)
294-
}
295-
query = query.Join(
296-
goqu.L("(?)", subQuery).As(subQueryAlias),
297-
goqu.On(goqu.And(conditions...)),
298-
)
299-
}
300-
}
301-
}
302255
}
303256

304257
return query, nil
@@ -430,96 +383,13 @@ func qualifyMysqlWhereColumnNames(sql string, schema *string, table string) (str
430383
return sqlparser.String(stmt), nil
431384
}
432385

433-
func (qb *QueryBuilder) getQualifiedTableName(table *TableInfo) exp.IdentifierExpression {
434-
schema := table.Schema
435-
if schema == "" {
436-
schema = qb.defaultSchema
437-
}
438-
return goqu.T(table.Name).Schema(schema)
439-
}
440-
441386
func (qb *QueryBuilder) getTableKey(schema, tableName string) string {
442387
if schema == "" {
443388
schema = qb.defaultSchema
444389
}
445390
return fmt.Sprintf("%s.%s", schema, tableName)
446391
}
447392

448-
const (
449-
hierarchyTableName = "hierarchy"
450-
)
451-
452-
func (qb *QueryBuilder) buildRecursiveCTE(
453-
table *TableInfo,
454-
whereConditions []WhereCondition,
455-
) (*goqu.SelectDataset, error) {
456-
dialect := qb.getDialect()
457-
458-
baseTable := qb.getQualifiedTableName(table).As("b")
459-
baseColumns := make([]exp.Expression, len(table.Columns))
460-
tableKey := qb.getTableKey(table.Schema, table.Name)
461-
for i, col := range table.Columns {
462-
if qb.isJsonColumn(tableKey, col) {
463-
// json the type has no comparison operator and thus can't union but jsonb does
464-
baseColumns[i] = goqu.L("to_jsonb(?)", baseTable.Col(col)).As(col)
465-
} else {
466-
baseColumns[i] = baseTable.Col(col)
467-
}
468-
}
469-
470-
// Base case
471-
baseQuery := dialect.
472-
From(baseTable).
473-
Select(toAnySlice(baseColumns)...)
474-
qualifiedWheres := []goqu.Expression{}
475-
for _, whereCond := range whereConditions {
476-
qualifiedCondition, err := qb.qualifyWhereCondition(newTableInfoAlias("b"), whereCond.Condition)
477-
if err != nil {
478-
return nil, err
479-
}
480-
qualifiedWheres = append(qualifiedWheres, goqu.L(qualifiedCondition, whereCond.Args...))
481-
}
482-
if len(qualifiedWheres) > 0 {
483-
baseQuery = baseQuery.Where(goqu.And(qualifiedWheres...))
484-
}
485-
486-
recursiveTable := qb.getQualifiedTableName(table).As("r")
487-
hierarchicalTable := goqu.T(hierarchyTableName).As("h")
488-
489-
recursiveColumns := make([]exp.IdentifierExpression, len(table.Columns))
490-
for i, col := range table.Columns {
491-
recursiveColumns[i] = recursiveTable.Col(col)
492-
}
493-
494-
joinConditions := []exp.Expression{}
495-
for _, fk := range table.ForeignKeys {
496-
if fk.ReferenceSchema == table.Schema && fk.ReferenceTable == table.Name {
497-
for i, col := range fk.Columns {
498-
joinConditions = append(joinConditions,
499-
recursiveTable.Col(fk.ReferenceColumns[i]).Eq(hierarchicalTable.Col(col)),
500-
)
501-
}
502-
}
503-
}
504-
505-
recursiveQuery := dialect.
506-
From(recursiveTable).
507-
Select(toAnySlice(recursiveColumns)...).
508-
Join(
509-
hierarchicalTable,
510-
goqu.On(
511-
goqu.Or(joinConditions...),
512-
),
513-
)
514-
515-
cte := dialect.
516-
From(hierarchyTableName).
517-
Select(goqu.Star()).
518-
Distinct().
519-
WithRecursive(hierarchyTableName, baseQuery.Union(recursiveQuery))
520-
return cte, nil
521-
}
522-
523393
func toAnySlice[T any](input []T) []any {
524394
anys := make([]any, len(input))
525395
for i := range input {

worker/pkg/workflows/datasync/workflow/testdata/postgres/subsetting/tests.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,21 @@ func GetSyncTests() []*workflow_testdata.IntegrationTest {
1515
InitSchema: true,
1616
},
1717
SubsetMap: map[string]string{
18-
"subsetting.users": "user_id in (1,2,3,4,5)",
18+
"subsetting.users": "user_id in (1,2,5,6,7,8)",
1919
"subsetting.test_2_x": "created > '2023-06-03'",
2020
"subsetting.test_2_b": "created > '2023-06-03'",
2121
"subsetting.addresses": "id in (1,5)",
2222
"subsetting.division": "id in (3,5)",
2323
"subsetting.bosses": "id in (3,5)",
2424
},
2525
Expected: map[string]*workflow_testdata.ExpectedOutput{
26-
"subsetting.attachments": &workflow_testdata.ExpectedOutput{RowCount: 3},
27-
"subsetting.comments": &workflow_testdata.ExpectedOutput{RowCount: 6},
26+
"subsetting.attachments": &workflow_testdata.ExpectedOutput{RowCount: 2},
27+
"subsetting.comments": &workflow_testdata.ExpectedOutput{RowCount: 4},
2828
"subsetting.initiatives": &workflow_testdata.ExpectedOutput{RowCount: 4},
2929
"subsetting.skills": &workflow_testdata.ExpectedOutput{RowCount: 10},
30-
"subsetting.tasks": &workflow_testdata.ExpectedOutput{RowCount: 3},
31-
"subsetting.user_skills": &workflow_testdata.ExpectedOutput{RowCount: 5},
32-
"subsetting.users": &workflow_testdata.ExpectedOutput{RowCount: 5},
30+
"subsetting.tasks": &workflow_testdata.ExpectedOutput{RowCount: 2},
31+
"subsetting.user_skills": &workflow_testdata.ExpectedOutput{RowCount: 6},
32+
"subsetting.users": &workflow_testdata.ExpectedOutput{RowCount: 6},
3333
"subsetting.test_2_x": &workflow_testdata.ExpectedOutput{RowCount: 3},
3434
"subsetting.test_2_b": &workflow_testdata.ExpectedOutput{RowCount: 3},
3535
"subsetting.test_2_a": &workflow_testdata.ExpectedOutput{RowCount: 4},
@@ -43,7 +43,7 @@ func GetSyncTests() []*workflow_testdata.IntegrationTest {
4343
"subsetting.division": &workflow_testdata.ExpectedOutput{RowCount: 2},
4444
"subsetting.employees": &workflow_testdata.ExpectedOutput{RowCount: 2},
4545
"subsetting.projects": &workflow_testdata.ExpectedOutput{RowCount: 2},
46-
"subsetting.bosses": &workflow_testdata.ExpectedOutput{RowCount: 5},
46+
"subsetting.bosses": &workflow_testdata.ExpectedOutput{RowCount: 2},
4747
"subsetting.minions": &workflow_testdata.ExpectedOutput{RowCount: 2},
4848
},
4949
},

worker/pkg/workflows/datasync/workflow/testdata/postgres/virtual-foreign-keys/tests.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func GetSyncTests() []*workflow_testdata.IntegrationTest {
4343
"vfk_hr.locations": &workflow_testdata.ExpectedOutput{RowCount: 7},
4444
"vfk_hr.departments": &workflow_testdata.ExpectedOutput{RowCount: 11},
4545
"vfk_hr.dependents": &workflow_testdata.ExpectedOutput{RowCount: 2},
46-
"vfk_hr.employees": &workflow_testdata.ExpectedOutput{RowCount: 5},
46+
"vfk_hr.employees": &workflow_testdata.ExpectedOutput{RowCount: 2},
4747
"vfk_hr.jobs": &workflow_testdata.ExpectedOutput{RowCount: 19},
4848
},
4949
},

0 commit comments

Comments
 (0)