From 6a17e2f562a7a8a716f72ceb4fc3a196c26a85d3 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 8 Oct 2025 18:17:10 +0100 Subject: [PATCH 1/3] initial --- .../consensus/requests/handler.go | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/pkg/capabilities/consensus/requests/handler.go b/pkg/capabilities/consensus/requests/handler.go index 3186be5563..23a019f769 100644 --- a/pkg/capabilities/consensus/requests/handler.go +++ b/pkg/capabilities/consensus/requests/handler.go @@ -23,6 +23,11 @@ type ConsensusRequest[T any, R ConsensusResponse] interface { SendTimeout(ctx context.Context) } +type responseWithRequestStoreRemovalChan[R ConsensusResponse] struct { + response R + requestRemovedFromStore chan struct{} +} + type ConsensusResponse interface { RequestID() string } @@ -38,7 +43,7 @@ type Handler[T ConsensusRequest[T, R], R ConsensusResponse] struct { responseCache map[string]*responseCacheEntry[R] cacheExpiryTime time.Duration - responseCh chan R + responseCh chan responseWithRequestStoreRemovalChan[R] requestCh chan T clock clockwork.Clock @@ -49,7 +54,7 @@ func NewHandler[T ConsensusRequest[T, R], R ConsensusResponse](lggr logger.Logge store: s, pendingRequests: map[string]T{}, responseCache: map[string]*responseCacheEntry[R]{}, - responseCh: make(chan R), + responseCh: make(chan responseWithRequestStoreRemovalChan[R]), requestCh: make(chan T), clock: clock, cacheExpiryTime: responseExpiryTime, @@ -62,11 +67,23 @@ func NewHandler[T ConsensusRequest[T, R], R ConsensusResponse](lggr logger.Logge } func (h *Handler[T, R]) SendResponse(ctx context.Context, resp R) { + syncResponse := responseWithRequestStoreRemovalChan[R]{ + response: resp, + requestRemovedFromStore: make(chan struct{}, 1), + } + + select { + case <-ctx.Done(): + return + case h.responseCh <- syncResponse: + } + select { case <-ctx.Done(): return - case h.responseCh <- resp: + case <-syncResponse.requestRemovedFromStore: } + } func (h *Handler[T, R]) SendRequest(ctx context.Context, r T) { @@ -113,8 +130,10 @@ func (h *Handler[T, R]) worker(ctx context.Context) { h.eng.Errorw("failed to add request to store", "err", err) } - case resp := <-h.responseCh: + case syncResp := <-h.responseCh: + resp := syncResp.response req, wasPresent := h.store.Evict(resp.RequestID()) + syncResp.requestRemovedFromStore <- struct{}{} if !wasPresent { h.responseCache[resp.RequestID()] = &responseCacheEntry[R]{ response: resp, From e2666540146f67250f86596cf0d8c5fb4dac90a9 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 8 Oct 2025 18:19:05 +0100 Subject: [PATCH 2/3] tidy --- pkg/capabilities/consensus/requests/handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/capabilities/consensus/requests/handler.go b/pkg/capabilities/consensus/requests/handler.go index 23a019f769..9fe3b08709 100644 --- a/pkg/capabilities/consensus/requests/handler.go +++ b/pkg/capabilities/consensus/requests/handler.go @@ -130,10 +130,10 @@ func (h *Handler[T, R]) worker(ctx context.Context) { h.eng.Errorw("failed to add request to store", "err", err) } - case syncResp := <-h.responseCh: - resp := syncResp.response + case respWithChannel := <-h.responseCh: + resp := respWithChannel.response req, wasPresent := h.store.Evict(resp.RequestID()) - syncResp.requestRemovedFromStore <- struct{}{} + respWithChannel.requestRemovedFromStore <- struct{}{} if !wasPresent { h.responseCache[resp.RequestID()] = &responseCacheEntry[R]{ response: resp, From 0ae59d6237955d583b18bd3b8b1df3cdee7b9cdb Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 8 Oct 2025 18:19:49 +0100 Subject: [PATCH 3/3] tidy --- pkg/capabilities/consensus/requests/handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/capabilities/consensus/requests/handler.go b/pkg/capabilities/consensus/requests/handler.go index 9fe3b08709..181564b2a9 100644 --- a/pkg/capabilities/consensus/requests/handler.go +++ b/pkg/capabilities/consensus/requests/handler.go @@ -67,7 +67,7 @@ func NewHandler[T ConsensusRequest[T, R], R ConsensusResponse](lggr logger.Logge } func (h *Handler[T, R]) SendResponse(ctx context.Context, resp R) { - syncResponse := responseWithRequestStoreRemovalChan[R]{ + respWithRemovalChan := responseWithRequestStoreRemovalChan[R]{ response: resp, requestRemovedFromStore: make(chan struct{}, 1), } @@ -75,13 +75,13 @@ func (h *Handler[T, R]) SendResponse(ctx context.Context, resp R) { select { case <-ctx.Done(): return - case h.responseCh <- syncResponse: + case h.responseCh <- respWithRemovalChan: } select { case <-ctx.Done(): return - case <-syncResponse.requestRemovedFromStore: + case <-respWithRemovalChan.requestRemovedFromStore: } }