Skip to content

Commit

Permalink
Add auto-expiration for locked account keys and metrics for available…
Browse files Browse the repository at this point in the history
… keys left
  • Loading branch information
m-Peter committed Dec 12, 2024
1 parent 04fd1d5 commit 2a7482e
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 62 deletions.
4 changes: 2 additions & 2 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Bootstrap struct {
events *ingestion.Engine
profiler *api.ProfileServer
db *pebbleDB.DB
keystore *requester.Keystore
keystore *requester.KeyStore
}

func New(config config.Config) (*Bootstrap, error) {
Expand Down Expand Up @@ -220,7 +220,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
})
}

b.keystore = requester.NewKeystore(accountKeys)
b.keystore = requester.NewKeyStore(accountKeys)

evm, err := requester.NewEVM(
b.storages.Registers,
Expand Down
13 changes: 13 additions & 0 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Collector interface {
EVMAccountInteraction(address string)
MeasureRequestDuration(start time.Time, method string)
OperatorBalance(account *flow.Account)
AvailableSigningKeys(count int)
}

var _ Collector = &DefaultCollector{}
Expand All @@ -35,6 +36,7 @@ type DefaultCollector struct {
operatorBalance prometheus.Gauge
evmAccountCallCounters *prometheus.CounterVec
requestDurations *prometheus.HistogramVec
availableSigningkeys prometheus.Gauge
}

func NewCollector(logger zerolog.Logger) Collector {
Expand Down Expand Up @@ -90,6 +92,11 @@ func NewCollector(logger zerolog.Logger) Collector {
Buckets: prometheus.DefBuckets,
}, []string{"method"})

availableSigningKeys := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("available_signing_keys"),
Help: "Number of keys available for transaction signing",
})

metrics := []prometheus.Collector{
apiErrors,
traceDownloadErrorCounter,
Expand All @@ -101,6 +108,7 @@ func NewCollector(logger zerolog.Logger) Collector {
operatorBalance,
evmAccountCallCounters,
requestDurations,
availableSigningKeys,
}
if err := registerMetrics(logger, metrics...); err != nil {
logger.Info().Msg("using noop collector as metric register failed")
Expand All @@ -118,6 +126,7 @@ func NewCollector(logger zerolog.Logger) Collector {
evmAccountCallCounters: evmAccountCallCounters,
requestDurations: requestDurations,
operatorBalance: operatorBalance,
availableSigningkeys: availableSigningKeys,
}
}

Expand Down Expand Up @@ -172,6 +181,10 @@ func (c *DefaultCollector) MeasureRequestDuration(start time.Time, method string
Observe(time.Since(start).Seconds())
}

func (c *DefaultCollector) AvailableSigningKeys(count int) {
c.availableSigningkeys.Set(float64(count))
}

func prefixedName(name string) string {
return fmt.Sprintf("evm_gateway_%s", name)
}
1 change: 1 addition & 0 deletions metrics/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ func (c *nopCollector) EVMTransactionIndexed(int) {}
func (c *nopCollector) EVMAccountInteraction(string) {}
func (c *nopCollector) MeasureRequestDuration(time.Time, string) {}
func (c *nopCollector) OperatorBalance(*flow.Account) {}
func (c *nopCollector) AvailableSigningKeys(count int) {}
20 changes: 10 additions & 10 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ var _ EventSubscriber = &RPCEventSubscriber{}
type RPCEventSubscriber struct {
logger zerolog.Logger

client *requester.CrossSporkClient
chain flowGo.ChainID
keystore *requester.Keystore
height uint64
client *requester.CrossSporkClient
chain flowGo.ChainID
keyLock requester.KeyLock
height uint64

recovery bool
recoveredEvents []flow.Event
Expand All @@ -46,17 +46,17 @@ func NewRPCEventSubscriber(
logger zerolog.Logger,
client *requester.CrossSporkClient,
chainID flowGo.ChainID,
keystore *requester.Keystore,
keyLock requester.KeyLock,
startHeight uint64,
) *RPCEventSubscriber {
logger = logger.With().Str("component", "subscriber").Logger()
return &RPCEventSubscriber{
logger: logger,

client: client,
chain: chainID,
keystore: keystore,
height: startHeight,
client: client,
chain: chainID,
keyLock: keyLock,
height: startHeight,
}
}

Expand Down Expand Up @@ -173,7 +173,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
}
}
for _, evt := range blockEvents.Events {
r.keystore.UnlockKey(evt.TransactionID)
r.keyLock.UnlockKey(evt.TransactionID)
}

eventsChan <- evmEvents
Expand Down
10 changes: 5 additions & 5 deletions services/ingestion/event_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_Subscribing(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)

events := subscriber.Subscribe(context.Background())

Expand Down Expand Up @@ -83,7 +83,7 @@ func Test_MissingBlockEvent(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)

events := subscriber.Subscribe(context.Background())

Expand Down Expand Up @@ -185,7 +185,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)

events := subscriber.Subscribe(context.Background())

Expand Down Expand Up @@ -248,7 +248,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)

events := subscriber.Subscribe(context.Background())

Expand Down Expand Up @@ -310,7 +310,7 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)

events := subscriber.Subscribe(context.Background())

Expand Down
88 changes: 49 additions & 39 deletions services/requester/keystore.go → services/requester/key_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,33 @@ package requester
import (
"fmt"
"sync"
"time"

flowsdk "github.com/onflow/flow-go-sdk"

"github.com/onflow/flow-go-sdk/crypto"
flowGo "github.com/onflow/flow-go/model/flow"
)

