diff --git a/CHANGELOG.md b/CHANGELOG.md index 423599bc9..e2cc98858 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,7 +54,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * (wardend) Bump IAVL to v1.2.2. Fixes some potential apphash mismatches that happen in some rare cases. * (x/async) Scaffold new module with create/read operations * (x/warden) Sign requests query return all request (not only with broadcastType=BroadcastType.Disabled) -* (prophet) First version. Includes the task runner for Futures and Votes, and the interface for handlers. ### Bug Fixes diff --git a/go.mod b/go.mod index f2e279994..94ceb4b77 100644 --- a/go.mod +++ b/go.mod @@ -59,7 +59,6 @@ require ( github.com/gorilla/mux v1.8.1 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 - github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/iancoleman/strcase v0.3.0 github.com/labstack/echo/v4 v4.12.0 github.com/prometheus/client_golang v1.20.5 @@ -218,6 +217,7 @@ require ( github.com/hashicorp/go-safetemp v1.0.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/hdevalence/ed25519consensus v0.1.0 // indirect diff --git a/prophet/dedup.go b/prophet/dedup.go deleted file mode 100644 index be6260886..000000000 --- a/prophet/dedup.go +++ /dev/null @@ -1,67 +0,0 @@ -package prophet - -import ( - lru "github.com/hashicorp/golang-lru/v2" -) - -var dedupLRUSize = 10000 - -type getIDer interface { - getID() uint64 -} - -// dedup takes a channel of requests, deduplicates them, and emits -// returns a new channel of unique requests. -// -// It does so by keeping a LRU cache of unique IDs. -// Requests coming after [dedupLRUSize] requests will be emitted -// again, even if duplicated. -type dedup[T getIDer] struct { - in <-chan T - out chan T - c *lru.Cache[uint64, struct{}] -} - -func newDedup[T getIDer](ch <-chan T) (*dedup[T], error) { - c, err := lru.New[uint64, struct{}](dedupLRUSize) - if err != nil { - return nil, err - } - - out := make(chan T) - - go func() { - defer close(out) - for req := range ch { - if c.Contains(req.getID()) { - continue - } - c.Add(req.getID(), struct{}{}) - out <- req - } - }() - - return &dedup[T]{ - in: ch, - c: c, - out: out, - }, nil -} - -// dedupFutureReader wraps a [FutureReader] and deduplicates the incoming -// futures. -type dedupFutureReader struct { - d *dedup[Future] -} - -func newDedupFutureReader(r FutureReader) (*dedupFutureReader, error) { - d, err := newDedup(r.Read()) - if err != nil { - return nil, err - } - return &dedupFutureReader{d: d}, nil -} - -func (d dedupFutureReader) Read() <-chan Future { - return d.d.out -} diff --git a/prophet/doc.go b/prophet/doc.go deleted file mode 100644 index 3c8c001e3..000000000 --- a/prophet/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -// Package prophet implements an asynchronous task-runner for the Warden -// Protocol. -// -// The tasks are called "Futures". -// -// A [Future] consist of a []byte input, and a handler identified by a string. -// -// The handler implements the [FutureHandler] interface and is responsible for -// executing a future, or verifying the correctness of a future output (e.g. by -// re-executing it, or by implementing more sophisticated verification). -// -// The handlers are registered in a global registry, and are looked up by their -// unique string identifier. -// See [Register] for registering a new handler. -// -// The entry point for the prophet package is the [P] struct, which implements -// the asynchronous process. See the methods of [P] for more information on -// starting it, and scheduling new futures. -package prophet diff --git a/prophet/exec.go b/prophet/exec.go deleted file mode 100644 index 1952d7932..000000000 --- a/prophet/exec.go +++ /dev/null @@ -1,70 +0,0 @@ -package prophet - -import ( - "context" - "log/slog" -) - -// FutureResultWriter writes results of future executions. -type FutureResultWriter interface { - Write(result FutureResult) error -} - -// ExecFutures executes futures coming from the specified reader and writes -// them to the specified writer. -// -// This call is non-blocking, the main loop is executed in a goroutine. -func ExecFutures(r FutureReader, w FutureResultWriter) error { - log := slog.With("process", "exec_futures") - - go func() { - for future := range r.Read() { - log := log.With("future", future.ID) - - log.Debug("running future") - output, err := Execute(context.TODO(), future) - if err != nil { - log.Error("failed to run future", "err", err) - continue - } - err = w.Write(output) - if err != nil { - log.Error("failed to write future result", "err", err) - continue - } - } - }() - - return nil -} - -// VoteWriter writes votes of future verifications. -type VoteWriter interface { - Write(result Vote) error -} - -// ExecVotes executes future verifications coming from the specified reader and -// writes them to the specified writer. -// -// This call is non-blocking, the main loop is executed in a goroutine. -func ExecVotes(r FutureResultReader, w VoteWriter) error { - log := slog.With("process", "exec_votes") - - go func() { - for proposal := range r.Read() { - plog := log.With("future", proposal.ID) - - plog.Debug("verifying future proposal") - err := Verify(context.TODO(), proposal) - if err := w.Write(Vote{ - ID: proposal.ID, - Err: err, - }); err != nil { - plog.Error("failed to write vote", "err", err) - continue - } - } - }() - - return nil -} diff --git a/prophet/future.go b/prophet/future.go deleted file mode 100644 index 59b87c8a5..000000000 --- a/prophet/future.go +++ /dev/null @@ -1,46 +0,0 @@ -package prophet - -// Future is a unit of computation. -type Future struct { - // ID is a globally unique identifier for this Future. - ID uint64 - // Handler identifies what handler will execute the computation. - Handler string - // Input is the input data for the handler. - Input []byte -} - -// getID implements [getIDer]. -func (r Future) getID() uint64 { return r.ID } - -// FutureReader is a source of futures. -type FutureReader interface { - Read() <-chan Future -} - -// FutureResult is the result of the computation of a future. -type FutureResult struct { - Future - Output []byte -} - -// getID implements [getIDer]. -func (r FutureResult) getID() uint64 { return r.ID } - -// FutureResultReader is a source of future results. -type FutureResultReader interface { - Read() <-chan FutureResult -} - -// Vote is a vote on a future result, indicating if it could be verified or -// not. -type Vote struct { - // ID is the ID of the future. - ID uint64 - // Err is the error that occurred during the verification. If it is nil, - // the future result was verified. - Err error -} - -// getID implements [getIDer]. -func (v Vote) getID() uint64 { return v.ID } diff --git a/prophet/handlers.go b/prophet/handlers.go deleted file mode 100644 index 1832b209b..000000000 --- a/prophet/handlers.go +++ /dev/null @@ -1,64 +0,0 @@ -package prophet - -import ( - "context" - "fmt" - "log/slog" - "time" -) - -// FutureHandler is the interface implemented by the future handlers. A handler -// is able to execute and verify futures. -type FutureHandler interface { - // Execute the computation with the given input, returning the output. - Execute(ctx context.Context, input []byte) ([]byte, error) - - // Verify the output of the computation with the given input, returning an - // error if the output is invalid. - Verify(ctx context.Context, input []byte, output []byte) error -} - -// Execute executes a given future, by invoking the registered handler. -func Execute(ctx context.Context, f Future) (FutureResult, error) { - s := getHandler(f.Handler) - if s == nil { - return FutureResult{}, fmt.Errorf("no future handler registered for %s", f.Handler) - } - - log := slog.With("task", "Execute", "future", f.ID, "handler", f.Handler) - log.Debug("start") - start := time.Now() - - output, err := s.Execute(ctx, f.Input) - if err != nil { - return FutureResult{}, fmt.Errorf("executing future: %w", err) - } - - log.Debug("end", "took", time.Since(start)) - - return FutureResult{ - Future: f, - Output: output, - }, nil -} - -// Verify verifies a given future result, by invoking the registered handler. -func Verify(ctx context.Context, f FutureResult) error { - s := getHandler(f.Handler) - if s == nil { - return fmt.Errorf("no future handler registered for %s", f.Handler) - } - - log := slog.With("task", "Verify", "future", f.ID, "handler", f.Handler) - log.Debug("start") - start := time.Now() - - err := s.Verify(ctx, f.Input, f.Output) - if err != nil { - return err - } - - log.Debug("end", "took", time.Since(start)) - - return nil -} diff --git a/prophet/prophet.go b/prophet/prophet.go deleted file mode 100644 index 973cd106c..000000000 --- a/prophet/prophet.go +++ /dev/null @@ -1,147 +0,0 @@ -package prophet - -import ( - "fmt" - "log/slog" - - lru "github.com/hashicorp/golang-lru/v2" -) - -// queueBufferSize sets the default size for incoming queues, i.e. the number -// of futures waiting to be executed and the number of future results waiting -// to be verified. -// Trying to add more items to the queue than this size will drop the new -// items. -var queueBufferSize = 100 - -// P is the main prophet process. Use [New] to create a new P. -type P struct { - futures *q[Future] - proposals *q[FutureResult] - - resultsWriter *s[FutureResult] - votesWriter *s[Vote] -} - -// New returns an initialized P. Call [P.Run] to start the main loop. -func New() (*P, error) { - resultsWriter, err := newS[FutureResult]() - if err != nil { - return nil, err - } - - votesWriter, err := newS[Vote]() - if err != nil { - return nil, err - } - - return &P{ - futures: newQ[Future](queueBufferSize), - proposals: newQ[FutureResult](queueBufferSize), - resultsWriter: resultsWriter, - votesWriter: votesWriter, - }, nil -} - -// Run starts the main loop of the prophet process. -// -// Goroutines are started to execute incoming futures and verifying incoming -// future results. -func (p *P) Run() error { - futures, err := newDedupFutureReader(p.futures) - if err != nil { - return fmt.Errorf("failed to create futures dedup reader: %w", err) - } - - if err := ExecFutures(futures, p.resultsWriter); err != nil { - return fmt.Errorf("failed to run futures loop: %w", err) - } - - if err := ExecVotes(p.proposals, p.votesWriter); err != nil { - return fmt.Errorf("failed to run votes loop: %w", err) - } - - return nil -} - -// AddFuture adds a future to be executed. This call is non-blocking. The -// future will be executed in the background, the results can be retrieved by -// calling [P.Results]. -func (p *P) AddFuture(future Future) { - p.futures.Add(future) -} - -// AddFutureResult adds a future result to be voted on. This call is -// non-blocking. -func (p *P) AddFutureResult(proposal FutureResult) { - p.proposals.Add(proposal) -} - -// Results returns a slice with all the results of futures that have been -// executed. -// The returned function must be called to remove the results from the set. -func (p *P) Results() ([]FutureResult, func()) { - values := p.resultsWriter.Values() - if len(values) == 0 { - return nil, func() {} - } - - return values, func() { - p.resultsWriter.Remove(values...) - } -} - -// q is a queue that doesn't block the producer (i.e. q.Add is non-blocking). -type q[T any] struct { - ch chan T -} - -func newQ[T any](buffer int) *q[T] { - return &q[T]{ - ch: make(chan T, buffer), - } -} - -func (q *q[T]) Add(item T) { - select { - case q.ch <- item: - default: - slog.Warn("q.Add: queue is full, dropped item", "item", item) - } -} - -func (q *q[T]) Read() <-chan T { - return q.ch -} - -var defaultSetSize = 10000 - -// s is a set with a size bound, and thread-safe. -// Adding elements to the set may delete older elements, if the set is full. -type s[T getIDer] struct { - l *lru.Cache[uint64, T] -} - -func newS[T getIDer]() (*s[T], error) { - l, err := lru.New[uint64, T](defaultSetSize) - if err != nil { - return nil, err - } - return &s[T]{l: l}, nil -} - -func (s *s[T]) Write(value T) error { - id := value.getID() - s.l.Add(id, value) - return nil -} - -func (s *s[T]) Values() []T { - return s.l.Values() -} - -func (s *s[T]) Remove(values ...T) { - for _, v := range values { - s.l.Remove(v.getID()) - } -} diff --git a/prophet/registry.go b/prophet/registry.go deleted file mode 100644 index dad361053..000000000 --- a/prophet/registry.go +++ /dev/null @@ -1,29 +0,0 @@ -package prophet - -import "sync" - -// registry is a global registry of futures handlers. -var registry = r{ - futures: make(map[string]FutureHandler), -} - -type r struct { - rw sync.RWMutex - futures map[string]FutureHandler -} - -// Register registers a new future handler with the given unique name. -func Register(name string, future FutureHandler) { - registry.rw.Lock() - defer registry.rw.Unlock() - if _, found := registry.futures[name]; found { - panic("future already registered") - } - registry.futures[name] = future -} - -func getHandler(name string) FutureHandler { - registry.rw.RLock() - defer registry.rw.RUnlock() - return registry.futures[name] -}