Skip to content

Commit

Permalink
feat(x/async): implement PreBlocker
Browse files Browse the repository at this point in the history
PreBlocker takes the tx #1 of the block, unmarshal the AsyncInjectedTx
and executes it.

The AsyncInjectedTx includes results and votes for futures, coming from
different validators. The PreBlocker updates the local application state
accordingly.
  • Loading branch information
Pitasi committed Dec 19, 2024
1 parent 3a86453 commit f91e0d2
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 0 deletions.
59 changes: 59 additions & 0 deletions warden/x/async/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package keeper

import (
"context"
"fmt"

cometabci "github.com/cometbft/cometbft/abci/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/skip-mev/slinky/abci/ve"
"github.com/warden-protocol/wardenprotocol/prophet"
"github.com/warden-protocol/wardenprotocol/warden/app/vemanager"
types "github.com/warden-protocol/wardenprotocol/warden/x/async/types/v1beta1"
)

Expand Down Expand Up @@ -141,10 +143,67 @@ func (k Keeper) ProcessProposalHandler() sdk.ProcessProposalHandler {
func (k Keeper) PreBlocker() sdk.PreBlocker {
return func(ctx sdk.Context, req *cometabci.RequestFinalizeBlock) (*sdk.ResponsePreBlock, error) {
resp := &sdk.ResponsePreBlock{}
if !ve.VoteExtensionsEnabled(ctx) || len(req.Txs) < 2 {
return resp, nil
}

log := ctx.Logger().With("module", "x/async")
asyncTx := req.Txs[1]
if len(asyncTx) == 0 {
return resp, nil
}

var tx types.AsyncInjectedTx
if err := tx.Unmarshal(asyncTx); err != nil {
log.Error("failed to unmarshal async tx", "err", err, "tx", asyncTx)
// probably not an async tx?
// but slinky in this case rejects their proposal so maybe we
// should do the same?
return resp, nil
}

for _, v := range tx.ExtendedVotesInfo {
var w vemanager.VoteExtensions
if err := w.Unmarshal(v.VoteExtension); err != nil {
return resp, fmt.Errorf("failed to unmarshal vote extension wrapper: %w", err)
}

// todo: check VE signature, or maybe do it in the verify ve handler?

if len(w.Extensions) < 2 {
continue
}

var asyncve types.AsyncVoteExtension
if err := asyncve.Unmarshal(w.Extensions[1]); err != nil {
return resp, fmt.Errorf("failed to unmarshal x/async vote extension: %w", err)
}

if err := k.processVE(ctx, v.Validator.Address, asyncve); err != nil {
return resp, fmt.Errorf("failed to process vote extension: %w", err)
}
}

return resp, nil
}
}

func (k Keeper) processVE(ctx sdk.Context, fromAddr []byte, ve types.AsyncVoteExtension) error {
for _, r := range ve.Results {
if err := k.AddFutureResult(ctx, r.FutureId, fromAddr, r.Output); err != nil {
return fmt.Errorf("failed to add future result: %w", err)
}
}

for _, vote := range ve.Votes {
if err := k.SetFutureVote(ctx, vote.FutureId, fromAddr, vote.Vote); err != nil {
return fmt.Errorf("failed to set task vote: %w", err)
}
}

return nil
}

func (k Keeper) buildAsyncTx(votes []cometabci.ExtendedVoteInfo) ([]byte, error) {
tx := types.AsyncInjectedTx{
ExtendedVotesInfo: votes,
Expand Down
24 changes: 24 additions & 0 deletions warden/x/async/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewKeeper(
logger log.Logger,
authority string,
p *prophet.P,
//selfValAddr sdk.ConsAddress,
) Keeper {
if _, err := sdk.AccAddressFromBech32(authority); err != nil {
panic(fmt.Sprintf("invalid authority address: %s", authority))
Expand Down Expand Up @@ -90,6 +91,29 @@ func (k Keeper) Logger() log.Logger {
return k.logger.With("module", fmt.Sprintf("x/%s", types.ModuleName))
}

func (k Keeper) AddFutureResult(ctx context.Context, id uint64, submitter, output []byte) error {
if err := k.futures.SetResult(ctx, types.FutureResult{
Id: id,
Output: output,
Submitter: submitter,
}); err != nil {
return err
}

if err := k.SetFutureVote(ctx, id, submitter, types.FutureVoteType_VOTE_TYPE_VERIFIED); err != nil {
return err
}

return nil
}

func (k Keeper) SetFutureVote(ctx context.Context, id uint64, voter []byte, vote types.FutureVoteType) error {
if !vote.IsValid() {
return fmt.Errorf("invalid vote type: %v", vote)
}
return k.votes.Set(ctx, collections.Join(id, voter), int32(vote))
}

func (k Keeper) GetFutureVotes(ctx context.Context, futureId uint64) ([]types.FutureVote, error) {
it, err := k.votes.Iterate(ctx, collections.NewPrefixedPairRange[uint64, []byte](futureId))
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions warden/x/async/types/v1beta1/future.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package v1beta1

func (v FutureVoteType) IsValid() bool {
_, ok := FutureVoteType_name[int32(v)]
return ok
}

0 comments on commit f91e0d2

Please sign in to comment.