Skip to content

Commit

Permalink
Merge pull request #740 from onflow/implementMissingStreaming
Browse files Browse the repository at this point in the history
Implement missing streaming methods
  • Loading branch information
franklywatson authored Sep 3, 2024
2 parents b340328 + aa2ecc7 commit 5e1792d
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 16 deletions.
15 changes: 15 additions & 0 deletions emulator/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1870,3 +1870,18 @@ func (b *Blockchain) executeSystemChunkTransaction() error {

return nil
}

func (b *Blockchain) GetRegisterValues(registerIDs flowgo.RegisterIDs, height uint64) (values []flowgo.RegisterValue, err error) {
ledger, err := b.storage.LedgerByHeight(context.Background(), height)
if err != nil {
return nil, err
}
for _, registerID := range registerIDs {
value, err := ledger.Get(registerID)
if err != nil {
return nil, err
}
values = append(values, value)
}
return values, nil
}
93 changes: 77 additions & 16 deletions server/access/streamBackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,50 +68,87 @@ func NewStateStreamBackend(blockchain *emulator.Blockchain, log zerolog.Logger)

var _ state_stream.API = &StateStreamBackend{}

func (b *StateStreamBackend) newSubscriptionByBlockId(
ctx context.Context,
startBlockID flow.Identifier,
f subscription.GetDataByHeightFunc,
) subscription.Subscription {
block, err := b.blockchain.GetBlockByID(startBlockID)
if err != nil {
return subscription.NewFailedSubscription(err, "could not get block by ID")
}
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, block.Header.Height, f)
go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)
return sub
}

func (b *StateStreamBackend) newSubscriptionByHeight(
ctx context.Context,
startHeight uint64,
f subscription.GetDataByHeightFunc,
) subscription.Subscription {
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, startHeight, f)
go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)
return sub
}

func (b *StateStreamBackend) newSubscriptionByLatestHeight(
ctx context.Context,
f subscription.GetDataByHeightFunc,
) subscription.Subscription {
block, err := b.blockchain.GetLatestBlock()
if err != nil {
return subscription.NewFailedSubscription(err, "could not get latest block")
}
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, block.Header.Height, f)
go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)
return sub
}

func (b *StateStreamBackend) SubscribeEventsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter state_stream.EventFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByBlockId(ctx, startBlockID, b.getEventsResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeEventsFromStartHeight(
ctx context.Context,
startHeight uint64,
filter state_stream.EventFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByHeight(ctx, startHeight, b.getEventsResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeEventsFromLatest(
ctx context.Context,
filter state_stream.EventFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByLatestHeight(ctx, b.getEventsResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter state_stream.AccountStatusFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByBlockId(ctx, startBlockID, b.getAccountStatusResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startHeight uint64,
filter state_stream.AccountStatusFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByHeight(ctx, startHeight, b.getAccountStatusResponseFactory(filter))
}

func (b *StateStreamBackend) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter state_stream.AccountStatusFilter,
) subscription.Subscription {
return nil
return b.newSubscriptionByLatestHeight(ctx, b.getAccountStatusResponseFactory(filter))
}

func getStartHeightFunc(blockchain *emulator.Blockchain) GetStartHeightFunc {
Expand Down Expand Up @@ -261,26 +298,26 @@ func (b *StateStreamBackend) SubscribeExecutionData(ctx context.Context, startBl
return subscription.NewFailedSubscription(err, "could not get start height")
}

sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponse)
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getExecutionDataResponse)

go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)

return sub
}

func (b *StateStreamBackend) SubscribeExecutionDataFromStartBlockID(ctx context.Context, startBlockID flow.Identifier) subscription.Subscription {
return nil
return b.newSubscriptionByBlockId(ctx, startBlockID, b.getExecutionDataResponse)
}

func (b *StateStreamBackend) SubscribeExecutionDataFromStartBlockHeight(ctx context.Context, startBlockHeight uint64) subscription.Subscription {
return nil
return b.newSubscriptionByHeight(ctx, startBlockHeight, b.getExecutionDataResponse)
}

func (b *StateStreamBackend) SubscribeExecutionDataFromLatest(ctx context.Context) subscription.Subscription {
return nil
return b.newSubscriptionByLatestHeight(ctx, b.getExecutionDataResponse)
}

func (b *StateStreamBackend) getResponse(ctx context.Context, height uint64) (interface{}, error) {
func (b *StateStreamBackend) getExecutionDataResponse(ctx context.Context, height uint64) (interface{}, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
Expand All @@ -296,20 +333,20 @@ type GetExecutionDataFunc func(context.Context, uint64) (*execution_data.BlockEx

type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)

func (b StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription {
func (b *StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription {
nextHeight, err := b.getStartHeight(startBlockID, startHeight)
if err != nil {
return subscription.NewFailedSubscription(err, "could not get start height")
}

sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponseFactory(filter))
sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getEventsResponseFactory(filter))

go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx)

return sub
}

func (b StateStreamBackend) getResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc {
func (b *StateStreamBackend) getEventsResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (interface{}, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
Expand All @@ -334,6 +371,30 @@ func (b StateStreamBackend) getResponseFactory(filter state_stream.EventFilter)
}
}

func (b StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) {
return nil, status.Errorf(codes.Unimplemented, "not implemented")
func (b *StateStreamBackend) getAccountStatusResponseFactory(
filter state_stream.AccountStatusFilter,
) subscription.GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (interface{}, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
}

events := []flow.Event{}
for _, chunkExecutionData := range executionData.ChunkExecutionDatas {
events = append(events, filter.Filter(chunkExecutionData.Events)...)
}

allAccountProtocolEvents := filter.GroupCoreEventsByAccountAddress(events, b.log)

return &backend.AccountStatusesResponse{
BlockID: executionData.BlockID,
Height: height,
AccountEvents: allAccountProtocolEvents,
}, nil
}
}

func (b *StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) {
return b.blockchain.GetRegisterValues(registerIDs, height)
}

0 comments on commit 5e1792d

Please sign in to comment.