Skip to content

Commit

Permalink
refresh on-ledger requests after congestion makes mempool drop them
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgemmsilva committed Apr 23, 2024
1 parent 526e864 commit 3311201
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 79 deletions.
13 changes: 7 additions & 6 deletions components/chains/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,13 @@ func provide(c *dig.Container) error {
deps.ConsensusStateRegistry,
deps.ChainListener,
mempool.Settings{
TTL: ParamsChains.MempoolTTL,
MaxOffledgerInPool: ParamsChains.MempoolMaxOffledgerInPool,
MaxOnledgerInPool: ParamsChains.MempoolMaxOnledgerInPool,
MaxTimedInPool: ParamsChains.MempoolMaxTimedInPool,
MaxOnledgerToPropose: ParamsChains.MempoolMaxOnledgerToPropose,
MaxOffledgerToPropose: ParamsChains.MempoolMaxOffledgerToPropose,
TTL: ParamsChains.MempoolTTL,
OnLedgerRefreshMinInterval: ParamsChains.MempoolOnLedgerRefreshMinInterval,
MaxOffledgerInPool: ParamsChains.MempoolMaxOffledgerInPool,
MaxOnledgerInPool: ParamsChains.MempoolMaxOnledgerInPool,
MaxTimedInPool: ParamsChains.MempoolMaxTimedInPool,
MaxOnledgerToPropose: ParamsChains.MempoolMaxOnledgerToPropose,
MaxOffledgerToPropose: ParamsChains.MempoolMaxOffledgerToPropose,
},
ParamsChains.BroadcastInterval,
shutdown.NewCoordinator("chains", Component.Logger().Named("Shutdown")),
Expand Down
37 changes: 19 additions & 18 deletions components/chains/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,25 @@ import (
)

type ParametersChains struct {
BroadcastUpToNPeers int `default:"2" usage:"number of peers an offledger request is broadcasted to"`
BroadcastInterval time.Duration `default:"0s" usage:"time between re-broadcast of offledger requests; 0 value means that re-broadcasting is disabled"`
APICacheTTL time.Duration `default:"300s" usage:"time to keep processed offledger requests in api cache"`
PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"`
DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."`
PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."`
ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."`
RecoveryTimeout time.Duration `default:"20s" usage:"Time after which another consensus attempt is made."`
RedeliveryPeriod time.Duration `default:"2s" usage:"the resend period for msg."`
PrintStatusPeriod time.Duration `default:"3s" usage:"the period to print consensus instance status."`
ConsensusInstsInAdvance int `default:"3" usage:""`
AwaitReceiptCleanupEvery int `default:"100" usage:"for every this number AwaitReceipt will be cleaned up"`
MempoolTTL time.Duration `default:"24h" usage:"Time that requests are allowed to sit in the mempool without being processed"`
MempoolMaxOffledgerInPool int `default:"2000" usage:"Maximum number of off-ledger requests kept in the mempool"`
MempoolMaxOnledgerInPool int `default:"1000" usage:"Maximum number of on-ledger requests kept in the mempool"`
MempoolMaxTimedInPool int `default:"100" usage:"Maximum number of timed on-ledger requests kept in the mempool"`
MempoolMaxOffledgerToPropose int `default:"500" usage:"Maximum number of off-ledger requests to propose for the next block"`
MempoolMaxOnledgerToPropose int `default:"100" usage:"Maximum number of on-ledger requests to propose for the next block (includes timed requests)"`
BroadcastUpToNPeers int `default:"2" usage:"number of peers an offledger request is broadcasted to"`
BroadcastInterval time.Duration `default:"0s" usage:"time between re-broadcast of offledger requests; 0 value means that re-broadcasting is disabled"`
APICacheTTL time.Duration `default:"300s" usage:"time to keep processed offledger requests in api cache"`
PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"`
DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."`
PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."`
ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."`
RecoveryTimeout time.Duration `default:"20s" usage:"Time after which another consensus attempt is made."`
RedeliveryPeriod time.Duration `default:"2s" usage:"the resend period for msg."`
PrintStatusPeriod time.Duration `default:"3s" usage:"the period to print consensus instance status."`
ConsensusInstsInAdvance int `default:"3" usage:""`
AwaitReceiptCleanupEvery int `default:"100" usage:"for every this number AwaitReceipt will be cleaned up"`
MempoolTTL time.Duration `default:"24h" usage:"Time that requests are allowed to sit in the mempool without being processed"`
MempoolMaxOffledgerInPool int `default:"2000" usage:"Maximum number of off-ledger requests kept in the mempool"`
MempoolMaxOnledgerInPool int `default:"1000" usage:"Maximum number of on-ledger requests kept in the mempool"`
MempoolMaxTimedInPool int `default:"100" usage:"Maximum number of timed on-ledger requests kept in the mempool"`
MempoolMaxOffledgerToPropose int `default:"500" usage:"Maximum number of off-ledger requests to propose for the next block"`
MempoolMaxOnledgerToPropose int `default:"100" usage:"Maximum number of on-ledger requests to propose for the next block (includes timed requests)"`
MempoolOnLedgerRefreshMinInterval time.Duration `default:"10m" usage:"Minimum interval to try to refresh the list of on-ledger requests after some have been dropped from the pool (this interval is introduced to avoid dropping/refreshing cycle if there are too many requests on L1 to process)"`
}

type ParametersWAL struct {
Expand Down
40 changes: 21 additions & 19 deletions packages/chain/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,13 @@ type Mempool interface {
}

type Settings struct {
TTL time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed)
MaxOffledgerInPool int
MaxOnledgerInPool int
MaxTimedInPool int
MaxOnledgerToPropose int // (including timed-requests)
MaxOffledgerToPropose int
}

type RequestPool[V isc.Request] interface {
Has(reqRef *isc.RequestRef) bool
Get(reqRef *isc.RequestRef) V
Add(request V)
Remove(request V)
// this removes requests from the pool if predicate returns false
Cleanup(predicate func(request V, ts time.Time) bool)
Iterate(f func(e *typedPoolEntry[V]) bool)
StatusString() string
WriteContent(io.Writer)
TTL time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed)
OnLedgerRefreshMinInterval time.Duration
MaxOffledgerInPool int
MaxOnledgerInPool int
MaxTimedInPool int
MaxOnledgerToPropose int // (including timed-requests)
MaxOffledgerToPropose int
}

