Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JUJU-4504] Implement timeout-based retry method for txns #69

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ func SetRunnerFunc(r Runner, f func() TxnRunner) {
}
}

// Specify the transaction timeout for some tests.
func SetTxnTimeout(r Runner, t time.Duration) {
inner := r.(*transactionRunner)
inner.txnTimeout = t
}

var CheckMongoSupportsOut = checkMongoSupportsOut

// NewDBOracleNoOut is only used for testing. It forces the DBOracle to not ask
Expand Down
5 changes: 3 additions & 2 deletions incrementalprune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package txn

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -302,7 +303,7 @@ func (s *TxnSuite) TearDownTest(c *gc.C) {

func (s *TxnSuite) runTxn(c *gc.C, ops ...txn.Op) bson.ObjectId {
txnId := bson.NewObjectId()
err := s.runner.Run(ops, txnId, nil)
err := s.runner.Run(context.Background(), ops, txnId, nil)
c.Assert(err, jc.ErrorIsNil)
return txnId
}
Expand All @@ -314,7 +315,7 @@ func (s *TxnSuite) runInterruptedTxn(c *gc.C, breakpoint string, ops ...txn.Op)
KillChance: 1,
Breakpoint: breakpoint,
})
err := s.runner.Run(ops, txnId, nil)
err := s.runner.Run(context.Background(), ops, txnId, nil)
c.Assert(err, gc.Equals, txn.ErrChaos)
txn.SetChaos(txn.Chaos{})
return txnId
Expand Down
135 changes: 75 additions & 60 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package txn

