Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve datanode snapshot creation #11396

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/data-node/commands/start/node_pre.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi
l.snapshotService,
networkHistoryStore,
l.conf.API.Port,
l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo))
l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo),
)
if err != nil {
return fmt.Errorf("failed to create networkHistory service:%w", err)
}
Expand Down
23 changes: 23 additions & 0 deletions datanode/networkhistory/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,26 @@ func New(ctx context.Context, log *logging.Logger, chainID string, cfg Config, c
datanodeGrpcAPIPort: datanodeGrpcAPIPort,
}

go func() {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ctx.Done():
s.log.Info("saving network history before before leaving")
// consume all pending files
s.snapshotService.Flush()
s.log.Info("network history saved (maybe)")
return
case <-ticker.C:
s.snapshotService.Flush()
}
}
}()

if cfg.Publish {
var err error

// publish all file which are ready
go func() {
ticker := time.NewTicker(5 * time.Second)
for {
Expand Down Expand Up @@ -182,6 +200,11 @@ func (d *Service) CreateAndPublishSegment(ctx context.Context, chainID string, t
}
}

// empty the file worker
d.log.Info("saving network history to disk")
d.snapshotService.Flush()
d.log.Info("network history saved to disk")

if err = d.PublishSegments(ctx); err != nil {
return fmt.Errorf("failed to publish snapshots: %w", err)
}
Expand Down
57 changes: 48 additions & 9 deletions datanode/networkhistory/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var (
)

func TestMain(t *testing.M) {
dirs := map[string]string{}
outerCtx, cancelOuterCtx := context.WithCancel(context.Background())
defer cancelOuterCtx()

Expand Down Expand Up @@ -148,7 +149,7 @@ func TestMain(t *testing.M) {
pgLog *bytes.Buffer,
) {
sqlConfig = config
log.Infof("DB Connection String: ", sqlConfig.ConnectionConfig.GetConnectionString())
log.Infof("DB Connection String: %v", sqlConfig.ConnectionConfig.GetConnectionString())

pool, err := sqlstore.CreateConnectionPool(outerCtx, sqlConfig.ConnectionConfig)
if err != nil {
Expand Down Expand Up @@ -184,7 +185,11 @@ func TestMain(t *testing.M) {
panic(fmt.Errorf("failed to create snapshot: %w", err))
}

waitForSnapshotToComplete(ss)
dirs["dir1"] = ss.Directory
// fmt.Printf("unpublished: %v\n", ss.Directory)
// os.Exit(1)

waitForSnapshotToComplete2(ss, snapshotService.Flush)

snapshots = append(snapshots, ss)

Expand All @@ -211,7 +216,8 @@ func TestMain(t *testing.M) {
panic(fmt.Errorf("failed to create snapshot:%w", err))
}

waitForSnapshotToComplete(lastSnapshot)
dirs["dir2"] = lastSnapshot.Directory
waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush)
snapshots = append(snapshots, lastSnapshot)
md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory())
if err != nil {
Expand Down Expand Up @@ -268,7 +274,8 @@ func TestMain(t *testing.M) {
panic(fmt.Errorf("failed to create snapshot:%w", err))
}

waitForSnapshotToComplete(lastSnapshot)
dirs["dir3"] = lastSnapshot.Directory
waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush)
snapshots = append(snapshots, lastSnapshot)
md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory())
if err != nil {
Expand Down Expand Up @@ -378,7 +385,8 @@ func TestMain(t *testing.M) {
log.Infof("%s", goldenSourceHistorySegment[3000].HistorySegmentID)
log.Infof("%s", goldenSourceHistorySegment[4000].HistorySegmentID)
log.Infof("%s", goldenSourceHistorySegment[5000].HistorySegmentID)

fmt.Printf("DIRECTORIES: %v\n", dirs)
os.Exit(1)
panicIfHistorySegmentIdsNotEqual(goldenSourceHistorySegment[1000].HistorySegmentID, "QmQX6n82ex2XDh1tWL1gCv2viDttUwRSdyG1XaekYfLpJk", snapshots)
panicIfHistorySegmentIdsNotEqual(goldenSourceHistorySegment[2000].HistorySegmentID, "QmaWdp5RPui6ePszzPvk48e7FxHmPGx2VMpbXD2NTgtFMT", snapshots)
panicIfHistorySegmentIdsNotEqual(goldenSourceHistorySegment[2500].HistorySegmentID, "QmRmAX4AfQ9xAdLN8GjVCBmDH7Cm6q1ts7TBF9UjLdjMG9", snapshots)
Expand Down Expand Up @@ -421,6 +429,7 @@ func TestLoadingDataFetchedAsynchronously(t *testing.T) {
require.Equal(t, int64(1000), fetched)

networkhistoryService := setupNetworkHistoryService(ctx, log, snapshotService, networkHistoryStore, snapshotCopyToPath)

segments, err := networkhistoryService.ListAllHistorySegments()
require.NoError(t, err)

Expand Down Expand Up @@ -583,7 +592,7 @@ func TestRestoringNodeThatAlreadyContainsData(t *testing.T) {
ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
require.NoError(t, err)

waitForSnapshotToComplete(ss)
waitForSnapshotToComplete2(ss, snapshotService.Flush)

md5Hash, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory())
require.NoError(t, err)
Expand Down Expand Up @@ -888,7 +897,7 @@ func TestRestoreFromPartialHistoryAndProcessEvents(t *testing.T) {
if lastCommittedBlockHeight > 0 && lastCommittedBlockHeight%snapshotInterval == 0 {
ss, err = service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
require.NoError(t, err)
waitForSnapshotToComplete(ss)
waitForSnapshotToComplete2(ss, service.Flush)

if lastCommittedBlockHeight == 4000 {
newSnapshotFileHashAt4000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory())
Expand Down Expand Up @@ -993,7 +1002,7 @@ func TestRestoreFromFullHistorySnapshotAndProcessEvents(t *testing.T) {
if lastCommittedBlockHeight == 3000 {
ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
require.NoError(t, err)
waitForSnapshotToComplete(ss)
waitForSnapshotToComplete2(ss, service.Flush)

snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory())
require.NoError(t, err)
Expand Down Expand Up @@ -1096,7 +1105,7 @@ func TestRestoreFromFullHistorySnapshotWithIndexesAndOrderTriggersAndProcessEven
if lastCommittedBlockHeight == 3000 {
ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
require.NoError(t, err)
waitForSnapshotToComplete(ss)
waitForSnapshotToComplete2(ss, service.Flush)

snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory())
require.NoError(t, err)
Expand Down Expand Up @@ -1578,6 +1587,36 @@ func waitForSnapshotToComplete(sf segment.Unpublished) {
}
}

func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) {
for {
time.Sleep(10 * time.Millisecond)
// wait for snapshot current file
_, err := os.Stat(sf.UnpublishedSnapshotDataDirectory())
if err != nil {
if errors.Is(err, os.ErrNotExist) {
continue
} else {
panic(err)
}
}

flush()

// wait for snapshot data dump in progress file to be removed

_, err = os.Stat(sf.InProgressFilePath())
if err != nil {
if errors.Is(err, os.ErrNotExist) {
break
} else {
panic(err)
}
} else {
continue
}
}
}

func decompressEventFile() {
sourceFile, err := os.Open(compressedEventsFile)
if err != nil {
Expand Down
107 changes: 107 additions & 0 deletions datanode/networkhistory/snapshot/file_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package snapshot

import (
"bytes"
"fmt"
"os"
"sync"
)

type bufAndPath struct {
buf *bytes.Buffer
isProgressFile bool
path string
}

type FileWorker struct {
mu sync.Mutex
queue []*bufAndPath
}

func NewFileWorker() *FileWorker {
return &FileWorker{
queue: []*bufAndPath{},
}
}

func (fw *FileWorker) Add(buf *bytes.Buffer, path string) {
fw.mu.Lock()
defer fw.mu.Unlock()

fw.queue = append(fw.queue, &bufAndPath{buf, false, path})
}

func (fw *FileWorker) AddLockFile(path string) {
fw.mu.Lock()
defer fw.mu.Unlock()

fw.queue = append(fw.queue, &bufAndPath{nil, true, path})
}

func (fw *FileWorker) peek() (bp *bufAndPath) {
fw.mu.Lock()
defer fw.mu.Unlock()

if len(fw.queue) <= 0 {
return
}

bp, fw.queue = fw.queue[0], fw.queue[1:]

return
}

func (fw *FileWorker) Empty() bool {
fw.mu.Lock()
defer fw.mu.Unlock()

return len(fw.queue) <= 0
}

func (fw *FileWorker) Consume() error {
bp := fw.peek()
if bp == nil {
return nil // nothing to do
}

if bp.isProgressFile {
return fw.removeLockFile(bp.path)
}

return fw.writeSegment(bp)
}

func (fw *FileWorker) removeLockFile(path string) error {
return os.Remove(path)
}

func (fw *FileWorker) writeSegment(bp *bufAndPath) error {
file, err := os.Create(bp.path)
if err != nil {
return fmt.Errorf("failed to create file %s : %w", bp.path, err)
}

defer file.Close()

_, err = bp.buf.WriteTo(file)
if err != nil {
return fmt.Errorf("couldn't writer to file %v : %w", bp.path, err)
}

return nil
}
29 changes: 21 additions & 8 deletions datanode/networkhistory/snapshot/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Service struct {

historyStore HistoryStore

fw *FileWorker
tableSnapshotFileSizesCached map[string]int

createSnapshotLock mutex.CtxMutex
copyToPath string
migrateSchemaUpToVersion func(version int64) error
Expand All @@ -59,14 +62,16 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po
}

s := &Service{
log: log,
config: config,
connPool: connPool,
createSnapshotLock: mutex.New(),
copyToPath: snapshotsCopyToPath,
migrateSchemaUpToVersion: migrateDatabaseToVersion,
migrateSchemaDownToVersion: migrateSchemaDownToVersion,
historyStore: historyStore,
log: log,
config: config,
connPool: connPool,
createSnapshotLock: mutex.New(),
copyToPath: snapshotsCopyToPath,
migrateSchemaUpToVersion: migrateDatabaseToVersion,
migrateSchemaDownToVersion: migrateSchemaDownToVersion,
historyStore: historyStore,
fw: NewFileWorker(),
tableSnapshotFileSizesCached: map[string]int{},
}

err = os.MkdirAll(s.copyToPath, fs.ModePerm)
Expand All @@ -77,6 +82,14 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po
return s, nil
}

func (b *Service) Flush() {
for !b.fw.Empty() {
if err := b.fw.Consume(); err != nil {
b.log.Error("failed to write all files to disk", logging.Error(err))
}
}
}

func (b *Service) SnapshotData(ctx context.Context, chainID string, toHeight int64) error {
_, err := b.CreateSnapshotAsynchronously(ctx, chainID, toHeight)
if err != nil {
Expand Down
Loading
Loading