diff --git a/bundle/bundle.go b/bundle/bundle.go index 04fe72b..81e43df 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -20,9 +20,10 @@ type Bundle struct { userOpHash common.Hash ops *operation.BundleOperations - atlasTxHash common.Hash - relayErr *relayerror.Error + atlasTxHashes []common.Hash + relayErr *relayerror.Error + complete bool completionSubs []chan *Bundle createdAt time.Time @@ -34,28 +35,36 @@ func NewBundle(userOpHash common.Hash, bundleOps *operation.BundleOperations) *B return &Bundle{ userOpHash: userOpHash, ops: bundleOps, + atlasTxHashes: make([]common.Hash, 0), completionSubs: make([]chan *Bundle, 0), createdAt: time.Now(), } } -func (b *Bundle) GetResult() (common.Hash, *relayerror.Error) { +func (b *Bundle) GetResult() (interface{}, *relayerror.Error) { b.mu.RLock() defer b.mu.RUnlock() - return b.atlasTxHash, b.relayErr + if b.isMultiBundler() { + return b.atlasTxHashes, nil + } + + return b.atlasTxHashes[0], nil } -func (b *Bundle) SetAtlasTxHash(txHash common.Hash) *relayerror.Error { +func (b *Bundle) SetAtlasTxHash(txHash common.Hash, multiBundler bool) *relayerror.Error { b.mu.Lock() defer b.mu.Unlock() - if b.atlasTxHash != (common.Hash{}) { - log.Info("bundle has already been bundled", "bundledTxHash", b.atlasTxHash.Hex(), "newTxHash", txHash.Hex()) + if !multiBundler && len(b.atlasTxHashes) == 1 { + log.Info("bundle has already been bundled", "bundledTxHash", b.atlasTxHashes[0].Hex(), "newTxHash", txHash.Hex()) return ErrAlreadyBundled } - b.atlasTxHash = txHash - b.notifyCompletionSubs() + b.atlasTxHashes = append(b.atlasTxHashes, txHash) + + if !multiBundler { + b.notifyCompletionSubs() + } return nil } @@ -75,7 +84,12 @@ func (b *Bundle) SetRelayError(relayErr *relayerror.Error) *relayerror.Error { return nil } +func (b *Bundle) NotifyCompletionSubs() { + b.notifyCompletionSubs() +} + func (b *Bundle) notifyCompletionSubs() { + b.complete = true for _, subChan := range b.completionSubs { select { case subChan <- b: @@ -92,22 +106,26 @@ func (b *Bundle) addCompletionSub(subChan chan *Bundle) { b.completionSubs = append(b.completionSubs, subChan) } -func (b *Bundle) getAtlasTxHash(completionChan chan *Bundle) (common.Hash, *relayerror.Error) { - atlasTxHash, relayErr := b.GetResult() +// Return an array of hashes if the bundle is multi-bundler, otherwise return a single hash +func (b *Bundle) getAtlasTxHashes(completionChan chan *Bundle) (interface{}, *relayerror.Error) { + atlasTxHashes, relayErr := b.GetResult() if relayErr != nil { - return common.Hash{}, relayErr + return nil, relayErr } - bundled := atlasTxHash != (common.Hash{}) - if completionChan != nil && !bundled { + if completionChan != nil && !b.complete { b.addCompletionSub(completionChan) - return common.Hash{}, nil + return nil, nil } - if !bundled { - log.Info("bundle has not been bundled yet", "userOpHash", b.userOpHash.Hex()) - return common.Hash{}, ErrNotBundledYet + if !b.complete { + log.Info("bundle has not been completed yet", "userOpHash", b.userOpHash.Hex()) + return nil, ErrNotBundledYet } - return atlasTxHash, nil + return atlasTxHashes, nil +} + +func (b *Bundle) isMultiBundler() bool { + return b.ops.DAppOperation.Bundler == common.Address{} } diff --git a/bundle/manager.go b/bundle/manager.go index 7344136..327b6be 100644 --- a/bundle/manager.go +++ b/bundle/manager.go @@ -59,7 +59,7 @@ func (bm *Manager) bundlesCleaner() { for range time.Tick(10 * time.Minute) { bm.mu.Lock() for userOpHash, bundle := range bm.bundles { - if (bundle.atlasTxHash != (common.Hash{}) || bundle.relayErr != nil) && time.Since(bundle.createdAt) > time.Hour { + if (len(bundle.atlasTxHashes) > 0 || bundle.relayErr != nil) && time.Since(bundle.createdAt) > time.Hour { delete(bm.bundles, userOpHash) } } @@ -177,15 +177,15 @@ func (bm *Manager) UnregisterBundle(userOpHash common.Hash) { delete(bm.bundles, userOpHash) } -func (bm *Manager) GetBundleHash(userOpHash common.Hash, completionChan chan *Bundle) (common.Hash, *relayerror.Error) { +func (bm *Manager) GetBundleHash(userOpHash common.Hash, completionChan chan *Bundle) (interface{}, *relayerror.Error) { bm.mu.RLock() defer bm.mu.RUnlock() bundle, ok := bm.bundles[userOpHash] if !ok { log.Info("bundle not found", "userOpHash", userOpHash.Hex()) - return common.Hash{}, ErrBundleNotFound + return nil, ErrBundleNotFound } - return bundle.getAtlasTxHash(completionChan) + return bundle.getAtlasTxHashes(completionChan) } diff --git a/core/relay.go b/core/relay.go index 1a251c1..b1e5d9e 100644 --- a/core/relay.go +++ b/core/relay.go @@ -123,7 +123,7 @@ func (r *Relay) auctionCompleteCallback(bundleOps *operation.BundleOperations) { return } - callChainHash, err := bundleOps.CallChainHash(dAppConfig.CallConfig, dAppConfig.To) + callChainHash, err := bundleOps.CallChainHash() if err != nil { log.Info("failed to compute call chain hash", "err", err) return @@ -184,7 +184,7 @@ func (r *Relay) submitBundleOperations(bundleOps *operation.BundleOperations) (s return "", relayErr } - if err := r.server.ForwardBundle(bundleOps, bundle.SetAtlasTxHash, bundle.SetRelayError); err != nil { + if err := r.server.ForwardBundle(bundleOps, bundle.SetAtlasTxHash, bundle.SetRelayError, bundle.NotifyCompletionSubs); err != nil { r.bundleManager.UnregisterBundle(userOpHash) return "", ErrForwardBundle.AddError(err) } @@ -192,7 +192,7 @@ func (r *Relay) submitBundleOperations(bundleOps *operation.BundleOperations) (s return BundleSuccessfullySubmitted, nil } -func (r *Relay) getBundleHash(userOpHash common.Hash, completionChan chan *bundle.Bundle) (common.Hash, *relayerror.Error) { +func (r *Relay) getBundleHash(userOpHash common.Hash, completionChan chan *bundle.Bundle) (interface{}, *relayerror.Error) { return r.bundleManager.GetBundleHash(userOpHash, completionChan) } diff --git a/core/server.go b/core/server.go index 3b1e8aa..a243ca0 100644 --- a/core/server.go +++ b/core/server.go @@ -107,10 +107,11 @@ var ( type newSolverOperationFn func(*operation.SolverOperation) (common.Hash, *relayerror.Error) type getSolverOperationStatusFn func(common.Hash, chan *auction.SolverStatus) (*auction.SolverStatus, *relayerror.Error) type getDAppSignatoriesFn func(common.Address) ([]common.Address, *relayerror.Error) -type setAtlasTxHashFn func(common.Hash) *relayerror.Error +type setAtlasTxHashFn func(common.Hash, bool) *relayerror.Error type setRelayErrorFn func(*relayerror.Error) *relayerror.Error type submitBundleOperationsFn func(*operation.BundleOperations) (string, *relayerror.Error) type registerBundleErrorFn func(common.Hash, *relayerror.Error) +type notifyCompletionSubsFn func() type RequestParams struct { Topic string `json:"topic"` @@ -253,11 +254,14 @@ func (c *Conn) isSignatory() bool { } type bundlingRequest struct { - candidatesBundlers map[common.Address]*Conn - offlineBundlers map[common.Address]bool - setAtlasTxHash setAtlasTxHashFn - setRelayError setRelayErrorFn - doneChan chan struct{} + multiBundler bool + candidatesBundlers map[common.Address]*Conn + offlineBundlers map[common.Address]bool + setAtlasTxHash setAtlasTxHashFn + setRelayError setRelayErrorFn + notifyCompletionSubs notifyCompletionSubsFn + doneChan chan struct{} + multiBundlerDoneChans map[common.Address]chan struct{} } type SigningRequest struct { @@ -441,13 +445,16 @@ func (s *Server) BroadcastUserOperationPartial(userOperationPartialRaw *operatio s.publish(broadcast) } -func (s *Server) ForwardBundle(bundleOps *operation.BundleOperations, setAtlasTxHash setAtlasTxHashFn, setRelayError setRelayErrorFn) *relayerror.Error { +func (s *Server) ForwardBundle(bundleOps *operation.BundleOperations, setAtlasTxHash setAtlasTxHashFn, setRelayError setRelayErrorFn, notifyCompletionSubs notifyCompletionSubsFn) *relayerror.Error { bundlingRequest := &bundlingRequest{ - candidatesBundlers: make(map[common.Address]*Conn), - offlineBundlers: make(map[common.Address]bool), - setAtlasTxHash: setAtlasTxHash, - setRelayError: setRelayError, - doneChan: make(chan struct{}), + multiBundler: bundleOps.DAppOperation.Bundler == (common.Address{}), + candidatesBundlers: make(map[common.Address]*Conn), + offlineBundlers: make(map[common.Address]bool), + setAtlasTxHash: setAtlasTxHash, + setRelayError: setRelayError, + notifyCompletionSubs: notifyCompletionSubs, + multiBundlerDoneChans: make(map[common.Address]chan struct{}), + doneChan: make(chan struct{}), } var firstCandidate *Conn @@ -455,28 +462,35 @@ func (s *Server) ForwardBundle(bundleOps *operation.BundleOperations, setAtlasTx s.mu.Lock() defer s.mu.Unlock() - primaryBundler, primaryOnline := s.bundlers[bundleOps.DAppOperation.Bundler] - if primaryOnline { - firstCandidate = primaryBundler - bundlingRequest.candidatesBundlers[bundleOps.DAppOperation.Bundler] = primaryBundler - } - - // Retrieve dApp signatories, which are allowed bundlers - signatories, relayErr := s.getDAppSignatories(bundleOps.DAppOperation.Control) - if relayErr != nil { - log.Info("failed to get dApp signatories", "control", bundleOps.DAppOperation.Control.Hex(), "err", relayErr.Message) - if !primaryOnline { - return ErrBundlerOffline + if bundlingRequest.multiBundler { + for _, conn := range s.bundlers { + bundlingRequest.candidatesBundlers[conn.bundler] = conn + bundlingRequest.multiBundlerDoneChans[conn.bundler] = make(chan struct{}) + } + } else { + primaryBundler, primaryOnline := s.bundlers[bundleOps.DAppOperation.Bundler] + if primaryOnline { + firstCandidate = primaryBundler + bundlingRequest.candidatesBundlers[bundleOps.DAppOperation.Bundler] = primaryBundler } - } - for _, signatory := range signatories { - conn, online := s.bundlers[signatory] - if online { - bundlingRequest.candidatesBundlers[signatory] = conn + // Retrieve dApp signatories, which are allowed bundlers + signatories, relayErr := s.getDAppSignatories(bundleOps.DAppOperation.Control) + if relayErr != nil { + log.Info("failed to get dApp signatories", "control", bundleOps.DAppOperation.Control.Hex(), "err", relayErr.Message) + if !primaryOnline { + return ErrBundlerOffline + } } - if firstCandidate == nil { - firstCandidate = conn + + for _, signatory := range signatories { + conn, online := s.bundlers[signatory] + if online { + bundlingRequest.candidatesBundlers[signatory] = conn + } + if firstCandidate == nil { + firstCandidate = conn + } } } @@ -511,12 +525,57 @@ func (s *Server) runBundlingRequest(id string, bundleReq *BundleRequest, firstCa return nil } + deleteBundlingRequest := func() { + s.mu.Lock() + delete(s.bundlingRequests, id) + s.mu.Unlock() + } + + if bundlingRequest.multiBundler { + var ( + responseReceived bool + wg sync.WaitGroup + ) + + // Contacting all candidates simultaneously + for _, candidate := range bundlingRequest.candidatesBundlers { + wg.Add(1) + go func(candidate *Conn) { + if relayErr := candidate.send(bundleReq); relayErr != nil { + log.Info("failed to send bundle request", "bundler", nextCandidate.bundler.Hex(), "err", relayErr.Message) + return + } + + select { + case <-time.After(BundlerTimeout): + log.Info("bundler timed out", "bundler", nextCandidate.bundler.Hex()) + + case <-bundlingRequest.multiBundlerDoneChans[candidate.bundler]: + // Got a response + responseReceived = true + } + }(candidate) + } + + wg.Wait() + + deleteBundlingRequest() + + if !responseReceived { + // All candidates failed + bundlingRequest.setRelayError(ErrBundlerOffline) + return + } + + bundlingRequest.notifyCompletionSubs() + return + } + + // Not multi-bundler for { if nextCandidate == nil { // No more candidates - s.mu.Lock() - delete(s.bundlingRequests, id) - s.mu.Unlock() + deleteBundlingRequest() bundlingRequest.setRelayError(ErrBundlerOffline) return } @@ -690,7 +749,7 @@ func (s *Server) processSolverMessage(conn *Conn, msg []byte) { conn.send(resp) } -func (s *Server) processBundlerMessage(msg []byte) { +func (s *Server) processBundlerMessage(msg []byte, bundler common.Address) { var resp *BundleResponse if err := json.Unmarshal(msg, &resp); err != nil { log.Info("failed to unmarshal bundler message", "err", err) @@ -722,10 +781,19 @@ func (s *Server) processBundlerMessage(msg []byte) { return } - close(bundlingRequest.doneChan) - delete(s.bundlingRequests, resp.Id) + bundlingRequest.setAtlasTxHash(resp.Result, bundlingRequest.multiBundler) - bundlingRequest.setAtlasTxHash(resp.Result) + if bundlingRequest.multiBundler { + doneChan, exist := bundlingRequest.multiBundlerDoneChans[bundler] + if !exist { + log.Info("unexpected bundler response", "id", resp.Id, "bundler", bundler.Hex(), "message", string(msg)) + return + } + close(doneChan) + } else { + close(bundlingRequest.doneChan) + delete(s.bundlingRequests, resp.Id) + } } func (s *Server) processSignatoryMessage(msg []byte) { @@ -906,7 +974,7 @@ func (s *Server) readPump(conn *Conn, doneChan chan<- struct{}) { } if conn.isBundler() { - s.processBundlerMessage(msg) + s.processBundlerMessage(msg, conn.bundler) } else if conn.isSignatory() { s.processSignatoryMessage(msg) } else { diff --git a/operation/bundle.go b/operation/bundle.go index fd0d1e2..176a252 100644 --- a/operation/bundle.go +++ b/operation/bundle.go @@ -69,7 +69,7 @@ func (b *BundleOperations) Validate(ethClient *ethclient.Client, userOpHash comm ) if utils.FlagVerifyCallChainHash(dAppConfig.CallConfig) { - callChainHash, err = b.CallChainHash(dAppConfig.CallConfig, dAppConfig.To) + callChainHash, err = b.CallChainHash() if err != nil { log.Info("failed to compute call chain hash", "err", err) return relayerror.ErrServerInternal @@ -83,13 +83,7 @@ func (b *BundleOperations) Validate(ethClient *ethclient.Client, userOpHash comm return nil } -func (b *BundleOperations) CallChainHash(callConfig uint32, dAppControl common.Address) (common.Hash, error) { - callSequence := []byte{} - - if utils.FlagRequirePreOps(callConfig) { - callSequence = append(callSequence, dAppControl.Bytes()...) - } - +func (b *BundleOperations) CallChainHash() (common.Hash, error) { userOpAbiEncoded, err := b.UserOperation.AbiEncode() if err != nil { return common.Hash{}, err @@ -100,8 +94,8 @@ func (b *BundleOperations) CallChainHash(callConfig uint32, dAppControl common.A return common.Hash{}, err } - callSequence = append(callSequence, userOpAbiEncoded...) - callSequence = append(callSequence, solverOpsAbiEncoded...) - - return crypto.Keccak256Hash(callSequence), nil + return crypto.Keccak256Hash( + userOpAbiEncoded, + solverOpsAbiEncoded, + ), nil } diff --git a/operation/bundle_test.go b/operation/bundle_test.go index 30d7aa9..072a6ea 100644 --- a/operation/bundle_test.go +++ b/operation/bundle_test.go @@ -3,7 +3,6 @@ package operation import ( "testing" - "github.com/FastLane-Labs/atlas-operations-relay/utils" "github.com/ethereum/go-ethereum/common" ) @@ -20,66 +19,34 @@ func generateBundleOperations(solverOpsCount int) *BundleOperations { } } -func TestBundleOperationsCallChainHashWithRequiredPreOpsAndWithSolverOps(t *testing.T) { - t.Parallel() - - bundleOps := generateBundleOperations(3) - want := common.HexToHash("0xce835ea8087710762b1d392a3225f5cb50adb278093945e1835b3cc5f3033a82") - - result, err := bundleOps.CallChainHash(uint32(1)<