Skip to content

Commit

Permalink
Implement timeout-based retry method for txns
Browse files Browse the repository at this point in the history
This patch is part of the set of patchs aiming to fix
https://bugs.launchpad.net/juju/+bug/2031631.

The idea here is to implement a transaction retry mechanism similar to
the one being used in the official mongodb driver, i.e. a 120 seconds
timeout for transactions to finish or waiting while being retried.

The context passed to the mgo Run method is necessary in order to cancel
the actual request that's being executed by mgo either after timeout,
either because the user (juju) cancels the passed context.

Tests have been fixed and some have been deleted since they are no
longer relevant (we don't retry for a fixed number of times, instead we
retry until timeout).
  • Loading branch information
nvinuesa committed Sep 1, 2023
1 parent fc5d50e commit e1d4a17
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 117 deletions.
6 changes: 6 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ func SetRunnerFunc(r Runner, f func() TxnRunner) {
}
}

// Specify the transaction timeout for some tests.
func SetTxnTimeout(r Runner, t time.Duration) {
inner := r.(*transactionRunner)
inner.txnTimeout = t
}

var CheckMongoSupportsOut = checkMongoSupportsOut

// NewDBOracleNoOut is only used for testing. It forces the DBOracle to not ask
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ require (
golang.org/x/text v0.3.7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/juju/mgo/v3 => /home/nicolas/workspace/canonical/mgo
5 changes: 3 additions & 2 deletions incrementalprune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package txn

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -302,7 +303,7 @@ func (s *TxnSuite) TearDownTest(c *gc.C) {

func (s *TxnSuite) runTxn(c *gc.C, ops ...txn.Op) bson.ObjectId {
txnId := bson.NewObjectId()
err := s.runner.Run(ops, txnId, nil)
err := s.runner.Run(context.Background(), ops, txnId, nil)
c.Assert(err, jc.ErrorIsNil)
return txnId
}
Expand All @@ -314,7 +315,7 @@ func (s *TxnSuite) runInterruptedTxn(c *gc.C, breakpoint string, ops ...txn.Op)
KillChance: 1,
Breakpoint: breakpoint,
})
err := s.runner.Run(ops, txnId, nil)
err := s.runner.Run(context.Background(), ops, txnId, nil)
c.Assert(err, gc.Equals, txn.ErrChaos)
txn.SetChaos(txn.Chaos{})
return txnId
Expand Down
135 changes: 75 additions & 60 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package txn

