Skip to content

Commit 52d9e62

Browse files
committed
feat(concurrency): running the pipelines in the packages in paralell
1 parent 01a73c1 commit 52d9e62

File tree

2 files changed

+24
-8
lines changed

2 files changed

+24
-8
lines changed

pkg/packages/inbrdv/InbPackage.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"database/sql"
55
"errors"
66
"fmt"
7+
"sync"
78

89
"github.com/ooemperor/go-db-etl/pkg/logging"
910
"github.com/ooemperor/go-db-etl/pkg/pipeline/inbrdv"
@@ -34,12 +35,19 @@ func (inbP *InbPackage) Run() error {
3435
logging.EtlLogger.Error(msg)
3536
return errors.New(msg)
3637
}
38+
39+
var wg sync.WaitGroup
40+
wg.Add(len(inbP.pipelines))
3741
for _, tablePipeline := range inbP.pipelines {
38-
c := <-tablePipeline.Run()
39-
if c != nil {
40-
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
41-
}
42+
go func() {
43+
defer wg.Done()
44+
c := <-tablePipeline.Run()
45+
if c != nil {
46+
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
47+
}
48+
}()
4249
}
50+
wg.Wait()
4351
logging.EtlLogger.Info(fmt.Sprintf("END %v", inbP.Name()))
4452
return nil
4553
}

pkg/packages/srcinb/SystemPackage.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"database/sql"
55
"errors"
66
"fmt"
7+
"sync"
78

89
"github.com/ooemperor/go-db-etl/pkg/logging"
910
"github.com/ooemperor/go-db-etl/pkg/pipeline/srcinb"
@@ -34,12 +35,19 @@ func (srcP *SystemPackage) Run() error {
3435
logging.EtlLogger.Error(msg)
3536
return errors.New(msg)
3637
}
38+
39+
var wg sync.WaitGroup
40+
wg.Add(len(srcP.pipelines))
3741
for _, tablePipeline := range srcP.pipelines {
38-
c := <-tablePipeline.Run()
39-
if c != nil {
40-
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
41-
}
42+
go func() {
43+
defer wg.Done()
44+
c := <-tablePipeline.Run()
45+
if c != nil {
46+
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
47+
}
48+
}()
4249
}
50+
wg.Wait()
4351
logging.EtlLogger.Info(fmt.Sprintf("END %v", srcP.Name()))
4452
return nil
4553
}

0 commit comments

Comments
 (0)