Skip to content

Commit e7b81af

Browse files
committed
cleanup(inbrdv): cleanup the workarounds to proper code for the inbrdv build
1 parent 6153db5 commit e7b81af

File tree

3 files changed

+132
-33
lines changed

3 files changed

+132
-33
lines changed

pkg/builder/Global.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package builder
22

33
import "fmt"
44

5+
/*
6+
BuildTruncateTableSql return truncate script for a given table
7+
*/
58
func BuildTruncateTableSql(schema string, tableName string) (string, error) {
69
var script string
710

@@ -16,3 +19,45 @@ func BuildTruncateTableSql(schema string, tableName string) (string, error) {
1619

1720
return script, nil
1821
}
22+
23+
/*
24+
ScriptSetTableUnlogged constructs script to make table unlogged
25+
*/
26+
func ScriptSetTableUnlogged(schema string, tableName string) (string, error) {
27+
var script string
28+
29+
if schema == "" {
30+
return "", fmt.Errorf("the schema cannot be blank")
31+
}
32+
if tableName == "" {
33+
return "", fmt.Errorf("the tablename cannot be blank")
34+
}
35+
36+
script += fmt.Sprintf("ALTER TABLE %s.%s SET UNLOGGED;", schema, tableName)
37+
return script, nil
38+
}
39+
40+
/*
41+
ScriptSetTableLogged constructs script to make table unlogged
42+
*/
43+
func ScriptSetTableLogged(schema string, tableName string) (string, error) {
44+
var script string
45+
46+
if schema == "" {
47+
return "", fmt.Errorf("the schema cannot be blank")
48+
}
49+
if tableName == "" {
50+
return "", fmt.Errorf("the tablename cannot be blank")
51+
}
52+
script += fmt.Sprintf("ALTER TABLE %s.%s SET LOGGED;", schema, tableName)
53+
return script, nil
54+
}
55+
56+
/*
57+
ScriptTransactionWrapper Wraps string in transction for postgresql
58+
*/
59+
func ScriptTransactionWrapper(query string) string {
60+
prefix := "DO $$ BEGIN"
61+
suffix := "END $$ ;"
62+
return fmt.Sprintf("%v %v %v", prefix, query, suffix)
63+
}

pkg/builder/Global_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,84 @@ func TestBuilderGlobalInsertSchemaEmpty(t *testing.T) {
3131
t.Fatalf("incorrect error: %v", err.Error())
3232
}
3333
}
34+
35+
func TestScriptTransactionWrapper(t *testing.T) {
36+
type args struct {
37+
query string
38+
}
39+
tests := []struct {
40+
name string
41+
args args
42+
want string
43+
}{
44+
{name: "InbPackageBuildTest1", args: args{query: "SELECT 1;"}, want: "DO $$ BEGIN SELECT 1; END $$ ;"},
45+
{name: "InbPackageBuildTest2", args: args{query: "TRUNCATE TABLE public.test;"}, want: "DO $$ BEGIN TRUNCATE TABLE public.test; END $$ ;"},
46+
}
47+
for _, tt := range tests {
48+
t.Run(tt.name, func(t *testing.T) {
49+
if got := ScriptTransactionWrapper(tt.args.query); got != tt.want {
50+
t.Errorf("ScriptTransactionWrapper() = %v, want %v", got, tt.want)
51+
}
52+
})
53+
}
54+
}
55+
56+
func TestScriptSetTableLogged(t *testing.T) {
57+
type args struct {
58+
schema string
59+
tableName string
60+
}
61+
tests := []struct {
62+
name string
63+
args args
64+
want string
65+
wantErr bool
66+
}{
67+
{name: "ScriptSetTableLoggedTest1", args: args{schema: "public", tableName: "test"}, want: "ALTER TABLE public.test SET LOGGED;", wantErr: false},
68+
{name: "ScriptSetTableLoggedTest1", args: args{schema: "", tableName: "test"}, want: "", wantErr: true},
69+
{name: "ScriptSetTableLoggedTest1", args: args{schema: "public", tableName: ""}, want: "", wantErr: true},
70+
{name: "ScriptSetTableLoggedTest1", args: args{schema: "", tableName: ""}, want: "", wantErr: true},
71+
}
72+
for _, tt := range tests {
73+
t.Run(tt.name, func(t *testing.T) {
74+
got, err := ScriptSetTableLogged(tt.args.schema, tt.args.tableName)
75+
if (err != nil) != tt.wantErr {
76+
t.Errorf("ScriptSetTableLogged() error = %v, wantErr %v", err, tt.wantErr)
77+
return
78+
}
79+
if got != tt.want {
80+
t.Errorf("ScriptSetTableLogged() got = %v, want %v", got, tt.want)
81+
}
82+
})
83+
}
84+
}
85+
86+
func TestScriptSetTableUnlogged(t *testing.T) {
87+
type args struct {
88+
schema string
89+
tableName string
90+
}
91+
tests := []struct {
92+
name string
93+
args args
94+
want string
95+
wantErr bool
96+
}{
97+
{name: "ScriptSetTableUnloggedTest1", args: args{schema: "public", tableName: "test"}, want: "ALTER TABLE public.test SET UNLOGGED;", wantErr: false},
98+
{name: "ScriptSetTableUnloggedTest2", args: args{schema: "", tableName: "test"}, want: "", wantErr: true},
99+
{name: "ScriptSetTableUnloggedTest3", args: args{schema: "public", tableName: ""}, want: "", wantErr: true},
100+
{name: "ScriptSetTableUnloggedTest4", args: args{schema: "", tableName: ""}, want: "", wantErr: true},
101+
}
102+
for _, tt := range tests {
103+
t.Run(tt.name, func(t *testing.T) {
104+
got, err := ScriptSetTableUnlogged(tt.args.schema, tt.args.tableName)
105+
if (err != nil) != tt.wantErr {
106+
t.Errorf("ScriptSetTableLogged() error = %v, wantErr %v", err, tt.wantErr)
107+
return
108+
}
109+
if got != tt.want {
110+
t.Errorf("ScriptSetTableLogged() got = %v, want %v", got, tt.want)
111+
}
112+
})
113+
}
114+
}

