Skip to content

Commit 8de82b5

Browse files
authored
Merge pull request #10 from ooemperor/development
Development
2 parents a425321 + b2ce85d commit 8de82b5

File tree

11 files changed

+53
-24
lines changed

11 files changed

+53
-24
lines changed

.github/workflows/docker_build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ on:
55
branches: [ "**" ]
66

77
jobs:
8-
build:
8+
build-test:
99
runs-on: ubuntu-latest
1010
steps:
1111
- uses: actions/checkout@v4

.github/workflows/docker_build_publish_development.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ env:
1919

2020

2121
jobs:
22-
build:
22+
build-publish-development:
2323

2424
runs-on: ubuntu-latest
2525
permissions:

.github/workflows/docker_build_publish_main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ env:
1717

1818

1919
jobs:
20-
build:
20+
build-publish-main:
2121

2222
runs-on: ubuntu-latest
2323
permissions:

.github/workflows/go_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: Go test
22
on: [push]
33

44
jobs:
5-
build:
5+
go-test:
66
runs-on: ubuntu-latest
77

88
steps:

pkg/builder/Global.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,6 @@ ScriptTransactionWrapper Wraps string in transction for postgresql
5858
*/
5959
func ScriptTransactionWrapper(query string) string {
6060
prefix := "DO $$ BEGIN"
61-
suffix := "END $$ ;"
61+
suffix := "END $$; COMMIT;"
6262
return fmt.Sprintf("%v %v %v", prefix, query, suffix)
6363
}

pkg/builder/Global_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ func TestScriptTransactionWrapper(t *testing.T) {
4141
args args
4242
want string
4343
}{
44-
{name: "InbPackageBuildTest1", args: args{query: "SELECT 1;"}, want: "DO $$ BEGIN SELECT 1; END $$ ;"},
45-
{name: "InbPackageBuildTest2", args: args{query: "TRUNCATE TABLE public.test;"}, want: "DO $$ BEGIN TRUNCATE TABLE public.test; END $$ ;"},
44+
{name: "InbPackageBuildTest1", args: args{query: "SELECT 1;"}, want: "DO $$ BEGIN SELECT 1; END $$; COMMIT;"},
45+
{name: "InbPackageBuildTest2", args: args{query: "TRUNCATE TABLE public.test;"}, want: "DO $$ BEGIN TRUNCATE TABLE public.test; END $$; COMMIT;"},
4646
}
4747
for _, tt := range tests {
4848
t.Run(tt.name, func(t *testing.T) {

pkg/builder/InbRdv.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,14 @@ func BuildInbRdvSatDeleteQuery(tableName string) (string, error) {
5353
if tableName == "" {
5454
return "", fmt.Errorf("the tablename cannot be blank")
5555
}
56-
script += fmt.Sprintf("UPDATE rdv.%s_sat ", tableName)
56+
script += fmt.Sprintf("UPDATE rdv.%s_sat s ", tableName)
5757
script += "SET delete_dts = NOW() "
58-
script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat_cur) ", tableName)
59-
script += "AND delete_dts IS NULL;"
58+
script += fmt.Sprintf("WHERE s.delete_dts IS NULL AND NOT EXISTS (SELECT 1 FROM rdv.%s_sat_cur sc WHERE sc.frh = s.frh);", tableName)
59+
// script += fmt.Sprintf("FROM rdv.%s_sat AS s ", tableName)
60+
// script += fmt.Sprintf("LEFT JOIN rdv.%s_sat_cur AS sc on s.frh = sc.frh ", tableName)
61+
// script += "WHERE sc.frh IS NULL AND s.delete_dts IS NULL;"
62+
// script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat_cur) ", tableName)
63+
// script += "AND delete_dts IS NULL;"
6064

6165
return script, nil
6266
}
@@ -71,8 +75,9 @@ func BuildInbRdvSatInsertQuery(tableName string) (string, error) {
7175
}
7276

7377
script += fmt.Sprintf("INSERT INTO rdv.%s_sat ", tableName)
74-
script += fmt.Sprintf("SELECT * FROM rdv.%s_sat_cur ", tableName)
75-
script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat);", tableName)
78+
script += fmt.Sprintf("SELECT sc.* FROM rdv.%s_sat_cur AS sc LEFT JOIN rdv.%s_sat AS s ON s.frh = sc.frh ", tableName, tableName)
79+
script += "WHERE s.frh IS NULL;"
80+
// script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat);", tableName)
7681

7782
return script, nil
7883
}

