Skip to content

Commit 34273fb

Browse files
authored
goroutine leak fixed (#5214)
Signed-off-by: Fedor Partanskiy <[email protected]>
1 parent ba9d6ed commit 34273fb

File tree

3 files changed

+38
-12
lines changed

3 files changed

+38
-12
lines changed

common/deliverclient/blocksprovider/bft_censorship_monitor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func (m *BFTCensorshipMonitor) launchHeaderReceivers() error {
356356
hrRcvMon := m.hdrRcvTrackers[ep.Address]
357357
// This may fail if the orderer is down or faulty. If it fails, we back off and retry.
358358
// We count connection failure attempts (here) and stop failures separately.
359-
headerClient, _, err := m.newHeaderClient(ep, hrRcvMon.headerReceiver) // TODO use the clientCloser function
359+
headerClient, clientCloser, err := m.newHeaderClient(ep, hrRcvMon.headerReceiver)
360360
if err != nil {
361361
dur := backOffDuration(2.0, hrRcvMon.connectFailureCounter, m.timeoutConfig.MinRetryInterval, m.timeoutConfig.MaxRetryInterval)
362362
hrRcvMon.retryDeadline = time.Now().Add(dur)
@@ -365,7 +365,7 @@ func (m *BFTCensorshipMonitor) launchHeaderReceivers() error {
365365
continue
366366
}
367367

368-
hrRcvMon.headerReceiver = NewBFTHeaderReceiver(m.chainID, ep.Address, headerClient, m.updatableHeaderVerifier, hrRcvMon.headerReceiver, flogging.MustGetLogger("BFTHeaderReceiver"))
368+
hrRcvMon.headerReceiver = NewBFTHeaderReceiver(m.chainID, ep.Address, headerClient, clientCloser, m.updatableHeaderVerifier, hrRcvMon.headerReceiver, flogging.MustGetLogger("BFTHeaderReceiver"))
369369
hrRcvMon.connectFailureCounter = 0
370370
hrRcvMon.retryDeadline = time.Time{}
371371

common/deliverclient/blocksprovider/bft_header_receiver.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type BFTHeaderReceiver struct {
3232
errorStopTime time.Time
3333
endpoint string
3434
client orderer.AtomicBroadcast_DeliverClient
35+
clientCloserFunc func()
3536
updatableBlockVerifier UpdatableBlockVerifier
3637

3738
// A block with Header & Metadata, without Data (i.e. lastHeader.Data==nil); except from config blocks, which are full.
@@ -50,6 +51,7 @@ func NewBFTHeaderReceiver(
5051
chainID string,
5152
endpoint string,
5253
client orderer.AtomicBroadcast_DeliverClient,
54+
clientCloser func(),
5355
updatableBlockVerifier UpdatableBlockVerifier,
5456
previousReceiver *BFTHeaderReceiver,
5557
logger *flogging.FabricLogger,
@@ -59,6 +61,7 @@ func NewBFTHeaderReceiver(
5961
stopChan: make(chan struct{}, 1),
6062
endpoint: endpoint,
6163
client: client,
64+
clientCloserFunc: clientCloser,
6265
updatableBlockVerifier: updatableBlockVerifier.Clone(),
6366
logger: logger,
6467
}
@@ -186,8 +189,7 @@ func (hr *BFTHeaderReceiver) Stop() error {
186189

187190
hr.logger.Infof("[%s][%s] Stopping", hr.chainID, hr.endpoint)
188191
hr.stop = true
189-
_ = hr.client.CloseSend()
190-
// TODO close the underlying connection as well
192+
hr.clientCloserFunc()
191193
close(hr.stopChan)
192194

193195
return nil

common/deliverclient/blocksprovider/bft_header_receiver_test.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ func TestBftHeaderReceiver_NoBlocks_RecvError(t *testing.T) {
3030
streamClientMock := &fake.DeliverClient{}
3131
streamClientMock.RecvReturns(nil, errors.New("oops"))
3232
streamClientMock.CloseSendReturns(nil)
33+
clientCloser := func() {
34+
_ = streamClientMock.CloseSend()
35+
}
3336

34-
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
37+
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, clientCloser, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
3538
assert.NotNil(t, hr)
3639
assert.False(t, hr.IsStarted())
3740
assert.False(t, hr.IsStopped())
@@ -57,9 +60,12 @@ func TestBftHeaderReceiver_BadStatus(t *testing.T) {
5760
streamClientMock.RecvReturnsOnCall(1, &orderer.DeliverResponse{Type: &orderer.DeliverResponse_Status{Status: common.Status_BAD_REQUEST}}, nil)
5861
streamClientMock.RecvReturnsOnCall(2, &orderer.DeliverResponse{Type: &orderer.DeliverResponse_Status{Status: common.Status_SERVICE_UNAVAILABLE}}, nil)
5962
streamClientMock.CloseSendReturns(nil)
63+
clientCloser := func() {
64+
_ = streamClientMock.CloseSend()
65+
}
6066

6167
for i := 0; i < 3; i++ {
62-
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
68+
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, clientCloser, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
6369
assert.NotNil(t, hr)
6470

6571
hr.DeliverHeaders() // it will get a bad status and exit
@@ -78,8 +84,11 @@ func TestBftHeaderReceiver_NilResponse(t *testing.T) {
7884
streamClientMock := &fake.DeliverClient{}
7985
streamClientMock.RecvReturns(nil, nil)
8086
streamClientMock.CloseSendReturns(nil)
87+
clientCloser := func() {
88+
_ = streamClientMock.CloseSend()
89+
}
8190

82-
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
91+
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, clientCloser, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
8392
assert.NotNil(t, hr)
8493

8594
hr.DeliverHeaders() // it will get a bad status and exit
@@ -96,7 +105,10 @@ func TestBftHeaderReceiver_WithBlocks_Renew(t *testing.T) {
96105
fakeBlockVerifier.VerifyBlockAttestationCalls(naiveBlockVerifier)
97106
fakeBlockVerifier.CloneReturns(fakeBlockVerifier)
98107
streamClientMock := &fake.DeliverClient{}
99-
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
108+
clientCloser := func() {
109+
_ = streamClientMock.CloseSend()
110+
}
111+
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, clientCloser, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
100112

101113
seqCh := make(chan uint64)
102114
streamClientMock.RecvCalls(
@@ -147,7 +159,10 @@ func TestBftHeaderReceiver_WithBlocks_Renew(t *testing.T) {
147159
// === Create a new BFTHeaderReceiver with the last good header of the previous receiver
148160
fakeBlockVerifier = &fake.UpdatableBlockVerifier{}
149161
streamClientMock = &fake.DeliverClient{}
150-
hr2 := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, fakeBlockVerifier, hr, flogging.MustGetLogger("test.BFTHeaderReceiver.2"))
162+
clientCloser = func() {
163+
_ = streamClientMock.CloseSend()
164+
}
165+
hr2 := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, clientCloser, fakeBlockVerifier, hr, flogging.MustGetLogger("test.BFTHeaderReceiver.2"))
151166
assert.False(t, hr2.IsStarted())
152167
assert.False(t, hr2.IsStopped())
153168
bNum, bTime, err = hr2.LastBlockNum()
@@ -162,7 +177,10 @@ func TestBftHeaderReceiver_WithBlocks_StopOnVerificationFailure(t *testing.T) {
162177
fakeBlockVerifier.VerifyBlockAttestationCalls(naiveBlockVerifier)
163178
fakeBlockVerifier.CloneReturns(fakeBlockVerifier)
164179
streamClientMock := &fake.DeliverClient{}
165-
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
180+
clientCloser := func() {
181+
_ = streamClientMock.CloseSend()
182+
}
183+
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, clientCloser, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
166184

167185
seqCh := make(chan uint64)
168186
goodSig := uint32(1)
@@ -221,7 +239,10 @@ func TestBftHeaderReceiver_WithBlocks_ConfigVerification(t *testing.T) {
221239
fakeBlockVerifier.VerifyBlockAttestationCalls(naiveBlockVerifier)
222240
fakeBlockVerifier.CloneReturns(fakeBlockVerifier)
223241
streamClientMock := &fake.DeliverClient{}
224-
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
242+
clientCloser := func() {
243+
_ = streamClientMock.CloseSend()
244+
}
245+
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, clientCloser, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
225246

226247
seqCh := make(chan uint64)
227248
streamClientMock.RecvCalls(
@@ -285,7 +306,10 @@ func TestBftHeaderReceiver_VerifyOnce(t *testing.T) {
285306
fakeBlockVerifier.VerifyBlockAttestationCalls(naiveBlockVerifier)
286307
fakeBlockVerifier.CloneReturns(fakeBlockVerifier)
287308
streamClientMock := &fake.DeliverClient{}
288-
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
309+
clientCloser := func() {
310+
_ = streamClientMock.CloseSend()
311+
}
312+
hr := blocksprovider.NewBFTHeaderReceiver("testchannel", "10.10.10.11:666", streamClientMock, clientCloser, fakeBlockVerifier, nil, flogging.MustGetLogger("test.BFTHeaderReceiver"))
289313

290314
seqCh := make(chan uint64)
291315
goodSig := uint32(1)

0 commit comments

Comments
 (0)