From e21c5b78718b70576783dd1c16ee5138a41561e5 Mon Sep 17 00:00:00 2001 From: robinbryce Date: Fri, 20 Sep 2024 09:49:09 +0100 Subject: [PATCH] fix: handle 429 errors gracefully, with re-tries, in replicate-logs (#33) * fix: handle 429 errors gracefully, with re-tries, in replicate-logs AB#9913 * fix: take changes supporting better ratelimiting handling * fix: minor spelling nits * review: refactor a little to reduce cognative load * fix: missed an err check --------- Co-authored-by: Robin Bryce --- go.mod | 7 +- go.sum | 6 +- replicatelogs.go | 164 +++++++++++---- .../replicatelogs_azurite_test.go | 196 ++++++++++++++++++ 4 files changed, 322 insertions(+), 51 deletions(-) diff --git a/go.mod b/go.mod index 82ff733..6505437 100644 --- a/go.mod +++ b/go.mod @@ -6,18 +6,17 @@ require ( github.com/datatrails/go-datatrails-common v0.16.1 github.com/datatrails/go-datatrails-common-api-gen v0.4.6 github.com/datatrails/go-datatrails-logverification v0.1.7 - github.com/datatrails/go-datatrails-merklelog/massifs v0.1.0 + github.com/datatrails/go-datatrails-merklelog/massifs v0.1.1 github.com/datatrails/go-datatrails-merklelog/mmr v0.0.2 github.com/datatrails/go-datatrails-merklelog/mmrtesting v0.1.0 github.com/datatrails/go-datatrails-simplehash v0.0.5 github.com/gosuri/uiprogress v0.0.1 github.com/urfave/cli/v2 v2.27.1 github.com/zeebo/bencode v1.0.0 + golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa ) -// replace ( -// github.com/datatrails/go-datatrails-merklelog/massifs => ../go-datatrails-merklelog/massifs -// ) +// replace github.com/datatrails/go-datatrails-merklelog/massifs => ../go-datatrails-merklelog/massifs require ( github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect diff --git a/go.sum b/go.sum index 8ecc37e..812656d 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ github.com/datatrails/go-datatrails-common-api-gen v0.4.6 h1:yzviWC2jBOC3ItotQQl github.com/datatrails/go-datatrails-common-api-gen v0.4.6/go.mod h1:OQN91xvlW6xcWTFvwsM2Nn4PZwFAIOE52FG7yRl4QPQ= github.com/datatrails/go-datatrails-logverification v0.1.7 h1:HCZj3V2n2J7RzY39Ne2hyOCl2z5kjqm/D7ibOtRiSc4= github.com/datatrails/go-datatrails-logverification v0.1.7/go.mod h1:yCYT82iv95QGgvXTxQRb9vSkHF653cjiDXXwOAw3I4s= -github.com/datatrails/go-datatrails-merklelog/massifs v0.1.0 h1:NzukXz65iplfjHMf74A+b76rgALgShV2DO/EyZHfn1Y= -github.com/datatrails/go-datatrails-merklelog/massifs v0.1.0/go.mod h1:RT4xRDMMMzEXPaSg87Dl7ODWd5bNxJiPptxRDcTxcVk= +github.com/datatrails/go-datatrails-merklelog/massifs v0.1.1 h1:ZdAIM0ojp1lFeocOtjwVLWx8fa3ytUKAmLCj4KFq9MU= +github.com/datatrails/go-datatrails-merklelog/massifs v0.1.1/go.mod h1:RT4xRDMMMzEXPaSg87Dl7ODWd5bNxJiPptxRDcTxcVk= github.com/datatrails/go-datatrails-merklelog/mmr v0.0.2 h1:Jxov4/onoFiCISLQNSPy/nyt3USAEvUZpEjlScHJYKI= github.com/datatrails/go-datatrails-merklelog/mmr v0.0.2/go.mod h1:+Oz8O6bns0rF6gr03xJzKTBzUzyskZ8Gics8/qeNzYk= github.com/datatrails/go-datatrails-merklelog/mmrtesting v0.1.0 h1:q9RXtAGydXKSJjARnFObNu743cbfIOfERTXiiVa2tF4= @@ -159,6 +159,8 @@ golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= diff --git a/replicatelogs.go b/replicatelogs.go index 5dfea9e..820a433 100644 --- a/replicatelogs.go +++ b/replicatelogs.go @@ -6,12 +6,21 @@ import ( "errors" "fmt" "sync" + "time" "github.com/datatrails/go-datatrails-common/cbor" "github.com/datatrails/go-datatrails-common/logger" "github.com/datatrails/go-datatrails-merklelog/massifs" "github.com/gosuri/uiprogress" "github.com/urfave/cli/v2" + "golang.org/x/exp/rand" +) + +const ( + // baseDefaultRetryDelay is the base delay for retrying transient errors. A little jitter is added. + // 429 errors which provide a valid Retry-After header will honor that header rather than use this. + baseDefaultRetryDelay = 2 * time.Second + defaultConcurrency = 5 ) var ( @@ -75,6 +84,21 @@ changes are read from standard input.`, Value: false, Aliases: []string{"p"}, }, + &cli.IntFlag{ + Name: "retries", + Aliases: []string{"r"}, + Value: -1, // -1 means no limit + Usage: ` +Set a maximum number of retries for transient error conditions. Set 0 to disable retries. +By default transient errors are re-tried without limit, and if the error is 429, the Retry-After header is honored.`, + }, + &cli.IntFlag{ + Name: "concurrency", + Value: defaultConcurrency, + Aliases: []string{"c"}, + Usage: fmt.Sprintf( + `The number of concurrent replication operations to run, defaults to %d. A high number is a sure way to get rate limited`, defaultConcurrency), + }, }, Action: func(cCtx *cli.Context) error { cmd := &CmdCtx{} @@ -99,58 +123,108 @@ changes are read from standard input.`, } progress := newProgressor(cCtx, "tenants", len(changes)) - var wg sync.WaitGroup - errChan := make(chan error, len(changes)) // buffered so it doesn't block - - for _, change := range changes { - wg.Add(1) - go func(change TenantMassif, errChan chan<- error) { - defer wg.Done() - defer progress.Completed() - - replicator, err := NewVerifiedReplica(cCtx, cmd.Clone()) - if err != nil { - errChan <- err - return - } - endMassif := uint32(change.Massif) - startMassif := uint32(0) - if cCtx.IsSet("ancestors") && uint32(cCtx.Int("ancestors")) < endMassif { - startMassif = endMassif - uint32(cCtx.Int("ancestors")) - } - - err = replicator.ReplicateVerifiedUpdates( - context.Background(), - change.Tenant, startMassif, endMassif, - ) - if err != nil { - errChan <- err - } - }(change, errChan) + concurency := min(len(changes), max(1, cCtx.Int("concurrency"))) + for i := 0; i < len(changes); i += concurency { + err = replicateChanges(cCtx, cmd, changes[i:min(i+concurency, len(changes))], progress) + if err != nil { + return err + } } - // the error channel is buffered enough for each tenant, so this will not get deadlocked - wg.Wait() - close(errChan) - - var errs []error - for err := range errChan { - cmd.log.Infof(err.Error()) - errs = append(errs, err) - } - if len(errs) > 0 { - return errs[0] - } - if len(changes) == 1 { - cmd.log.Infof("replication complete for tenant %s", changes[0].Tenant) - } else { - cmd.log.Infof("replication complete for %d tenants", len(changes)) - } return nil }, } } +// replicateChanges replicate the changes for the provided slice of tenants. +// Paralelism is limited by breaking the total changes into smaller slices and calling this function +func replicateChanges(cCtx *cli.Context, cmd *CmdCtx, changes []TenantMassif, progress Progresser) error { + + var wg sync.WaitGroup + errChan := make(chan error, len(changes)) // buffered so it doesn't block + + for _, change := range changes { + wg.Add(1) + go func(change TenantMassif, errChan chan<- error) { + defer wg.Done() + defer progress.Completed() + + retries := max(-1, cCtx.Int("retries")) + for { + + replicator, startMassif, endMassif, err := initReplication(cCtx, cmd, change) + if err != nil { + errChan <- err + return + } + + err = replicator.ReplicateVerifiedUpdates( + context.Background(), + change.Tenant, startMassif, endMassif, + ) + if err == nil { + return + } + + // 429 is the only transient error we currently re-try + var retryDelay time.Duration + retryDelay, ok := massifs.IsRateLimiting(err) + if !ok || retries == 0 { + // not transient + errChan <- err + return + } + if retryDelay == 0 { + retryDelay = defaultRetryDelay(err) + } + + // underflow will actually terminate the loop, but that would have been running for an infeasable amount of time + retries-- + // in the default case, remaining is always reported as -1 + cmd.log.Infof("retrying in %s, remaining: %d", retryDelay, max(-1, retries)) + } + }(change, errChan) + } + + // the error channel is buffered enough for each tenant, so this will not get deadlocked + wg.Wait() + close(errChan) + + var errs []error + for err := range errChan { + cmd.log.Infof(err.Error()) + errs = append(errs, err) + } + if len(errs) > 0 { + return errs[0] + } + if len(changes) == 1 { + cmd.log.Infof("replication complete for tenant %s", changes[0].Tenant) + } else { + cmd.log.Infof("replication complete for %d tenants", len(changes)) + } + return nil +} + +func initReplication(cCtx *cli.Context, cmd *CmdCtx, change TenantMassif) (*VerifiedReplica, uint32, uint32, error) { + + replicator, err := NewVerifiedReplica(cCtx, cmd.Clone()) + if err != nil { + return nil, 0, 0, err + } + endMassif := uint32(change.Massif) + startMassif := uint32(0) + if cCtx.IsSet("ancestors") && uint32(cCtx.Int("ancestors")) < endMassif { + startMassif = endMassif - uint32(cCtx.Int("ancestors")) + } + return replicator, startMassif, endMassif, nil +} + +func defaultRetryDelay(_ error) time.Duration { + // give the delay some jitter, this is universally a good practice + return baseDefaultRetryDelay + time.Duration(rand.Intn(100))*time.Millisecond +} + func newProgressor(cCtx *cli.Context, barName string, increments int) Progresser { if !cCtx.Bool("progress") { diff --git a/tests/replicatelogs/replicatelogs_azurite_test.go b/tests/replicatelogs/replicatelogs_azurite_test.go index 2af52c2..4723fac 100644 --- a/tests/replicatelogs/replicatelogs_azurite_test.go +++ b/tests/replicatelogs/replicatelogs_azurite_test.go @@ -571,6 +571,202 @@ func (s *ReplicateLogsCmdSuite) Test4MassifsForThreeTenants() { } } +// TestThreeTenantsOneAtATime uses --concurency to force the replication to go one tenant at a time +// The test just ensures the obvious boundary case works +func (s *ReplicateLogsCmdSuite) TestThreeTenantsOneAtATime() { + logger.New("TestThreeTenantsOneAtATime") + defer logger.OnExit() + + tc := massifs.NewLocalMassifReaderTestContext( + s.T(), logger.Sugar, "TestThreeTenantsOneAtATime") + + massifCount := uint32(4) + massifHeight := uint8(8) + + tenantId0 := tc.G.NewTenantIdentity() + // note: CreateLog both creates the massifs *and* populates them + tc.CreateLog(tenantId0, massifHeight, massifCount) + tenantId1 := tc.G.NewTenantIdentity() + tc.CreateLog(tenantId1, massifHeight, massifCount) + tenantId2 := tc.G.NewTenantIdentity() + tc.CreateLog(tenantId2, massifHeight, massifCount) + + changes := []struct { + TenantIdentity string `json:"tenant"` + MassifIndex int `json:"massifindex"` + }{ + {tenantId0, int(massifCount - 1)}, + {tenantId1, int(massifCount - 1)}, + {tenantId2, int(massifCount - 1)}, + } + + data, err := json.Marshal(changes) + s.NoError(err) + // note: the suite does a before & after pipe for Stdin + s.StdinWriteAndClose(data) + + replicaDir := s.T().TempDir() + + // note: VERACITY_IKWID is set in main, we need it to enable --envauth so we force it here + app := veracity.NewApp("tests", true) + veracity.AddCommands(app, true) + + err = app.Run([]string{ + "veracity", + "--envauth", // uses the emulator + "--container", tc.TestConfig.Container, + "--data-url", s.Env.AzuriteVerifiableDataURL, + "--height", fmt.Sprintf("%d", massifHeight), + "replicate-logs", + "--replicadir", replicaDir, + "--concurrency", "1", + }) + s.NoError(err) + + for _, change := range changes { + for i := range change.MassifIndex + 1 { + expectMassifFile := filepath.Join( + replicaDir, massifs.ReplicaRelativeMassifPath(change.TenantIdentity, uint32(i))) + s.FileExistsf( + expectMassifFile, "the replicated massif should exist") + expectSealFile := filepath.Join( + replicaDir, massifs.ReplicaRelativeSealPath(change.TenantIdentity, uint32(i))) + s.FileExistsf(expectSealFile, "the replicated seal should exist") + } + } +} + +// TestConcurrencyZero uses --concurency to force the replication to go one tenant at a time +// The test just ensures the obvious boundary case works +func (s *ReplicateLogsCmdSuite) TestConcurrencyZero() { + logger.New("TestConcurrencyZero") + defer logger.OnExit() + + tc := massifs.NewLocalMassifReaderTestContext( + s.T(), logger.Sugar, "TestConcurrencyZero") + + massifCount := uint32(4) + massifHeight := uint8(8) + + tenantId0 := tc.G.NewTenantIdentity() + // note: CreateLog both creates the massifs *and* populates them + tc.CreateLog(tenantId0, massifHeight, massifCount) + tenantId1 := tc.G.NewTenantIdentity() + tc.CreateLog(tenantId1, massifHeight, massifCount) + tenantId2 := tc.G.NewTenantIdentity() + tc.CreateLog(tenantId2, massifHeight, massifCount) + + changes := []struct { + TenantIdentity string `json:"tenant"` + MassifIndex int `json:"massifindex"` + }{ + {tenantId0, int(massifCount - 1)}, + {tenantId1, int(massifCount - 1)}, + {tenantId2, int(massifCount - 1)}, + } + + data, err := json.Marshal(changes) + s.NoError(err) + // note: the suite does a before & after pipe for Stdin + s.StdinWriteAndClose(data) + + replicaDir := s.T().TempDir() + + // note: VERACITY_IKWID is set in main, we need it to enable --envauth so we force it here + app := veracity.NewApp("tests", true) + veracity.AddCommands(app, true) + + err = app.Run([]string{ + "veracity", + "--envauth", // uses the emulator + "--container", tc.TestConfig.Container, + "--data-url", s.Env.AzuriteVerifiableDataURL, + "--height", fmt.Sprintf("%d", massifHeight), + "replicate-logs", + "--replicadir", replicaDir, + "--concurrency", "0", + }) + s.NoError(err) + + for _, change := range changes { + for i := range change.MassifIndex + 1 { + expectMassifFile := filepath.Join( + replicaDir, massifs.ReplicaRelativeMassifPath(change.TenantIdentity, uint32(i))) + s.FileExistsf( + expectMassifFile, "the replicated massif should exist") + expectSealFile := filepath.Join( + replicaDir, massifs.ReplicaRelativeSealPath(change.TenantIdentity, uint32(i))) + s.FileExistsf(expectSealFile, "the replicated seal should exist") + } + } +} + +// TestConcurrencyCappedToTenantCount sets --concurency greater than the number of tenants +// and shows all tenants are replicated +func (s *ReplicateLogsCmdSuite) TestConcurrencyCappedToTenantCount() { + logger.New("TestConcurrencyCappedToTenantCount") + defer logger.OnExit() + + tc := massifs.NewLocalMassifReaderTestContext( + s.T(), logger.Sugar, "TestConcurrencyCappedToTenantCount") + + massifCount := uint32(4) + massifHeight := uint8(8) + + tenantId0 := tc.G.NewTenantIdentity() + // note: CreateLog both creates the massifs *and* populates them + tc.CreateLog(tenantId0, massifHeight, massifCount) + tenantId1 := tc.G.NewTenantIdentity() + tc.CreateLog(tenantId1, massifHeight, massifCount) + tenantId2 := tc.G.NewTenantIdentity() + tc.CreateLog(tenantId2, massifHeight, massifCount) + + changes := []struct { + TenantIdentity string `json:"tenant"` + MassifIndex int `json:"massifindex"` + }{ + {tenantId0, int(massifCount - 1)}, + {tenantId1, int(massifCount - 1)}, + {tenantId2, int(massifCount - 1)}, + } + + data, err := json.Marshal(changes) + s.NoError(err) + // note: the suite does a before & after pipe for Stdin + s.StdinWriteAndClose(data) + + replicaDir := s.T().TempDir() + + // note: VERACITY_IKWID is set in main, we need it to enable --envauth so we force it here + app := veracity.NewApp("tests", true) + veracity.AddCommands(app, true) + + err = app.Run([]string{ + "veracity", + "--envauth", // uses the emulator + "--container", tc.TestConfig.Container, + "--data-url", s.Env.AzuriteVerifiableDataURL, + "--height", fmt.Sprintf("%d", massifHeight), + "replicate-logs", + "--replicadir", replicaDir, + "--concurrency", "30000", + }) + s.NoError(err) + + for _, change := range changes { + for i := range change.MassifIndex + 1 { + expectMassifFile := filepath.Join( + replicaDir, massifs.ReplicaRelativeMassifPath(change.TenantIdentity, uint32(i))) + s.FileExistsf( + expectMassifFile, "the replicated massif should exist") + expectSealFile := filepath.Join( + replicaDir, massifs.ReplicaRelativeSealPath(change.TenantIdentity, uint32(i))) + s.FileExistsf(expectSealFile, "the replicated seal should exist") + } + } + +} + // newTestLocalReader creates a new LocalReader // This provides a convenient way to interact with the massifs locally replicated by integration tests. func newTestLocalReader(