Skip to content

Commit

Permalink
feat: improve datanode snapshot creation
Browse files Browse the repository at this point in the history
As of now, the snapshot are created in a sequential and blocking way in the datanode. This means
that while a snapshot is being taken, no block can be processed.

The following approach is made:
- the database is locked with a transaction
- queries are generated
- one by one the query are:
 - executed
 - the result piped into the file system
- finally the lock is released, and later the files are added to ipfs.

The bottle neck here is that the results are being save on the fs as they arrive, which is unecessary
and amount for 95% of the time spent snapshoting (and so blocking anything else).

To prevent this, we keep those results from the database in buffers, and only save them to file via a worker
go routine.

Signed-off-by: Jeremy Letang <[email protected]>
  • Loading branch information
jeremyletang committed Jun 19, 2024
1 parent eb07bfd commit 7b3aa5f
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 20 deletions.
5 changes: 3 additions & 2 deletions cmd/data-node/commands/networkhistory/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func createNetworkHistoryService(ctx context.Context, log *logging.Logger, vegaC
return nil, fmt.Errorf("failed to create network history store: %w", err)
}

fw := snapshot.NewFileWorker()
snapshotService, err := snapshot.NewSnapshotService(log, vegaConfig.NetworkHistory.Snapshot, connPool, networkHistoryStore,
vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo), func(version int64) error {
if err := sqlstore.MigrateUpToSchemaVersion(log, vegaConfig.SQLStore, version, sqlstore.EmbedMigrations); err != nil {
Expand All @@ -234,14 +235,14 @@ func createNetworkHistoryService(ctx context.Context, log *logging.Logger, vegaC
return fmt.Errorf("failed to migrate down to schema version %d: %w", version, err)
}
return nil
})
}, fw)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot service: %w", err)
}

networkHistoryService, err := networkhistory.New(ctx, log, vegaConfig.ChainID, vegaConfig.NetworkHistory,
connPool, snapshotService, networkHistoryStore, vegaConfig.API.Port,
vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo))
vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo), fw)
if err != nil {
return nil, fmt.Errorf("failed new networkhistory service:%w", err)
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/data-node/commands/start/node_pre.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi
networkHistoryServiceLog := networkHistoryLog.Named("service")
home := l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistoryHome)

fw := snapshot.NewFileWorker()

networkHistoryStore, err := store.New(l.ctx, networkHistoryServiceLog, l.conf.ChainID, l.conf.NetworkHistory.Store, home,
l.conf.MaxMemoryPercent)
if err != nil {
Expand All @@ -365,7 +367,7 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi
return fmt.Errorf("failed to migrate down to schema version %d: %w", version, err)
}
return nil
})
}, fw)
if err != nil {
return fmt.Errorf("failed to create snapshot service:%w", err)
}
Expand All @@ -375,7 +377,9 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi
l.snapshotService,
networkHistoryStore,
l.conf.API.Port,
l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo))
l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo),
fw,
)
if err != nil {
return fmt.Errorf("failed to create networkHistory service:%w", err)
}
Expand Down
35 changes: 35 additions & 0 deletions datanode/networkhistory/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@ type Service struct {
datanodeGrpcAPIPort int

publishLock sync.Mutex

fw *snapshot.FileWorker
}

