From 882faddea19c8aee571c76e4ca6aca6d69be5870 Mon Sep 17 00:00:00 2001 From: Jeremy Letang Date: Mon, 8 Jul 2024 18:22:42 +0100 Subject: [PATCH] chore wip Signed-off-by: Jeremy Letang --- datanode/networkhistory/service_test.go | 4 +- .../snapshot/service_create_snapshot.go | 86 ++++++++++++++----- 2 files changed, 68 insertions(+), 22 deletions(-) diff --git a/datanode/networkhistory/service_test.go b/datanode/networkhistory/service_test.go index 1cb75c6eb8..1bce0ffe7a 100644 --- a/datanode/networkhistory/service_test.go +++ b/datanode/networkhistory/service_test.go @@ -1561,7 +1561,7 @@ func getSnapshotIntervalToHistoryTableDeltaSummary(ctx context.Context, func waitForSnapshotToComplete(sf segment.Unpublished) { for { - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) // wait for snapshot current file _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -1589,7 +1589,7 @@ func waitForSnapshotToComplete(sf segment.Unpublished) { func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) { for { - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) // wait for snapshot current file _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) if err != nil { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot.go b/datanode/networkhistory/snapshot/service_create_snapshot.go index dd58facdc0..d83d432c52 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot.go @@ -142,11 +142,13 @@ func (b *Service) createNewSnapshot( b.fw.AddLockFile(s.InProgressFilePath()) } - if async { - go snapshotData() - } else { - snapshotData() - } + fmt.Printf("IS ASYNC???? > %v\n", async) + + // if async { + // go snapshotData() + // } else { + snapshotData() + // } return s, nil } @@ -188,6 +190,21 @@ func getNextSnapshotSpan(ctx context.Context, toHeight int64, copyDataTx pgx.Tx) return nextSpan, nil } +type txSnapshot struct { + snapshotID string +} + +func getExportSnapshot(ctx context.Context, tx pgx.Tx) (*txSnapshot, error) { + row := tx.QueryRow(ctx, "SELECT pg_export_snapshot();") + var s string + err := row.Scan(&s) + if err != nil { + return nil, fmt.Errorf("couldn't scan pg_export_snapshot result: %w\n", err) + } + + return &txSnapshot{s}, nil +} + type Span struct { FromHeight int64 ToHeight int64 @@ -252,8 +269,20 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa fmt.Printf("\n\nCURRENT STATE\n\n") // Write Current State currentSQL, lastSpan := currentStateCopySQL(dbMetaData) + + lastSpanCopied, lastSpanBytesCopied, err := copyTablesDataLastSpan(ctx, tx, lastSpan, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) + if err != nil { + return fmt.Errorf("failed to copy last span table data:%w", err) + } + + // create snapshot so all others starts with the same state + pgTxSnapshot, err := getExportSnapshot(ctx, tx) + if err != nil { + panic(err) + } + // currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) - currentRowsCopied, currentStateBytesCopied, err := copyTablesDataAsync(ctx, pool, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) + currentRowsCopied, currentStateBytesCopied, err := copyTablesDataAsync(ctx, pool, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached, pgTxSnapshot) if err != nil { return fmt.Errorf("failed to copy current state table data:%w", err) } @@ -264,17 +293,12 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa // Write History historySQL := historyCopySQL(dbMetaData, seg) // historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) - historyRowsCopied, historyBytesCopied, err := copyTablesDataAsync(ctx, pool, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) + historyRowsCopied, historyBytesCopied, err := copyTablesDataAsync(ctx, pool, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached, pgTxSnapshot) if err != nil { return fmt.Errorf("failed to copy history table data:%w", err) } fmt.Printf("\n\nHISTORY STATE DONE\n\n") - lastSpanCopied, lastSpanBytesCopied, err := copyTablesDataLastSpan(ctx, tx, lastSpan, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) - if err != nil { - return fmt.Errorf("failed to copy last span table data:%w", err) - } - err = tx.Commit(ctx) if err != nil { return fmt.Errorf("failed to commit snapshot transaction:%w", err) @@ -308,7 +332,7 @@ func currentStateCopySQL(dbMetaData DatabaseMetadata) ([]TableCopySql, TableCopy tableCopySQL := fmt.Sprintf(`copy (select * from %s order by %s) TO STDOUT WITH (FORMAT csv, HEADER) `, tableName, meta.SortOrder) - if meta.Name == "last_snapshot_span" { + if tableName == "last_snapshot_span" { lastSpan = TableCopySql{meta, tableCopySQL} } else { copySQL = append(copySQL, TableCopySql{meta, tableCopySQL}) @@ -385,6 +409,7 @@ func copyTablesDataAsync( toDir string, fw *FileWorker, lenCache map[string]int, + pgTxSnapshot *txSnapshot, ) (int64, int64, error) { var ( totalRowsCopied atomic.Int64 @@ -401,7 +426,7 @@ func copyTablesDataAsync( filePath := path.Join(toDir, tableSql.metaData.Name) // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) - numRowsCopied, bytesCopied, err := extractTableData(newCtx, pool, filePath, tableSql, fw, lenCache, &mtx) + numRowsCopied, bytesCopied, err := extractTableData(newCtx, pool, filePath, tableSql, fw, lenCache, &mtx, pgTxSnapshot) if err != nil { return fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) } @@ -431,6 +456,7 @@ func extractTableData( fw *FileWorker, lenCache map[string]int, cacheMu *sync.Mutex, + pgTxSnapshot *txSnapshot, ) (int64, int64, error) { if cacheMu != nil { @@ -455,7 +481,7 @@ func extractTableData( b := bytes.NewBuffer(make([]byte, 0, allocCap)) - numRowsCopied, err := executeCopy(ctx, pool, tableSql, b) + numRowsCopied, err := executeCopy(ctx, pool, tableSql, b, pgTxSnapshot) if err != nil { return 0, 0, fmt.Errorf("failed to execute copy: %w", err) } @@ -479,16 +505,32 @@ func extractTableData( return numRowsCopied, len, nil } -func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, w io.Writer) (int64, error) { +func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, w io.Writer, pgTxSnapshot *txSnapshot) (int64, error) { defer metrics.StartNetworkHistoryCopy(tableSql.metaData.Name)() - conn, err := pool.Acquire(ctx) + // conn, err := pool.Acquire(ctx) + // if err != nil { + // fmt.Sprintf("\n\n\n\nBOIIIIIIIIIIIIIIIIIIII couldn't acquire connection: %v\n\n\n\n\n", err) + // } + // defer conn.Release() + + tx, err := pool.Begin(ctx) if err != nil { - fmt.Sprintf("\n\n\n\nBOIIIIIIIIIIIIIIIIIIII couldn't acquire connection: %v\n\n\n\n\n", err) + return 0, fmt.Errorf("failed to begin copy table data transaction: %w", err) } - defer conn.Release() - tag, err := conn.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql) + defer tx.Rollback(ctx) + + if _, err = tx.Exec(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); err != nil { + return 0, fmt.Errorf("failed to set transaction isolation level to serilizable: %w", err) + } + + if _, err = tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%v'", pgTxSnapshot.snapshotID)); err != nil { + fmt.Printf("COULDN'T IMPORT!!! %v", err) + return 0, fmt.Errorf("failed to set transaction isolation level to serilizable: %w", err) + } + + tag, err := tx.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql) if err != nil { return 0, fmt.Errorf("failed to execute copy sql %s: %w", tableSql.copySql, err) } @@ -498,6 +540,10 @@ func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, fmt.Printf("SQL: %v / COPIED: %v\n", tableSql.copySql, rowsCopied) metrics.NetworkHistoryRowsCopied(tableSql.metaData.Name, rowsCopied) + if err := tx.Commit(ctx); err != nil { + return 0, err + } + return rowsCopied, nil }