Skip to content

Commit

Permalink
Merge pull request #5792 from onflow/gregor/evm/stable-cadence-updated
Browse files Browse the repository at this point in the history
Sync master with stable cadence
  • Loading branch information
sideninja authored Apr 30, 2024
2 parents 74b0c3b + b465be2 commit 26cb8ee
Show file tree
Hide file tree
Showing 68 changed files with 3,026 additions and 1,992 deletions.
15 changes: 12 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,14 @@ jobs:
#env:
# RACE_DETECTOR: 1
- name: Upload coverage report
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
timeout-minutes: 1
continue-on-error: true
with:
file: ./coverage.txt
flags: unittests
name: codecov-umbrella
token: ${{ secrets.CODECOV_TOKEN }}

unit-test-insecure:
name: Unit Tests Insecure (${{ matrix.targets.name }})
Expand Down Expand Up @@ -193,11 +196,14 @@ jobs:
#env:
# RACE_DETECTOR: 1
- name: Upload coverage report
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
timeout-minutes: 1
continue-on-error: true
with:
file: ./coverage.txt
flags: unittests
name: codecov-umbrella
token: ${{ secrets.CODECOV_TOKEN }}

docker-build:
name: Docker Build
Expand Down Expand Up @@ -270,11 +276,14 @@ jobs:
#env:
# RACE_DETECTOR: 1
- name: Upload coverage report
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
timeout-minutes: 1
continue-on-error: true
with:
file: ./coverage.txt
flags: unittests
name: codecov-umbrella
token: ${{ secrets.CODECOV_TOKEN }}

