Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Td/test/grpc oe #2237

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/protocol-build-and-push-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on: # yamllint disable-line rule:truthy
- main
- 'release/protocol/v[0-9]+.[0-9]+.x' # e.g. release/protocol/v0.1.x
- 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x
- 'td/*'

jobs:
build-and-push-snapshot-dev:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/protocol-build-and-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on: # yamllint disable-line rule:truthy
- main
- 'release/protocol/v[0-9]+.[0-9]+.x' # e.g. release/protocol/v0.1.x
- 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x
- 'td/*'

jobs:
build-and-push-dev:
Expand Down
8 changes: 8 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ message StreamUpdate {
}
}

message StagedFinalizeBlockEvent {
// Contains one of StreamOrderbookFill, StreamSubaccountUpdate.
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
}
}

// StreamOrderbookUpdate provides information on an orderbook update. Used in
// the full node GRPC stream.
message StreamOrderbookUpdate {
Expand Down
4 changes: 4 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ func New(
statsmoduletypes.TransientStoreKey,
rewardsmoduletypes.TransientStoreKey,
indexer_manager.TransientStoreKey,
streaming.StreamingManagerTransientStoreKey,
perpetualsmoduletypes.TransientStoreKey,
)
memKeys := storetypes.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, clobmoduletypes.MemStoreKey)
Expand Down Expand Up @@ -764,6 +765,7 @@ func New(
appFlags,
appCodec,
logger,
tkeys[streaming.StreamingManagerTransientStoreKey],
)

timeProvider := &timelib.TimeProviderImpl{}
Expand Down Expand Up @@ -2058,6 +2060,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags flags.Flags,
cdc codec.Codec,
logger log.Logger,
streamingManagerTransientStoreKey storetypes.StoreKey,
) (manager streamingtypes.FullNodeStreamingManager, wsServer *ws.WebsocketServer) {
logger = logger.With(log.ModuleKey, "full-node-streaming")
if appFlags.GrpcStreamingEnabled {
Expand All @@ -2071,6 +2074,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags.GrpcStreamingMaxBatchSize,
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
streamingManagerTransientStoreKey,
)

// Start websocket server.
Expand Down
1 change: 1 addition & 0 deletions protocol/lib/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ const (
UpdateType = "update_type"
ValidateMatches = "validate_matches"
ValidateOrder = "validate_order"
StreamStagedEventsAfterFinalizeBlock = "stream_staged_events_after_finalize_block"

// MemCLOB.
AddedToOrderBook = "added_to_orderbook"
Expand Down
3 changes: 3 additions & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ const (
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
GrpcStagedAllFinalizeBlockUpdates = "grpc_staged_all_finalize_block_updates"
GrpcStagedFillFinalizeBlockUpdates = "grpc_staged_finalize_block_fill_updates"
GrpcStagedSubaccountFinalizeBlockUpdates = "grpc_staged_finalize_block_subaccount_updates"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
12 changes: 12 additions & 0 deletions protocol/streaming/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package streaming

const (
// Transient store key for storing staged events.
StreamingManagerTransientStoreKey = "tmp_streaming"

// Key for storing the count of staged events.
StagedEventsCountKey = "EvtCnt"

// Key prefix for staged events.
StagedEventsKeyPrefix = "Evt:"
)
86 changes: 86 additions & 0 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package streaming

import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/dydxprotocol/v4-chain/protocol/lib"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
Expand Down Expand Up @@ -50,6 +55,9 @@ type FullNodeStreamingManagerImpl struct {
// Block interval in which snapshot info should be sent out in.
// Defaults to 0, which means only one snapshot will be sent out.
snapshotBlockInterval uint32

// stores the staged FinalizeBlock events for full node streaming.
streamingManagerTransientStoreKey storetypes.StoreKey
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
Expand Down Expand Up @@ -86,6 +94,7 @@ func NewFullNodeStreamingManager(
maxUpdatesInCache uint32,
maxSubscriptionChannelSize uint32,
snapshotBlockInterval uint32,
streamingManagerTransientStoreKey storetypes.StoreKey,
) *FullNodeStreamingManagerImpl {
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
Expand All @@ -102,6 +111,8 @@ func NewFullNodeStreamingManager(
maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
snapshotBlockInterval: snapshotBlockInterval,

streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -367,6 +378,81 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
}
}

func getStagedEventsCount(store storetypes.KVStore) uint32 {
countsBytes := store.Get([]byte(StagedEventsCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
}

func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
}
sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
)
}

func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &fill,
},
}
sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
)
}

