Skip to content

Commit

Permalink
Merge pull request #72 from FastLane-Labs/multi-bundler
Browse files Browse the repository at this point in the history
Multi bundler
  • Loading branch information
jj1980a authored Aug 9, 2024
2 parents 826fbff + adfe2bc commit e6ffaf9
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 131 deletions.
56 changes: 37 additions & 19 deletions bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ type Bundle struct {
userOpHash common.Hash
ops *operation.BundleOperations

atlasTxHash common.Hash
relayErr *relayerror.Error
atlasTxHashes []common.Hash
relayErr *relayerror.Error

complete bool
completionSubs []chan *Bundle

createdAt time.Time
Expand All @@ -34,28 +35,36 @@ func NewBundle(userOpHash common.Hash, bundleOps *operation.BundleOperations) *B
return &Bundle{
userOpHash: userOpHash,
ops: bundleOps,
atlasTxHashes: make([]common.Hash, 0),
completionSubs: make([]chan *Bundle, 0),
createdAt: time.Now(),
}
}

func (b *Bundle) GetResult() (common.Hash, *relayerror.Error) {
func (b *Bundle) GetResult() (interface{}, *relayerror.Error) {
b.mu.RLock()
defer b.mu.RUnlock()
return b.atlasTxHash, b.relayErr
if b.isMultiBundler() {
return b.atlasTxHashes, nil
}

return b.atlasTxHashes[0], nil
}

func (b *Bundle) SetAtlasTxHash(txHash common.Hash) *relayerror.Error {
func (b *Bundle) SetAtlasTxHash(txHash common.Hash, multiBundler bool) *relayerror.Error {
b.mu.Lock()
defer b.mu.Unlock()

if b.atlasTxHash != (common.Hash{}) {
log.Info("bundle has already been bundled", "bundledTxHash", b.atlasTxHash.Hex(), "newTxHash", txHash.Hex())
if !multiBundler && len(b.atlasTxHashes) == 1 {
log.Info("bundle has already been bundled", "bundledTxHash", b.atlasTxHashes[0].Hex(), "newTxHash", txHash.Hex())
return ErrAlreadyBundled
}

b.atlasTxHash = txHash
b.notifyCompletionSubs()
b.atlasTxHashes = append(b.atlasTxHashes, txHash)

if !multiBundler {
b.notifyCompletionSubs()
}

return nil
}
Expand All @@ -75,7 +84,12 @@ func (b *Bundle) SetRelayError(relayErr *relayerror.Error) *relayerror.Error {
return nil
}

func (b *Bundle) NotifyCompletionSubs() {
b.notifyCompletionSubs()
}

func (b *Bundle) notifyCompletionSubs() {
b.complete = true
for _, subChan := range b.completionSubs {
select {
case subChan <- b:
Expand All @@ -92,22 +106,26 @@ func (b *Bundle) addCompletionSub(subChan chan *Bundle) {
b.completionSubs = append(b.completionSubs, subChan)
}

func (b *Bundle) getAtlasTxHash(completionChan chan *Bundle) (common.Hash, *relayerror.Error) {
atlasTxHash, relayErr := b.GetResult()
// Return an array of hashes if the bundle is multi-bundler, otherwise return a single hash
func (b *Bundle) getAtlasTxHashes(completionChan chan *Bundle) (interface{}, *relayerror.Error) {
atlasTxHashes, relayErr := b.GetResult()
if relayErr != nil {
return common.Hash{}, relayErr
return nil, relayErr
}

bundled := atlasTxHash != (common.Hash{})
if completionChan != nil && !bundled {
if completionChan != nil && !b.complete {
b.addCompletionSub(completionChan)
return common.Hash{}, nil
return nil, nil
}

if !bundled {
log.Info("bundle has not been bundled yet", "userOpHash", b.userOpHash.Hex())
return common.Hash{}, ErrNotBundledYet
if !b.complete {
log.Info("bundle has not been completed yet", "userOpHash", b.userOpHash.Hex())
return nil, ErrNotBundledYet
}

return atlasTxHash, nil
return atlasTxHashes, nil
}

func (b *Bundle) isMultiBundler() bool {
return b.ops.DAppOperation.Bundler == common.Address{}
}
8 changes: 4 additions & 4 deletions bundle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (bm *Manager) bundlesCleaner() {
for range time.Tick(10 * time.Minute) {
bm.mu.Lock()
for userOpHash, bundle := range bm.bundles {
if (bundle.atlasTxHash != (common.Hash{}) || bundle.relayErr != nil) && time.Since(bundle.createdAt) > time.Hour {
if (len(bundle.atlasTxHashes) > 0 || bundle.relayErr != nil) && time.Since(bundle.createdAt) > time.Hour {
delete(bm.bundles, userOpHash)
}
}
Expand Down Expand Up @@ -177,15 +177,15 @@ func (bm *Manager) UnregisterBundle(userOpHash common.Hash) {
delete(bm.bundles, userOpHash)
}

func (bm *Manager) GetBundleHash(userOpHash common.Hash, completionChan chan *Bundle) (common.Hash, *relayerror.Error) {
func (bm *Manager) GetBundleHash(userOpHash common.Hash, completionChan chan *Bundle) (interface{}, *relayerror.Error) {
bm.mu.RLock()
defer bm.mu.RUnlock()

bundle, ok := bm.bundles[userOpHash]
if !ok {
log.Info("bundle not found", "userOpHash", userOpHash.Hex())
return common.Hash{}, ErrBundleNotFound
return nil, ErrBundleNotFound
}

return bundle.getAtlasTxHash(completionChan)
return bundle.getAtlasTxHashes(completionChan)
}
6 changes: 3 additions & 3 deletions core/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *Relay) auctionCompleteCallback(bundleOps *operation.BundleOperations) {
return
}

callChainHash, err := bundleOps.CallChainHash(dAppConfig.CallConfig, dAppConfig.To)
callChainHash, err := bundleOps.CallChainHash()
if err != nil {
log.Info("failed to compute call chain hash", "err", err)
return
Expand Down Expand Up @@ -184,15 +184,15 @@ func (r *Relay) submitBundleOperations(bundleOps *operation.BundleOperations) (s
return "", relayErr
}

if err := r.server.ForwardBundle(bundleOps, bundle.SetAtlasTxHash, bundle.SetRelayError); err != nil {
if err := r.server.ForwardBundle(bundleOps, bundle.SetAtlasTxHash, bundle.SetRelayError, bundle.NotifyCompletionSubs); err != nil {
r.bundleManager.UnregisterBundle(userOpHash)
return "", ErrForwardBundle.AddError(err)
}

return BundleSuccessfullySubmitted, nil
}

func (r *Relay) getBundleHash(userOpHash common.Hash, completionChan chan *bundle.Bundle) (common.Hash, *relayerror.Error) {
func (r *Relay) getBundleHash(userOpHash common.Hash, completionChan chan *bundle.Bundle) (interface{}, *relayerror.Error) {
return r.bundleManager.GetBundleHash(userOpHash, completionChan)
}

Expand Down
146 changes: 107 additions & 39 deletions core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ var (
type newSolverOperationFn func(*operation.SolverOperation) (common.Hash, *relayerror.Error)
type getSolverOperationStatusFn func(common.Hash, chan *auction.SolverStatus) (*auction.SolverStatus, *relayerror.Error)
type getDAppSignatoriesFn func(common.Address) ([]common.Address, *relayerror.Error)
type setAtlasTxHashFn func(common.Hash) *relayerror.Error
type setAtlasTxHashFn func(common.Hash, bool) *relayerror.Error
type setRelayErrorFn func(*relayerror.Error) *relayerror.Error
type submitBundleOperationsFn func(*operation.BundleOperations) (string, *relayerror.Error)
type registerBundleErrorFn func(common.Hash, *relayerror.Error)
type notifyCompletionSubsFn func()

type RequestParams struct {
Topic string `json:"topic"`
Expand Down Expand Up @@ -253,11 +254,14 @@ func (c *Conn) isSignatory() bool {
}

type bundlingRequest struct {
candidatesBundlers map[common.Address]*Conn
offlineBundlers map[common.Address]bool
setAtlasTxHash setAtlasTxHashFn
setRelayError setRelayErrorFn
doneChan chan struct{}
multiBundler bool
candidatesBundlers map[common.Address]*Conn
offlineBundlers map[common.Address]bool
setAtlasTxHash setAtlasTxHashFn
setRelayError setRelayErrorFn
notifyCompletionSubs notifyCompletionSubsFn
doneChan chan struct{}
multiBundlerDoneChans map[common.Address]chan struct{}
}

type SigningRequest struct {
Expand Down Expand Up @@ -441,42 +445,52 @@ func (s *Server) BroadcastUserOperationPartial(userOperationPartialRaw *operatio
s.publish(broadcast)
}

func (s *Server) ForwardBundle(bundleOps *operation.BundleOperations, setAtlasTxHash setAtlasTxHashFn, setRelayError setRelayErrorFn) *relayerror.Error {
func (s *Server) ForwardBundle(bundleOps *operation.BundleOperations, setAtlasTxHash setAtlasTxHashFn, setRelayError setRelayErrorFn, notifyCompletionSubs notifyCompletionSubsFn) *relayerror.Error {
bundlingRequest := &bundlingRequest{
candidatesBundlers: make(map[common.Address]*Conn),
offlineBundlers: make(map[common.Address]bool),
setAtlasTxHash: setAtlasTxHash,
setRelayError: setRelayError,
doneChan: make(chan struct{}),
multiBundler: bundleOps.DAppOperation.Bundler == (common.Address{}),
candidatesBundlers: make(map[common.Address]*Conn),
offlineBundlers: make(map[common.Address]bool),
setAtlasTxHash: setAtlasTxHash,
setRelayError: setRelayError,
notifyCompletionSubs: notifyCompletionSubs,
multiBundlerDoneChans: make(map[common.Address]chan struct{}),
doneChan: make(chan struct{}),
}

var firstCandidate *Conn

s.mu.Lock()
defer s.mu.Unlock()

primaryBundler, primaryOnline := s.bundlers[bundleOps.DAppOperation.Bundler]
if primaryOnline {
firstCandidate = primaryBundler
bundlingRequest.candidatesBundlers[bundleOps.DAppOperation.Bundler] = primaryBundler
}

// Retrieve dApp signatories, which are allowed bundlers
signatories, relayErr := s.getDAppSignatories(bundleOps.DAppOperation.Control)
if relayErr != nil {
log.Info("failed to get dApp signatories", "control", bundleOps.DAppOperation.Control.Hex(), "err", relayErr.Message)
if !primaryOnline {
return ErrBundlerOffline
if bundlingRequest.multiBundler {
for _, conn := range s.bundlers {
bundlingRequest.candidatesBundlers[conn.bundler] = conn
bundlingRequest.multiBundlerDoneChans[conn.bundler] = make(chan struct{})
}
} else {
primaryBundler, primaryOnline := s.bundlers[bundleOps.DAppOperation.Bundler]
if primaryOnline {
firstCandidate = primaryBundler
bundlingRequest.candidatesBundlers[bundleOps.DAppOperation.Bundler] = primaryBundler
}
}

for _, signatory := range signatories {
conn, online := s.bundlers[signatory]
if online {
bundlingRequest.candidatesBundlers[signatory] = conn
// Retrieve dApp signatories, which are allowed bundlers
signatories, relayErr := s.getDAppSignatories(bundleOps.DAppOperation.Control)
if relayErr != nil {
log.Info("failed to get dApp signatories", "control", bundleOps.DAppOperation.Control.Hex(), "err", relayErr.Message)
if !primaryOnline {
return ErrBundlerOffline
}
}
if firstCandidate == nil {
firstCandidate = conn

for _, signatory := range signatories {
conn, online := s.bundlers[signatory]
if online {
bundlingRequest.candidatesBundlers[signatory] = conn
}
if firstCandidate == nil {
firstCandidate = conn
}
}
}

Expand Down Expand Up @@ -511,12 +525,57 @@ func (s *Server) runBundlingRequest(id string, bundleReq *BundleRequest, firstCa
return nil
}

deleteBundlingRequest := func() {
s.mu.Lock()
delete(s.bundlingRequests, id)
s.mu.Unlock()
}

if bundlingRequest.multiBundler {
var (
responseReceived bool
wg sync.WaitGroup
)

// Contacting all candidates simultaneously
for _, candidate := range bundlingRequest.candidatesBundlers {
wg.Add(1)
go func(candidate *Conn) {
if relayErr := candidate.send(bundleReq); relayErr != nil {
log.Info("failed to send bundle request", "bundler", nextCandidate.bundler.Hex(), "err", relayErr.Message)
return
}

select {
case <-time.After(BundlerTimeout):
log.Info("bundler timed out", "bundler", nextCandidate.bundler.Hex())

case <-bundlingRequest.multiBundlerDoneChans[candidate.bundler]:
// Got a response
responseReceived = true
}
}(candidate)
}

wg.Wait()

deleteBundlingRequest()

if !responseReceived {
// All candidates failed
bundlingRequest.setRelayError(ErrBundlerOffline)
return
}

bundlingRequest.notifyCompletionSubs()
return
}

// Not multi-bundler
for {
if nextCandidate == nil {
// No more candidates
s.mu.Lock()
delete(s.bundlingRequests, id)
s.mu.Unlock()
deleteBundlingRequest()
bundlingRequest.setRelayError(ErrBundlerOffline)
return
}
Expand Down Expand Up @@ -690,7 +749,7 @@ func (s *Server) processSolverMessage(conn *Conn, msg []byte) {
conn.send(resp)
}

func (s *Server) processBundlerMessage(msg []byte) {
func (s *Server) processBundlerMessage(msg []byte, bundler common.Address) {
var resp *BundleResponse
if err := json.Unmarshal(msg, &resp); err != nil {
log.Info("failed to unmarshal bundler message", "err", err)
Expand Down Expand Up @@ -722,10 +781,19 @@ func (s *Server) processBundlerMessage(msg []byte) {
return
}

close(bundlingRequest.doneChan)
delete(s.bundlingRequests, resp.Id)
bundlingRequest.setAtlasTxHash(resp.Result, bundlingRequest.multiBundler)

bundlingRequest.setAtlasTxHash(resp.Result)
if bundlingRequest.multiBundler {
doneChan, exist := bundlingRequest.multiBundlerDoneChans[bundler]
if !exist {
log.Info("unexpected bundler response", "id", resp.Id, "bundler", bundler.Hex(), "message", string(msg))
return
}
close(doneChan)
} else {
close(bundlingRequest.doneChan)
delete(s.bundlingRequests, resp.Id)
}
}

func (s *Server) processSignatoryMessage(msg []byte) {
Expand Down Expand Up @@ -906,7 +974,7 @@ func (s *Server) readPump(conn *Conn, doneChan chan<- struct{}) {
}

if conn.isBundler() {
s.processBundlerMessage(msg)
s.processBundlerMessage(msg, conn.bundler)
} else if conn.isSignatory() {
s.processSignatoryMessage(msg)
} else {
Expand Down
Loading

0 comments on commit e6ffaf9

Please sign in to comment.