Skip to content

Commit

Permalink
feat: Saga (#63)
Browse files Browse the repository at this point in the history
* feat: add sagas

* fix(saga): check of compensate function

* fix(saga): check of compensate function

* fix(saga): check of compensate function

* fix(saga): check of compensate function

* feat: add sagas

* fix(ots3): missing trace in ots3

* feat(saga): checkpoint

* feat(saga): rewrite saga

* fix(saga): fix cases

* fix(saga): fix cases

* fix(clihttp): data race

* fix(clihttp): data race

* fix(clihttp): data race

* fix(sagas): build failure

* doc(sagas): add more docs
  • Loading branch information
Reasno committed Mar 10, 2021
1 parent 4229330 commit e1e6692
Show file tree
Hide file tree
Showing 25 changed files with 1,944 additions and 47 deletions.
44 changes: 25 additions & 19 deletions clihttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,66 +73,72 @@ func NewClient(tracer opentracing.Tracer, options ...Option) *Client {

// Do sends the request.
func (c *Client) Do(req *http.Request) (*http.Response, error) {
req, tracer := nethttp.TraceRequest(c.tracer, req)
defer tracer.Finish()
clientSpan := c.tracer.StartSpan("HTTP Client")
defer clientSpan.Finish()

c.logRequest(req, tracer)
ext.SpanKindRPCClient.Set(clientSpan)
ext.HTTPUrl.Set(clientSpan, req.RequestURI)
ext.HTTPMethod.Set(clientSpan, req.Method)

// Inject the client span context into the headers
c.logRequest(req, clientSpan)

c.tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header))
response, err := c.underlying.Do(req)
if err != nil {
return response, err
}

c.logResponse(response, tracer)
c.logResponse(response, clientSpan)

return response, err
}

