Skip to content

Commit fca8805

Browse files
committed
feat(x/async): EndBlocker schedules pending Future
Implement an EndBlocker that takes all the Futures that do not have a result yet, and schedule them to the prophet's process for asynchronous execution.
1 parent c209ff6 commit fca8805

File tree

3 files changed

+59
-0
lines changed

3 files changed

+59
-0
lines changed

warden/x/async/keeper/abci.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,32 @@ import (
55

66
cometabci "github.com/cometbft/cometbft/abci/types"
77
sdk "github.com/cosmos/cosmos-sdk/types"
8+
"github.com/warden-protocol/wardenprotocol/prophet"
89
)
910

1011
func (k Keeper) BeginBlocker(ctx context.Context) error {
1112
return nil
1213
}
1314

15+
// EndBlocker schedules some pending futures to the Prophet's process.
16+
//
17+
// Note: if a future remains pending for more blocks, it could be re-added to
18+
// Prophet even if it's already in the Prophet's queue or it's being processed.
19+
// This is not a problem as Prophet filters out incoming duplicate futures.
1420
func (k Keeper) EndBlocker(ctx context.Context) error {
21+
futures, err := k.futures.PendingFutures(ctx, 10)
22+
if err != nil {
23+
return err
24+
}
25+
26+
for _, f := range futures {
27+
k.p.AddFuture(prophet.Future{
28+
ID: f.Id,
29+
Handler: f.Handler,
30+
Input: f.Input,
31+
})
32+
}
33+
1534
return nil
1635
}
1736

warden/x/async/keeper/futures.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type FutureKeeper struct {
1616
futures repo.SeqCollection[types.Future]
1717
futureByCreator collections.KeySet[collections.Pair[sdk.AccAddress, uint64]]
1818
results collections.Map[uint64, types.FutureResult]
19+
pendingFutures collections.KeySet[uint64]
1920
}
2021

2122
func NewFutureKeeper(sb *collections.SchemaBuilder, cdc codec.Codec) *FutureKeeper {
@@ -27,10 +28,13 @@ func NewFutureKeeper(sb *collections.SchemaBuilder, cdc codec.Codec) *FutureKeep
2728

2829
results := collections.NewMap(sb, ResultsPrefix, "future_results", collections.Uint64Key, codec.CollValue[types.FutureResult](cdc))
2930

31+
pendingFutures := collections.NewKeySet(sb, PendingFuturesPrefix, "pending_futures", collections.Uint64Key)
32+
3033
return &FutureKeeper{
3134
futures: futures,
3235
futureByCreator: futureByCreator,
3336
results: results,
37+
pendingFutures: pendingFutures,
3438
}
3539
}
3640

@@ -49,6 +53,10 @@ func (k *FutureKeeper) Append(ctx context.Context, t *types.Future) (uint64, err
4953
return 0, err
5054
}
5155

56+
if err := k.pendingFutures.Set(ctx, id); err != nil {
57+
return 0, err
58+
}
59+
5260
return id, nil
5361
}
5462

@@ -61,6 +69,9 @@ func (k *FutureKeeper) Set(ctx context.Context, f types.Future) error {
6169
}
6270

6371
func (k *FutureKeeper) SetResult(ctx context.Context, result types.FutureResult) error {
72+
if err := k.pendingFutures.Remove(ctx, result.Id); err != nil {
73+
return err
74+
}
6475
return k.results.Set(ctx, result.Id, result)
6576
}
6677

@@ -75,3 +86,31 @@ func (k *FutureKeeper) HasResult(ctx context.Context, id uint64) (bool, error) {
7586
func (k *FutureKeeper) Futures() repo.SeqCollection[types.Future] {
7687
return k.futures
7788
}
89+
90+
func (k *FutureKeeper) PendingFutures(ctx context.Context, limit int) ([]types.Future, error) {
91+
it, err := k.pendingFutures.IterateRaw(ctx, nil, nil, collections.OrderAscending)
92+
if err != nil {
93+
return nil, err
94+
}
95+
defer it.Close()
96+
97+
futures := make([]types.Future, 0, limit)
98+
for ; it.Valid(); it.Next() {
99+
id, err := it.Key()
100+
if err != nil {
101+
return nil, err
102+
}
103+
104+
fut, err := k.futures.Get(ctx, id)
105+
if err != nil {
106+
return nil, err
107+
}
108+
109+
futures = append(futures, fut)
110+
if len(futures) == limit {
111+
break
112+
}
113+
}
114+
115+
return futures, nil
116+
}

warden/x/async/keeper/keeper.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ var (
3737
FutureByAddressPrefix = collections.NewPrefix(2)
3838
ResultsPrefix = collections.NewPrefix(3)
3939
VotesPrefix = collections.NewPrefix(4)
40+
PendingFuturesPrefix = collections.NewPrefix(5)
4041
)
4142

4243
func NewKeeper(

0 commit comments

Comments
 (0)