diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index cb902d829..1efb887ba 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -193,5 +193,5 @@ func composeAndExecute() { func init() { rootCmd.AddCommand(composeAndExecuteCmd) composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events") - composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs") + composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs") } diff --git a/cmd/execute.go b/cmd/execute.go index ecccdee03..6579ffdea 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -139,7 +139,7 @@ func execute() { func init() { rootCmd.AddCommand(executeCmd) executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events") - executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs") + executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs") } type Exporter interface { diff --git a/cmd/root.go b/cmd/root.go index dba91864b..ef2475db7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -36,22 +36,22 @@ import ( ) var ( - cfgFile string - databaseConfig config.Database - genConfig config.Plugin - ipc string - levelDbPath string + cfgFile string + databaseConfig config.Database + genConfig config.Plugin + ipc string + levelDbPath string queueRecheckInterval time.Duration - startingBlockNumber int64 - storageDiffsPath string - syncAll bool - endingBlockNumber int64 - recheckHeadersArg bool + startingBlockNumber int64 + storageDiffsPath string + syncAll bool + endingBlockNumber int64 + recheckHeadersArg bool ) const ( - pollingInterval = 7 * time.Second - validationWindow = 15 + pollingInterval = 7 * time.Second + validationWindow = 15 ) var rootCmd = &cobra.Command{ diff --git a/libraries/shared/storage/utils/errors.go b/libraries/shared/storage/utils/errors.go index 0c87c6424..bf2bcc4a2 100644 --- a/libraries/shared/storage/utils/errors.go +++ b/libraries/shared/storage/utils/errors.go @@ -17,9 +17,12 @@ package utils import ( + "errors" "fmt" ) +var ErrRowExists = errors.New("parsed row for storage diff already exists") + type ErrContractNotFound struct { Contract string } diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 7b2c5362e..ac5b8a8d7 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -78,7 +78,7 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) { return } executeErr := storageTransformer.Execute(row) - if executeErr != nil { + if executeErr != nil && executeErr != utils.ErrRowExists { logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) queueErr := storageWatcher.Queue.Add(row) if queueErr != nil { @@ -100,7 +100,7 @@ func (storageWatcher StorageWatcher) processQueue() { continue } executeErr := storageTransformer.Execute(row) - if executeErr == nil { + if executeErr == nil || executeErr == utils.ErrRowExists { storageWatcher.deleteRow(row.Id) } } diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index 6c12358da..e3d804d96 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -109,6 +109,18 @@ var _ = Describe("Storage Watcher", func() { close(done) }) + It("does not queue row if transformer execution fails because row already exists", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrRowExists + + go storageWatcher.Execute(rows, errs, time.Hour) + + Expect(<-errs).To(BeNil()) + Consistently(func() bool { + return mockQueue.AddCalled + }).Should(BeFalse()) + close(done) + }) + It("queues row for later processing if transformer execution fails", func(done Done) { mockTransformer.ExecuteErr = fakes.FakeError @@ -187,6 +199,17 @@ var _ = Describe("Storage Watcher", func() { close(done) }) + It("deletes row from queue if transformer execution errors because row already exists", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrRowExists + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(row.Id)) + close(done) + }) + It("logs error if deleting persisted row fails", func(done Done) { mockQueue.DeleteErr = fakes.FakeError tempFile, fileErr := ioutil.TempFile("", "log")