Skip to content

Commit

Permalink
chore wip
Browse files Browse the repository at this point in the history
Signed-off-by: Jeremy Letang <[email protected]>
  • Loading branch information
jeremyletang committed Jul 8, 2024
1 parent 9593fa4 commit 882fadd
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 22 deletions.
4 changes: 2 additions & 2 deletions datanode/networkhistory/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,7 +1561,7 @@ func getSnapshotIntervalToHistoryTableDeltaSummary(ctx context.Context,

func waitForSnapshotToComplete(sf segment.Unpublished) {
for {
time.Sleep(5 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
// wait for snapshot current file
_, err := os.Stat(sf.UnpublishedSnapshotDataDirectory())
if err != nil {
Expand Down Expand Up @@ -1589,7 +1589,7 @@ func waitForSnapshotToComplete(sf segment.Unpublished) {

func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) {
for {
time.Sleep(5 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
// wait for snapshot current file
_, err := os.Stat(sf.UnpublishedSnapshotDataDirectory())
if err != nil {
Expand Down
86 changes: 66 additions & 20 deletions datanode/networkhistory/snapshot/service_create_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,13 @@ func (b *Service) createNewSnapshot(
b.fw.AddLockFile(s.InProgressFilePath())
}

if async {
go snapshotData()
} else {
snapshotData()
}
fmt.Printf("IS ASYNC???? > %v\n", async)

Check failure on line 145 in datanode/networkhistory/snapshot/service_create_snapshot.go

View workflow job for this annotation

GitHub Actions / lint

use of `fmt.Printf` forbidden by pattern `fmt\.Print.*` (forbidigo)

// if async {
// go snapshotData()
// } else {
snapshotData()
// }

return s, nil
}
Expand Down Expand Up @@ -188,6 +190,21 @@ func getNextSnapshotSpan(ctx context.Context, toHeight int64, copyDataTx pgx.Tx)
return nextSpan, nil
}

type txSnapshot struct {
snapshotID string
}

func getExportSnapshot(ctx context.Context, tx pgx.Tx) (*txSnapshot, error) {
row := tx.QueryRow(ctx, "SELECT pg_export_snapshot();")
var s string
err := row.Scan(&s)
if err != nil {
return nil, fmt.Errorf("couldn't scan pg_export_snapshot result: %w\n", err)
}

return &txSnapshot{s}, nil
}

type Span struct {
FromHeight int64
ToHeight int64
Expand Down Expand Up @@ -252,8 +269,20 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa
fmt.Printf("\n\nCURRENT STATE\n\n")

Check failure on line 269 in datanode/networkhistory/snapshot/service_create_snapshot.go

View workflow job for this annotation

GitHub Actions / lint

use of `fmt.Printf` forbidden by pattern `fmt\.Print.*` (forbidigo)
// Write Current State
currentSQL, lastSpan := currentStateCopySQL(dbMetaData)

lastSpanCopied, lastSpanBytesCopied, err := copyTablesDataLastSpan(ctx, tx, lastSpan, currentStateDir, b.fw, b.tableSnapshotFileSizesCached)
if err != nil {
return fmt.Errorf("failed to copy last span table data:%w", err)
}

// create snapshot so all others starts with the same state
pgTxSnapshot, err := getExportSnapshot(ctx, tx)
if err != nil {
panic(err)
}

// currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached)
currentRowsCopied, currentStateBytesCopied, err := copyTablesDataAsync(ctx, pool, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached)
currentRowsCopied, currentStateBytesCopied, err := copyTablesDataAsync(ctx, pool, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached, pgTxSnapshot)
if err != nil {
return fmt.Errorf("failed to copy current state table data:%w", err)
}
Expand All @@ -264,17 +293,12 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa
// Write History
historySQL := historyCopySQL(dbMetaData, seg)
// historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached)
historyRowsCopied, historyBytesCopied, err := copyTablesDataAsync(ctx, pool, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached)
historyRowsCopied, historyBytesCopied, err := copyTablesDataAsync(ctx, pool, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached, pgTxSnapshot)
if err != nil {
return fmt.Errorf("failed to copy history table data:%w", err)
}
fmt.Printf("\n\nHISTORY STATE DONE\n\n")

Check failure on line 300 in datanode/networkhistory/snapshot/service_create_snapshot.go

View workflow job for this annotation

GitHub Actions / lint

use of `fmt.Printf` forbidden by pattern `fmt\.Print.*` (forbidigo)

lastSpanCopied, lastSpanBytesCopied, err := copyTablesDataLastSpan(ctx, tx, lastSpan, currentStateDir, b.fw, b.tableSnapshotFileSizesCached)
if err != nil {
return fmt.Errorf("failed to copy last span table data:%w", err)
}

err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit snapshot transaction:%w", err)
Expand Down Expand Up @@ -308,7 +332,7 @@ func currentStateCopySQL(dbMetaData DatabaseMetadata) ([]TableCopySql, TableCopy
tableCopySQL := fmt.Sprintf(`copy (select * from %s order by %s) TO STDOUT WITH (FORMAT csv, HEADER) `, tableName,
meta.SortOrder)

if meta.Name == "last_snapshot_span" {
if tableName == "last_snapshot_span" {
lastSpan = TableCopySql{meta, tableCopySQL}
} else {
copySQL = append(copySQL, TableCopySql{meta, tableCopySQL})
Expand Down Expand Up @@ -385,6 +409,7 @@ func copyTablesDataAsync(
toDir string,
fw *FileWorker,
lenCache map[string]int,
pgTxSnapshot *txSnapshot,
) (int64, int64, error) {
var (
totalRowsCopied atomic.Int64
Expand All @@ -401,7 +426,7 @@ func copyTablesDataAsync(

filePath := path.Join(toDir, tableSql.metaData.Name)
// numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql)
numRowsCopied, bytesCopied, err := extractTableData(newCtx, pool, filePath, tableSql, fw, lenCache, &mtx)
numRowsCopied, bytesCopied, err := extractTableData(newCtx, pool, filePath, tableSql, fw, lenCache, &mtx, pgTxSnapshot)
if err != nil {
return fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err)
}
Expand Down Expand Up @@ -431,6 +456,7 @@ func extractTableData(
fw *FileWorker,
lenCache map[string]int,
cacheMu *sync.Mutex,
pgTxSnapshot *txSnapshot,
) (int64, int64, error) {

if cacheMu != nil {
Expand All @@ -455,7 +481,7 @@ func extractTableData(

b := bytes.NewBuffer(make([]byte, 0, allocCap))

numRowsCopied, err := executeCopy(ctx, pool, tableSql, b)
numRowsCopied, err := executeCopy(ctx, pool, tableSql, b, pgTxSnapshot)
if err != nil {
return 0, 0, fmt.Errorf("failed to execute copy: %w", err)
}
Expand All @@ -479,16 +505,32 @@ func extractTableData(
return numRowsCopied, len, nil
}

func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, w io.Writer) (int64, error) {
func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql, w io.Writer, pgTxSnapshot *txSnapshot) (int64, error) {
defer metrics.StartNetworkHistoryCopy(tableSql.metaData.Name)()

conn, err := pool.Acquire(ctx)
// conn, err := pool.Acquire(ctx)
// if err != nil {
// fmt.Sprintf("\n\n\n\nBOIIIIIIIIIIIIIIIIIIII couldn't acquire connection: %v\n\n\n\n\n", err)
// }
// defer conn.Release()

tx, err := pool.Begin(ctx)
if err != nil {
fmt.Sprintf("\n\n\n\nBOIIIIIIIIIIIIIIIIIIII couldn't acquire connection: %v\n\n\n\n\n", err)
return 0, fmt.Errorf("failed to begin copy table data transaction: %w", err)
}
defer conn.Release()

tag, err := conn.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql)
defer tx.Rollback(ctx)

if _, err = tx.Exec(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); err != nil {
return 0, fmt.Errorf("failed to set transaction isolation level to serilizable: %w", err)
}

if _, err = tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%v'", pgTxSnapshot.snapshotID)); err != nil {
fmt.Printf("COULDN'T IMPORT!!! %v", err)
return 0, fmt.Errorf("failed to set transaction isolation level to serilizable: %w", err)
}

tag, err := tx.Conn().PgConn().CopyTo(ctx, w, tableSql.copySql)
if err != nil {
return 0, fmt.Errorf("failed to execute copy sql %s: %w", tableSql.copySql, err)
}
Expand All @@ -498,6 +540,10 @@ func executeCopy(ctx context.Context, pool *pgxpool.Pool, tableSql TableCopySql,
fmt.Printf("SQL: %v / COPIED: %v\n", tableSql.copySql, rowsCopied)
metrics.NetworkHistoryRowsCopied(tableSql.metaData.Name, rowsCopied)

if err := tx.Commit(ctx); err != nil {
return 0, err
}

return rowsCopied, nil
}

Expand Down

0 comments on commit 882fadd

Please sign in to comment.