func New(ctx context.Context, log *logging.Logger, chainID string, cfg Config, connPool *pgxpool.Pool,
snapshotService *snapshot.Service,
networkHistoryStore *store.Store, datanodeGrpcAPIPort int,
snapshotsCopyToPath string,
fw *snapshot.FileWorker,
) (*Service, error) {
s := &Service{
cfg: cfg,
Expand All @@ -68,10 +71,35 @@ func New(ctx context.Context, log *logging.Logger, chainID string, cfg Config, c
chainID: chainID,
snapshotsCopyToPath: snapshotsCopyToPath,
datanodeGrpcAPIPort: datanodeGrpcAPIPort,
fw: fw,
}

if cfg.Publish {
var err error
go func() {

Check failure on line 79 in datanode/networkhistory/service.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

Check failure on line 80 in datanode/networkhistory/service.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
// consume all pending files
f := func() {
for !fw.Empty() {
if err = fw.Consume(); err != nil {
s.log.Error("failed to write all files to disk", logging.Error(err))
}
}
}

ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ctx.Done():
f()
return
case <-ticker.C:
f()
}
}
}()

// publish all file which are ready
go func() {
ticker := time.NewTicker(5 * time.Second)
for {
Expand Down Expand Up @@ -182,6 +210,13 @@ func (d *Service) CreateAndPublishSegment(ctx context.Context, chainID string, t
}
}

// empty the file worker
for !d.fw.Empty() {
if err = d.fw.Consume(); err != nil {
d.log.Error("failed to write all files to disk", logging.Error(err))
}
}

if err = d.PublishSegments(ctx); err != nil {
return fmt.Errorf("failed to publish snapshots: %w", err)
}
Expand Down
11 changes: 8 additions & 3 deletions datanode/networkhistory/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,10 @@ func TestMain(t *testing.M) {
datanodeConfig := config2.NewDefaultConfig()
cfg := networkhistory.NewDefaultConfig()

fw := snapshot.NewFileWorker()

_, err = networkhistory.New(outerCtx, log, chainID, cfg, networkHistoryConnPool, snapshotService,
networkHistoryStore, datanodeConfig.API.Port, snapshotCopyToPath)
networkHistoryStore, datanodeConfig.API.Port, snapshotCopyToPath, fw)

if err != nil {
panic(err)
Expand Down Expand Up @@ -1223,8 +1225,10 @@ func setupNetworkHistoryService(ctx context.Context, log *logging.Logger, inputS

datanodeConfig := config2.NewDefaultConfig()

fw := snapshot.NewFileWorker()

networkHistoryService, err := networkhistory.New(ctx, log, chainID, cfg, networkHistoryConnPool,
inputSnapshotService, store, datanodeConfig.API.Port, snapshotCopyToPath)
inputSnapshotService, store, datanodeConfig.API.Port, snapshotCopyToPath, fw)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -1291,9 +1295,10 @@ func assertIntervalHistoryIsEmpty(t *testing.T, historyTableDelta []map[string]t

func setupSnapshotService(snapshotCopyToPath string) *snapshot.Service {
snapshotServiceCfg := snapshot.NewDefaultConfig()
fw := snapshot.NewFileWorker()
snapshotService, err := snapshot.NewSnapshotService(logging.NewTestLogger(), snapshotServiceCfg,
networkHistoryConnPool, networkHistoryStore, snapshotCopyToPath, migrateUpToDatabaseVersion,
migrateDownToDatabaseVersion)
migrateDownToDatabaseVersion, fw)
if err != nil {
panic(err)
}
Expand Down
108 changes: 108 additions & 0 deletions datanode/networkhistory/snapshot/file_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package snapshot

import (
"bytes"
"fmt"
"os"
"sync"
)

type bufAndPath struct {
buf *bytes.Buffer
isProgressFile bool
path string
}

type FileWorker struct {
mu sync.Mutex
queue []*bufAndPath
}

func NewFileWorker() *FileWorker {
return &FileWorker{
queue: []*bufAndPath{},
}
}

func (fw *FileWorker) Add(buf *bytes.Buffer, path string) {
fw.mu.Lock()
defer fw.mu.Unlock()

fw.queue = append(fw.queue, &bufAndPath{buf, false, path})
}

func (fw *FileWorker) AddLockFile(path string) {
fw.mu.Lock()
defer fw.mu.Unlock()

fw.queue = append(fw.queue, &bufAndPath{nil, true, path})
}

func (fw *FileWorker) peek() (bp *bufAndPath) {
fw.mu.Lock()
defer fw.mu.Unlock()

if len(fw.queue) <= 0 {
return
}

bp, fw.queue = fw.queue[0], fw.queue[1:]

return
}

func (fw *FileWorker) Empty() bool {
fw.mu.Lock()
defer fw.mu.Unlock()

return len(fw.queue) <= 0
}

func (fw *FileWorker) Consume() error {
bp := fw.peek()
if bp == nil {
return nil // nothing to do
}

if bp.isProgressFile {
return fw.removeLockFile(bp.path)
}

return fw.writeSegment(bp)
}

func (fw *FileWorker) removeLockFile(path string) error {
return os.Remove(path)
}

func (fw *FileWorker) writeSegment(bp *bufAndPath) error {
file, err := os.Create(bp.path)
if err != nil {
return fmt.Errorf("failed to create file %s : %w", bp.path, err)
}

defer file.Close()

_, err = bp.buf.WriteTo(file)
if err != nil {
return fmt.Errorf("couldn't writer to file %v : %w", bp.path, err)
}

return nil

Check failure on line 107 in datanode/networkhistory/snapshot/file_worker.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
}
4 changes: 4 additions & 0 deletions datanode/networkhistory/snapshot/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Service struct {

historyStore HistoryStore

fw *FileWorker

createSnapshotLock mutex.CtxMutex
copyToPath string
migrateSchemaUpToVersion func(version int64) error
Expand All @@ -51,6 +53,7 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po
snapshotsCopyToPath string,
migrateDatabaseToVersion func(version int64) error,
migrateSchemaDownToVersion func(version int64) error,
fw *FileWorker,
) (*Service, error) {
var err error

Expand All @@ -67,6 +70,7 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po
migrateSchemaUpToVersion: migrateDatabaseToVersion,
migrateSchemaDownToVersion: migrateSchemaDownToVersion,
historyStore: historyStore,
fw: fw,
}

err = os.MkdirAll(s.copyToPath, fs.ModePerm)
Expand Down
Loading

0 comments on commit 7b3aa5f

Please sign in to comment.