// This implementation tracks single branch of the chain only. I.e. all the consensus
Expand Down Expand Up @@ -175,6 +164,8 @@ type mempoolImpl struct {
log *logger.Logger
metrics *metrics.ChainMempoolMetrics
listener ChainListener
refreshOnLedgerRequests func()
lastRefreshTimestamp time.Time
}

var _ Mempool = &mempoolImpl{}
Expand Down Expand Up @@ -235,6 +226,7 @@ func New(
listener ChainListener,
settings Settings,
broadcastInterval time.Duration,
refreshOnLedgerRequests func(),
) Mempool {
netPeeringID := peering.HashPeeringIDFromBytes(chainID.Bytes(), []byte("Mempool")) // ChainID × Mempool
waitReq := NewWaitReq(waitRequestCleanupEvery)
Expand Down Expand Up @@ -269,6 +261,8 @@ func New(
log: log,
metrics: metrics,
listener: listener,
refreshOnLedgerRequests: refreshOnLedgerRequests,
lastRefreshTimestamp: time.Now(),
}

pipeMetrics.TrackPipeLen("mp-serverNodesUpdatedPipe", mpi.serverNodesUpdatedPipe.Len)
Expand Down Expand Up @@ -392,7 +386,7 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc)
debugTicker := time.NewTicker(distShareDebugTick)
timeTicker := time.NewTicker(distShareTimeTick)
rePublishTicker := time.NewTicker(distShareRePublishTick)
forceCleanMempoolTicker := time.NewTicker(forceCleanMempoolTick)
forceCleanMempoolTicker := time.NewTicker(forceCleanMempoolTick) // this exists to force mempool cleanup on access nodes // thought: maybe access nodes shouldn't have a mempool at all
for {
select {
case recv, ok := <-serverNodesUpdatedPipeOutCh:
Expand Down Expand Up @@ -950,6 +944,14 @@ func (mpi *mempoolImpl) handleRePublishTimeTick() {
}
return true
})

