From b688903971f65687296f6ec982fa967b1a703ff4 Mon Sep 17 00:00:00 2001
From: Antonio Pitasi <antonio@pitasi.dev>
Date: Wed, 11 Dec 2024 16:48:00 +0100
Subject: [PATCH] feat(x/async): implement PreBlocker

PreBlocker takes the tx #1 of the block, unmarshal the AsyncInjectedTx
and executes it.

The AsyncInjectedTx includes results and votes for futures, coming from
different validators. The PreBlocker updates the local application state
accordingly.
---
 warden/x/async/keeper/abci.go   | 59 +++++++++++++++++++++++++++++++++
 warden/x/async/keeper/keeper.go | 21 ++++++++++++
 2 files changed, 80 insertions(+)

diff --git a/warden/x/async/keeper/abci.go b/warden/x/async/keeper/abci.go
index 893ef757f..3fe762752 100644
--- a/warden/x/async/keeper/abci.go
+++ b/warden/x/async/keeper/abci.go
@@ -2,11 +2,13 @@ package keeper
 
 import (
 	"context"
+	"fmt"
 
 	cometabci "github.com/cometbft/cometbft/abci/types"
 	sdk "github.com/cosmos/cosmos-sdk/types"
 	"github.com/skip-mev/slinky/abci/ve"
 	"github.com/warden-protocol/wardenprotocol/prophet"
+	"github.com/warden-protocol/wardenprotocol/warden/app/vemanager"
 	types "github.com/warden-protocol/wardenprotocol/warden/x/async/types/v1beta1"
 )
 
@@ -140,10 +142,67 @@ func (k Keeper) ProcessProposalHandler() sdk.ProcessProposalHandler {
 func (k Keeper) PreBlocker() sdk.PreBlocker {
 	return func(ctx sdk.Context, req *cometabci.RequestFinalizeBlock) (*sdk.ResponsePreBlock, error) {
 		resp := &sdk.ResponsePreBlock{}
+		if !ve.VoteExtensionsEnabled(ctx) || len(req.Txs) < 2 {
+			return resp, nil
+		}
+
+		log := ctx.Logger().With("module", "x/async")
+		asyncTx := req.Txs[1]
+		if len(asyncTx) == 0 {
+			return resp, nil
+		}
+
+		var tx types.AsyncInjectedTx
+		if err := tx.Unmarshal(asyncTx); err != nil {
+			log.Error("failed to unmarshal async tx", "err", err, "tx", asyncTx)
+			// probably not an async tx?
+			// but slinky in this case rejects their proposal so maybe we
+			// should do the same?
+			return resp, nil
+		}
+
+		for _, v := range tx.ExtendedVotesInfo {
+			var w vemanager.VoteExtensions
+			if err := w.Unmarshal(v.VoteExtension); err != nil {
+				return resp, fmt.Errorf("failed to unmarshal vote extension wrapper: %w", err)
+			}
+
+			// todo: check VE signature, or maybe do it in the verify ve handler?
+
+			if len(w.Extensions) < 2 {
+				continue
+			}
+
+			var asyncve types.AsyncVoteExtension
+			if err := asyncve.Unmarshal(w.Extensions[1]); err != nil {
+				return resp, fmt.Errorf("failed to unmarshal x/async vote extension: %w", err)
+			}
+
+			if err := k.processVE(ctx, v.Validator.Address, asyncve); err != nil {
+				return resp, fmt.Errorf("failed to process vote extension: %w", err)
+			}
+		}
+
 		return resp, nil
 	}
 }
 
+func (k Keeper) processVE(ctx sdk.Context, fromAddr []byte, ve types.AsyncVoteExtension) error {
+	for _, r := range ve.Results {
+		if err := k.AddFutureResult(ctx, r.FutureId, fromAddr, r.Output); err != nil {
+			return fmt.Errorf("failed to add future result: %w", err)
+		}
+	}
+
+	for _, vote := range ve.Votes {
+		if err := k.SetFutureVote(ctx, vote.FutureId, fromAddr, vote.Vote); err != nil {
+			return fmt.Errorf("failed to set task vote: %w", err)
+		}
+	}
+
+	return nil
+}
+
 func (k Keeper) buildAsyncTx(votes []cometabci.ExtendedVoteInfo) ([]byte, error) {
 	tx := types.AsyncInjectedTx{
 		ExtendedVotesInfo: votes,
diff --git a/warden/x/async/keeper/keeper.go b/warden/x/async/keeper/keeper.go
index 7d8819f8e..38527cde9 100644
--- a/warden/x/async/keeper/keeper.go
+++ b/warden/x/async/keeper/keeper.go
@@ -46,6 +46,7 @@ func NewKeeper(
 	logger log.Logger,
 	authority string,
 	p *prophet.P,
+	//selfValAddr sdk.ConsAddress,
 ) Keeper {
 	if _, err := sdk.AccAddressFromBech32(authority); err != nil {
 		panic(fmt.Sprintf("invalid authority address: %s", authority))
@@ -90,6 +91,26 @@ func (k Keeper) Logger() log.Logger {
 	return k.logger.With("module", fmt.Sprintf("x/%s", types.ModuleName))
 }
 
+func (k Keeper) AddFutureResult(ctx context.Context, id uint64, submitter, output []byte) error {
+	if err := k.futures.SetResult(ctx, types.FutureResult{
+		Id:        id,
+		Output:    output,
+		Submitter: submitter,
+	}); err != nil {
+		return err
+	}
+
+	if err := k.SetFutureVote(ctx, id, submitter, types.FutureVoteType_VOTE_TYPE_VERIFIED); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (k Keeper) SetFutureVote(ctx context.Context, id uint64, voter []byte, vote types.FutureVoteType) error {
+	return k.votes.Set(ctx, collections.Join(id, voter), int32(vote))
+}
+
 func (k Keeper) GetFutureVotes(ctx context.Context, futureId uint64) ([]types.FutureVote, error) {
 	it, err := k.votes.Iterate(ctx, collections.NewPrefixedPairRange[uint64, []byte](futureId))
 	if err != nil {