func (c *Client) logRequest(req *http.Request, tracer *nethttp.Tracer) {
func (c *Client) logRequest(req *http.Request, span opentracing.Span) {
if req.Body == nil {
return
}
body, err := req.GetBody()
if err != nil {
ext.Error.Set(tracer.Span(), true)
tracer.Span().LogKV("error", errors.Wrap(err, "cannot get request body"))
ext.Error.Set(span, true)
span.LogKV("error", errors.Wrap(err, "cannot get request body"))
return
}
length, _ := strconv.Atoi(req.Header.Get(http.CanonicalHeaderKey("Content-Length")))
if length > c.requestLogThreshold {
ext.Error.Set(tracer.Span(), true)
tracer.Span().LogKV("request", "elided: Content-Length too large")
ext.Error.Set(span, true)
span.LogKV("request", "elided: Content-Length too large")
return
}
byt, err := ioutil.ReadAll(body)
if err != nil {
ext.Error.Set(tracer.Span(), true)
tracer.Span().LogKV("error", errors.Wrap(err, "cannot read request body"))
ext.Error.Set(span, true)
span.LogKV("error", errors.Wrap(err, "cannot read request body"))
return
}
if tracer.Span() != nil {
tracer.Span().LogKV("request", string(byt))
if span != nil {
span.LogKV("request", string(byt))
}

}

func (c *Client) logResponse(response *http.Response, tracer *nethttp.Tracer) {
func (c *Client) logResponse(response *http.Response, span opentracing.Span) {
if response.Body == nil {
return
}
length, _ := strconv.Atoi(response.Header.Get(http.CanonicalHeaderKey("Content-Length")))
if length > c.responseLogThreshold {
tracer.Span().LogKV("response", "elided: Content-Length too large")
span.LogKV("response", "elided: Content-Length too large")
return
}
var buf bytes.Buffer
byt, err := ioutil.ReadAll(response.Body)
if err != nil {
ext.Error.Set(tracer.Span(), true)
tracer.Span().LogFields(log.Error(err))
ext.Error.Set(span, true)
span.LogFields(log.Error(err))
}
if tracer.Span() != nil {
tracer.Span().LogKV("response", string(byt))
if span != nil {
span.LogKV("response", string(byt))
}
buf.Write(byt)
response.Body = ioutil.NopCloser(&buf)
Expand Down
38 changes: 14 additions & 24 deletions clihttp/client_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package clihttp

import (
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"net/http"
"strings"
"testing"

"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"

"github.com/opentracing/opentracing-go/mocktracer"
)

Expand Down Expand Up @@ -49,27 +50,16 @@ func TestClient_Do(t *testing.T) {
}

func TestClient_race(t *testing.T) {
cases := []struct {
name string
request *http.Request
Option []Option
}{
{
"normal",
func() *http.Request { r, _ := http.NewRequest("GET", "https://baidu.com", nil); return r }(),
[]Option{},
},
}
for _, c := range cases {
c := c
// the mock tracer is not concurrent safe.
tracer := opentracing.GlobalTracer()
client := NewClient(tracer, c.Option...)
for i := 0; i < 10; i++ {
t.Run(c.name, func(t *testing.T) {
t.Parallel()
_, _ = client.Do(c.request)
})
}
// the mock tracer is not concurrent safe.
//tracer := opentracing.GlobalTracer()
tracer := opentracing.NoopTracer{}
client := NewClient(tracer)
for i := 0; i < 100; i++ {
t.Run("", func(t *testing.T) {
t.Parallel()
r, _ := http.NewRequest("GET", "https://baidu.com", nil)
_, _ = client.Do(r)
})
}

}
6 changes: 6 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,22 @@ func TestKoanfAdapter_Watch(t *gotesting.T) {
assert.Equal(t, "baz", ka.String("foo"))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var ch = make(chan struct{})
go func() {
ka.watcher.Watch(ctx, func() error {
assert.NoError(t, ka.Reload(), "reload should be successful")
err := ka.Reload()
fmt.Println(err)
ch <- struct{}{}
return nil
})
}()
time.Sleep(time.Second)
ioutil.WriteFile(f.Name(), []byte("foo: bar"), 0644)
ioutil.WriteFile(f.Name(), []byte("foo: bar"), 0644)
<-ch


// The following test is flaky on CI. Unable to reproduce locally.
/*
Expand Down
6 changes: 6 additions & 0 deletions dtx/correlation_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package dtx

type correlationIDType string

// CorrelationID is an identifier to correlate transactions in context.
const CorrelationID correlationIDType = "CorrelationID"
43 changes: 43 additions & 0 deletions dtx/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Package dtx contains common utilities in the context of distributed transaction.
Context Passing
It is curial for all parties in the distributed transaction to share an
transaction id. This package provides utility to pass this id across services.
HTTPToContext() http.RequestFunc
ContextToHTTP() http.RequestFunc
GRPCToContext() grpc.ServerRequestFunc
ContextToGRPC() grpc.ClientRequestFunc
Idempotency
Certain operations will be retried by the client more than once. A middleware is
provided for the server to shield against repeated request in the same
transaction.
func MakeIdempotence(s Oncer) endpoint.Middleware
Lock
Certain resource in transaction cannot be concurrently accessed. A middleware is
provided to lock such resources.
func MakeLock(l Locker) endpoint.Middleware
Allow Null Compensation and Prevent Resource Suspension
Transaction participants may receive the compensation
order before performing normal operations due to network exceptions. In this
case, null compensation is required.
If the forward operation arrives later than the compensating operation due to
network exceptions, the forward operation must be discarded. Otherwise, resource
suspension occurs.
func MakeAttempt(s Sequencer) endpoint.Middleware
func MakeCancel(s Sequencer) endpoint.Middleware
*/
package dtx
110 changes: 110 additions & 0 deletions dtx/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package dtx

import (
"context"

"github.com/go-kit/kit/endpoint"
"github.com/pkg/errors"
)

// ErrNonIdempotent is returned when an endpoint is requested more than once with the same CorrelationID.
var ErrNonIdempotent = errors.New("rejected repeated request")

// ErrNoLock is returned when the endpoint fail to fetch the distributed lock under the same CorrelationID.
var ErrNoLock = errors.New("failed to grab lock")

// Oncer should return true if the key has been observed before.
type Oncer interface {
Once(ctx context.Context, key string) bool
}

// MakeIdempotence returns a middleware that ensures the next endpoint can only be executed once per CorrelationID.
func MakeIdempotence(s Oncer) endpoint.Middleware {
return func(e endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
correlationID, ok := ctx.Value(CorrelationID).(string)
if !ok {
return e(ctx, request)
}
if s.Once(ctx, correlationID) {
return nil, ErrNonIdempotent
}
return e(ctx, request)
}
}
}

// Locker is an interface for the distributed lock.
type Locker interface {
// Lock should return true only when it successfully grabs the lock.
Lock(ctx context.Context, key string) bool
Unlock(ctx context.Context, key string)
}

// MakeLock returns a middleware that ensures the next endpoint is never concurrently accessed.
func MakeLock(l Locker) endpoint.Middleware {
return func(e endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
correlationID, ok := ctx.Value(CorrelationID).(string)
if !ok {
return e(ctx, request)
}
if l.Lock(ctx, correlationID) {
defer l.Unlock(ctx, correlationID)
return e(ctx, request)
}
return nil, ErrNoLock
}
}
}

// Sequencer is an interface that shields against the disordering of
// attempt and cancel in a transactional context.
type Sequencer interface {
MarkCancelledCheckAttempted(context.Context, string) bool
MarkAttemptedCheckCancelled(context.Context, string) bool
}

// MakeAttempt returns a middleware that wraps around an attempt endpoint. If the
// this segment of the distributed transaction is already cancelled, the next
// endpoint will never be executed.
//
// If the forward operation arrives later than the compensating operation due to
// network exceptions, the forward operation must be discarded. Otherwise,
// resource suspension occurs.
func MakeAttempt(s Sequencer) endpoint.Middleware {
return func(e endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
correlationID, ok := ctx.Value(CorrelationID).(string)
if !ok {
return e(ctx, request)
}
if s.MarkAttemptedCheckCancelled(ctx, correlationID) {
return nil, nil
}
return e(ctx, request)
}
}
}

// MakeCancel returns a middleware that wraps around the cancellation endpoint.
// It guarantees if this segment of the distributed transaction is never attempted,
// the cancellation endpoint will not be executed.
//
// Transaction participants may receive the compensation order before performing
// normal operations due to network exceptions. In this case, null compensation
// is required.
func MakeCancel(s Sequencer) endpoint.Middleware {
return func(e endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
correlationID, ok := ctx.Value(CorrelationID).(string)
if !ok {
return e(ctx, request)
}
if !s.MarkCancelledCheckAttempted(ctx, correlationID) {
return nil, nil
}
return e(ctx, request)
}
}
}
Loading

0 comments on commit e1e6692

Please sign in to comment.