From 9098e67e7b5e6a3299df5e456f193b92d52ddbbc Mon Sep 17 00:00:00 2001 From: Jeremy Letang Date: Wed, 19 Jun 2024 15:31:15 +0100 Subject: [PATCH] feat: improve datanode snapshot creation As of now, the snapshot are created in a sequential and blocking way in the datanode. This means that while a snapshot is being taken, no block can be processed. The following approach is made: - the database is locked with a transaction - queries are generated - one by one the query are: - executed - the result piped into the file system - finally the lock is released, and later the files are added to ipfs. The bottle neck here is that the results are being save on the fs as they arrive, which is unecessary and amount for 95% of the time spent snapshoting (and so blocking anything else). To prevent this, we keep those results from the database in buffers, and only save them to file via a worker go routine. Signed-off-by: Jeremy Letang --- cmd/data-node/commands/start/node_pre.go | 3 +- datanode/networkhistory/service.go | 23 ++++ datanode/networkhistory/service_test.go | 47 ++++++-- .../networkhistory/snapshot/file_worker.go | 107 ++++++++++++++++++ datanode/networkhistory/snapshot/service.go | 11 ++ .../snapshot/service_create_snapshot.go | 54 ++++++--- .../snapshot/service_create_snapshot_test.go | 1 + 7 files changed, 220 insertions(+), 26 deletions(-) create mode 100644 datanode/networkhistory/snapshot/file_worker.go diff --git a/cmd/data-node/commands/start/node_pre.go b/cmd/data-node/commands/start/node_pre.go index 3b0b8b2a454..dfb7947ef84 100644 --- a/cmd/data-node/commands/start/node_pre.go +++ b/cmd/data-node/commands/start/node_pre.go @@ -375,7 +375,8 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi l.snapshotService, networkHistoryStore, l.conf.API.Port, - l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo)) + l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo), + ) if err != nil { return fmt.Errorf("failed to create networkHistory service:%w", err) } diff --git a/datanode/networkhistory/service.go b/datanode/networkhistory/service.go index b60f77ef4b3..58a78c1f429 100644 --- a/datanode/networkhistory/service.go +++ b/datanode/networkhistory/service.go @@ -70,8 +70,26 @@ func New(ctx context.Context, log *logging.Logger, chainID string, cfg Config, c datanodeGrpcAPIPort: datanodeGrpcAPIPort, } + go func() { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ctx.Done(): + s.log.Info("saving network history before before leaving") + // consume all pending files + s.snapshotService.Flush() + s.log.Info("network history saved (maybe)") + return + case <-ticker.C: + s.snapshotService.Flush() + } + } + }() + if cfg.Publish { var err error + + // publish all file which are ready go func() { ticker := time.NewTicker(5 * time.Second) for { @@ -182,6 +200,11 @@ func (d *Service) CreateAndPublishSegment(ctx context.Context, chainID string, t } } + // empty the file worker + d.log.Info("saving network history to disk") + d.snapshotService.Flush() + d.log.Info("network history saved to disk") + if err = d.PublishSegments(ctx); err != nil { return fmt.Errorf("failed to publish snapshots: %w", err) } diff --git a/datanode/networkhistory/service_test.go b/datanode/networkhistory/service_test.go index 2cd98f02c86..5806d5648c0 100644 --- a/datanode/networkhistory/service_test.go +++ b/datanode/networkhistory/service_test.go @@ -148,7 +148,7 @@ func TestMain(t *testing.M) { pgLog *bytes.Buffer, ) { sqlConfig = config - log.Infof("DB Connection String: ", sqlConfig.ConnectionConfig.GetConnectionString()) + log.Infof("DB Connection String: %v", sqlConfig.ConnectionConfig.GetConnectionString()) pool, err := sqlstore.CreateConnectionPool(outerCtx, sqlConfig.ConnectionConfig) if err != nil { @@ -184,7 +184,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot: %w", err)) } - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, snapshotService.Flush) snapshots = append(snapshots, ss) @@ -211,7 +211,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } - waitForSnapshotToComplete(lastSnapshot) + waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -268,7 +268,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } - waitForSnapshotToComplete(lastSnapshot) + waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -421,6 +421,7 @@ func TestLoadingDataFetchedAsynchronously(t *testing.T) { require.Equal(t, int64(1000), fetched) networkhistoryService := setupNetworkHistoryService(ctx, log, snapshotService, networkHistoryStore, snapshotCopyToPath) + segments, err := networkhistoryService.ListAllHistorySegments() require.NoError(t, err) @@ -583,7 +584,7 @@ func TestRestoringNodeThatAlreadyContainsData(t *testing.T) { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, snapshotService.Flush) md5Hash, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -888,7 +889,7 @@ func TestRestoreFromPartialHistoryAndProcessEvents(t *testing.T) { if lastCommittedBlockHeight > 0 && lastCommittedBlockHeight%snapshotInterval == 0 { ss, err = service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) if lastCommittedBlockHeight == 4000 { newSnapshotFileHashAt4000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) @@ -993,7 +994,7 @@ func TestRestoreFromFullHistorySnapshotAndProcessEvents(t *testing.T) { if lastCommittedBlockHeight == 3000 { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -1096,7 +1097,7 @@ func TestRestoreFromFullHistorySnapshotWithIndexesAndOrderTriggersAndProcessEven if lastCommittedBlockHeight == 3000 { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -1578,6 +1579,36 @@ func waitForSnapshotToComplete(sf segment.Unpublished) { } } +func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) { + for { + time.Sleep(10 * time.Millisecond) + // wait for snapshot current file + _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + continue + } else { + panic(err) + } + } + + flush() + + // wait for snapshot data dump in progress file to be removed + + _, err = os.Stat(sf.InProgressFilePath()) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + break + } else { + panic(err) + } + } else { + continue + } + } +} + func decompressEventFile() { sourceFile, err := os.Open(compressedEventsFile) if err != nil { diff --git a/datanode/networkhistory/snapshot/file_worker.go b/datanode/networkhistory/snapshot/file_worker.go new file mode 100644 index 00000000000..df0ce8b9472 --- /dev/null +++ b/datanode/networkhistory/snapshot/file_worker.go @@ -0,0 +1,107 @@ +// Copyright (C) 2023 Gobalsky Labs Limited +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package snapshot + +import ( + "bytes" + "fmt" + "os" + "sync" +) + +type bufAndPath struct { + buf *bytes.Buffer + isProgressFile bool + path string +} + +type FileWorker struct { + mu sync.Mutex + queue []*bufAndPath +} + +func NewFileWorker() *FileWorker { + return &FileWorker{ + queue: []*bufAndPath{}, + } +} + +func (fw *FileWorker) Add(buf *bytes.Buffer, path string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + fw.queue = append(fw.queue, &bufAndPath{buf, false, path}) +} + +func (fw *FileWorker) AddLockFile(path string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + fw.queue = append(fw.queue, &bufAndPath{nil, true, path}) +} + +func (fw *FileWorker) peek() (bp *bufAndPath) { + fw.mu.Lock() + defer fw.mu.Unlock() + + if len(fw.queue) <= 0 { + return + } + + bp, fw.queue = fw.queue[0], fw.queue[1:] + + return +} + +func (fw *FileWorker) Empty() bool { + fw.mu.Lock() + defer fw.mu.Unlock() + + return len(fw.queue) <= 0 +} + +func (fw *FileWorker) Consume() error { + bp := fw.peek() + if bp == nil { + return nil // nothing to do + } + + if bp.isProgressFile { + return fw.removeLockFile(bp.path) + } + + return fw.writeSegment(bp) +} + +func (fw *FileWorker) removeLockFile(path string) error { + return os.Remove(path) +} + +func (fw *FileWorker) writeSegment(bp *bufAndPath) error { + file, err := os.Create(bp.path) + if err != nil { + return fmt.Errorf("failed to create file %s : %w", bp.path, err) + } + + defer file.Close() + + _, err = bp.buf.WriteTo(file) + if err != nil { + return fmt.Errorf("couldn't writer to file %v : %w", bp.path, err) + } + + return nil +} diff --git a/datanode/networkhistory/snapshot/service.go b/datanode/networkhistory/snapshot/service.go index f81ae8f8bb4..4af7eee75ca 100644 --- a/datanode/networkhistory/snapshot/service.go +++ b/datanode/networkhistory/snapshot/service.go @@ -40,6 +40,8 @@ type Service struct { historyStore HistoryStore + fw *FileWorker + createSnapshotLock mutex.CtxMutex copyToPath string migrateSchemaUpToVersion func(version int64) error @@ -67,6 +69,7 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po migrateSchemaUpToVersion: migrateDatabaseToVersion, migrateSchemaDownToVersion: migrateSchemaDownToVersion, historyStore: historyStore, + fw: NewFileWorker(), } err = os.MkdirAll(s.copyToPath, fs.ModePerm) @@ -77,6 +80,14 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po return s, nil } +func (b *Service) Flush() { + for !b.fw.Empty() { + if err := b.fw.Consume(); err != nil { + b.log.Error("failed to write all files to disk", logging.Error(err)) + } + } +} + func (b *Service) SnapshotData(ctx context.Context, chainID string, toHeight int64) error { _, err := b.CreateSnapshotAsynchronously(ctx, chainID, toHeight) if err != nil { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot.go b/datanode/networkhistory/snapshot/service_create_snapshot.go index b4d46bb300a..c8fc71d7414 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot.go @@ -16,6 +16,7 @@ package snapshot import ( + "bytes" "context" "errors" "fmt" @@ -29,7 +30,6 @@ import ( "code.vegaprotocol.io/vega/datanode/networkhistory/segment" "code.vegaprotocol.io/vega/datanode/sqlstore" "code.vegaprotocol.io/vega/libs/fs" - vio "code.vegaprotocol.io/vega/libs/io" "code.vegaprotocol.io/vega/logging" "github.com/georgysavva/scany/pgxscan" @@ -50,7 +50,10 @@ func (b *Service) CreateSnapshotAsynchronously(ctx context.Context, chainID stri return b.createNewSnapshot(ctx, chainID, toHeight, true) } -func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeight int64, +func (b *Service) createNewSnapshot( + ctx context.Context, + chainID string, + toHeight int64, async bool, ) (segment.Unpublished, error) { var err error @@ -115,11 +118,12 @@ func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeigh runAllInReverseOrder(cleanUp) return segment.Unpublished{}, fmt.Errorf("failed to create write lock file:%w", err) } - cleanUp = append(cleanUp, func() { _ = os.Remove(s.InProgressFilePath()) }) + // cleanUp = append(cleanUp, func() { _ = os.Remove(s.InProgressFilePath()) }) // To ensure reads are isolated from this point forward execute a read on last block _, err = sqlstore.GetLastBlockUsingConnection(ctx, copyDataTx) if err != nil { + _ = os.Remove(s.InProgressFilePath()) runAllInReverseOrder(cleanUp) return segment.Unpublished{}, fmt.Errorf("failed to get last block using connection: %w", err) } @@ -130,6 +134,8 @@ func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeigh if err != nil { b.log.Panic("failed to snapshot data", logging.Error(err)) } + + b.fw.AddLockFile(s.InProgressFilePath()) } if async { @@ -241,14 +247,14 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa // Write Current State currentSQL := currentStateCopySQL(dbMetaData) - currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir) + currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir, b.fw) if err != nil { return fmt.Errorf("failed to copy current state table data:%w", err) } // Write History historySQL := historyCopySQL(dbMetaData, seg) - historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir) + historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir, b.fw) if err != nil { return fmt.Errorf("failed to copy history table data:%w", err) } @@ -310,15 +316,24 @@ func historyCopySQL(dbMetaData DatabaseMetadata, segment interface{ GetFromHeigh return copySQL } -func copyTablesData(ctx context.Context, tx pgx.Tx, copySQL []TableCopySql, toDir string) (int64, int64, error) { +func copyTablesData( + ctx context.Context, + tx pgx.Tx, + copySQL []TableCopySql, + toDir string, + fw *FileWorker, +) (int64, int64, error) { var totalRowsCopied int64 var totalBytesCopied int64 + for _, tableSql := range copySQL { filePath := path.Join(toDir, tableSql.metaData.Name) - numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) + // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) + numRowsCopied, bytesCopied, err := extractTableData(ctx, tx, filePath, tableSql, fw) if err != nil { return 0, 0, fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) } + totalRowsCopied += numRowsCopied totalBytesCopied += bytesCopied } @@ -326,20 +341,25 @@ func copyTablesData(ctx context.Context, tx pgx.Tx, copySQL []TableCopySql, toDi return totalRowsCopied, totalBytesCopied, nil } -func writeTableToDataFile(ctx context.Context, tx pgx.Tx, filePath string, tableSql TableCopySql) (int64, int64, error) { - file, err := os.Create(filePath) - if err != nil { - return 0, 0, fmt.Errorf("failed to create file %s:%w", filePath, err) - } - defer file.Close() - - fileWriter := vio.NewCountWriter(file) +func extractTableData( + ctx context.Context, + tx pgx.Tx, + filePath string, + tableSql TableCopySql, + fw *FileWorker, +) (int64, int64, error) { + var b bytes.Buffer - numRowsCopied, err := executeCopy(ctx, tx, tableSql, fileWriter) + numRowsCopied, err := executeCopy(ctx, tx, tableSql, &b) if err != nil { return 0, 0, fmt.Errorf("failed to execute copy: %w", err) } - return numRowsCopied, fileWriter.Count(), nil + + len := int64(b.Len()) + // schedule it + fw.Add(&b, filePath) + + return numRowsCopied, len, nil } func executeCopy(ctx context.Context, tx pgx.Tx, tableSql TableCopySql, w io.Writer) (int64, error) { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot_test.go b/datanode/networkhistory/snapshot/service_create_snapshot_test.go index a8871024673..6e43b1a68f2 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot_test.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot_test.go @@ -28,6 +28,7 @@ import ( func TestGetHistorySnapshots(t *testing.T) { snapshotsDir := t.TempDir() + service, err := snapshot.NewSnapshotService(logging.NewTestLogger(), snapshot.NewDefaultConfig(), nil, nil, snapshotsDir, nil, nil) if err != nil { panic(err)