// periodically try to refresh On-ledger requests that might have been dropped
if time.Since(mpi.lastRefreshTimestamp) > mpi.settings.OnLedgerRefreshMinInterval {
if mpi.onLedgerPool.ShouldRefreshRequests() || mpi.timePool.ShouldRefreshRequests() {
mpi.refreshOnLedgerRequests()
mpi.lastRefreshTimestamp = time.Now()
}
}
}

func (mpi *mempoolImpl) handleForceCleanMempool() {
Expand Down
2 changes: 2 additions & 0 deletions packages/chain/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ func TestTTL(t *testing.T) {
MaxOffledgerToPropose: 1000,
},
1*time.Second,
func() {},
)
defer te.close()
start := time.Now()
Expand Down Expand Up @@ -820,6 +821,7 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv {
MaxOffledgerToPropose: 1000,
},
1*time.Second,
func() {},
)
}
return te
Expand Down
36 changes: 26 additions & 10 deletions packages/chain/mempool/time_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ type TimePool interface {
TakeTill(timestamp time.Time) []isc.OnLedgerRequest
Has(reqID *isc.RequestRef) bool
Cleanup(predicate func(request isc.OnLedgerRequest, ts time.Time) bool)
ShouldRefreshRequests() bool
}

// Here we implement TimePool. We maintain the request in a list ordered by a timestamp.
// The list is organized in slots. Each slot contains a list of requests that fit to the
// slot boundaries.
type timePoolImpl struct {
requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.OnLedgerRequest] // All the requests in this pool.
slots *timeSlot // Structure to fetch them fast by their time.
maxPoolSize int
sizeMetric func(int)
log *logger.Logger
requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.OnLedgerRequest] // All the requests in this pool.
slots *timeSlot // Structure to fetch them fast by their time.
hasDroppedRequests bool
maxPoolSize int
sizeMetric func(int)
log *logger.Logger
}

type timeSlot struct {
Expand All @@ -44,11 +46,12 @@ var _ TimePool = &timePoolImpl{}

func NewTimePool(maxTimedInPool int, sizeMetric func(int), log *logger.Logger) TimePool {
return &timePoolImpl{
requests: shrinkingmap.New[isc.RequestRefKey, isc.OnLedgerRequest](),
slots: nil,
maxPoolSize: maxTimedInPool,
sizeMetric: sizeMetric,
log: log,
requests: shrinkingmap.New[isc.RequestRefKey, isc.OnLedgerRequest](),
slots: nil,
hasDroppedRequests: false,
maxPoolSize: maxTimedInPool,
sizeMetric: sizeMetric,
log: log,
}
}

Expand Down Expand Up @@ -117,13 +120,26 @@ func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.OnLedgerReq
tpi.requests.Delete(rKey)
}
}
tpi.hasDroppedRequests = true
}

// log and update metrics
tpi.log.Debugf("ADD %v as key=%v", request.ID(), reqRefKey)
tpi.sizeMetric(tpi.requests.Size())
}

func (tpi *timePoolImpl) ShouldRefreshRequests() bool {
if !tpi.hasDroppedRequests {
return false
}
if tpi.requests.Size() > 0 {
return false // wait until pool is empty to refresh
}
// assume after this function returns true, the requests will be refreshed
tpi.hasDroppedRequests = false
return true
}

func (tpi *timePoolImpl) TakeTill(timestamp time.Time) []isc.OnLedgerRequest {
resp := []isc.OnLedgerRequest{}
for slot := tpi.slots; slot != nil; slot = slot.next {
Expand Down
56 changes: 42 additions & 14 deletions packages/chain/mempool/typed_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,30 @@ import (
"github.com/iotaledger/wasp/packages/kv/codec"
)

type RequestPool[V isc.Request] interface {
Has(reqRef *isc.RequestRef) bool
Get(reqRef *isc.RequestRef) V
Add(request V)
Remove(request V)
// this removes requests from the pool if predicate returns false
Cleanup(predicate func(request V, ts time.Time) bool)
Iterate(f func(e *typedPoolEntry[V]) bool)
StatusString() string
WriteContent(io.Writer)
ShouldRefreshRequests() bool
}

// TODO add gas price to on-ledger requests
// TODO this list needs to be periodically re-filled from L1 once the activity is lower
type typedPool[V isc.Request] struct {
waitReq WaitReq
requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]]
ordered []*typedPoolEntry[V] // TODO use a better data structure instead!!! (probably RedBlackTree)
maxPoolSize int
sizeMetric func(int)
timeMetric func(time.Duration)
log *logger.Logger
waitReq WaitReq
requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]]
ordered []*typedPoolEntry[V] // TODO use a better data structure instead!!! (probably RedBlackTree)
hasDroppedRequests bool
maxPoolSize int
sizeMetric func(int)
timeMetric func(time.Duration)
log *logger.Logger
}

