From 9a29175abb47ba3b74949bb60301510e08a0c185 Mon Sep 17 00:00:00 2001 From: Jeremy Letang Date: Wed, 19 Jun 2024 15:31:15 +0100 Subject: [PATCH 1/3] feat: improve datanode snapshot creation As of now, the snapshot are created in a sequential and blocking way in the datanode. This means that while a snapshot is being taken, no block can be processed. The following approach is made: - the database is locked with a transaction - queries are generated - one by one the query are: - executed - the result piped into the file system - finally the lock is released, and later the files are added to ipfs. The bottle neck here is that the results are being save on the fs as they arrive, which is unecessary and amount for 95% of the time spent snapshoting (and so blocking anything else). To prevent this, we keep those results from the database in buffers, and only save them to file via a worker go routine. Cache buffer size in datanode snapshot. Signed-off-by: Jeremy Letang --- cmd/data-node/commands/start/node_pre.go | 3 +- datanode/networkhistory/service.go | 23 ++++ datanode/networkhistory/service_test.go | 47 ++++++-- .../networkhistory/snapshot/file_worker.go | 107 ++++++++++++++++++ datanode/networkhistory/snapshot/service.go | 29 +++-- .../snapshot/service_create_snapshot.go | 71 +++++++++--- .../snapshot/service_create_snapshot_test.go | 1 + 7 files changed, 249 insertions(+), 32 deletions(-) create mode 100644 datanode/networkhistory/snapshot/file_worker.go diff --git a/cmd/data-node/commands/start/node_pre.go b/cmd/data-node/commands/start/node_pre.go index 3b0b8b2a45..dfb7947ef8 100644 --- a/cmd/data-node/commands/start/node_pre.go +++ b/cmd/data-node/commands/start/node_pre.go @@ -375,7 +375,8 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi l.snapshotService, networkHistoryStore, l.conf.API.Port, - l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo)) + l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo), + ) if err != nil { return fmt.Errorf("failed to create networkHistory service:%w", err) } diff --git a/datanode/networkhistory/service.go b/datanode/networkhistory/service.go index b60f77ef4b..58a78c1f42 100644 --- a/datanode/networkhistory/service.go +++ b/datanode/networkhistory/service.go @@ -70,8 +70,26 @@ func New(ctx context.Context, log *logging.Logger, chainID string, cfg Config, c datanodeGrpcAPIPort: datanodeGrpcAPIPort, } + go func() { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ctx.Done(): + s.log.Info("saving network history before before leaving") + // consume all pending files + s.snapshotService.Flush() + s.log.Info("network history saved (maybe)") + return + case <-ticker.C: + s.snapshotService.Flush() + } + } + }() + if cfg.Publish { var err error + + // publish all file which are ready go func() { ticker := time.NewTicker(5 * time.Second) for { @@ -182,6 +200,11 @@ func (d *Service) CreateAndPublishSegment(ctx context.Context, chainID string, t } } + // empty the file worker + d.log.Info("saving network history to disk") + d.snapshotService.Flush() + d.log.Info("network history saved to disk") + if err = d.PublishSegments(ctx); err != nil { return fmt.Errorf("failed to publish snapshots: %w", err) } diff --git a/datanode/networkhistory/service_test.go b/datanode/networkhistory/service_test.go index 2cd98f02c8..5806d5648c 100644 --- a/datanode/networkhistory/service_test.go +++ b/datanode/networkhistory/service_test.go @@ -148,7 +148,7 @@ func TestMain(t *testing.M) { pgLog *bytes.Buffer, ) { sqlConfig = config - log.Infof("DB Connection String: ", sqlConfig.ConnectionConfig.GetConnectionString()) + log.Infof("DB Connection String: %v", sqlConfig.ConnectionConfig.GetConnectionString()) pool, err := sqlstore.CreateConnectionPool(outerCtx, sqlConfig.ConnectionConfig) if err != nil { @@ -184,7 +184,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot: %w", err)) } - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, snapshotService.Flush) snapshots = append(snapshots, ss) @@ -211,7 +211,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } - waitForSnapshotToComplete(lastSnapshot) + waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -268,7 +268,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } - waitForSnapshotToComplete(lastSnapshot) + waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -421,6 +421,7 @@ func TestLoadingDataFetchedAsynchronously(t *testing.T) { require.Equal(t, int64(1000), fetched) networkhistoryService := setupNetworkHistoryService(ctx, log, snapshotService, networkHistoryStore, snapshotCopyToPath) + segments, err := networkhistoryService.ListAllHistorySegments() require.NoError(t, err) @@ -583,7 +584,7 @@ func TestRestoringNodeThatAlreadyContainsData(t *testing.T) { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, snapshotService.Flush) md5Hash, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -888,7 +889,7 @@ func TestRestoreFromPartialHistoryAndProcessEvents(t *testing.T) { if lastCommittedBlockHeight > 0 && lastCommittedBlockHeight%snapshotInterval == 0 { ss, err = service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) if lastCommittedBlockHeight == 4000 { newSnapshotFileHashAt4000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) @@ -993,7 +994,7 @@ func TestRestoreFromFullHistorySnapshotAndProcessEvents(t *testing.T) { if lastCommittedBlockHeight == 3000 { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -1096,7 +1097,7 @@ func TestRestoreFromFullHistorySnapshotWithIndexesAndOrderTriggersAndProcessEven if lastCommittedBlockHeight == 3000 { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -1578,6 +1579,36 @@ func waitForSnapshotToComplete(sf segment.Unpublished) { } } +func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) { + for { + time.Sleep(10 * time.Millisecond) + // wait for snapshot current file + _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + continue + } else { + panic(err) + } + } + + flush() + + // wait for snapshot data dump in progress file to be removed + + _, err = os.Stat(sf.InProgressFilePath()) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + break + } else { + panic(err) + } + } else { + continue + } + } +} + func decompressEventFile() { sourceFile, err := os.Open(compressedEventsFile) if err != nil { diff --git a/datanode/networkhistory/snapshot/file_worker.go b/datanode/networkhistory/snapshot/file_worker.go new file mode 100644 index 0000000000..df0ce8b947 --- /dev/null +++ b/datanode/networkhistory/snapshot/file_worker.go @@ -0,0 +1,107 @@ +// Copyright (C) 2023 Gobalsky Labs Limited +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package snapshot + +import ( + "bytes" + "fmt" + "os" + "sync" +) + +type bufAndPath struct { + buf *bytes.Buffer + isProgressFile bool + path string +} + +type FileWorker struct { + mu sync.Mutex + queue []*bufAndPath +} + +func NewFileWorker() *FileWorker { + return &FileWorker{ + queue: []*bufAndPath{}, + } +} + +func (fw *FileWorker) Add(buf *bytes.Buffer, path string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + fw.queue = append(fw.queue, &bufAndPath{buf, false, path}) +} + +func (fw *FileWorker) AddLockFile(path string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + fw.queue = append(fw.queue, &bufAndPath{nil, true, path}) +} + +func (fw *FileWorker) peek() (bp *bufAndPath) { + fw.mu.Lock() + defer fw.mu.Unlock() + + if len(fw.queue) <= 0 { + return + } + + bp, fw.queue = fw.queue[0], fw.queue[1:] + + return +} + +func (fw *FileWorker) Empty() bool { + fw.mu.Lock() + defer fw.mu.Unlock() + + return len(fw.queue) <= 0 +} + +func (fw *FileWorker) Consume() error { + bp := fw.peek() + if bp == nil { + return nil // nothing to do + } + + if bp.isProgressFile { + return fw.removeLockFile(bp.path) + } + + return fw.writeSegment(bp) +} + +func (fw *FileWorker) removeLockFile(path string) error { + return os.Remove(path) +} + +func (fw *FileWorker) writeSegment(bp *bufAndPath) error { + file, err := os.Create(bp.path) + if err != nil { + return fmt.Errorf("failed to create file %s : %w", bp.path, err) + } + + defer file.Close() + + _, err = bp.buf.WriteTo(file) + if err != nil { + return fmt.Errorf("couldn't writer to file %v : %w", bp.path, err) + } + + return nil +} diff --git a/datanode/networkhistory/snapshot/service.go b/datanode/networkhistory/snapshot/service.go index f81ae8f8bb..bfebd2d068 100644 --- a/datanode/networkhistory/snapshot/service.go +++ b/datanode/networkhistory/snapshot/service.go @@ -40,6 +40,9 @@ type Service struct { historyStore HistoryStore + fw *FileWorker + tableSnapshotFileSizesCached map[string]int + createSnapshotLock mutex.CtxMutex copyToPath string migrateSchemaUpToVersion func(version int64) error @@ -59,14 +62,16 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po } s := &Service{ - log: log, - config: config, - connPool: connPool, - createSnapshotLock: mutex.New(), - copyToPath: snapshotsCopyToPath, - migrateSchemaUpToVersion: migrateDatabaseToVersion, - migrateSchemaDownToVersion: migrateSchemaDownToVersion, - historyStore: historyStore, + log: log, + config: config, + connPool: connPool, + createSnapshotLock: mutex.New(), + copyToPath: snapshotsCopyToPath, + migrateSchemaUpToVersion: migrateDatabaseToVersion, + migrateSchemaDownToVersion: migrateSchemaDownToVersion, + historyStore: historyStore, + fw: NewFileWorker(), + tableSnapshotFileSizesCached: map[string]int{}, } err = os.MkdirAll(s.copyToPath, fs.ModePerm) @@ -77,6 +82,14 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po return s, nil } +func (b *Service) Flush() { + for !b.fw.Empty() { + if err := b.fw.Consume(); err != nil { + b.log.Error("failed to write all files to disk", logging.Error(err)) + } + } +} + func (b *Service) SnapshotData(ctx context.Context, chainID string, toHeight int64) error { _, err := b.CreateSnapshotAsynchronously(ctx, chainID, toHeight) if err != nil { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot.go b/datanode/networkhistory/snapshot/service_create_snapshot.go index b4d46bb300..c4e6939929 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot.go @@ -16,6 +16,7 @@ package snapshot import ( + "bytes" "context" "errors" "fmt" @@ -29,7 +30,6 @@ import ( "code.vegaprotocol.io/vega/datanode/networkhistory/segment" "code.vegaprotocol.io/vega/datanode/sqlstore" "code.vegaprotocol.io/vega/libs/fs" - vio "code.vegaprotocol.io/vega/libs/io" "code.vegaprotocol.io/vega/logging" "github.com/georgysavva/scany/pgxscan" @@ -50,7 +50,10 @@ func (b *Service) CreateSnapshotAsynchronously(ctx context.Context, chainID stri return b.createNewSnapshot(ctx, chainID, toHeight, true) } -func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeight int64, +func (b *Service) createNewSnapshot( + ctx context.Context, + chainID string, + toHeight int64, async bool, ) (segment.Unpublished, error) { var err error @@ -115,11 +118,12 @@ func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeigh runAllInReverseOrder(cleanUp) return segment.Unpublished{}, fmt.Errorf("failed to create write lock file:%w", err) } - cleanUp = append(cleanUp, func() { _ = os.Remove(s.InProgressFilePath()) }) + // cleanUp = append(cleanUp, func() { _ = os.Remove(s.InProgressFilePath()) }) // To ensure reads are isolated from this point forward execute a read on last block _, err = sqlstore.GetLastBlockUsingConnection(ctx, copyDataTx) if err != nil { + _ = os.Remove(s.InProgressFilePath()) runAllInReverseOrder(cleanUp) return segment.Unpublished{}, fmt.Errorf("failed to get last block using connection: %w", err) } @@ -130,6 +134,8 @@ func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeigh if err != nil { b.log.Panic("failed to snapshot data", logging.Error(err)) } + + b.fw.AddLockFile(s.InProgressFilePath()) } if async { @@ -241,14 +247,14 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa // Write Current State currentSQL := currentStateCopySQL(dbMetaData) - currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir) + currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) if err != nil { return fmt.Errorf("failed to copy current state table data:%w", err) } // Write History historySQL := historyCopySQL(dbMetaData, seg) - historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir) + historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) if err != nil { return fmt.Errorf("failed to copy history table data:%w", err) } @@ -310,15 +316,25 @@ func historyCopySQL(dbMetaData DatabaseMetadata, segment interface{ GetFromHeigh return copySQL } -func copyTablesData(ctx context.Context, tx pgx.Tx, copySQL []TableCopySql, toDir string) (int64, int64, error) { +func copyTablesData( + ctx context.Context, + tx pgx.Tx, + copySQL []TableCopySql, + toDir string, + fw *FileWorker, + lenCache map[string]int, +) (int64, int64, error) { var totalRowsCopied int64 var totalBytesCopied int64 + for _, tableSql := range copySQL { filePath := path.Join(toDir, tableSql.metaData.Name) - numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) + // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) + numRowsCopied, bytesCopied, err := extractTableData(ctx, tx, filePath, tableSql, fw, lenCache) if err != nil { return 0, 0, fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) } + totalRowsCopied += numRowsCopied totalBytesCopied += bytesCopied } @@ -326,20 +342,45 @@ func copyTablesData(ctx context.Context, tx pgx.Tx, copySQL []TableCopySql, toDi return totalRowsCopied, totalBytesCopied, nil } -func writeTableToDataFile(ctx context.Context, tx pgx.Tx, filePath string, tableSql TableCopySql) (int64, int64, error) { - file, err := os.Create(filePath) - if err != nil { - return 0, 0, fmt.Errorf("failed to create file %s:%w", filePath, err) +func extractTableData( + ctx context.Context, + tx pgx.Tx, + filePath string, + tableSql TableCopySql, + fw *FileWorker, + lenCache map[string]int, +) (int64, int64, error) { + allocCap := lenCache[tableSql.metaData.Name] + + fmt.Printf("DEBUGTEMP: %v - initialAllocCap(%v)\n", tableSql.metaData.Name, allocCap) + + if allocCap == 0 { + allocCap = 1000000 // roughly 1mb, because why not. + } else { + // if we already have something cached maybe this is growing + // a grow of 30% is not unreasonnable? + // should leave us some room + allocCap += allocCap / 3 } - defer file.Close() - fileWriter := vio.NewCountWriter(file) + fmt.Printf("DEBUGTEMP: %v - finalAllocCap(%v)", tableSql.metaData.Name, allocCap) + + b := bytes.NewBuffer(make([]byte, 0, allocCap)) - numRowsCopied, err := executeCopy(ctx, tx, tableSql, fileWriter) + numRowsCopied, err := executeCopy(ctx, tx, tableSql, b) if err != nil { return 0, 0, fmt.Errorf("failed to execute copy: %w", err) } - return numRowsCopied, fileWriter.Count(), nil + + len := int64(b.Len()) + // schedule it + fw.Add(b, filePath) + + // save the new len for this table + lenCache[tableSql.metaData.Name] = int(len) + fmt.Printf("DEBUGTEMP: %v - actualLen(%v)\n", tableSql.metaData.Name, len) + + return numRowsCopied, len, nil } func executeCopy(ctx context.Context, tx pgx.Tx, tableSql TableCopySql, w io.Writer) (int64, error) { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot_test.go b/datanode/networkhistory/snapshot/service_create_snapshot_test.go index a887102467..6e43b1a68f 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot_test.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot_test.go @@ -28,6 +28,7 @@ import ( func TestGetHistorySnapshots(t *testing.T) { snapshotsDir := t.TempDir() + service, err := snapshot.NewSnapshotService(logging.NewTestLogger(), snapshot.NewDefaultConfig(), nil, nil, snapshotsDir, nil, nil) if err != nil { panic(err) From 9593fa4fd4ac9d7d0d49e54aaeb93f1f52426f4c Mon Sep 17 00:00:00 2001 From: Jeremy Letang Date: Mon, 8 Jul 2024 14:19:57 +0100 Subject: [PATCH 2/3] chore: debug and ssuch Signed-off-by: Jeremy Letang --- datanode/networkhistory/service_test.go | 14 +- .../snapshot/service_create_snapshot.go | 234 ++++++++++++++++-- 2 files changed, 218 insertions(+), 30 deletions(-) diff --git a/datanode/networkhistory/service_test.go b/datanode/networkhistory/service_test.go index 5806d5648c..1cb75c6eb8 100644 --- a/datanode/networkhistory/service_test.go +++ b/datanode/networkhistory/service_test.go @@ -94,6 +94,7 @@ var ( ) func TestMain(t *testing.M) { + dirs := map[string]string{} outerCtx, cancelOuterCtx := context.WithCancel(context.Background()) defer cancelOuterCtx() @@ -184,6 +185,10 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot: %w", err)) } + dirs["dir1"] = ss.Directory + // fmt.Printf("unpublished: %v\n", ss.Directory) + // os.Exit(1) + waitForSnapshotToComplete2(ss, snapshotService.Flush) snapshots = append(snapshots, ss) @@ -211,6 +216,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } + dirs["dir2"] = lastSnapshot.Directory waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) @@ -268,6 +274,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } + dirs["dir3"] = lastSnapshot.Directory waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) @@ -378,7 +385,8 @@ func TestMain(t *testing.M) { log.Infof("%s", goldenSourceHistorySegment[3000].HistorySegmentID) log.Infof("%s", goldenSourceHistorySegment[4000].HistorySegmentID) log.Infof("%s", goldenSourceHistorySegment[5000].HistorySegmentID) - + fmt.Printf("DIRECTORIES: %v\n", dirs) + os.Exit(1) panicIfHistorySegmentIdsNotEqual(goldenSourceHistorySegment[1000].HistorySegmentID, "QmQX6n82ex2XDh1tWL1gCv2viDttUwRSdyG1XaekYfLpJk", snapshots) panicIfHistorySegmentIdsNotEqual(goldenSourceHistorySegment[2000].HistorySegmentID, "QmaWdp5RPui6ePszzPvk48e7FxHmPGx2VMpbXD2NTgtFMT", snapshots) panicIfHistorySegmentIdsNotEqual(goldenSourceHistorySegment[2500].HistorySegmentID, "QmRmAX4AfQ9xAdLN8GjVCBmDH7Cm6q1ts7TBF9UjLdjMG9", snapshots) @@ -1553,7 +1561,7 @@ func getSnapshotIntervalToHistoryTableDeltaSummary(ctx context.Context, func waitForSnapshotToComplete(sf segment.Unpublished) { for { - time.Sleep(10 * time.Millisecond) + time.Sleep(5 * time.Millisecond) // wait for snapshot current file _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -1581,7 +1589,7 @@ func waitForSnapshotToComplete(sf segment.Unpublished) { func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) { for { - time.Sleep(10 * time.Millisecond) + time.Sleep(5 * time.Millisecond) // wait for snapshot current file _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) if err != nil { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot.go b/datanode/networkhistory/snapshot/service_create_snapshot.go index c4e6939929..dd58facdc0 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot.go @@ -24,6 +24,8 @@ import ( "os" "path" "sort" + "sync" + "sync/atomic" "time" "code.vegaprotocol.io/vega/datanode/metrics" @@ -34,7 +36,9 @@ import ( "github.com/georgysavva/scany/pgxscan" "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" ) var ( @@ -130,7 +134,7 @@ func (b *Service) createNewSnapshot( snapshotData := func() { defer func() { runAllInReverseOrder(cleanUp) }() - err = b.snapshotData(ctx, copyDataTx, dbMetaData, s) + err = b.snapshotData(ctx, copyDataTx, dbMetaData, s, b.connPool) if err != nil { b.log.Panic("failed to snapshot data", logging.Error(err)) } @@ -218,7 +222,7 @@ func runAllInReverseOrder(functions []func()) { } } -func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData DatabaseMetadata, seg segment.Unpublished) error { +func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData DatabaseMetadata, seg segment.Unpublished, pool *pgxpool.Pool) error { defer func() { // Calling rollback on a committed transaction has no effect, hence we can rollback in defer to ensure // always rolled back if the transaction was not successfully committed @@ -245,27 +249,39 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa return fmt.Errorf("failed to create history state directory:%w", err) } + fmt.Printf("\n\nCURRENT STATE\n\n") // Write Current State - currentSQL := currentStateCopySQL(dbMetaData) - currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) + currentSQL, lastSpan := currentStateCopySQL(dbMetaData) + // currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) + currentRowsCopied, currentStateBytesCopied, err := copyTablesDataAsync(ctx, pool, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) if err != nil { return fmt.Errorf("failed to copy current state table data:%w", err) } + fmt.Printf("\n\nCURRENT STATE DONE\n\n") + + fmt.Printf("\n\nHISTORY STATE\n\n") // Write History historySQL := historyCopySQL(dbMetaData, seg) - historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) + // historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) + historyRowsCopied, historyBytesCopied, err := copyTablesDataAsync(ctx, pool, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) if err != nil { return fmt.Errorf("failed to copy history table data:%w", err) } + fmt.Printf("\n\nHISTORY STATE DONE\n\n") + + lastSpanCopied, lastSpanBytesCopied, err := copyTablesDataLastSpan(ctx, tx, lastSpan, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) + if err != nil { + return fmt.Errorf("failed to copy last span table data:%w", err) + } err = tx.Commit(ctx) if err != nil { return fmt.Errorf("failed to commit snapshot transaction:%w", err) } - metrics.SetLastSnapshotRowcount(float64(currentRowsCopied + historyRowsCopied)) - metrics.SetLastSnapshotCurrentStateBytes(float64(currentStateBytesCopied)) + metrics.SetLastSnapshotRowcount(float64(currentRowsCopied + historyRowsCopied + lastSpanCopied)) + metrics.SetLastSnapshotCurrentStateBytes(float64(currentStateBytesCopied + lastSpanBytesCopied)) metrics.SetLastSnapshotHistoryBytes(float64(historyBytesCopied)) metrics.SetLastSnapshotSeconds(time.Since(start).Seconds()) @@ -280,8 +296,9 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa return nil } -func currentStateCopySQL(dbMetaData DatabaseMetadata) []TableCopySql { +func currentStateCopySQL(dbMetaData DatabaseMetadata) ([]TableCopySql, TableCopySql) { var copySQL []TableCopySql + var lastSpan TableCopySql tablesNames := maps.Keys(dbMetaData.TableNameToMetaData) sort.Strings(tablesNames) @@ -290,10 +307,15 @@ func currentStateCopySQL(dbMetaData DatabaseMetadata) []TableCopySql { if !dbMetaData.TableNameToMetaData[tableName].Hypertable { tableCopySQL := fmt.Sprintf(`copy (select * from %s order by %s) TO STDOUT WITH (FORMAT csv, HEADER) `, tableName, meta.SortOrder) - copySQL = append(copySQL, TableCopySql{meta, tableCopySQL}) + + if meta.Name == "last_snapshot_span" { + lastSpan = TableCopySql{meta, tableCopySQL} + } else { + copySQL = append(copySQL, TableCopySql{meta, tableCopySQL}) + } } } - return copySQL + return copySQL, lastSpan } func historyCopySQL(dbMetaData DatabaseMetadata, segment interface{ GetFromHeight() int64 }) []TableCopySql { @@ -316,42 +338,186 @@ func historyCopySQL(dbMetaData DatabaseMetadata, segment interface{ GetFromHeigh return copySQL } -func copyTablesData( +// func copyTablesData( +// ctx context.Context, +// tx pgx.Tx, +// copySQL []TableCopySql, +// toDir string, +// fw *FileWorker, +// lenCache map[string]int, +// ) (int64, int64, error) { +// var totalRowsCopied int64 +// var totalBytesCopied int64 + +// for _, tableSql := range copySQL { +// filePath := path.Join(toDir, tableSql.metaData.Name) +// // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) +// numRowsCopied, bytesCopied, err := extractTableData(ctx, tx, filePath, tableSql, fw, lenCache, nil) +// if err != nil { +// return 0, 0, fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) +// } + +// totalRowsCopied += numRowsCopied +// totalBytesCopied += bytesCopied +// } + +// return totalRowsCopied, totalBytesCopied, nil +// } + +func copyTablesDataLastSpan( ctx context.Context, tx pgx.Tx, + tableSql TableCopySql, + toDir string, + fw *FileWorker, + lenCache map[string]int, +) (int64, int64, error) { + + filePath := path.Join(toDir, tableSql.metaData.Name) + var mtx sync.Mutex + return extractTableData2(ctx, tx, filePath, tableSql, fw, lenCache, &mtx) +} + +func copyTablesDataAsync( + ctx context.Context, + pool *pgxpool.Pool, copySQL []TableCopySql, toDir string, fw *FileWorker, lenCache map[string]int, ) (int64, int64, error) { - var totalRowsCopied int64 - var totalBytesCopied int64 + var ( + totalRowsCopied atomic.Int64 + totalBytesCopied atomic.Int64 + mtx sync.Mutex + ) - for _, tableSql := range copySQL { - filePath := path.Join(toDir, tableSql.metaData.Name) - // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) - numRowsCopied, bytesCopied, err := extractTableData(ctx, tx, filePath, tableSql, fw, lenCache) - if err != nil { - return 0, 0, fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) - } + errg, newCtx := errgroup.WithContext(ctx) + + for _, tSql := range copySQL { + tableSql := tSql + + errg.Go(func() error { + + filePath := path.Join(toDir, tableSql.metaData.Name) + // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) + numRowsCopied, bytesCopied, err := extractTableData(newCtx, pool, filePath, tableSql, fw, lenCache, &mtx) + if err != nil { + return fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) + } - totalRowsCopied += numRowsCopied - totalBytesCopied += bytesCopied + totalRowsCopied.Add(numRowsCopied) + totalBytesCopied.Add(bytesCopied) + + return nil + }) + } + + fmt.Printf("\n\n\n\nWAITING\n\n\n\n") + if err := errg.Wait(); err != nil { + return 0, 0, err } - return totalRowsCopied, totalBytesCopied, nil + fmt.Printf("ALL DONE BABY\n\n\n\n") + + return totalRowsCopied.Load(), totalBytesCopied.Load(), nil } func extractTableData( ctx context.Context, - tx pgx.Tx, + pool *pgxpool.Pool, filePath string, tableSql TableCopySql, fw *FileWorker, lenCache map[string]int, + cacheMu *sync.Mutex, ) (int64, int64, error) { + + if cacheMu != nil { + cacheMu.Lock() + } allocCap := lenCache[tableSql.metaData.Name] + if cacheMu != nil { + cacheMu.Unlock() + } + fmt.Printf("DEBUGTEMP: %v - initialAllocCap(%v)\n", tableSql.metaData.Name, allocCap) + + if allocCap == 0 { + allocCap = 1000000 // roughly 1mb, because why not. + } else { + // if we already have something cached maybe this is growing + // a grow of 30% is not unreasonnable? + // should leave us some room + allocCap += allocCap / 3 + } + + fmt.Printf("DEBUGTEMP: %v - finalAllocCap(%v)\n", tableSql.metaData.Name, allocCap) + + b := bytes.NewBuffer(make([]byte, 0, allocCap)) + + numRowsCopied, err := executeCopy(ctx, pool, tableSql, b) + if err != nil { + return 0, 0, fmt.Errorf("failed to execute copy: %w", err) + } + + fmt.Printf("===============>>>> %v - %v - %v\n", filePath, numRowsCopied, b.Len()) + + len := int64(b.Len()) + // schedule it + fw.Add(b, filePath) + + // save the new len for this table + if cacheMu != nil { + cacheMu.Lock() + } + lenCache[tableSql.metaData.Name] = int(len) + if cacheMu != nil { + cacheMu.Unlock() + } + fmt.Printf("DEBUGTEMP: %v - actualLen(%v)\n", tableSql.metaData.Name, len) + + return numRowsCopied, len, nil +} + +func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, w io.Writer) (int64, error) { + defer metrics.StartNetworkHistoryCopy(tableSql.metaData.Name)() + conn, err := pool.Acquire(ctx) + if err != nil { + fmt.Sprintf("\n\n\n\nBOIIIIIIIIIIIIIIIIIIII couldn't acquire connection: %v\n\n\n\n\n", err) + } + defer conn.Release() + + tag, err := conn.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql) + if err != nil { + return 0, fmt.Errorf("failed to execute copy sql %s: %w", tableSql.copySql, err) + } + + rowsCopied := tag.RowsAffected() + + fmt.Printf("SQL: %v / COPIED: %v\n", tableSql.copySql, rowsCopied) + metrics.NetworkHistoryRowsCopied(tableSql.metaData.Name, rowsCopied) + + return rowsCopied, nil +} + +func extractTableData2( + ctx context.Context, + tx pgx.Tx, + filePath string, + tableSql TableCopySql, + fw *FileWorker, + lenCache map[string]int, + cacheMu *sync.Mutex, +) (int64, int64, error) { + + if cacheMu != nil { + cacheMu.Lock() + } + allocCap := lenCache[tableSql.metaData.Name] + if cacheMu != nil { + cacheMu.Unlock() + } fmt.Printf("DEBUGTEMP: %v - initialAllocCap(%v)\n", tableSql.metaData.Name, allocCap) if allocCap == 0 { @@ -363,11 +529,11 @@ func extractTableData( allocCap += allocCap / 3 } - fmt.Printf("DEBUGTEMP: %v - finalAllocCap(%v)", tableSql.metaData.Name, allocCap) + fmt.Printf("DEBUGTEMP: %v - finalAllocCap(%v)\n", tableSql.metaData.Name, allocCap) b := bytes.NewBuffer(make([]byte, 0, allocCap)) - numRowsCopied, err := executeCopy(ctx, tx, tableSql, b) + numRowsCopied, err := executeCopy2(ctx, tx, tableSql, b) if err != nil { return 0, 0, fmt.Errorf("failed to execute copy: %w", err) } @@ -377,23 +543,37 @@ func extractTableData( fw.Add(b, filePath) // save the new len for this table + if cacheMu != nil { + cacheMu.Lock() + } lenCache[tableSql.metaData.Name] = int(len) + if cacheMu != nil { + cacheMu.Unlock() + } fmt.Printf("DEBUGTEMP: %v - actualLen(%v)\n", tableSql.metaData.Name, len) return numRowsCopied, len, nil } -func executeCopy(ctx context.Context, tx pgx.Tx, tableSql TableCopySql, w io.Writer) (int64, error) { +func executeCopy2(ctx context.Context, tx pgx.Tx, tableSql TableCopySql, w io.Writer) (int64, error) { defer metrics.StartNetworkHistoryCopy(tableSql.metaData.Name)() tag, err := tx.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql) if err != nil { + // fmt.Printf("YOLOFAILURE: %v\n", err) return 0, fmt.Errorf("failed to execute copy sql %s: %w", tableSql.copySql, err) } + // fmt.Printf("YOLO: %v\n", string(tag)) + rowsCopied := tag.RowsAffected() metrics.NetworkHistoryRowsCopied(tableSql.metaData.Name, rowsCopied) + // _, err = w.Write(tag) + // if err != nil { + // return 0, fmt.Errorf("failed to execute copy sql %s: %w", tableSql.copySql, err) + // } + return rowsCopied, nil } From 882faddea19c8aee571c76e4ca6aca6d69be5870 Mon Sep 17 00:00:00 2001 From: Jeremy Letang Date: Mon, 8 Jul 2024 18:22:42 +0100 Subject: [PATCH 3/3] chore wip Signed-off-by: Jeremy Letang --- datanode/networkhistory/service_test.go | 4 +- .../snapshot/service_create_snapshot.go | 86 ++++++++++++++----- 2 files changed, 68 insertions(+), 22 deletions(-) diff --git a/datanode/networkhistory/service_test.go b/datanode/networkhistory/service_test.go index 1cb75c6eb8..1bce0ffe7a 100644 --- a/datanode/networkhistory/service_test.go +++ b/datanode/networkhistory/service_test.go @@ -1561,7 +1561,7 @@ func getSnapshotIntervalToHistoryTableDeltaSummary(ctx context.Context, func waitForSnapshotToComplete(sf segment.Unpublished) { for { - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) // wait for snapshot current file _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -1589,7 +1589,7 @@ func waitForSnapshotToComplete(sf segment.Unpublished) { func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) { for { - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) // wait for snapshot current file _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) if err != nil { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot.go b/datanode/networkhistory/snapshot/service_create_snapshot.go index dd58facdc0..d83d432c52 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot.go @@ -142,11 +142,13 @@ func (b *Service) createNewSnapshot( b.fw.AddLockFile(s.InProgressFilePath()) } - if async { - go snapshotData() - } else { - snapshotData() - } + fmt.Printf("IS ASYNC???? > %v\n", async) + + // if async { + // go snapshotData() + // } else { + snapshotData() + // } return s, nil } @@ -188,6 +190,21 @@ func getNextSnapshotSpan(ctx context.Context, toHeight int64, copyDataTx pgx.Tx) return nextSpan, nil } +type txSnapshot struct { + snapshotID string +} + +func getExportSnapshot(ctx context.Context, tx pgx.Tx) (*txSnapshot, error) { + row := tx.QueryRow(ctx, "SELECT pg_export_snapshot();") + var s string + err := row.Scan(&s) + if err != nil { + return nil, fmt.Errorf("couldn't scan pg_export_snapshot result: %w\n", err) + } + + return &txSnapshot{s}, nil +} + type Span struct { FromHeight int64 ToHeight int64 @@ -252,8 +269,20 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa fmt.Printf("\n\nCURRENT STATE\n\n") // Write Current State currentSQL, lastSpan := currentStateCopySQL(dbMetaData) + + lastSpanCopied, lastSpanBytesCopied, err := copyTablesDataLastSpan(ctx, tx, lastSpan, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) + if err != nil { + return fmt.Errorf("failed to copy last span table data:%w", err) + } + + // create snapshot so all others starts with the same state + pgTxSnapshot, err := getExportSnapshot(ctx, tx) + if err != nil { + panic(err) + } + // currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) - currentRowsCopied, currentStateBytesCopied, err := copyTablesDataAsync(ctx, pool, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) + currentRowsCopied, currentStateBytesCopied, err := copyTablesDataAsync(ctx, pool, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached, pgTxSnapshot) if err != nil { return fmt.Errorf("failed to copy current state table data:%w", err) } @@ -264,17 +293,12 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa // Write History historySQL := historyCopySQL(dbMetaData, seg) // historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) - historyRowsCopied, historyBytesCopied, err := copyTablesDataAsync(ctx, pool, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) + historyRowsCopied, historyBytesCopied, err := copyTablesDataAsync(ctx, pool, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached, pgTxSnapshot) if err != nil { return fmt.Errorf("failed to copy history table data:%w", err) } fmt.Printf("\n\nHISTORY STATE DONE\n\n") - lastSpanCopied, lastSpanBytesCopied, err := copyTablesDataLastSpan(ctx, tx, lastSpan, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) - if err != nil { - return fmt.Errorf("failed to copy last span table data:%w", err) - } - err = tx.Commit(ctx) if err != nil { return fmt.Errorf("failed to commit snapshot transaction:%w", err) @@ -308,7 +332,7 @@ func currentStateCopySQL(dbMetaData DatabaseMetadata) ([]TableCopySql, TableCopy tableCopySQL := fmt.Sprintf(`copy (select * from %s order by %s) TO STDOUT WITH (FORMAT csv, HEADER) `, tableName, meta.SortOrder) - if meta.Name == "last_snapshot_span" { + if tableName == "last_snapshot_span" { lastSpan = TableCopySql{meta, tableCopySQL} } else { copySQL = append(copySQL, TableCopySql{meta, tableCopySQL}) @@ -385,6 +409,7 @@ func copyTablesDataAsync( toDir string, fw *FileWorker, lenCache map[string]int, + pgTxSnapshot *txSnapshot, ) (int64, int64, error) { var ( totalRowsCopied atomic.Int64 @@ -401,7 +426,7 @@ func copyTablesDataAsync( filePath := path.Join(toDir, tableSql.metaData.Name) // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) - numRowsCopied, bytesCopied, err := extractTableData(newCtx, pool, filePath, tableSql, fw, lenCache, &mtx) + numRowsCopied, bytesCopied, err := extractTableData(newCtx, pool, filePath, tableSql, fw, lenCache, &mtx, pgTxSnapshot) if err != nil { return fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) } @@ -431,6 +456,7 @@ func extractTableData( fw *FileWorker, lenCache map[string]int, cacheMu *sync.Mutex, + pgTxSnapshot *txSnapshot, ) (int64, int64, error) { if cacheMu != nil { @@ -455,7 +481,7 @@ func extractTableData( b := bytes.NewBuffer(make([]byte, 0, allocCap)) - numRowsCopied, err := executeCopy(ctx, pool, tableSql, b) + numRowsCopied, err := executeCopy(ctx, pool, tableSql, b, pgTxSnapshot) if err != nil { return 0, 0, fmt.Errorf("failed to execute copy: %w", err) } @@ -479,16 +505,32 @@ func extractTableData( return numRowsCopied, len, nil } -func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, w io.Writer) (int64, error) { +func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, w io.Writer, pgTxSnapshot *txSnapshot) (int64, error) { defer metrics.StartNetworkHistoryCopy(tableSql.metaData.Name)() - conn, err := pool.Acquire(ctx) + // conn, err := pool.Acquire(ctx) + // if err != nil { + // fmt.Sprintf("\n\n\n\nBOIIIIIIIIIIIIIIIIIIII couldn't acquire connection: %v\n\n\n\n\n", err) + // } + // defer conn.Release() + + tx, err := pool.Begin(ctx) if err != nil { - fmt.Sprintf("\n\n\n\nBOIIIIIIIIIIIIIIIIIIII couldn't acquire connection: %v\n\n\n\n\n", err) + return 0, fmt.Errorf("failed to begin copy table data transaction: %w", err) } - defer conn.Release() - tag, err := conn.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql) + defer tx.Rollback(ctx) + + if _, err = tx.Exec(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); err != nil { + return 0, fmt.Errorf("failed to set transaction isolation level to serilizable: %w", err) + } + + if _, err = tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%v'", pgTxSnapshot.snapshotID)); err != nil { + fmt.Printf("COULDN'T IMPORT!!! %v", err) + return 0, fmt.Errorf("failed to set transaction isolation level to serilizable: %w", err) + } + + tag, err := tx.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql) if err != nil { return 0, fmt.Errorf("failed to execute copy sql %s: %w", tableSql.copySql, err) } @@ -498,6 +540,10 @@ func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, fmt.Printf("SQL: %v / COPIED: %v\n", tableSql.copySql, rowsCopied) metrics.NetworkHistoryRowsCopied(tableSql.metaData.Name, rowsCopied) + if err := tx.Commit(ctx); err != nil { + return 0, err + } + return rowsCopied, nil }