import (
"context"
"math/rand"
"strings"
"time"
Expand All @@ -29,14 +30,6 @@ import (
var logger = loggo.GetLogger("juju.txn")

const (
// defaultClientTxnRetries is the default number of times a transaction will be retried
// when there is an invariant assertion failure (for client side transactions).
defaultClientTxnRetries = 3

// defaultServerTxnRetries is the default number of times a transaction will be retried
// when there is an invariant assertion failure (for server side transactions).
defaultServerTxnRetries = 50

// defaultRetryBackoff is the default interval used to pause between
// unsuccessful transaction operations.
defaultRetryBackoff = 1 * time.Millisecond
Expand All @@ -51,6 +44,10 @@ const (

// defaultChangeLogName is the default mgo transaction runner change log.
defaultChangeLogName = "txns.log"

// defaultTxnTimeoutSeconds is the default time length for a
// transaction to finish before it gets cancelled.
defaultTxnTimeoutSeconds = 120
)

const (
Expand Down Expand Up @@ -139,7 +136,7 @@ type Runner interface {
}

type txnRunner interface {
Run([]txn.Op, bson.ObjectId, interface{}) error
Run(context.Context, []txn.Op, bson.ObjectId, interface{}) error
ChangeLog(*mgo.Collection)
ResumeAll() error
}
Expand All @@ -160,12 +157,13 @@ type transactionRunner struct {
clock Clock

serverSideTransactions bool
nrRetries int
retryBackoff time.Duration
retryFuzzPercent int
pauseFunc func(duration time.Duration)

newRunner func() txnRunner

txnTimeout time.Duration
}

var _ Runner = (*transactionRunner)(nil)
Expand Down Expand Up @@ -248,7 +246,6 @@ func NewRunner(params RunnerParams) Runner {
runTransactionObserver: params.RunTransactionObserver,
clock: params.Clock,
serverSideTransactions: sstxn,
nrRetries: params.MaxRetryAttempts,
retryBackoff: params.RetryBackoff,
retryFuzzPercent: params.RetryFuzzPercent,
pauseFunc: params.PauseFunc,
Expand All @@ -261,12 +258,6 @@ func NewRunner(params RunnerParams) Runner {
} else if txnRunner.changeLogName == "" {
txnRunner.changeLogName = defaultChangeLogName
}
if txnRunner.nrRetries == 0 {
txnRunner.nrRetries = defaultClientTxnRetries
if txnRunner.serverSideTransactions {
txnRunner.nrRetries = defaultServerTxnRetries
}
}
if txnRunner.retryBackoff == 0 {
txnRunner.retryBackoff = defaultRetryBackoff
}
Expand All @@ -284,6 +275,7 @@ func NewRunner(params RunnerParams) Runner {
// they also specify a RunTransactionObserver.
txnRunner.clock = clock.WallClock
}
txnRunner.txnTimeout = defaultTxnTimeoutSeconds * time.Second
return txnRunner
}

Expand All @@ -301,53 +293,68 @@ func (tr *transactionRunner) newRunnerImpl() txnRunner {
return runner
}

// Run is defined on Runner.
// Run is defined on Runner. After timeout the transaction gets cancelled and
// the last returned error by the transaction will be returned.
func (tr *transactionRunner) Run(transactions TransactionSource) error {
var lastErr error
for i := 0; i < tr.nrRetries; i++ {
// If we are retrying, give other txns a chance to have a go.
if i > 0 && tr.serverSideTransactions {
tr.backoff(i)
}
ops, err := transactions(i)
if err == ErrTransientFailure {
continue
}
if err == ErrNoOperations {
return nil
}
if err != nil {
return err
}
if len(ops) == 0 {
// Treat this the same as ErrNoOperations but don't suppress other errors.
return nil
}
if err = tr.RunTransaction(&Transaction{
Ops: ops,
Attempt: i,
}); err == nil {
return nil
} else if err != txn.ErrAborted && !mgo.IsRetryable(err) && !mgo.IsSnapshotError(err) {
// Mongo very occasionally returns an intermittent
// "unexpected message" error. Retry those.
// Also mongo sometimes gets very busy and we get an
// i/o timeout. We retry those too.
// However if this is the last time, return that error
// rather than the excessive contention error.
msg := err.Error()
retryErr := strings.HasSuffix(msg, "unexpected message") ||
strings.HasSuffix(msg, "i/o timeout")
if !retryErr || i == (tr.nrRetries-1) {
ctx, cancel := context.WithTimeout(context.TODO(), tr.txnTimeout)
defer cancel()

var (
lastErr error
i int
)
for {
select {
case <-ctx.Done():
if lastErr == txn.ErrAborted {
return ErrExcessiveContention
}
return lastErr
default:
// If we are retrying, give other txns a chance to have a go.
if i > 0 && tr.serverSideTransactions {
tr.backoff(i)
}
ops, err := transactions(i)
if err == ErrTransientFailure {
i++
continue
}
if err == ErrNoOperations {
return nil
}
if err != nil {
return err
}
if len(ops) == 0 {
// Treat this the same as ErrNoOperations but don't suppress other errors.
return nil
}
if err = tr.runTransaction(
ctx,
&Transaction{
Ops: ops,
Attempt: i,
}); err == nil {
return nil
} else if err != txn.ErrAborted && !mgo.IsRetryable(err) && !mgo.IsSnapshotError(err) {
// Mongo very occasionally returns an intermittent
// "unexpected message" error. Retry those.
// Also mongo sometimes gets very busy and we get an
// i/o timeout. We retry those too.
// However if this is the last time, return that error
// rather than the excessive contention error.
msg := err.Error()
retryErr := strings.HasSuffix(msg, "unexpected message") ||
strings.HasSuffix(msg, "i/o timeout")
if !retryErr {
return err
}
}
lastErr = err
i++
}
lastErr = err
}
if lastErr == txn.ErrAborted {
return ErrExcessiveContention
}
return lastErr
}

func (tr *transactionRunner) backoff(attempt int) {
Expand All @@ -370,6 +377,14 @@ func (tr *transactionRunner) pause(dur time.Duration) {

// RunTransaction is defined on Runner.
func (tr *transactionRunner) RunTransaction(transaction *Transaction) error {
ctx, cancel := context.WithTimeout(context.TODO(), tr.txnTimeout)
defer cancel()

return tr.runTransaction(ctx, transaction)
}

// RunTransaction is defined on Runner.
func (tr *transactionRunner) runTransaction(ctx context.Context, transaction *Transaction) error {
testHooks := <-tr.testHooks
tr.testHooks <- nil
if len(testHooks) > 0 {
Expand Down Expand Up @@ -421,7 +436,7 @@ func (tr *transactionRunner) RunTransaction(transaction *Transaction) error {
}
}
}
err := runner.Run(transaction.Ops, "", nil)
err := runner.Run(ctx, transaction.Ops, "", nil)
if tr.runTransactionObserver != nil {
transaction.Error = err
transaction.Duration = tr.clock.Now().Sub(start)
Expand Down
60 changes: 8 additions & 52 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package txn_test

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -59,6 +60,8 @@ func (s *txnSuite) SetUpTest(c *gc.C) {
s.backoffs = append(s.backoffs, dur)
},
})
// Set a smaller txn timeout than the default one for tests.
jujutxn.SetTxnTimeout(s.txnRunner, 100*time.Millisecond)
s.supportsSST = false
}

Expand Down Expand Up @@ -123,6 +126,8 @@ func (s *sstxnSuite) SetUpTest(c *gc.C) {
s.backoffs = append(s.backoffs, dur)
},
})
// Set a smaller txn timeout than the default one for tests.
jujutxn.SetTxnTimeout(s.txnRunner, 100*time.Millisecond)
s.supportsSST = true
}

Expand Down Expand Up @@ -433,10 +438,8 @@ func (s *txnSuite) TestRetryHooks(c *gc.C) {
}

func (s *txnSuite) TestExcessiveContention(c *gc.C) {
maxAttempt := 0
// This keeps failing because the Assert is wrong.
buildTxn := func(attempt int) ([]txn.Op, error) {
maxAttempt = attempt
ops := []txn.Op{{
C: s.collection.Name,
Id: "1",
Expand All @@ -447,14 +450,9 @@ func (s *txnSuite) TestExcessiveContention(c *gc.C) {
}
err := s.txnRunner.Run(buildTxn)
c.Assert(err, gc.Equals, jujutxn.ErrExcessiveContention)
if s.supportsSST {
c.Assert(maxAttempt, gc.Equals, 49)
} else {
c.Assert(maxAttempt, gc.Equals, 2)
}
}

func (s *txnSuite) TestPause(c *gc.C) {
func (s *txnSuite) TestBackoff(c *gc.C) {
buildTxn := func(attempt int) ([]txn.Op, error) {
ops := []txn.Op{{
C: s.collection.Name,
Expand All @@ -467,7 +465,7 @@ func (s *txnSuite) TestPause(c *gc.C) {
err := s.txnRunner.Run(buildTxn)
c.Assert(err, gc.Equals, jujutxn.ErrExcessiveContention)
if s.supportsSST {
c.Assert(s.backoffs, gc.HasLen, 49)
// c.Assert(s.backoffs, gc.HasLen, 49)
c.Assert(s.backoffs[48], jc.DurationLessThan, 50*time.Millisecond)
for i := 0; i < len(s.backoffs); i++ {
c.Assert(s.backoffs[i], jc.GreaterThan, 0)
Expand Down Expand Up @@ -586,27 +584,6 @@ func (s *txnSuite) TestRunFailureIntermittentUnexpectedMessage(c *gc.C) {
c.Check(tries, gc.Equals, 2)
}

func (s *txnSuite) TestRunFailureAlwaysUnexpectedMessage(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
fake := &fakeRunner{errors: []error{
errors.New("unexpected message"),
errors.New("unexpected message"),
errors.New("unexpected message"),
errors.New("unexpected message"),
}}
jujutxn.SetRunnerFunc(runner, fake.new)
tries := 0
// Doesn't matter what this returns as long as it isn't an error.
buildTxn := func(attempt int) ([]txn.Op, error) {
tries++
// return 1 op that happens to do nothing
return []txn.Op{{}}, nil
}
err := runner.Run(buildTxn)
c.Check(err, gc.ErrorMatches, "unexpected message")
c.Check(tries, gc.Equals, 3)
}

func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
fake := &fakeRunner{errors: []error{errors.New("i/o timeout")}}
Expand All @@ -623,27 +600,6 @@ func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) {
c.Check(tries, gc.Equals, 2)
}

func (s *txnSuite) TestRunFailureAlwaysIOTimeout(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
fake := &fakeRunner{errors: []error{
errors.New("i/o timeout"),
errors.New("i/o timeout"),
errors.New("i/o timeout"),
errors.New("i/o timeout"),
}}
jujutxn.SetRunnerFunc(runner, fake.new)
tries := 0
// Doesn't matter what this returns as long as it isn't an error.
buildTxn := func(attempt int) ([]txn.Op, error) {
tries++
// return 1 op that happens to do nothing
return []txn.Op{{}}, nil
}
err := runner.Run(buildTxn)
c.Check(err, gc.ErrorMatches, "i/o timeout")
c.Check(tries, gc.Equals, 3)
}

func (s *txnSuite) TestRunTransactionObserver(c *gc.C) {
var calls []jujutxn.Transaction
clock := testclock.NewClock(time.Now())
Expand Down Expand Up @@ -701,7 +657,7 @@ func (f *fakeRunner) new() jujutxn.TxnRunner {
return f
}

func (f *fakeRunner) Run([]txn.Op, bson.ObjectId, interface{}) error {
func (f *fakeRunner) Run(context.Context, []txn.Op, bson.ObjectId, interface{}) error {
if len(f.durations) > 0 && f.clock != nil {
f.clock.Advance(f.durations[0])
f.durations = f.durations[1:]
Expand Down
Loading

0 comments on commit e1d4a17

Please sign in to comment.