Skip to content

Commit

Permalink
add helper to send with retry
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Jul 16, 2024
1 parent 33e88be commit c707684
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 2 deletions.
84 changes: 84 additions & 0 deletions hedge.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
pb "github.com/flowerinthenight/hedge/proto/v1"
"github.com/flowerinthenight/spindle"
"github.com/google/uuid"
gaxv2 "github.com/googleapis/gax-go/v2"
"github.com/hashicorp/memberlist"
"google.golang.org/api/iterator"
"google.golang.org/grpc"
Expand Down Expand Up @@ -1377,3 +1378,86 @@ func New(client *spanner.Client, hostPort, lockTable, lockName, logTable string,

return op
}

type SendToLeaderArgs struct {
// Number of retry attempts to contact the leader.
// Defaults to 10. If set to a negative number, it
// will retry forever.
Retries int
}

// SendToLeader is a wrapper to hedge.Send() with builtin retry mechanisms.
func SendToLeader(ctx context.Context, op *Op, m []byte, args ...*SendToLeaderArgs) ([]byte, error) {
if op == nil {
return nil, fmt.Errorf("hedge: op cannot be nil")
}

retries := 10
if len(args) > 0 {
retries = args[0].Retries
}

if retries == 0 {
retries = 10
}

result := make(chan []byte, 1)
done := make(chan error, 1)
go func() {
var err error
var res []byte
defer func(b *[]byte, e *error) {
result <- *b
done <- *e
}(&res, &err)

bo := gaxv2.Backoff{
Max: time.Minute,
}

var i int
for {
if i >= retries {
break
}

if !op.IsRunning() {
time.Sleep(bo.Pause())
}

if retries > -1 {
i++
}
}

i = 0
for {
if i >= retries {
err = fmt.Errorf("hedge: retries exhausted")
return
}

var r []byte
r, err = op.Send(ctx, m)
if err != nil {
time.Sleep(bo.Pause())
} else {
res = r // to outside
return
}

if retries > -1 {
i++
}
}
}()

for {
select {
case e := <-done:
return <-result, e
case <-ctx.Done():
return nil, context.Canceled
}
}
}
5 changes: 3 additions & 2 deletions sos.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ type memT struct {
}

// SoS (Spillover-Store) represents an object for spill-over (or stitched)
// storage. Useful only for load-process-discard types of data processing.
// See limitation below.
// storage. Useful for load-process-discard types of data processing. The
// order of storage priority is local memory, local disk, other pod's
// memory, other pod's disk, and so on.
//
// Limitation: At the moment, it's not allowed to reuse a name for SOS
// once it's used and closed within hedge's lifetime.
Expand Down

0 comments on commit c707684

Please sign in to comment.