From f91e0d2d97d5ced87bde6ee41703df79b81f7c08 Mon Sep 17 00:00:00 2001 From: Antonio Pitasi Date: Wed, 11 Dec 2024 16:48:00 +0100 Subject: [PATCH] feat(x/async): implement PreBlocker PreBlocker takes the tx #1 of the block, unmarshal the AsyncInjectedTx and executes it. The AsyncInjectedTx includes results and votes for futures, coming from different validators. The PreBlocker updates the local application state accordingly. --- warden/x/async/keeper/abci.go | 59 ++++++++++++++++++++++++++ warden/x/async/keeper/keeper.go | 24 +++++++++++ warden/x/async/types/v1beta1/future.go | 6 +++ 3 files changed, 89 insertions(+) create mode 100644 warden/x/async/types/v1beta1/future.go diff --git a/warden/x/async/keeper/abci.go b/warden/x/async/keeper/abci.go index c4d293d8b..84a409cfb 100644 --- a/warden/x/async/keeper/abci.go +++ b/warden/x/async/keeper/abci.go @@ -2,11 +2,13 @@ package keeper import ( "context" + "fmt" cometabci "github.com/cometbft/cometbft/abci/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/skip-mev/slinky/abci/ve" "github.com/warden-protocol/wardenprotocol/prophet" + "github.com/warden-protocol/wardenprotocol/warden/app/vemanager" types "github.com/warden-protocol/wardenprotocol/warden/x/async/types/v1beta1" ) @@ -141,10 +143,67 @@ func (k Keeper) ProcessProposalHandler() sdk.ProcessProposalHandler { func (k Keeper) PreBlocker() sdk.PreBlocker { return func(ctx sdk.Context, req *cometabci.RequestFinalizeBlock) (*sdk.ResponsePreBlock, error) { resp := &sdk.ResponsePreBlock{} + if !ve.VoteExtensionsEnabled(ctx) || len(req.Txs) < 2 { + return resp, nil + } + + log := ctx.Logger().With("module", "x/async") + asyncTx := req.Txs[1] + if len(asyncTx) == 0 { + return resp, nil + } + + var tx types.AsyncInjectedTx + if err := tx.Unmarshal(asyncTx); err != nil { + log.Error("failed to unmarshal async tx", "err", err, "tx", asyncTx) + // probably not an async tx? + // but slinky in this case rejects their proposal so maybe we + // should do the same? + return resp, nil + } + + for _, v := range tx.ExtendedVotesInfo { + var w vemanager.VoteExtensions + if err := w.Unmarshal(v.VoteExtension); err != nil { + return resp, fmt.Errorf("failed to unmarshal vote extension wrapper: %w", err) + } + + // todo: check VE signature, or maybe do it in the verify ve handler? + + if len(w.Extensions) < 2 { + continue + } + + var asyncve types.AsyncVoteExtension + if err := asyncve.Unmarshal(w.Extensions[1]); err != nil { + return resp, fmt.Errorf("failed to unmarshal x/async vote extension: %w", err) + } + + if err := k.processVE(ctx, v.Validator.Address, asyncve); err != nil { + return resp, fmt.Errorf("failed to process vote extension: %w", err) + } + } + return resp, nil } } +func (k Keeper) processVE(ctx sdk.Context, fromAddr []byte, ve types.AsyncVoteExtension) error { + for _, r := range ve.Results { + if err := k.AddFutureResult(ctx, r.FutureId, fromAddr, r.Output); err != nil { + return fmt.Errorf("failed to add future result: %w", err) + } + } + + for _, vote := range ve.Votes { + if err := k.SetFutureVote(ctx, vote.FutureId, fromAddr, vote.Vote); err != nil { + return fmt.Errorf("failed to set task vote: %w", err) + } + } + + return nil +} + func (k Keeper) buildAsyncTx(votes []cometabci.ExtendedVoteInfo) ([]byte, error) { tx := types.AsyncInjectedTx{ ExtendedVotesInfo: votes, diff --git a/warden/x/async/keeper/keeper.go b/warden/x/async/keeper/keeper.go index 7d8819f8e..ea04c6d37 100644 --- a/warden/x/async/keeper/keeper.go +++ b/warden/x/async/keeper/keeper.go @@ -46,6 +46,7 @@ func NewKeeper( logger log.Logger, authority string, p *prophet.P, + //selfValAddr sdk.ConsAddress, ) Keeper { if _, err := sdk.AccAddressFromBech32(authority); err != nil { panic(fmt.Sprintf("invalid authority address: %s", authority)) @@ -90,6 +91,29 @@ func (k Keeper) Logger() log.Logger { return k.logger.With("module", fmt.Sprintf("x/%s", types.ModuleName)) } +func (k Keeper) AddFutureResult(ctx context.Context, id uint64, submitter, output []byte) error { + if err := k.futures.SetResult(ctx, types.FutureResult{ + Id: id, + Output: output, + Submitter: submitter, + }); err != nil { + return err + } + + if err := k.SetFutureVote(ctx, id, submitter, types.FutureVoteType_VOTE_TYPE_VERIFIED); err != nil { + return err + } + + return nil +} + +func (k Keeper) SetFutureVote(ctx context.Context, id uint64, voter []byte, vote types.FutureVoteType) error { + if !vote.IsValid() { + return fmt.Errorf("invalid vote type: %v", vote) + } + return k.votes.Set(ctx, collections.Join(id, voter), int32(vote)) +} + func (k Keeper) GetFutureVotes(ctx context.Context, futureId uint64) ([]types.FutureVote, error) { it, err := k.votes.Iterate(ctx, collections.NewPrefixedPairRange[uint64, []byte](futureId)) if err != nil { diff --git a/warden/x/async/types/v1beta1/future.go b/warden/x/async/types/v1beta1/future.go new file mode 100644 index 000000000..abd8be0d1 --- /dev/null +++ b/warden/x/async/types/v1beta1/future.go @@ -0,0 +1,6 @@ +package v1beta1 + +func (v FutureVoteType) IsValid() bool { + _, ok := FutureVoteType_name[int32(v)] + return ok +}