func getStagedFinalizeBlockEvents(store storetypes.KVStore) []clobtypes.StagedFinalizeBlockEvent {
count := getStagedEventsCount(store)
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
for i := uint32(0); i < count; i++ {
var event clobtypes.StagedFinalizeBlockEvent
bytes := store.Get(lib.Uint32ToKey(i))
clobtypes.Amino.MustUnmarshal(bytes, &event)
events[i] = event
}
return events
}

func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
return getStagedFinalizeBlockEvents(store)
}

func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
ctx sdk.Context,
eventBytes []byte,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)

// Increment events count.
count := getStagedEventsCount(store)
store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
store.Set(lib.Uint32ToKey(count), eventBytes)
}

// SendCombinedSnapshot sends messages to a particular subscriber without buffering.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
Expand Down
18 changes: 18 additions & 0 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,21 @@ func (sm *NoopGrpcStreamingManager) InitializeNewStreams(

func (sm *NoopGrpcStreamingManager) Stop() {
}

func (sm *NoopGrpcStreamingManager) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
}

func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
return nil
}

func (sm *NoopGrpcStreamingManager) StageFinalizeBlockSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
}
11 changes: 11 additions & 0 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ type FullNodeStreamingManager interface {
blockHeight uint32,
execMode sdk.ExecMode,
)
StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
)
StageFinalizeBlockSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
)
GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent
TracksSubaccountId(id satypes.SubaccountId) bool
}

Expand Down
11 changes: 11 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ func BeginBlocker(
keeper.ResetAllDeliveredOrderIds(ctx)
}

// Precommit executes all ABCI Precommit logic respective to the clob module.
func Precommit(
ctx sdk.Context,
keeper keeper.Keeper,
) {
if streamingManager := keeper.GetFullNodeStreamingManager(); !streamingManager.Enabled() {
return
}
keeper.StreamStagedEventsAfterFinalizeBlock(ctx)
}

// EndBlocker executes all ABCI EndBlock logic respective to the clob module.
func EndBlocker(
ctx sdk.Context,
Expand Down
72 changes: 72 additions & 0 deletions protocol/x/clob/keeper/grpc_stream_finalize_block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package keeper

import (
"time"

"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
)

// StreamStagedEventsAfterFinalizeBlock streams all events emitted during `FinalizeBlock`
// (PreBlocker + EndBlocker + DeliverTx + EndBlocker).
// It should be called after the consensus agrees on a block (e.g. Precommitter).
func (k Keeper) StreamStagedEventsAfterFinalizeBlock(
ctx sdk.Context,
) {
defer telemetry.MeasureSince(
time.Now(),
types.ModuleName,
metrics.StreamStagedEventsAfterFinalizeBlock,
metrics.Latency,
)

// Get onchain stream events stored in transient store.
stagedEvents := k.GetFullNodeStreamingManager().GetStagedFinalizeBlockEvents(ctx)

telemetry.SetGauge(
float32(len(stagedEvents)),
types.ModuleName,
metrics.GrpcStagedAllFinalizeBlockUpdates,
metrics.Count,
)

finalizedFillUpdates := []types.StreamOrderbookFill{}
finalizedSubaccountUpdates := []satypes.StreamSubaccountUpdate{}

for _, stagedEvent := range stagedEvents {
switch event := stagedEvent.Event.(type) {
case *types.StagedFinalizeBlockEvent_OrderFill:
finalizedFillUpdates = append(finalizedFillUpdates, *event.OrderFill)
case *types.StagedFinalizeBlockEvent_SubaccountUpdate:
finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate)
}
}

k.SendOrderbookFillUpdates(
ctx,
finalizedFillUpdates,
)

telemetry.SetGauge(
float32(len(finalizedFillUpdates)),
types.ModuleName,
metrics.GrpcStagedFillFinalizeBlockUpdates,
metrics.Count,
)

k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates(
finalizedSubaccountUpdates,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

telemetry.SetGauge(
float32(len(finalizedSubaccountUpdates)),
types.ModuleName,
metrics.GrpcStagedSubaccountFinalizeBlockUpdates,
metrics.Count,
)
}
Loading
Loading