pkg/pipeline/inbrdv/RdvPipelineBuilder.go

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ func (rdv *RdvPipeline) buildExecuteAll() (*processors.SQLExecutor, error) {
109109
return nil, err
110110
}
111111

112-
fullQuery := "DO $$ BEGIN " + truncateQuery + " END $$ ; DO $$ BEGIN " + satCurWriterQuery + " END $$ ; DO $$ BEGIN " + deleteQuery + " END $$ ; DO $$ BEGIN " + satWriterQuery + " END $$ ;"
112+
truncateQuery = builder.ScriptTransactionWrapper(truncateQuery)
113+
satCurWriterQuery = builder.ScriptTransactionWrapper(satCurWriterQuery)
114+
deleteQuery = builder.ScriptTransactionWrapper(deleteQuery)
115+
satWriterQuery = builder.ScriptTransactionWrapper(satWriterQuery)
116+
117+
fullQuery := fmt.Sprintf("%v %v %v %v", truncateQuery, satCurWriterQuery, deleteQuery, satWriterQuery)
113118
fullExecutor := processors.NewSQLExecutor(rdv.Db, fullQuery)
114119
return fullExecutor, nil
115120
}
@@ -120,42 +125,10 @@ Build constructs the pipeline for a given table.
120125
func (rdv *RdvPipeline) Build() (*goetl.Pipeline, error) {
121126
satCurTableName, _ := builder.GetRdvSatCurTableName(rdv.Table)
122127

123-
// build processors
124-
truncator, err := rdv.buildTruncator()
125-
if err != nil || truncator == nil {
126-
return nil, err
127-
}
128-
reader, err := rdv.buildInbReader()
129-
if err != nil || reader == nil {
130-
return nil, err
131-
}
132-
writerSatCur, err := rdv.buildSatCurWriter()
133-
if err != nil || writerSatCur == nil {
134-
return nil, err
135-
}
136-
updateDeleted, err := rdv.buildSatMarkDelete()
137-
if err != nil || updateDeleted == nil {
138-
return nil, err
139-
}
140-
satInserter, err := rdv.buildSatInsertExecutor()
141-
if err != nil || satInserter == nil {
142-
return nil, err
143-
}
144128
fullExecutor, err := rdv.buildExecuteAll()
145129
if err != nil || fullExecutor == nil {
146130
return nil, err
147131
}
148-
149-
// dummy selectors since executor after executor will never work
150-
//dummy := builder.BuildInbRdvDummySelect(rdv.Db)
151-
//pass2 := &processors.Passthrough{}
152-
//pass3 := &processors.Passthrough{}
153-
154-
// build stages in order of later usage
155-
//truncateAndReadStage := goetl.NewPipelineStage(goetl.Do(truncator).Outputs(writerSatCur, pass2), goetl.Do(dummy).Outputs(writerSatCur, pass2))
156-
//writerSatCurStage := goetl.NewPipelineStage(goetl.Do(writerSatCur).Outputs(updateDeleted, pass3), goetl.Do(pass2).Outputs(updateDeleted, pass3))
157-
//updateSatStage := goetl.NewPipelineStage(goetl.Do(updateDeleted).Outputs(satInserter), goetl.Do(pass3).Outputs(satInserter))
158-
//insertSatStage := goetl.NewPipelineStage(goetl.Do(satInserter))
159132
fullStage := goetl.NewPipelineStage(goetl.Do(fullExecutor))
160133

161134
layout, err := goetl.NewPipelineLayout(fullStage)

0 commit comments

Comments
 (0)