diff --git a/cmd/data-node/commands/start/node_pre.go b/cmd/data-node/commands/start/node_pre.go index 3b0b8b2a45..dfb7947ef8 100644 --- a/cmd/data-node/commands/start/node_pre.go +++ b/cmd/data-node/commands/start/node_pre.go @@ -375,7 +375,8 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi l.snapshotService, networkHistoryStore, l.conf.API.Port, - l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo)) + l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo), + ) if err != nil { return fmt.Errorf("failed to create networkHistory service:%w", err) } diff --git a/datanode/networkhistory/service.go b/datanode/networkhistory/service.go index b60f77ef4b..58a78c1f42 100644 --- a/datanode/networkhistory/service.go +++ b/datanode/networkhistory/service.go @@ -70,8 +70,26 @@ func New(ctx context.Context, log *logging.Logger, chainID string, cfg Config, c datanodeGrpcAPIPort: datanodeGrpcAPIPort, } + go func() { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ctx.Done(): + s.log.Info("saving network history before before leaving") + // consume all pending files + s.snapshotService.Flush() + s.log.Info("network history saved (maybe)") + return + case <-ticker.C: + s.snapshotService.Flush() + } + } + }() + if cfg.Publish { var err error + + // publish all file which are ready go func() { ticker := time.NewTicker(5 * time.Second) for { @@ -182,6 +200,11 @@ func (d *Service) CreateAndPublishSegment(ctx context.Context, chainID string, t } } + // empty the file worker + d.log.Info("saving network history to disk") + d.snapshotService.Flush() + d.log.Info("network history saved to disk") + if err = d.PublishSegments(ctx); err != nil { return fmt.Errorf("failed to publish snapshots: %w", err) } diff --git a/datanode/networkhistory/service_test.go b/datanode/networkhistory/service_test.go index 2cd98f02c8..1bce0ffe7a 100644 --- a/datanode/networkhistory/service_test.go +++ b/datanode/networkhistory/service_test.go @@ -94,6 +94,7 @@ var ( ) func TestMain(t *testing.M) { + dirs := map[string]string{} outerCtx, cancelOuterCtx := context.WithCancel(context.Background()) defer cancelOuterCtx() @@ -148,7 +149,7 @@ func TestMain(t *testing.M) { pgLog *bytes.Buffer, ) { sqlConfig = config - log.Infof("DB Connection String: ", sqlConfig.ConnectionConfig.GetConnectionString()) + log.Infof("DB Connection String: %v", sqlConfig.ConnectionConfig.GetConnectionString()) pool, err := sqlstore.CreateConnectionPool(outerCtx, sqlConfig.ConnectionConfig) if err != nil { @@ -184,7 +185,11 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot: %w", err)) } - waitForSnapshotToComplete(ss) + dirs["dir1"] = ss.Directory + // fmt.Printf("unpublished: %v\n", ss.Directory) + // os.Exit(1) + + waitForSnapshotToComplete2(ss, snapshotService.Flush) snapshots = append(snapshots, ss) @@ -211,7 +216,8 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } - waitForSnapshotToComplete(lastSnapshot) + dirs["dir2"] = lastSnapshot.Directory + waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -268,7 +274,8 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } - waitForSnapshotToComplete(lastSnapshot) + dirs["dir3"] = lastSnapshot.Directory + waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -378,7 +385,8 @@ func TestMain(t *testing.M) { log.Infof("%s", goldenSourceHistorySegment[3000].HistorySegmentID) log.Infof("%s", goldenSourceHistorySegment[4000].HistorySegmentID) log.Infof("%s", goldenSourceHistorySegment[5000].HistorySegmentID) - + fmt.Printf("DIRECTORIES: %v\n", dirs) + os.Exit(1) panicIfHistorySegmentIdsNotEqual(goldenSourceHistorySegment[1000].HistorySegmentID, "QmQX6n82ex2XDh1tWL1gCv2viDttUwRSdyG1XaekYfLpJk", snapshots) panicIfHistorySegmentIdsNotEqual(goldenSourceHistorySegment[2000].HistorySegmentID, "QmaWdp5RPui6ePszzPvk48e7FxHmPGx2VMpbXD2NTgtFMT", snapshots) panicIfHistorySegmentIdsNotEqual(goldenSourceHistorySegment[2500].HistorySegmentID, "QmRmAX4AfQ9xAdLN8GjVCBmDH7Cm6q1ts7TBF9UjLdjMG9", snapshots) @@ -421,6 +429,7 @@ func TestLoadingDataFetchedAsynchronously(t *testing.T) { require.Equal(t, int64(1000), fetched) networkhistoryService := setupNetworkHistoryService(ctx, log, snapshotService, networkHistoryStore, snapshotCopyToPath) + segments, err := networkhistoryService.ListAllHistorySegments() require.NoError(t, err) @@ -583,7 +592,7 @@ func TestRestoringNodeThatAlreadyContainsData(t *testing.T) { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, snapshotService.Flush) md5Hash, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -888,7 +897,7 @@ func TestRestoreFromPartialHistoryAndProcessEvents(t *testing.T) { if lastCommittedBlockHeight > 0 && lastCommittedBlockHeight%snapshotInterval == 0 { ss, err = service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) if lastCommittedBlockHeight == 4000 { newSnapshotFileHashAt4000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) @@ -993,7 +1002,7 @@ func TestRestoreFromFullHistorySnapshotAndProcessEvents(t *testing.T) { if lastCommittedBlockHeight == 3000 { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -1096,7 +1105,7 @@ func TestRestoreFromFullHistorySnapshotWithIndexesAndOrderTriggersAndProcessEven if lastCommittedBlockHeight == 3000 { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -1578,6 +1587,36 @@ func waitForSnapshotToComplete(sf segment.Unpublished) { } } +func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) { + for { + time.Sleep(10 * time.Millisecond) + // wait for snapshot current file + _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + continue + } else { + panic(err) + } + } + + flush() + + // wait for snapshot data dump in progress file to be removed + + _, err = os.Stat(sf.InProgressFilePath()) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + break + } else { + panic(err) + } + } else { + continue + } + } +} + func decompressEventFile() { sourceFile, err := os.Open(compressedEventsFile) if err != nil { diff --git a/datanode/networkhistory/snapshot/file_worker.go b/datanode/networkhistory/snapshot/file_worker.go new file mode 100644 index 0000000000..df0ce8b947 --- /dev/null +++ b/datanode/networkhistory/snapshot/file_worker.go @@ -0,0 +1,107 @@ +// Copyright (C) 2023 Gobalsky Labs Limited +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package snapshot + +import ( + "bytes" + "fmt" + "os" + "sync" +) + +type bufAndPath struct { + buf *bytes.Buffer + isProgressFile bool + path string +} + +type FileWorker struct { + mu sync.Mutex + queue []*bufAndPath +} + +func NewFileWorker() *FileWorker { + return &FileWorker{ + queue: []*bufAndPath{}, + } +} + +func (fw *FileWorker) Add(buf *bytes.Buffer, path string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + fw.queue = append(fw.queue, &bufAndPath{buf, false, path}) +} + +func (fw *FileWorker) AddLockFile(path string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + fw.queue = append(fw.queue, &bufAndPath{nil, true, path}) +} + +func (fw *FileWorker) peek() (bp *bufAndPath) { + fw.mu.Lock() + defer fw.mu.Unlock() + + if len(fw.queue) <= 0 { + return + } + + bp, fw.queue = fw.queue[0], fw.queue[1:] + + return +} + +func (fw *FileWorker) Empty() bool { + fw.mu.Lock() + defer fw.mu.Unlock() + + return len(fw.queue) <= 0 +} + +func (fw *FileWorker) Consume() error { + bp := fw.peek() + if bp == nil { + return nil // nothing to do + } + + if bp.isProgressFile { + return fw.removeLockFile(bp.path) + } + + return fw.writeSegment(bp) +} + +func (fw *FileWorker) removeLockFile(path string) error { + return os.Remove(path) +} + +func (fw *FileWorker) writeSegment(bp *bufAndPath) error { + file, err := os.Create(bp.path) + if err != nil { + return fmt.Errorf("failed to create file %s : %w", bp.path, err) + } + + defer file.Close() + + _, err = bp.buf.WriteTo(file) + if err != nil { + return fmt.Errorf("couldn't writer to file %v : %w", bp.path, err) + } + + return nil +} diff --git a/datanode/networkhistory/snapshot/service.go b/datanode/networkhistory/snapshot/service.go index f81ae8f8bb..bfebd2d068 100644 --- a/datanode/networkhistory/snapshot/service.go +++ b/datanode/networkhistory/snapshot/service.go @@ -40,6 +40,9 @@ type Service struct { historyStore HistoryStore + fw *FileWorker + tableSnapshotFileSizesCached map[string]int + createSnapshotLock mutex.CtxMutex copyToPath string migrateSchemaUpToVersion func(version int64) error @@ -59,14 +62,16 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po } s := &Service{ - log: log, - config: config, - connPool: connPool, - createSnapshotLock: mutex.New(), - copyToPath: snapshotsCopyToPath, - migrateSchemaUpToVersion: migrateDatabaseToVersion, - migrateSchemaDownToVersion: migrateSchemaDownToVersion, - historyStore: historyStore, + log: log, + config: config, + connPool: connPool, + createSnapshotLock: mutex.New(), + copyToPath: snapshotsCopyToPath, + migrateSchemaUpToVersion: migrateDatabaseToVersion, + migrateSchemaDownToVersion: migrateSchemaDownToVersion, + historyStore: historyStore, + fw: NewFileWorker(), + tableSnapshotFileSizesCached: map[string]int{}, } err = os.MkdirAll(s.copyToPath, fs.ModePerm) @@ -77,6 +82,14 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po return s, nil } +func (b *Service) Flush() { + for !b.fw.Empty() { + if err := b.fw.Consume(); err != nil { + b.log.Error("failed to write all files to disk", logging.Error(err)) + } + } +} + func (b *Service) SnapshotData(ctx context.Context, chainID string, toHeight int64) error { _, err := b.CreateSnapshotAsynchronously(ctx, chainID, toHeight) if err != nil { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot.go b/datanode/networkhistory/snapshot/service_create_snapshot.go index b4d46bb300..d83d432c52 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot.go @@ -16,6 +16,7 @@ package snapshot import ( + "bytes" "context" "errors" "fmt" @@ -23,18 +24,21 @@ import ( "os" "path" "sort" + "sync" + "sync/atomic" "time" "code.vegaprotocol.io/vega/datanode/metrics" "code.vegaprotocol.io/vega/datanode/networkhistory/segment" "code.vegaprotocol.io/vega/datanode/sqlstore" "code.vegaprotocol.io/vega/libs/fs" - vio "code.vegaprotocol.io/vega/libs/io" "code.vegaprotocol.io/vega/logging" "github.com/georgysavva/scany/pgxscan" "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" ) var ( @@ -50,7 +54,10 @@ func (b *Service) CreateSnapshotAsynchronously(ctx context.Context, chainID stri return b.createNewSnapshot(ctx, chainID, toHeight, true) } -func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeight int64, +func (b *Service) createNewSnapshot( + ctx context.Context, + chainID string, + toHeight int64, async bool, ) (segment.Unpublished, error) { var err error @@ -115,29 +122,34 @@ func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeigh runAllInReverseOrder(cleanUp) return segment.Unpublished{}, fmt.Errorf("failed to create write lock file:%w", err) } - cleanUp = append(cleanUp, func() { _ = os.Remove(s.InProgressFilePath()) }) + // cleanUp = append(cleanUp, func() { _ = os.Remove(s.InProgressFilePath()) }) // To ensure reads are isolated from this point forward execute a read on last block _, err = sqlstore.GetLastBlockUsingConnection(ctx, copyDataTx) if err != nil { + _ = os.Remove(s.InProgressFilePath()) runAllInReverseOrder(cleanUp) return segment.Unpublished{}, fmt.Errorf("failed to get last block using connection: %w", err) } snapshotData := func() { defer func() { runAllInReverseOrder(cleanUp) }() - err = b.snapshotData(ctx, copyDataTx, dbMetaData, s) + err = b.snapshotData(ctx, copyDataTx, dbMetaData, s, b.connPool) if err != nil { b.log.Panic("failed to snapshot data", logging.Error(err)) } - } - if async { - go snapshotData() - } else { - snapshotData() + b.fw.AddLockFile(s.InProgressFilePath()) } + fmt.Printf("IS ASYNC???? > %v\n", async) + + // if async { + // go snapshotData() + // } else { + snapshotData() + // } + return s, nil } @@ -178,6 +190,21 @@ func getNextSnapshotSpan(ctx context.Context, toHeight int64, copyDataTx pgx.Tx) return nextSpan, nil } +type txSnapshot struct { + snapshotID string +} + +func getExportSnapshot(ctx context.Context, tx pgx.Tx) (*txSnapshot, error) { + row := tx.QueryRow(ctx, "SELECT pg_export_snapshot();") + var s string + err := row.Scan(&s) + if err != nil { + return nil, fmt.Errorf("couldn't scan pg_export_snapshot result: %w\n", err) + } + + return &txSnapshot{s}, nil +} + type Span struct { FromHeight int64 ToHeight int64 @@ -212,7 +239,7 @@ func runAllInReverseOrder(functions []func()) { } } -func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData DatabaseMetadata, seg segment.Unpublished) error { +func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData DatabaseMetadata, seg segment.Unpublished, pool *pgxpool.Pool) error { defer func() { // Calling rollback on a committed transaction has no effect, hence we can rollback in defer to ensure // always rolled back if the transaction was not successfully committed @@ -239,27 +266,46 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa return fmt.Errorf("failed to create history state directory:%w", err) } + fmt.Printf("\n\nCURRENT STATE\n\n") // Write Current State - currentSQL := currentStateCopySQL(dbMetaData) - currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir) + currentSQL, lastSpan := currentStateCopySQL(dbMetaData) + + lastSpanCopied, lastSpanBytesCopied, err := copyTablesDataLastSpan(ctx, tx, lastSpan, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) + if err != nil { + return fmt.Errorf("failed to copy last span table data:%w", err) + } + + // create snapshot so all others starts with the same state + pgTxSnapshot, err := getExportSnapshot(ctx, tx) + if err != nil { + panic(err) + } + + // currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) + currentRowsCopied, currentStateBytesCopied, err := copyTablesDataAsync(ctx, pool, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached, pgTxSnapshot) if err != nil { return fmt.Errorf("failed to copy current state table data:%w", err) } + fmt.Printf("\n\nCURRENT STATE DONE\n\n") + + fmt.Printf("\n\nHISTORY STATE\n\n") // Write History historySQL := historyCopySQL(dbMetaData, seg) - historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir) + // historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) + historyRowsCopied, historyBytesCopied, err := copyTablesDataAsync(ctx, pool, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached, pgTxSnapshot) if err != nil { return fmt.Errorf("failed to copy history table data:%w", err) } + fmt.Printf("\n\nHISTORY STATE DONE\n\n") err = tx.Commit(ctx) if err != nil { return fmt.Errorf("failed to commit snapshot transaction:%w", err) } - metrics.SetLastSnapshotRowcount(float64(currentRowsCopied + historyRowsCopied)) - metrics.SetLastSnapshotCurrentStateBytes(float64(currentStateBytesCopied)) + metrics.SetLastSnapshotRowcount(float64(currentRowsCopied + historyRowsCopied + lastSpanCopied)) + metrics.SetLastSnapshotCurrentStateBytes(float64(currentStateBytesCopied + lastSpanBytesCopied)) metrics.SetLastSnapshotHistoryBytes(float64(historyBytesCopied)) metrics.SetLastSnapshotSeconds(time.Since(start).Seconds()) @@ -274,8 +320,9 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa return nil } -func currentStateCopySQL(dbMetaData DatabaseMetadata) []TableCopySql { +func currentStateCopySQL(dbMetaData DatabaseMetadata) ([]TableCopySql, TableCopySql) { var copySQL []TableCopySql + var lastSpan TableCopySql tablesNames := maps.Keys(dbMetaData.TableNameToMetaData) sort.Strings(tablesNames) @@ -284,10 +331,15 @@ func currentStateCopySQL(dbMetaData DatabaseMetadata) []TableCopySql { if !dbMetaData.TableNameToMetaData[tableName].Hypertable { tableCopySQL := fmt.Sprintf(`copy (select * from %s order by %s) TO STDOUT WITH (FORMAT csv, HEADER) `, tableName, meta.SortOrder) - copySQL = append(copySQL, TableCopySql{meta, tableCopySQL}) + + if tableName == "last_snapshot_span" { + lastSpan = TableCopySql{meta, tableCopySQL} + } else { + copySQL = append(copySQL, TableCopySql{meta, tableCopySQL}) + } } } - return copySQL + return copySQL, lastSpan } func historyCopySQL(dbMetaData DatabaseMetadata, segment interface{ GetFromHeight() int64 }) []TableCopySql { @@ -310,49 +362,264 @@ func historyCopySQL(dbMetaData DatabaseMetadata, segment interface{ GetFromHeigh return copySQL } -func copyTablesData(ctx context.Context, tx pgx.Tx, copySQL []TableCopySql, toDir string) (int64, int64, error) { - var totalRowsCopied int64 - var totalBytesCopied int64 - for _, tableSql := range copySQL { - filePath := path.Join(toDir, tableSql.metaData.Name) - numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) - if err != nil { - return 0, 0, fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) - } - totalRowsCopied += numRowsCopied - totalBytesCopied += bytesCopied +// func copyTablesData( +// ctx context.Context, +// tx pgx.Tx, +// copySQL []TableCopySql, +// toDir string, +// fw *FileWorker, +// lenCache map[string]int, +// ) (int64, int64, error) { +// var totalRowsCopied int64 +// var totalBytesCopied int64 + +// for _, tableSql := range copySQL { +// filePath := path.Join(toDir, tableSql.metaData.Name) +// // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) +// numRowsCopied, bytesCopied, err := extractTableData(ctx, tx, filePath, tableSql, fw, lenCache, nil) +// if err != nil { +// return 0, 0, fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) +// } + +// totalRowsCopied += numRowsCopied +// totalBytesCopied += bytesCopied +// } + +// return totalRowsCopied, totalBytesCopied, nil +// } + +func copyTablesDataLastSpan( + ctx context.Context, + tx pgx.Tx, + tableSql TableCopySql, + toDir string, + fw *FileWorker, + lenCache map[string]int, +) (int64, int64, error) { + + filePath := path.Join(toDir, tableSql.metaData.Name) + var mtx sync.Mutex + return extractTableData2(ctx, tx, filePath, tableSql, fw, lenCache, &mtx) +} + +func copyTablesDataAsync( + ctx context.Context, + pool *pgxpool.Pool, + copySQL []TableCopySql, + toDir string, + fw *FileWorker, + lenCache map[string]int, + pgTxSnapshot *txSnapshot, +) (int64, int64, error) { + var ( + totalRowsCopied atomic.Int64 + totalBytesCopied atomic.Int64 + mtx sync.Mutex + ) + + errg, newCtx := errgroup.WithContext(ctx) + + for _, tSql := range copySQL { + tableSql := tSql + + errg.Go(func() error { + + filePath := path.Join(toDir, tableSql.metaData.Name) + // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) + numRowsCopied, bytesCopied, err := extractTableData(newCtx, pool, filePath, tableSql, fw, lenCache, &mtx, pgTxSnapshot) + if err != nil { + return fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) + } + + totalRowsCopied.Add(numRowsCopied) + totalBytesCopied.Add(bytesCopied) + + return nil + }) } - return totalRowsCopied, totalBytesCopied, nil + fmt.Printf("\n\n\n\nWAITING\n\n\n\n") + if err := errg.Wait(); err != nil { + return 0, 0, err + } + + fmt.Printf("ALL DONE BABY\n\n\n\n") + + return totalRowsCopied.Load(), totalBytesCopied.Load(), nil } -func writeTableToDataFile(ctx context.Context, tx pgx.Tx, filePath string, tableSql TableCopySql) (int64, int64, error) { - file, err := os.Create(filePath) +func extractTableData( + ctx context.Context, + pool *pgxpool.Pool, + filePath string, + tableSql TableCopySql, + fw *FileWorker, + lenCache map[string]int, + cacheMu *sync.Mutex, + pgTxSnapshot *txSnapshot, +) (int64, int64, error) { + + if cacheMu != nil { + cacheMu.Lock() + } + allocCap := lenCache[tableSql.metaData.Name] + if cacheMu != nil { + cacheMu.Unlock() + } + fmt.Printf("DEBUGTEMP: %v - initialAllocCap(%v)\n", tableSql.metaData.Name, allocCap) + + if allocCap == 0 { + allocCap = 1000000 // roughly 1mb, because why not. + } else { + // if we already have something cached maybe this is growing + // a grow of 30% is not unreasonnable? + // should leave us some room + allocCap += allocCap / 3 + } + + fmt.Printf("DEBUGTEMP: %v - finalAllocCap(%v)\n", tableSql.metaData.Name, allocCap) + + b := bytes.NewBuffer(make([]byte, 0, allocCap)) + + numRowsCopied, err := executeCopy(ctx, pool, tableSql, b, pgTxSnapshot) if err != nil { - return 0, 0, fmt.Errorf("failed to create file %s:%w", filePath, err) + return 0, 0, fmt.Errorf("failed to execute copy: %w", err) } - defer file.Close() - fileWriter := vio.NewCountWriter(file) + fmt.Printf("===============>>>> %v - %v - %v\n", filePath, numRowsCopied, b.Len()) - numRowsCopied, err := executeCopy(ctx, tx, tableSql, fileWriter) + len := int64(b.Len()) + // schedule it + fw.Add(b, filePath) + + // save the new len for this table + if cacheMu != nil { + cacheMu.Lock() + } + lenCache[tableSql.metaData.Name] = int(len) + if cacheMu != nil { + cacheMu.Unlock() + } + fmt.Printf("DEBUGTEMP: %v - actualLen(%v)\n", tableSql.metaData.Name, len) + + return numRowsCopied, len, nil +} + +func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, w io.Writer, pgTxSnapshot *txSnapshot) (int64, error) { + defer metrics.StartNetworkHistoryCopy(tableSql.metaData.Name)() + + // conn, err := pool.Acquire(ctx) + // if err != nil { + // fmt.Sprintf("\n\n\n\nBOIIIIIIIIIIIIIIIIIIII couldn't acquire connection: %v\n\n\n\n\n", err) + // } + // defer conn.Release() + + tx, err := pool.Begin(ctx) + if err != nil { + return 0, fmt.Errorf("failed to begin copy table data transaction: %w", err) + } + + defer tx.Rollback(ctx) + + if _, err = tx.Exec(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); err != nil { + return 0, fmt.Errorf("failed to set transaction isolation level to serilizable: %w", err) + } + + if _, err = tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%v'", pgTxSnapshot.snapshotID)); err != nil { + fmt.Printf("COULDN'T IMPORT!!! %v", err) + return 0, fmt.Errorf("failed to set transaction isolation level to serilizable: %w", err) + } + + tag, err := tx.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql) + if err != nil { + return 0, fmt.Errorf("failed to execute copy sql %s: %w", tableSql.copySql, err) + } + + rowsCopied := tag.RowsAffected() + + fmt.Printf("SQL: %v / COPIED: %v\n", tableSql.copySql, rowsCopied) + metrics.NetworkHistoryRowsCopied(tableSql.metaData.Name, rowsCopied) + + if err := tx.Commit(ctx); err != nil { + return 0, err + } + + return rowsCopied, nil +} + +func extractTableData2( + ctx context.Context, + tx pgx.Tx, + filePath string, + tableSql TableCopySql, + fw *FileWorker, + lenCache map[string]int, + cacheMu *sync.Mutex, +) (int64, int64, error) { + + if cacheMu != nil { + cacheMu.Lock() + } + allocCap := lenCache[tableSql.metaData.Name] + if cacheMu != nil { + cacheMu.Unlock() + } + fmt.Printf("DEBUGTEMP: %v - initialAllocCap(%v)\n", tableSql.metaData.Name, allocCap) + + if allocCap == 0 { + allocCap = 1000000 // roughly 1mb, because why not. + } else { + // if we already have something cached maybe this is growing + // a grow of 30% is not unreasonnable? + // should leave us some room + allocCap += allocCap / 3 + } + + fmt.Printf("DEBUGTEMP: %v - finalAllocCap(%v)\n", tableSql.metaData.Name, allocCap) + + b := bytes.NewBuffer(make([]byte, 0, allocCap)) + + numRowsCopied, err := executeCopy2(ctx, tx, tableSql, b) if err != nil { return 0, 0, fmt.Errorf("failed to execute copy: %w", err) } - return numRowsCopied, fileWriter.Count(), nil + + len := int64(b.Len()) + // schedule it + fw.Add(b, filePath) + + // save the new len for this table + if cacheMu != nil { + cacheMu.Lock() + } + lenCache[tableSql.metaData.Name] = int(len) + if cacheMu != nil { + cacheMu.Unlock() + } + fmt.Printf("DEBUGTEMP: %v - actualLen(%v)\n", tableSql.metaData.Name, len) + + return numRowsCopied, len, nil } -func executeCopy(ctx context.Context, tx pgx.Tx, tableSql TableCopySql, w io.Writer) (int64, error) { +func executeCopy2(ctx context.Context, tx pgx.Tx, tableSql TableCopySql, w io.Writer) (int64, error) { defer metrics.StartNetworkHistoryCopy(tableSql.metaData.Name)() tag, err := tx.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql) if err != nil { + // fmt.Printf("YOLOFAILURE: %v\n", err) return 0, fmt.Errorf("failed to execute copy sql %s: %w", tableSql.copySql, err) } + // fmt.Printf("YOLO: %v\n", string(tag)) + rowsCopied := tag.RowsAffected() metrics.NetworkHistoryRowsCopied(tableSql.metaData.Name, rowsCopied) + // _, err = w.Write(tag) + // if err != nil { + // return 0, fmt.Errorf("failed to execute copy sql %s: %w", tableSql.copySql, err) + // } + return rowsCopied, nil } diff --git a/datanode/networkhistory/snapshot/service_create_snapshot_test.go b/datanode/networkhistory/snapshot/service_create_snapshot_test.go index a887102467..6e43b1a68f 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot_test.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot_test.go @@ -28,6 +28,7 @@ import ( func TestGetHistorySnapshots(t *testing.T) { snapshotsDir := t.TempDir() + service, err := snapshot.NewSnapshotService(logging.NewTestLogger(), snapshot.NewDefaultConfig(), nil, nil, snapshotsDir, nil, nil) if err != nil { panic(err)