type typedPoolEntry[V isc.Request] struct {
Expand All @@ -40,13 +54,14 @@ var _ RequestPool[isc.OffLedgerRequest] = &typedPool[isc.OffLedgerRequest]{}

func NewTypedPool[V isc.Request](maxOnledgerInPool int, waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) RequestPool[V] {
return &typedPool[V]{
waitReq: waitReq,
requests: shrinkingmap.New[isc.RequestRefKey, *typedPoolEntry[V]](),
ordered: []*typedPoolEntry[V]{},
maxPoolSize: maxOnledgerInPool,
sizeMetric: sizeMetric,
timeMetric: timeMetric,
log: log,
waitReq: waitReq,
requests: shrinkingmap.New[isc.RequestRefKey, *typedPoolEntry[V]](),
ordered: []*typedPoolEntry[V]{},
hasDroppedRequests: false,
maxPoolSize: maxOnledgerInPool,
sizeMetric: sizeMetric,
timeMetric: timeMetric,
log: log,
}
}

Expand Down Expand Up @@ -122,6 +137,7 @@ func (olp *typedPool[V]) LimitPoolSize() []*typedPoolEntry[V] {
olp.log.Debugf("LimitPoolSize dropping request: %v", r.req.ID())
olp.Remove(r.req)
}
olp.hasDroppedRequests = true
return reqsToDelete
}

Expand Down Expand Up @@ -168,6 +184,18 @@ func (olp *typedPool[V]) Remove(request V) {
olp.timeMetric(time.Since(entry.ts))
}

func (olp *typedPool[V]) ShouldRefreshRequests() bool {
if !olp.hasDroppedRequests {
return false
}
if olp.requests.Size() > 0 {
return false // wait until pool is empty to refresh
}
// assume after this function returns true, the requests will be refreshed
olp.hasDroppedRequests = false
return true
}

func (olp *typedPool[V]) Cleanup(predicate func(request V, ts time.Time) bool) {
olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool {
if !predicate(entry.req, entry.ts) {
Expand Down
4 changes: 4 additions & 0 deletions packages/chain/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ type ChainNodeConn interface {
onChainConnect func(),
onChainDisconnect func(),
)
// called if the mempoll has dropped some requests during congestion, and now the congestion stopped
RefreshOnLedgerRequests(ctx context.Context, chainID isc.ChainID)
}

type chainNodeImpl struct {
Expand Down Expand Up @@ -433,7 +435,9 @@ func New(
cni.listener,
mempoolSettings,
mempoolBroadcastInterval,
func() { nodeConn.RefreshOnLedgerRequests(ctx, chainID) },
)

cni.chainMgr = gpa.NewAckHandler(cni.me, chainMgr.AsGPA(), RedeliveryPeriod)
cni.stateMgr = stateMgr
cni.mempool = mempool
Expand Down
7 changes: 7 additions & 0 deletions packages/chain/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ type testNodeConn struct {
attachWG *sync.WaitGroup
}

var _ chain.NodeConnection = &testNodeConn{}

func newTestNodeConn(t *testing.T) *testNodeConn {
tnc := &testNodeConn{
t: t,
Expand Down Expand Up @@ -372,6 +374,11 @@ func (tnc *testNodeConn) GetL1ProtocolParams() *iotago.ProtocolParameters {
return testparameters.GetL1ProtocolParamsForTesting()
}

// RefreshOnLedgerRequests implements chain.NodeConnection.
func (tnc *testNodeConn) RefreshOnLedgerRequests(ctx context.Context, chainID isc.ChainID) {
// noop
}

////////////////////////////////////////////////////////////////////////////////
// testEnv

Expand Down
Loading

0 comments on commit 3311201

Please sign in to comment.