Skip to content

Commit

Permalink
Test heartbeat service, restrict free heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
jannotti committed Nov 21, 2024
1 parent 5191eb5 commit 1f93f49
Show file tree
Hide file tree
Showing 36 changed files with 675 additions and 540 deletions.
2 changes: 1 addition & 1 deletion agreement/gossip/networkFull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func spinNetwork(t *testing.T, nodesCount int, cfg config.Local) ([]*networkImpl
break
}
}
log.Infof("network established, %d nodes connected in %s", nodesCount, time.Now().Sub(start).String())
log.Infof("network established, %d nodes connected in %s", nodesCount, time.Since(start).String())
return networkImpls, msgCounters
}

Expand Down
2 changes: 1 addition & 1 deletion catchup/universalFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (uf *universalBlockFetcher) fetchBlock(ctx context.Context, round basics.Ro
} else {
return nil, nil, time.Duration(0), fmt.Errorf("fetchBlock: UniversalFetcher only supports HTTPPeer and UnicastPeer")
}
downloadDuration = time.Now().Sub(blockDownloadStartTime)
downloadDuration = time.Since(blockDownloadStartTime)
block, cert, err := processBlockBytes(fetchedBuf, round, address)
if err != nil {
return nil, nil, time.Duration(0), err
Expand Down
3 changes: 1 addition & 2 deletions cmd/goal/clerk.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ func waitForCommit(client libgoal.Client, txid string, transactionLastValidRound
}

reportInfof(infoTxPending, txid, stat.LastRound)
// WaitForRound waits until round "stat.LastRound+1" is committed
stat, err = client.WaitForRound(stat.LastRound)
stat, err = client.WaitForRound(stat.LastRound + 1)

Check warning on line 224 in cmd/goal/clerk.go

View check run for this annotation

Codecov / codecov/patch

cmd/goal/clerk.go#L224

Added line #L224 was not covered by tests
if err != nil {
return model.PendingTransactionResponse{}, fmt.Errorf(errorRequestFail, err)
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/loadgenerator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,23 @@ func waitForRound(restClient client.RestClient, cfg config, spendingRound bool)
time.Sleep(1 * time.Second)
continue
}
if isSpendRound(cfg, nodeStatus.LastRound) == spendingRound {
lastRound := nodeStatus.LastRound
if isSpendRound(cfg, lastRound) == spendingRound {
// time to send transactions.
return
}
if spendingRound {
fmt.Printf("Last round %d, waiting for spending round %d\n", nodeStatus.LastRound, nextSpendRound(cfg, nodeStatus.LastRound))
fmt.Printf("Last round %d, waiting for spending round %d\n", lastRound, nextSpendRound(cfg, nodeStatus.LastRound))
}
for {
// wait for the next round.
nodeStatus, err = restClient.WaitForBlock(basics.Round(nodeStatus.LastRound))
err = restClient.WaitForRoundWithTimeout(lastRound + 1)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to wait for next round node status : %v", err)
time.Sleep(1 * time.Second)
break
}
if isSpendRound(cfg, nodeStatus.LastRound) == spendingRound {
lastRound++
if isSpendRound(cfg, lastRound) == spendingRound {
// time to send transactions.
return
}
Expand Down
90 changes: 80 additions & 10 deletions daemon/algod/api/client/restClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/google/go-querystring/query"

Expand All @@ -39,6 +40,8 @@ import (
"github.com/algorand/go-algorand/ledger/eval"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/test/e2e-go/globals"
)

const (
Expand Down Expand Up @@ -283,12 +286,77 @@ func (client RestClient) Status() (response model.NodeStatusResponse, err error)
return
}

// WaitForBlock returns the node status after waiting for the given round.
func (client RestClient) WaitForBlock(round basics.Round) (response model.NodeStatusResponse, err error) {
// WaitForBlockAfter returns the node status after trying to wait for the given
// round+1. This REST API has the documented misfeatures of returning after 1
// minute, regardless of whether the given block has been reached.
func (client RestClient) WaitForBlockAfter(round basics.Round) (response model.NodeStatusResponse, err error) {
err = client.get(&response, fmt.Sprintf("/v2/status/wait-for-block-after/%d/", round), nil)
return
}

// WaitForRound returns the node status after waiting for the given round.
func (client RestClient) WaitForRound(round uint64, waitTime time.Duration) (status model.NodeStatusResponse, err error) {
timeout := time.NewTimer(waitTime)
for {
status, err = client.Status()
if err != nil {
return
}

if status.LastRound >= round {
return
}
select {
case <-timeout.C:
return model.NodeStatusResponse{}, fmt.Errorf("timeout waiting for round %v with last round = %v", round, status.LastRound)
case <-time.After(200 * time.Millisecond):
}
}
}

const singleRoundMaxTime = globals.MaxTimePerRound * 40

// WaitForRoundWithTimeout waits for a given round to be reached. As it
// waits, it returns early with an error if the wait time for any round exceeds
// globals.MaxTimePerRound so we can alert when we're getting "hung" waiting.
func (client RestClient) WaitForRoundWithTimeout(roundToWaitFor uint64) error {
status, err := client.Status()
if err != nil {
return err
}
lastRound := status.LastRound

// If node is already at or past target round, we're done
if lastRound >= roundToWaitFor {
return nil
}

roundComplete := make(chan error, 2)

for nextRound := lastRound + 1; lastRound < roundToWaitFor; nextRound++ {
roundStarted := time.Now()

go func(done chan error) {
stat, err := client.WaitForRound(nextRound, singleRoundMaxTime)
lastRound = stat.LastRound
done <- err
}(roundComplete)

select {
case lastError := <-roundComplete:
if lastError != nil {
close(roundComplete)
return lastError
}
case <-time.After(singleRoundMaxTime):
// we've timed out.
time := time.Since(roundStarted)
return fmt.Errorf("fixture.WaitForRound took %3.2f seconds between round %d and %d", time.Seconds(), lastRound, nextRound)
}
}
return nil
}

// HealthCheck does a health check on the potentially running node,
// returning an error if the API is down
func (client RestClient) HealthCheck() error {
Expand All @@ -301,14 +369,6 @@ func (client RestClient) ReadyCheck() error {
return client.get(nil, "/ready", nil)
}

// StatusAfterBlock waits for a block to occur then returns the StatusResponse after that block
// blocks on the node end
// Not supported
func (client RestClient) StatusAfterBlock(blockNum uint64) (response model.NodeStatusResponse, err error) {
err = client.get(&response, fmt.Sprintf("/v2/status/wait-for-block-after/%d", blockNum), nil)
return
}

type pendingTransactionsParams struct {
Max uint64 `url:"max"`
Format string `url:"format"`
Expand Down Expand Up @@ -557,6 +617,16 @@ func (client RestClient) RawBlock(round uint64) (response []byte, err error) {
return
}

// EncodedBlockCert takes a round and returns its parsed block and certificate
func (client RestClient) EncodedBlockCert(round uint64) (blockCert rpcs.EncodedBlockCert, err error) {
resp, err := client.RawBlock(round)
if err != nil {
return
}
err = protocol.Decode(resp, &blockCert)
return
}

// Shutdown requests the node to shut itself down
func (client RestClient) Shutdown() (err error) {
response := 1
Expand Down
4 changes: 2 additions & 2 deletions data/transactions/verify/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr *bookkeeping.Bl
if stxn.Txn.Type == protocol.StateProofTx {
continue
}
if stxn.Txn.Type == protocol.HeartbeatTx && len(stxs) == 1 {
// TODO: Only allow free HB if the HbAddress is challenged
if stxn.Txn.Type == protocol.HeartbeatTx && stxn.Txn.Group.IsZero() {
// in apply.Heartbeat, we further confirm that the heartbeat is for a challenged node
continue

Check warning on line 229 in data/transactions/verify/txn.go

View check run for this annotation

Codecov / codecov/patch

data/transactions/verify/txn.go#L229

Added line #L229 was not covered by tests
}
minFeeCount++
Expand Down
4 changes: 2 additions & 2 deletions data/transactions/verify/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func TestPaysetGroups(t *testing.T) {
startPaysetGroupsTime := time.Now()
err := PaysetGroups(context.Background(), txnGroups, blkHdr, verificationPool, MakeVerifiedTransactionCache(50000), nil)
require.NoError(t, err)
paysetGroupDuration := time.Now().Sub(startPaysetGroupsTime)
paysetGroupDuration := time.Since(startPaysetGroupsTime)

// break the signature and see if it fails.
txnGroups[0][0].Sig[0] = txnGroups[0][0].Sig[0] + 1
Expand Down Expand Up @@ -609,7 +609,7 @@ func TestPaysetGroups(t *testing.T) {
// channel is closed without a return
require.Failf(t, "Channel got closed ?!", "")
} else {
actualDuration := time.Now().Sub(startPaysetGroupsTime)
actualDuration := time.Since(startPaysetGroupsTime)
if err == nil {
if actualDuration > 4*time.Second {
// it took at least 2.5 seconds more than it should have had!
Expand Down
2 changes: 1 addition & 1 deletion data/transactions/verify/verifiedTxnCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func BenchmarkGetUnverifiedTransactionGroups50(b *testing.B) {
for i := 0; i < measuringMultipler; i++ {
impl.GetUnverifiedTransactionGroups(queryTxnGroups, spec, protocol.ConsensusCurrentVersion)
}
duration := time.Now().Sub(startTime)
duration := time.Since(startTime)
// calculate time per 10K verified entries:
t := int(duration*10000) / (measuringMultipler * b.N)
b.ReportMetric(float64(t)/float64(time.Millisecond), "ms/10K_cache_compares")
Expand Down
8 changes: 4 additions & 4 deletions heartbeat/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger/eval"
"github.com/algorand/go-algorand/ledger/apply"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
)
Expand Down Expand Up @@ -80,7 +80,7 @@ func (s *Service) Stop() {
func (s *Service) findChallenged(rules config.ProposerPayoutRules) []account.ParticipationRecordForRound {
current := s.ledger.LastRound()

ch := eval.FindChallenge(rules, current, s.ledger, eval.ChRisky)
ch := apply.FindChallenge(rules, current, s.ledger, apply.ChRisky)
if ch.IsZero() {
return nil
}
Expand All @@ -93,8 +93,7 @@ func (s *Service) findChallenged(rules config.ProposerPayoutRules) []account.Par
continue

Check warning on line 93 in heartbeat/service.go

View check run for this annotation

Codecov / codecov/patch

heartbeat/service.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}
if acct.Status == basics.Online {
lastSeen := max(acct.LastProposed, acct.LastHeartbeat)
if ch.Failed(pr.Account, lastSeen) {
if ch.Failed(pr.Account, acct.LastSeen()) {
s.log.Infof(" %v needs a heartbeat\n", pr.Account)
found = append(found, pr)
}
Expand Down Expand Up @@ -135,6 +134,7 @@ func (s *Service) loop() {

for _, pr := range s.findChallenged(proto.Payouts) {
stxn := s.prepareHeartbeat(pr, lastHdr)
s.log.Infof("sending heartbeat %v for %v\n", stxn.Txn.HeartbeatTxnFields, pr.Account)
err = s.bcast.BroadcastInternalSignedTxGroup([]transactions.SignedTxn{stxn})
if err != nil {
s.log.Errorf("error broadcasting heartbeat %v for %v: %v", stxn, pr.Account, err)

Check warning on line 140 in heartbeat/service.go

View check run for this annotation

Codecov / codecov/patch

heartbeat/service.go#L140

Added line #L140 was not covered by tests
Expand Down
8 changes: 4 additions & 4 deletions heartbeat/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func makeBlock(r basics.Round) bookkeeping.Block {
}
}

func TestHeartBeatOnlyWhenChallenged(t *testing.T) {
func TestHeartbeatOnlyWhenChallenged(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

Expand All @@ -234,9 +234,9 @@ func TestHeartBeatOnlyWhenChallenged(t *testing.T) {
// now they are online, but not challenged, so no heartbeat
acct.Status = basics.Online
acct.VoteKeyDilution = 100
otss := crypto.GenerateOneTimeSignatureSecrets(
basics.OneTimeIDForRound(ledger.LastRound(), acct.VoteKeyDilution).Batch,
5)
startBatch := basics.OneTimeIDForRound(ledger.LastRound(), acct.VoteKeyDilution).Batch
const batches = 50 // gives 50 * kd rounds = 5000
otss := crypto.GenerateOneTimeSignatureSecrets(startBatch, batches)
acct.VoteID = otss.OneTimeSignatureVerifier
ledger.addParticipant(joe, otss)
ledger.addParticipant(mary, otss)
Expand Down
116 changes: 116 additions & 0 deletions ledger/apply/challenge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package apply

import (
"math/bits"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/committee"
)

// ChallengePeriod indicates which part of the challenge period is under discussion.
type ChallengePeriod int

const (
// ChRisky indicates that a challenge is in effect, and the initial grace period is running out.
ChRisky ChallengePeriod = iota
// ChActive indicates that a challenege is in effect, and the grace period
// has run out, so accounts can be suspended
ChActive
)

type challenge struct {
// round is when the challenge occurred. 0 means this is not a challenge.
round basics.Round
// accounts that match the first `bits` of `seed` must propose or heartbeat to stay online
seed committee.Seed
bits int
}

type headerSource interface {
BlockHdr(round basics.Round) (bookkeeping.BlockHeader, error)
}

// FindChallenge returns the Challenge that was last issued if it's in the period requested.
func FindChallenge(rules config.ProposerPayoutRules, current basics.Round, headers headerSource, period ChallengePeriod) challenge {
// are challenges active?
interval := basics.Round(rules.ChallengeInterval)
if rules.ChallengeInterval == 0 || current < interval {
return challenge{}
}
lastChallenge := current - (current % interval)
grace := basics.Round(rules.ChallengeGracePeriod)
// FindChallenge is structured this way, instead of returning the challenge
// and letting the caller determine the period it cares about, to avoid
// using BlockHdr unnecessarily.
switch period {
case ChRisky:
if current <= lastChallenge+grace/2 || current > lastChallenge+grace {
return challenge{}
}
case ChActive:
if current <= lastChallenge+grace || current > lastChallenge+2*grace {
return challenge{}
}
}
challengeHdr, err := headers.BlockHdr(lastChallenge)
if err != nil {
panic(err)

Check warning on line 75 in ledger/apply/challenge.go

View check run for this annotation

Codecov / codecov/patch

ledger/apply/challenge.go#L75

Added line #L75 was not covered by tests
}
challengeProto := config.Consensus[challengeHdr.CurrentProtocol]
// challenge is not considered if rules have changed since that round
if challengeProto.Payouts != rules {
return challenge{}
}
return challenge{lastChallenge, challengeHdr.Seed, rules.ChallengeBits}
}

// IsZero returns true if the challenge is empty (used to indicate no challenege)
func (ch challenge) IsZero() bool {
return ch == challenge{}

Check warning on line 87 in ledger/apply/challenge.go

View check run for this annotation

Codecov / codecov/patch

ledger/apply/challenge.go#L86-L87

Added lines #L86 - L87 were not covered by tests
}

// Failed returns true iff ch is in effect, matches address, and lastSeen is
// before the challenge issue.
func (ch challenge) Failed(address basics.Address, lastSeen basics.Round) bool {
return ch.round != 0 && bitsMatch(ch.seed[:], address[:], ch.bits) && lastSeen < ch.round
}

// bitsMatch checks if the first n bits of two byte slices match. Written to
// work on arbitrary slices, but we expect that n is small. Only user today
// calls with n=5.
func bitsMatch(a, b []byte, n int) bool {
// Ensure n is a valid number of bits to compare
if n < 0 || n > len(a)*8 || n > len(b)*8 {
return false
}

// Compare entire bytes when n is bigger than 8
for i := 0; i < n/8; i++ {
if a[i] != b[i] {
return false
}
}
remaining := n % 8
if remaining == 0 {
return true
}
return bits.LeadingZeros8(a[n/8]^b[n/8]) >= remaining
}
Loading

0 comments on commit 1f93f49

Please sign in to comment.