pkg/builder/InbRdv_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ func TestBuildInbRdvSatDeleteQuery(t *testing.T) {
6464
t.Fatalf("Error on BuildInbRdvSatDeleteQuery build: %v", err)
6565
}
6666

67-
if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat_cur) AND delete_dts IS NULL;" {
67+
// if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat_cur) AND delete_dts IS NULL;" {
68+
// t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query)
69+
// }
70+
71+
if query != "UPDATE rdv.TestTable_sat s SET delete_dts = NOW() WHERE s.delete_dts IS NULL AND NOT EXISTS (SELECT 1 FROM rdv.TestTable_sat_cur sc WHERE sc.frh = s.frh);" {
6872
t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query)
6973
}
7074
}
@@ -77,7 +81,11 @@ func TestBuildInbRdvSatInsertQuery(t *testing.T) {
7781
t.Fatalf("Error on BuildInbRdvSatInsertQuery build: %v", err)
7882
}
7983

80-
if query != "INSERT INTO rdv.TestTable_sat SELECT * FROM rdv.TestTable_sat_cur WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat);" {
84+
// if query != "INSERT INTO rdv.TestTable_sat SELECT * FROM rdv.TestTable_sat_cur WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat);" {
85+
// t.Fatalf("BuildInbRdvSatInsertQuery incorrect: %v", query)
86+
// }
87+
if query != "INSERT INTO rdv.TestTable_sat SELECT sc.* FROM rdv.TestTable_sat_cur AS sc LEFT JOIN rdv.TestTable_sat AS s ON s.frh = sc.frh WHERE s.frh IS NULL;" {
8188
t.Fatalf("BuildInbRdvSatInsertQuery incorrect: %v", query)
8289
}
90+
8391
}

pkg/packages/inbrdv/InbPackage.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"database/sql"
55
"errors"
66
"fmt"
7+
"sync"
78

89
"github.com/ooemperor/go-db-etl/pkg/logging"
910
"github.com/ooemperor/go-db-etl/pkg/pipeline/inbrdv"
@@ -34,12 +35,19 @@ func (inbP *InbPackage) Run() error {
3435
logging.EtlLogger.Error(msg)
3536
return errors.New(msg)
3637
}
38+
39+
var wg sync.WaitGroup
40+
wg.Add(len(inbP.pipelines))
3741
for _, tablePipeline := range inbP.pipelines {
38-
c := <-tablePipeline.Run()
39-
if c != nil {
40-
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
41-
}
42+
go func() {
43+
defer wg.Done()
44+
c := <-tablePipeline.Run()
45+
if c != nil {
46+
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
47+
}
48+
}()
4249
}
50+
wg.Wait()
4351
logging.EtlLogger.Info(fmt.Sprintf("END %v", inbP.Name()))
4452
return nil
4553
}

pkg/packages/srcinb/SystemPackage.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"database/sql"
55
"errors"
66
"fmt"
7+
"sync"
78

89
"github.com/ooemperor/go-db-etl/pkg/logging"
910
"github.com/ooemperor/go-db-etl/pkg/pipeline/srcinb"
@@ -34,12 +35,19 @@ func (srcP *SystemPackage) Run() error {
3435
logging.EtlLogger.Error(msg)
3536
return errors.New(msg)
3637
}
38+
39+
var wg sync.WaitGroup
40+
wg.Add(len(srcP.pipelines))
3741
for _, tablePipeline := range srcP.pipelines {
38-
c := <-tablePipeline.Run()
39-
if c != nil {
40-
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
41-
}
42+
go func() {
43+
defer wg.Done()
44+
c := <-tablePipeline.Run()
45+
if c != nil {
46+
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
47+
}
48+
}()
4249
}
50+
wg.Wait()
4351
logging.EtlLogger.Info(fmt.Sprintf("END %v", srcP.Name()))
4452
return nil
4553
}

0 commit comments

Comments
 (0)