import (
"context"
"math/rand"
"strings"
"time"
Expand All @@ -29,14 +30,6 @@ import (
var logger = loggo.GetLogger("juju.txn")

const (
// defaultClientTxnRetries is the default number of times a transaction will be retried
// when there is an invariant assertion failure (for client side transactions).
defaultClientTxnRetries = 3

// defaultServerTxnRetries is the default number of times a transaction will be retried
// when there is an invariant assertion failure (for server side transactions).
defaultServerTxnRetries = 50

// defaultRetryBackoff is the default interval used to pause between
// unsuccessful transaction operations.
defaultRetryBackoff = 1 * time.Millisecond
Expand All @@ -51,6 +44,10 @@ const (

// defaultChangeLogName is the default mgo transaction runner change log.
defaultChangeLogName = "txns.log"

// defaultTxnTimeoutSeconds is the default time length for a
// transaction to finish before it gets cancelled.
defaultTxnTimeoutSeconds = 120
)

const (
Expand Down Expand Up @@ -139,7 +136,7 @@ type Runner interface {
}

type txnRunner interface {
Run([]txn.Op, bson.ObjectId, interface{}) error
Run(context.Context, []txn.Op, bson.ObjectId, interface{}) error
ChangeLog(*mgo.Collection)
ResumeAll() error
}
Expand All @@ -160,12 +157,13 @@ type transactionRunner struct {
clock Clock

serverSideTransactions bool
nrRetries int
retryBackoff time.Duration
retryFuzzPercent int
pauseFunc func(duration time.Duration)

newRunner func() txnRunner

txnTimeout time.Duration
}

var _ Runner = (*transactionRunner)(nil)
Expand Down Expand Up @@ -248,7 +246,6 @@ func NewRunner(params RunnerParams) Runner {
runTransactionObserver: params.RunTransactionObserver,
clock: params.Clock,
serverSideTransactions: sstxn,
nrRetries: params.MaxRetryAttempts,
retryBackoff: params.RetryBackoff,
retryFuzzPercent: params.RetryFuzzPercent,
pauseFunc: params.PauseFunc,
Expand All @@ -261,12 +258,6 @@ func NewRunner(params RunnerParams) Runner {
} else if txnRunner.changeLogName == "" {
txnRunner.changeLogName = defaultChangeLogName
}
if txnRunner.nrRetries == 0 {
txnRunner.nrRetries = defaultClientTxnRetries
if txnRunner.serverSideTransactions {
txnRunner.nrRetries = defaultServerTxnRetries
}
}
if txnRunner.retryBackoff == 0 {
txnRunner.retryBackoff = defaultRetryBackoff
}
Expand All @@ -284,6 +275,7 @@ func NewRunner(params RunnerParams) Runner {
// they also specify a RunTransactionObserver.
txnRunner.clock = clock.WallClock
}
txnRunner.txnTimeout = defaultTxnTimeoutSeconds * time.Second
return txnRunner
}

Expand All @@ -301,53 +293,68 @@ func (tr *transactionRunner) newRunnerImpl() txnRunner {
return runner
}

// Run is defined on Runner.
// Run is defined on Runner. After timeout the transaction gets cancelled and
// the last returned error by the transaction will be returned.
func (tr *transactionRunner) Run(transactions TransactionSource) error {
var lastErr error
for i := 0; i < tr.nrRetries; i++ {
// If we are retrying, give other txns a chance to have a go.
if i > 0 && tr.serverSideTransactions {
tr.backoff(i)
}
ops, err := transactions(i)
if err == ErrTransientFailure {
continue
}
if err == ErrNoOperations {
return nil
}
if err != nil {
return err
}
if len(ops) == 0 {
// Treat this the same as ErrNoOperations but don't suppress other errors.
return nil
}
if err = tr.RunTransaction(&Transaction{
Ops: ops,
Attempt: i,
}); err == nil {
return nil
} else if err != txn.ErrAborted && !mgo.IsRetryable(err) && !mgo.IsSnapshotError(err) {
// Mongo very occasionally returns an intermittent
// "unexpected message" error. Retry those.
// Also mongo sometimes gets very busy and we get an
// i/o timeout. We retry those too.
// However if this is the last time, return that error
// rather than the excessive contention error.
msg := err.Error()
retryErr := strings.HasSuffix(msg, "unexpected message") ||
strings.HasSuffix(msg, "i/o timeout")
if !retryErr || i == (tr.nrRetries-1) {
ctx, cancel := context.WithTimeout(context.TODO(), tr.txnTimeout)
defer cancel()

var (
lastErr error
i int
)
for {
select {
case <-ctx.Done():
if lastErr == txn.ErrAborted {
return ErrExcessiveContention
}
return lastErr
default:
// If we are retrying, give other txns a chance to have a go.
if i > 0 && tr.serverSideTransactions {
tr.backoff(i)
}
ops, err := transactions(i)
if err == ErrTransientFailure {
i++
continue
}
if err == ErrNoOperations {
return nil
}
if err != nil {
return err
}
if len(ops) == 0 {
// Treat this the same as ErrNoOperations but don't suppress other errors.
return nil
}
if err = tr.runTransaction(
ctx,
&Transaction{
Ops: ops,
Attempt: i,
}); err == nil {
return nil
} else if err != txn.ErrAborted && !mgo.IsRetryable(err) && !mgo.IsSnapshotError(err) {
// Mongo very occasionally returns an intermittent
// "unexpected message" error. Retry those.
// Also mongo sometimes gets very busy and we get an
// i/o timeout. We retry those too.
// However if this is the last time, return that error
// rather than the excessive contention error.
msg := err.Error()
retryErr := strings.HasSuffix(msg, "unexpected message") ||
strings.HasSuffix(msg, "i/o timeout")
if !retryErr {
return err
}
}
lastErr = err
i++
}
lastErr = err
}
if lastErr == txn.ErrAborted {
return ErrExcessiveContention
}
return lastErr
}

func (tr *transactionRunner) backoff(attempt int) {
Expand All @@ -370,6 +377,14 @@ func (tr *transactionRunner) pause(dur time.Duration) {

// RunTransaction is defined on Runner.
func (tr *transactionRunner) RunTransaction(transaction *Transaction) error {
ctx, cancel := context.WithTimeout(context.TODO(), tr.txnTimeout)
defer cancel()

return tr.runTransaction(ctx, transaction)
}

// RunTransaction is defined on Runner.
func (tr *transactionRunner) runTransaction(ctx context.Context, transaction *Transaction) error {
testHooks := <-tr.testHooks
tr.testHooks <- nil
if len(testHooks) > 0 {
Expand Down Expand Up @@ -421,7 +436,7 @@ func (tr *transactionRunner) RunTransaction(transaction *Transaction) error {
}
}
}
err := runner.Run(transaction.Ops, "", nil)
err := runner.Run(ctx, transaction.Ops, "", nil)
if tr.runTransactionObserver != nil {
transaction.Error = err
transaction.Duration = tr.clock.Now().Sub(start)
Expand Down
60 changes: 8 additions & 52 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package txn_test

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -59,6 +60,8 @@ func (s *txnSuite) SetUpTest(c *gc.C) {
s.backoffs = append(s.backoffs, dur)
},
})
// Set a smaller txn timeout than the default one for tests.
jujutxn.SetTxnTimeout(s.txnRunner, 100*time.Millisecond)
s.supportsSST = false
}

Expand Down Expand Up @@ -123,6 +126,8 @@ func (s *sstxnSuite) SetUpTest(c *gc.C) {
s.backoffs = append(s.backoffs, dur)
},
})
// Set a smaller txn timeout than the default one for tests.
jujutxn.SetTxnTimeout(s.txnRunner, 100*time.Millisecond)
s.supportsSST = true
}

Expand Down Expand Up @@ -433,10 +438,8 @@ func (s *txnSuite) TestRetryHooks(c *gc.C) {
}

func (s *txnSuite) TestExcessiveContention(c *gc.C) {
maxAttempt := 0
// This keeps failing because the Assert is wrong.
buildTxn := func(attempt int) ([]txn.Op, error) {
maxAttempt = attempt
ops := []txn.Op{{
C: s.collection.Name,
Id: "1",
Expand All @@ -447,14 +450,9 @@ func (s *txnSuite) TestExcessiveContention(c *gc.C) {
}
err := s.txnRunner.Run(buildTxn)
c.Assert(err, gc.Equals, jujutxn.ErrExcessiveContention)
if s.supportsSST {
c.Assert(maxAttempt, gc.Equals, 49)
} else {
c.Assert(maxAttempt, gc.Equals, 2)
}
}

func (s *txnSuite) TestPause(c *gc.C) {
func (s *txnSuite) TestBackoff(c *gc.C) {
buildTxn := func(attempt int) ([]txn.Op, error) {
ops := []txn.Op{{
C: s.collection.Name,
Expand All @@ -467,7 +465,7 @@ func (s *txnSuite) TestPause(c *gc.C) {
err := s.txnRunner.Run(buildTxn)
c.Assert(err, gc.Equals, jujutxn.ErrExcessiveContention)
if s.supportsSST {
c.Assert(s.backoffs, gc.HasLen, 49)
// c.Assert(s.backoffs, gc.HasLen, 49)
c.Assert(s.backoffs[48], jc.DurationLessThan, 50*time.Millisecond)
for i := 0; i < len(s.backoffs); i++ {
c.Assert(s.backoffs[i], jc.GreaterThan, 0)
Expand Down Expand Up @@ -586,27 +584,6 @@ func (s *txnSuite) TestRunFailureIntermittentUnexpectedMessage(c *gc.C) {
c.Check(tries, gc.Equals, 2)
}

func (s *txnSuite) TestRunFailureAlwaysUnexpectedMessage(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
fake := &fakeRunner{errors: []error{
errors.New("unexpected message"),
errors.New("unexpected message"),
errors.New("unexpected message"),
errors.New("unexpected message"),
}}
jujutxn.SetRunnerFunc(runner, fake.new)
tries := 0
// Doesn't matter what this returns as long as it isn't an error.
buildTxn := func(attempt int) ([]txn.Op, error) {
tries++
// return 1 op that happens to do nothing
return []txn.Op{{}}, nil
}
err := runner.Run(buildTxn)
c.Check(err, gc.ErrorMatches, "unexpected message")
c.Check(tries, gc.Equals, 3)
}

func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
fake := &fakeRunner{errors: []error{errors.New("i/o timeout")}}
Expand All @@ -623,27 +600,6 @@ func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) {
c.Check(tries, gc.Equals, 2)
}

func (s *txnSuite) TestRunFailureAlwaysIOTimeout(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
fake := &fakeRunner{errors: []error{
errors.New("i/o timeout"),
errors.New("i/o timeout"),
errors.New("i/o timeout"),
errors.New("i/o timeout"),
}}
jujutxn.SetRunnerFunc(runner, fake.new)
tries := 0
// Doesn't matter what this returns as long as it isn't an error.
buildTxn := func(attempt int) ([]txn.Op, error) {
tries++
// return 1 op that happens to do nothing
return []txn.Op{{}}, nil
}
err := runner.Run(buildTxn)
c.Check(err, gc.ErrorMatches, "i/o timeout")
c.Check(tries, gc.Equals, 3)
}

func (s *txnSuite) TestRunTransactionObserver(c *gc.C) {
var calls []jujutxn.Transaction
clock := testclock.NewClock(time.Now())
Expand Down Expand Up @@ -701,7 +657,7 @@ func (f *fakeRunner) new() jujutxn.TxnRunner {
return f
}

func (f *fakeRunner) Run([]txn.Op, bson.ObjectId, interface{}) error {
func (f *fakeRunner) Run(context.Context, []txn.Op, bson.ObjectId, interface{}) error {
if len(f.durations) > 0 && f.clock != nil {
f.clock.Advance(f.durations[0])
f.durations = f.durations[1:]
Expand Down
Loading