var ErrNoKeysAvailable = fmt.Errorf("no keys available")

const accountKeyExpiry = 10 * time.Second

type AccountKey struct {
flowsdk.AccountKey

mu sync.Mutex
ks *Keystore
Address flowsdk.Address
Signer crypto.Signer
inuse bool
mu sync.Mutex
ks *KeyStore
Address flowsdk.Address
Signer crypto.Signer
lastUsed time.Time
}

// Done unlocks a key after use and puts it back into the pool.
func (k *AccountKey) Done() {
k.markUnused()
k.ks.availableKeys <- k
}

// IncrementSequenceNumber is called when a key was successfully used to
// sign a transaction as the proposer. It increments the sequence number.
func (k *AccountKey) IncrementSequenceNumber() error {
k.mu.Lock()
defer k.mu.Unlock()

if !k.inuse {
return fmt.Errorf("key with index %d not locked", k.Index)
}

k.SequenceNumber++
return nil
k.ks.availableKeys <- k
}

func (k *AccountKey) SetProposerPayerAndSign(tx *flowsdk.Transaction) error {
Expand All @@ -48,21 +39,31 @@ func (k *AccountKey) SetProposerPayerAndSign(tx *flowsdk.Transaction) error {
SignEnvelope(k.Address, k.Index, k.Signer)
}

func (k *AccountKey) markUnused() {
k.mu.Lock()
defer k.mu.Unlock()
// incrementSequenceNumber is called when a key was successfully used to
// sign a transaction as the proposer. It increments the sequence number.
func (k *AccountKey) incrementSequenceNumber() {
k.SequenceNumber++
}

func (k *AccountKey) expired() bool {
return time.Since(k.lastUsed) > flowGo.DefaultTransactionExpiry
}

k.inuse = false
type KeyLock interface {
LockKey(txID flowsdk.Identifier, key *AccountKey)
UnlockKey(txID flowsdk.Identifier)
}

type Keystore struct {
type KeyStore struct {
availableKeys chan *AccountKey
usedKeys map[flowsdk.Identifier]*AccountKey
size int
}

func NewKeystore(keys []*AccountKey) *Keystore {
ks := &Keystore{
var _ KeyLock = (*KeyStore)(nil)

func NewKeyStore(keys []*AccountKey) *KeyStore {
ks := &KeyStore{
usedKeys: map[flowsdk.Identifier]*AccountKey{},
}

Expand All @@ -74,38 +75,47 @@ func NewKeystore(keys []*AccountKey) *Keystore {
ks.size = len(keys)
ks.availableKeys = availableKeys

go ks.keyExpiryChecker()

return ks
}

func (k *Keystore) Size() int {
return k.size
func (k *KeyStore) AvailableKeys() int {
return k.size - len(k.usedKeys)
}

func (k *Keystore) GetKey() (*AccountKey, error) {
func (k *KeyStore) Take() (*AccountKey, error) {
select {
case key := <-k.availableKeys:
key.mu.Lock()
defer key.mu.Unlock()

if key.inuse {
return nil, fmt.Errorf("key with index %d already in use", key.Index)
}
key.inuse = true

return key, nil
default:
return nil, ErrNoKeysAvailable
}
}

func (k *Keystore) LockKey(txID flowsdk.Identifier, key *AccountKey) {
func (k *KeyStore) LockKey(txID flowsdk.Identifier, key *AccountKey) {
key.mu.Lock()
defer key.mu.Unlock()

key.incrementSequenceNumber()
key.lastUsed = time.Now()
k.usedKeys[txID] = key
}

func (k *Keystore) UnlockKey(txID flowsdk.Identifier) {
func (k *KeyStore) UnlockKey(txID flowsdk.Identifier) {
key, ok := k.usedKeys[txID]
if ok && key != nil {
key.Done()
delete(k.usedKeys, txID)
}
}

func (k *KeyStore) keyExpiryChecker() {
for range time.Tick(accountKeyExpiry) {
for txID, key := range k.usedKeys {
if key.expired() {
k.UnlockKey(txID)
}
}
}
}
10 changes: 4 additions & 6 deletions services/requester/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type EVM struct {
logger zerolog.Logger
blocks storage.BlockIndexer
mux sync.Mutex
keystore *Keystore
keystore *KeyStore
head *types.Header
evmSigner types.Signer
validationOptions *txpool.ValidationOptions
Expand All @@ -119,7 +119,7 @@ func NewEVM(
blocks storage.BlockIndexer,
txPool *TxPool,
collector metrics.Collector,
keystore *Keystore,
keystore *KeyStore,
) (*EVM, error) {
logger = logger.With().Str("component", "requester").Logger()
address := config.COAAddress
Expand Down Expand Up @@ -553,7 +553,7 @@ func (e *EVM) buildTransaction(
return nil, err
}

accKey, err := e.keystore.GetKey()
accKey, err := e.keystore.Take()
if err != nil {
return nil, err
}
Expand All @@ -571,11 +571,9 @@ func (e *EVM) buildTransaction(
if err := accKey.SetProposerPayerAndSign(flowTx); err != nil {
return nil, err
}
if err := accKey.IncrementSequenceNumber(); err != nil {
return nil, err
}
e.keystore.LockKey(flowTx.ID(), accKey)

e.collector.AvailableSigningKeys(e.keystore.AvailableKeys())
e.collector.OperatorBalance(account)

return flowTx, nil
Expand Down

0 comments on commit 2a7482e

Please sign in to comment.