Skip to content

Commit

Permalink
dcrdtest: Automatic TearDown on SetUp failures
Browse files Browse the repository at this point in the history
This adds an automatic TearDown() call for an rpc harness that fails to
be setup, so that callers do not need to remember to do it themselves.

This improves usability of the package by ensuring goroutines do not
leak after either a failed SetUp() or a TearDown().

Unit tests are added that assert the correct behavior on both a
successfull setup and when certain specific setup calls fail.

This also fixes a goroutine leak in the internal memory wallet.
  • Loading branch information
matheusd authored and davecgh committed Oct 18, 2023
1 parent 53bcbaa commit 5f326da
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 29 deletions.
3 changes: 2 additions & 1 deletion dcrdtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ require (
github.com/decred/dcrd/rpcclient/v8 v8.0.0
github.com/decred/dcrd/txscript/v4 v4.1.0
github.com/decred/dcrd/wire v1.6.0
github.com/decred/slog v1.2.0
matheusd.com/testctx v0.1.0
)

require (
Expand All @@ -42,7 +44,6 @@ require (
github.com/decred/dcrd/math/uint256 v1.0.1 // indirect
github.com/decred/dcrd/peer/v3 v3.0.2 // indirect
github.com/decred/go-socks v1.1.0 // indirect
github.com/decred/slog v1.2.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions dcrdtest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,5 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI=
lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
matheusd.com/testctx v0.1.0 h1:MBpaNuqr23ugnkA59gz8Bd6BQIGkvZr7M4vYAc/Apzc=
matheusd.com/testctx v0.1.0/go.mod h1:u9la0YA1XIBcEpTU/aHJ9q4/L0VttkwhkG2m4lrj7Ls=
29 changes: 27 additions & 2 deletions dcrdtest/memwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ type memWallet struct {

net *chaincfg.Params

// quit is closed when the harness node is stopped.
quit chan struct{}

// wg tracks the mem wallet's goroutines.
wg sync.WaitGroup

rpc *rpcclient.Client

sync.RWMutex
Expand Down Expand Up @@ -164,14 +170,23 @@ func newMemWallet(net *chaincfg.Params, harnessID uint32) (*memWallet, error) {
utxos: make(map[wire.OutPoint]*utxo),
chainUpdateSignal: make(chan struct{}),
reorgJournal: make(map[int64]*undoEntry),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
}, nil
}

// Start launches all goroutines required for the wallet to function properly.
func (m *memWallet) Start() {
m.wg.Add(1)
go m.chainSyncer()
}

// Stop stops all goroutines required for the wallet to function properly.
func (m *memWallet) Stop() {
close(m.quit)
m.wg.Wait()
}

// SyncedHeight returns the height the wallet is known to be synced to.
//
// This function is safe for concurrent access.
Expand Down Expand Up @@ -219,7 +234,10 @@ func (m *memWallet) IngestBlock(header []byte, filteredTxns [][]byte) {
// available. We do this in a new goroutine in order to avoid blocking
// the main loop of the rpc client.
go func() {
m.chainUpdateSignal <- struct{}{}
select {
case m.chainUpdateSignal <- struct{}{}:
case <-m.quit:
}
}()
}

Expand All @@ -230,10 +248,17 @@ func (m *memWallet) IngestBlock(header []byte, filteredTxns [][]byte) {
func (m *memWallet) chainSyncer() {
log.Tracef("memwallet.chainSyncer")
defer log.Tracef("memwallet.chainSyncer exit")
defer m.wg.Done()

var update *chainUpdate

for range m.chainUpdateSignal {
for {
select {
case <-m.chainUpdateSignal:
case <-m.quit:
return
}

// A new update is available, so pop the new chain update from
// the front of the update queue.
m.chainMtx.Lock()
Expand Down
45 changes: 36 additions & 9 deletions dcrdtest/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
rpc "github.com/decred/dcrd/rpcclient/v8"
)

// errDcrdCmdExec is the error returned when the dcrd binary is not executed.
var errDcrdCmdExec = errors.New("unable to exec dcrd binary")

// nodeConfig contains all the args, and data required to launch a dcrd process
// and connect the rpc client to it.
type nodeConfig struct {
Expand Down Expand Up @@ -201,7 +204,6 @@ func newNode(config *nodeConfig, dataDir string) (*node, error) {
return &node{
config: config,
dataDir: dataDir,
cmd: config.command(),
}, nil
}

Expand All @@ -216,8 +218,10 @@ func (n *node) start(ctx context.Context) error {
var pid sync.WaitGroup
pid.Add(1)

cmd := n.config.command()

// Redirect stderr.
n.stderr, err = n.cmd.StderrPipe()
n.stderr, err = cmd.StderrPipe()
if err != nil {
return err
}
Expand All @@ -229,15 +233,18 @@ func (n *node) start(ctx context.Context) error {
for {
line, err := r.ReadBytes('\n')
if errors.Is(err, io.EOF) {
log.Tracef("stderr: EOF")
n.logf("stderr: EOF")
return
} else if err != nil {
n.logf("stderr: Unable to read stderr: %v", err)
return
}
n.logf("stderr: %s", line)
}
}()

// Redirect stdout.
n.stdout, err = n.cmd.StdoutPipe()
n.stdout, err = cmd.StdoutPipe()
if err != nil {
return err
}
Expand All @@ -249,7 +256,10 @@ func (n *node) start(ctx context.Context) error {
for {
line, err := r.ReadBytes('\n')
if errors.Is(err, io.EOF) {
log.Tracef("stdout: EOF")
n.logf("stdout: EOF")
return
} else if err != nil {
n.logf("stdout: Unable to read stdout: %v", err)
return
}
log.Tracef("stdout: %s", line)
Expand All @@ -269,8 +279,10 @@ func (n *node) start(ctx context.Context) error {
switch msg := msg.(type) {
case boundP2PListenAddrEvent:
p2pAddr = string(msg)
n.logf("P2P listen addr: %s", p2pAddr)
case boundRPCListenAddrEvent:
rpcAddr = string(msg)
n.logf("RPC listen addr: %s", rpcAddr)
}
if p2pAddr != "" && rpcAddr != "" {
close(gotSubsysAddrs)
Expand All @@ -283,33 +295,45 @@ func (n *node) start(ctx context.Context) error {
for err == nil {
_, err = nextIPCMessage(n.config.pipeRX.r)
}
n.logf("IPC messages drained")
}()

// Launch command and store pid.
if err := n.cmd.Start(); err != nil {
return err
if err := cmd.Start(); err != nil {
// When failing to execute, wait until running goroutines are
// closed.
pid.Done()
n.wg.Wait()
n.config.pipeTX.close()
n.config.pipeRX.close()
return fmt.Errorf("%w: %v", errDcrdCmdExec, err)
}
n.cmd = cmd
n.pid = n.cmd.Process.Pid

// Unblock pipes now pid is available
// Unblock pipes now that pid is available.
pid.Done()

f, err := os.Create(filepath.Join(n.config.String(), "dcrd.pid"))
if err != nil {
_ = n.stop() // Cleanup what has been done so far.
return err
}

n.pidFile = f.Name()
if _, err = fmt.Fprintf(f, "%d\n", n.cmd.Process.Pid); err != nil {
_ = n.stop() // Cleanup what has been done so far.
return err
}
if err := f.Close(); err != nil {
_ = n.stop() // Cleanup what has been done so far.
return err
}

// Read the RPC and P2P addresses.
select {
case <-ctx.Done():
_ = n.stop() // Cleanup what has been done so far.
return ctx.Err()
case <-gotSubsysAddrs:
n.p2pAddr = p2pAddr
Expand All @@ -323,7 +347,7 @@ func (n *node) start(ctx context.Context) error {
// properly. On windows, interrupt is not supported, so a kill signal is used
// instead
func (n *node) stop() error {
log.Tracef("stop %p %p", n.cmd, n.cmd.Process)
log.Tracef("stop %p", n.cmd)
defer log.Tracef("stop done")

if n.cmd == nil || n.cmd.Process == nil {
Expand Down Expand Up @@ -366,6 +390,9 @@ func (n *node) stop() error {
if err := n.config.pipeTX.close(); err != nil {
n.logf("Unable to close pipe TX: %v", err)
}

// Mark command terminated.
n.cmd = nil
return nil
}

Expand Down
38 changes: 31 additions & 7 deletions dcrdtest/rpc_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package dcrdtest

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -38,6 +39,8 @@ var (
// throughout the life of this package.
pathToDCRD string
pathToDCRDMtx sync.RWMutex

errNilCoinbaseAddr = errors.New("memWallet coinbase addr is nil")
)

// Harness fully encapsulates an active dcrd process to provide a unified
Expand Down Expand Up @@ -218,7 +221,14 @@ func New(t *testing.T, activeNet *chaincfg.Params, handlers *rpcclient.Notificat
//
// NOTE: This method and TearDown should always be called from the same
// goroutine as they are not concurrent safe.
func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutputs uint32) error {
func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutputs uint32) (err error) {
defer func() {
if err != nil {
tearErr := h.TearDown()
log.Warnf("Teardown error after setup error %v: %v", err, tearErr)
}
}()

// Start the dcrd node itself. This spawns a new process which will be
// managed
if err := h.node.start(ctx); err != nil {
Expand All @@ -231,6 +241,9 @@ func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutp

// Filter transactions that pay to the coinbase associated with the
// wallet.
if h.wallet.coinbaseAddr == nil {
return errNilCoinbaseAddr
}
filterAddrs := []stdaddr.Address{h.wallet.coinbaseAddr}
if err := h.Node.LoadTxFilter(ctx, true, filterAddrs, nil); err != nil {
return err
Expand Down Expand Up @@ -282,17 +295,28 @@ func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutp
// NOTE: This method and SetUp should always be called from the same goroutine
// as they are not concurrent safe.
func (h *Harness) TearDown() error {
log.Tracef("TearDown %p %p", h.Node, h.node)
defer log.Tracef("TearDown done")
log.Debugf("TearDown %p %p", h.Node, h.node)
defer log.Debugf("TearDown done")

if h.Node != nil {
log.Tracef("TearDown: Node")
log.Debugf("TearDown: Node")
h.Node.Shutdown()
h.Node = nil
}

log.Tracef("TearDown: node")
if err := h.node.shutdown(); err != nil {
return err
if h.node != nil {
log.Debugf("TearDown: node")
node := h.node
h.node = nil
if err := node.shutdown(); err != nil {
return err
}
}

log.Debugf("TearDown: wallet")
if h.wallet != nil {
h.wallet.Stop()
h.wallet = nil
}

return nil
Expand Down
Loading

0 comments on commit 5f326da

Please sign in to comment.