Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: warden-protocol/wardenprotocol
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 2b3f4bab57aacccb80d50b2e75bd2275137e0db3
Choose a base ref
..
head repository: warden-protocol/wardenprotocol
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: b688903971f65687296f6ec982fa967b1a703ff4
Choose a head ref
Showing with 17 additions and 7 deletions.
  1. +17 −7 prophet/prophet.go
24 changes: 17 additions & 7 deletions prophet/prophet.go
Original file line number Diff line number Diff line change
@@ -2,10 +2,18 @@ package prophet

import (
"fmt"
"log/slog"

lru "github.com/hashicorp/golang-lru/v2"
)

// queueBufferSize sets the default size for incoming queues, i.e. the number
// of futures waiting to be executed and the number of future results waiting
// to be verified.
// Trying to add more items to the queue than this size will drop the new
// items.
var queueBufferSize = 100

// P is the main prophet process. Use [New] to create a new P.
type P struct {
futures *q[Future]
@@ -28,8 +36,8 @@ func New() (*P, error) {
}

return &P{
futures: newQ[Future](),
proposals: newQ[FutureResult](),
futures: newQ[Future](queueBufferSize),
proposals: newQ[FutureResult](queueBufferSize),
resultsWriter: resultsWriter,
votesWriter: votesWriter,
}, nil
@@ -88,16 +96,18 @@ type q[T any] struct {
ch chan T
}

func newQ[T any]() *q[T] {
func newQ[T any](buffer int) *q[T] {
return &q[T]{
ch: make(chan T),
ch: make(chan T, buffer),
}
}

func (q *q[T]) Add(item T) {
go func() {
q.ch <- item
}()
select {
case q.ch <- item:
default:
slog.Warn("q.Add: queue is full, dropped item", "item", item)
}
}

func (q *q[T]) Read() <-chan T {