Skip to content

Commit 1f67361

Browse files
committed
feat: track Handshake sync cursor
This allows us to resume the sync process where we last left off Signed-off-by: Aurora Gaffney <[email protected]>
1 parent 06fc1fd commit 1f67361

File tree

2 files changed

+61
-4
lines changed

2 files changed

+61
-4
lines changed

internal/indexer/handshake.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ package indexer
88

99
import (
1010
"encoding/base32"
11+
"encoding/hex"
12+
"errors"
1113
"fmt"
1214
"log/slog"
1315
"time"
@@ -22,7 +24,6 @@ type handshakeState struct {
2224
peer *handshake.Peer
2325
peerAddress string
2426
peerBackoffDelay time.Duration
25-
blockHeight int
2627
}
2728

2829
func (i *Indexer) startHandshake() error {
@@ -59,8 +60,30 @@ func (i *Indexer) handshakeConnectPeer() error {
5960
// Stop waiting on connection shutdown
6061
}
6162
}()
63+
var locator [][32]byte = nil
64+
cursorBlockHash, err := state.GetState().GetHandshakeCursor()
65+
if err != nil {
66+
return err
67+
}
68+
if cursorBlockHash != "" {
69+
slog.Info(
70+
"found previous Handshake cursor: " + cursorBlockHash,
71+
)
72+
hashBytes, err := hex.DecodeString(cursorBlockHash)
73+
if err != nil {
74+
return err
75+
}
76+
if len(hashBytes) != 32 {
77+
// This isn't a condition we can really recover from, since it implies database corruption
78+
slog.Error(
79+
fmt.Sprintf("bad Handshake cursor block hash: %x", hashBytes),
80+
)
81+
return errors.New("bad Handshake locator")
82+
}
83+
locator = [][32]byte{[32]byte(hashBytes)}
84+
}
6285
// Start sync
63-
if err := i.handshakeState.peer.Sync(nil, i.handshakeHandleSync); err != nil {
86+
if err := i.handshakeState.peer.Sync(locator, i.handshakeHandleSync); err != nil {
6487
_ = i.handshakeState.peer.Close()
6588
return err
6689
}
@@ -102,10 +125,8 @@ func (i *Indexer) handshakeReconnectPeer() {
102125
}
103126

104127
func (i *Indexer) handshakeHandleSync(block *handshake.Block) error {
105-
i.handshakeState.blockHeight++
106128
slog.Debug(
107129
"synced Handshake block",
108-
"height", i.handshakeState.blockHeight,
109130
"hash", fmt.Sprintf("%x", block.Hash()),
110131
"prevHash", fmt.Sprintf("%x", block.Header.PrevBlock),
111132
)
@@ -152,6 +173,11 @@ func (i *Indexer) handshakeHandleSync(block *handshake.Block) error {
152173
}
153174
}
154175
}
176+
// Update cursor
177+
blockHash := block.Hash()
178+
if err := state.GetState().UpdateHandshakeCursor(hex.EncodeToString(blockHash[:])); err != nil {
179+
return err
180+
}
155181
return nil
156182
}
157183

internal/state/state.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const (
2727
chainsyncCursorKey = "chainsync_cursor"
2828
discoveredAddrKey = "discovered_addresses"
2929
fingerprintKey = "config_fingerprint"
30+
handshakeCursorKey = "handshake_cursor"
3031

3132
cardanoRecordKeyPrefix = "r_"
3233
cardanoDomainKeyPrefix = "d_"
@@ -359,6 +360,36 @@ func (s *State) lookupRecords(
359360
return ret, nil
360361
}
361362

363+
func (s *State) UpdateHandshakeCursor(blockHash string) error {
364+
err := s.db.Update(func(txn *badger.Txn) error {
365+
if err := txn.Set([]byte(handshakeCursorKey), []byte(blockHash)); err != nil {
366+
return err
367+
}
368+
return nil
369+
})
370+
return err
371+
}
372+
373+
func (s *State) GetHandshakeCursor() (string, error) {
374+
var blockHash string
375+
err := s.db.View(func(txn *badger.Txn) error {
376+
item, err := txn.Get([]byte(handshakeCursorKey))
377+
if err != nil {
378+
return err
379+
}
380+
val, err := item.ValueCopy(nil)
381+
if err != nil {
382+
return err
383+
}
384+
blockHash = string(val)
385+
return nil
386+
})
387+
if errors.Is(err, badger.ErrKeyNotFound) {
388+
return "", nil
389+
}
390+
return blockHash, err
391+
}
392+
362393
func (s *State) AddHandshakeName(name string) error {
363394
nameHash := sha3.Sum256([]byte(name))
364395
nameHashKey := fmt.Sprintf("%s%x", handshakeNameHashKeyPrefix, nameHash)

0 commit comments

Comments
 (0)