integration-test:
name: Integration Tests
Expand Down
15 changes: 11 additions & 4 deletions engine/access/rpc/backend/backend_stream_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"errors"
"fmt"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -283,7 +284,7 @@ func (b *backendSubscribeBlocks) getBlockDigestResponse(blockStatus flow.BlockSt

// getBlockHeader returns the block header for the given block height.
// Expected errors during normal operation:
// - storage.ErrNotFound: block for the given block height is not available.
// - subscription.ErrBlockNotReady: block for the given block height is not available.
func (b *backendSubscribeBlocks) getBlockHeader(height uint64, expectedBlockStatus flow.BlockStatus) (*flow.Header, error) {
err := b.validateHeight(height, expectedBlockStatus)
if err != nil {
Expand All @@ -293,6 +294,9 @@ func (b *backendSubscribeBlocks) getBlockHeader(height uint64, expectedBlockStat
// since we are querying a finalized or sealed block header, we can use the height index and save an ID computation
header, err := b.headers.ByHeight(height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, fmt.Errorf("failed to retrieve block header for height %d: %w", height, subscription.ErrBlockNotReady)
}
return nil, err
}

Expand All @@ -301,7 +305,7 @@ func (b *backendSubscribeBlocks) getBlockHeader(height uint64, expectedBlockStat

// getBlock returns the block for the given block height.
// Expected errors during normal operation:
// - storage.ErrNotFound: block for the given block height is not available.
// - subscription.ErrBlockNotReady: block for the given block height is not available.
func (b *backendSubscribeBlocks) getBlock(height uint64, expectedBlockStatus flow.BlockStatus) (*flow.Block, error) {
err := b.validateHeight(height, expectedBlockStatus)
if err != nil {
Expand All @@ -311,6 +315,9 @@ func (b *backendSubscribeBlocks) getBlock(height uint64, expectedBlockStatus flo
// since we are querying a finalized or sealed block, we can use the height index and save an ID computation
block, err := b.blocks.ByHeight(height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, fmt.Errorf("failed to retrieve block for height %d: %w", height, subscription.ErrBlockNotReady)
}
return nil, err
}

Expand All @@ -319,7 +326,7 @@ func (b *backendSubscribeBlocks) getBlock(height uint64, expectedBlockStatus flo

// validateHeight checks if the given block height is valid and available based on the expected block status.
// Expected errors during normal operation:
// - storage.ErrNotFound: block for the given block height is not available.
// - subscription.ErrBlockNotReady when unable to retrieve the block by height.
func (b *backendSubscribeBlocks) validateHeight(height uint64, expectedBlockStatus flow.BlockStatus) error {
highestHeight, err := b.blockTracker.GetHighestHeight(expectedBlockStatus)
if err != nil {
Expand All @@ -330,7 +337,7 @@ func (b *backendSubscribeBlocks) validateHeight(height uint64, expectedBlockStat
// note: it's possible for the data to exist in the data store before the notification is
// received. this ensures a consistent view is available to all streams.
if height > highestHeight {
return fmt.Errorf("block %d is not available yet: %w", height, storage.ErrNotFound)
return fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/backend_stream_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran
// When a block with the transaction is available, it is possible to receive a new transaction status while
// searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction
// statuses are the same, the current transaction status should be retrieved.
txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.BlockID, txInfo.blockWithTx.Height, txInfo.txExecuted)
txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockWithTx.Height, txInfo.txExecuted)
}
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
Expand Down
12 changes: 6 additions & 6 deletions engine/access/rpc/backend/backend_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (b *backendTransactions) GetTransactionResult(
if block == nil {
txStatus, err = b.DeriveUnknownTransactionStatus(tx.ReferenceBlockID)
} else {
txStatus, err = b.DeriveTransactionStatus(blockID, blockHeight, false)
txStatus, err = b.DeriveTransactionStatus(blockHeight, false)
}

if err != nil {
Expand Down Expand Up @@ -439,7 +439,7 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromExecutionNode(
txResult := resp.TransactionResults[i]

// tx body is irrelevant to status if it's in an executed block
txStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true)
txStatus, err := b.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
Expand Down Expand Up @@ -488,7 +488,7 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromExecutionNode(
}

systemTxResult := resp.TransactionResults[len(resp.TransactionResults)-1]
systemTxStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true)
systemTxStatus, err := b.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
Expand Down Expand Up @@ -573,7 +573,7 @@ func (b *backendTransactions) getTransactionResultByIndexFromExecutionNode(
}

// tx body is irrelevant to status if it's in an executed block
txStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true)
txStatus, err := b.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
Expand Down Expand Up @@ -626,7 +626,7 @@ func (b *backendTransactions) GetSystemTransactionResult(ctx context.Context, bl
}

systemTxResult := resp.TransactionResults[len(resp.TransactionResults)-1]
systemTxStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true)
systemTxStatus, err := b.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
return nil, rpc.ConvertStorageError(err)
}
Expand Down Expand Up @@ -793,7 +793,7 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode(
}

// tx body is irrelevant to status if it's in an executed block
txStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true)
txStatus, err := b.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (r *Retry) retryTxsAtHeight(heightToRetry uint64) error {
if block == nil {
status, err = r.backend.DeriveUnknownTransactionStatus(tx.ReferenceBlockID)
} else {
status, err = r.backend.DeriveTransactionStatus(block.ID(), block.Header.Height, false)
status, err = r.backend.DeriveTransactionStatus(block.Header.Height, false)
}

if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions engine/access/rpc/backend/transactions_local_data_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultFromStorage(
txStatusCode = 1 // statusCode of 1 indicates an error and 0 indicates no error, the same as on EN
}

txStatus, err := t.DeriveTransactionStatus(blockID, block.Header.Height, true)
txStatus, err := t.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
Expand Down Expand Up @@ -178,7 +178,7 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultsByBlockIDFromStorag
txStatusCode = 1
}

txStatus, err := t.DeriveTransactionStatus(blockID, block.Header.Height, true)
txStatus, err := t.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
Expand Down Expand Up @@ -255,7 +255,7 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultByIndexFromStorage(
txStatusCode = 1 // statusCode of 1 indicates an error and 0 indicates no error, the same as on EN
}

txStatus, err := t.DeriveTransactionStatus(blockID, block.Header.Height, true)
txStatus, err := t.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
Expand Down Expand Up @@ -338,9 +338,9 @@ func (t *TransactionsLocalDataProvider) DeriveUnknownTransactionStatus(refBlockI
return flow.TransactionStatusPending, nil
}

// DeriveTransactionStatus is used to determine the status of a transaction based on the provided block ID, block height, and execution status.
// DeriveTransactionStatus is used to determine the status of a transaction based on the provided block height, and execution status.
// No errors expected during normal operations.
func (t *TransactionsLocalDataProvider) DeriveTransactionStatus(blockID flow.Identifier, blockHeight uint64, executed bool) (flow.TransactionStatus, error) {
func (t *TransactionsLocalDataProvider) DeriveTransactionStatus(blockHeight uint64, executed bool) (flow.TransactionStatus, error) {
if !executed {
// If we've gotten here, but the block has not yet been executed, report it as only been finalized
return flow.TransactionStatusFinalized, nil
Expand Down
11 changes: 8 additions & 3 deletions engine/access/state_stream/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -12,7 +13,6 @@ import (
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
Expand Down Expand Up @@ -144,18 +144,23 @@ func New(

// getExecutionData returns the execution data for the given block height.
// Expected errors during normal operation:
// - storage.ErrNotFound or execution_data.BlobNotFoundError: execution data for the given block height is not available.
// - subscription.ErrBlockNotReady: execution data for the given block height is not available.
func (b *StateStreamBackend) getExecutionData(ctx context.Context, height uint64) (*execution_data.BlockExecutionDataEntity, error) {
highestHeight := b.ExecutionDataTracker.GetHighestHeight()
// fail early if no notification has been received for the given block height.
// note: it's possible for the data to exist in the data store before the notification is
// received. this ensures a consistent view is available to all streams.
if height > highestHeight {
return nil, fmt.Errorf("execution data for block %d is not available yet: %w", height, storage.ErrNotFound)
return nil, fmt.Errorf("execution data for block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
}

execData, err := b.execDataCache.ByHeight(ctx, height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) ||
execution_data.IsBlobNotFoundError(err) {
err = errors.Join(err, subscription.ErrBlockNotReady)
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
}
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
}

Expand Down
13 changes: 12 additions & 1 deletion engine/access/state_stream/backend/backend_account_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package backend

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

type AccountStatusesResponse struct {
Expand Down Expand Up @@ -54,7 +57,7 @@ func (b *AccountStatusesBackend) SubscribeAccountStatusesFromStartBlockID(
// SubscribeAccountStatusesFromStartHeight subscribes to the streaming of account status changes starting from
// a specific block height, with an optional status filter.
// Errors:
// - codes.ErrNotFound if could not get block by start height.
// - codes.ErrNotFound if could not get block by start height.
// - codes.Internal if there is an internal error.
func (b *AccountStatusesBackend) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
Expand Down Expand Up @@ -84,12 +87,20 @@ func (b *AccountStatusesBackend) SubscribeAccountStatusesFromLatestBlock(
}

// getAccountStatusResponseFactory returns a function that returns the account statuses response for a given height.
//
// Errors:
// - subscription.ErrBlockNotReady: If block header for the specified block height is not found.
// - error: An error, if any, encountered during getting events from storage or execution data.
func (b *AccountStatusesBackend) getAccountStatusResponseFactory(
filter state_stream.AccountStatusFilter,
) subscription.GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (interface{}, error) {
eventsResponse, err := b.eventsRetriever.GetAllEventsResponse(ctx, height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) ||
errors.Is(err, storage.ErrHeightNotIndexed) {
return nil, fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
}
return nil, err
}
filteredProtocolEvents := filter.Filter(eventsResponse.Events)
Expand Down
11 changes: 9 additions & 2 deletions engine/access/state_stream/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package backend

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

type EventsBackend struct {
Expand Down Expand Up @@ -126,12 +129,16 @@ func (b *EventsBackend) SubscribeEventsFromLatest(ctx context.Context, filter st
// - filter: The event filter used to filter events.
//
// Expected errors during normal operation:
// - codes.NotFound: If block header for the specified block height is not found, if events for the specified block height are not found.
// - subscription.ErrBlockNotReady: execution data for the given block height is not available.
func (b *EventsBackend) getResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (response interface{}, err error) {
eventsResponse, err := b.eventsRetriever.GetAllEventsResponse(ctx, height)
if err != nil {
return nil, err
if errors.Is(err, storage.ErrNotFound) ||
errors.Is(err, storage.ErrHeightNotIndexed) {
return nil, subscription.ErrBlockNotReady
}
return nil, fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
}

eventsResponse.Events = filter.Filter(eventsResponse.Events)
Expand Down
7 changes: 1 addition & 6 deletions engine/access/subscription/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"golang.org/x/time/rate"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/storage"
)

// ErrBlockNotReady represents an error indicating that a block is not yet available or ready.
Expand Down Expand Up @@ -105,10 +103,7 @@ func (s *Streamer) sendAllAvailable(ctx context.Context) error {
}

if err != nil {
if errors.Is(err, storage.ErrNotFound) ||
errors.Is(err, storage.ErrHeightNotIndexed) ||
execution_data.IsBlobNotFoundError(err) ||
errors.Is(err, ErrBlockNotReady) {
if errors.Is(err, ErrBlockNotReady) {
// no more available
return nil
}
Expand Down
Loading

0 comments on commit 26cb8ee

Please sign in to comment.