diff --git a/README.md b/README.md index 88f045d..e30ae77 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,9 @@ During my journey of exploring Ethereum MemPool, I found good initiative from [B - Make sure you've _`Go ( >= 1.16)`_, _`make`_ installed - Get one Ethereum Node up & running, with `txpool` RPC API enabled. You can always use SaaS Ethereum node. +- For leveraging pub/sub functionality `harmony` expects to get access to `0hub` - a Pub/Sub Hub, which can be deployed by following [this](https://github.com/itzmeanjan/pub0sub#hub) guide + +> Note: You'll need to fill `0hub`'s host, port in .env file ## Installation @@ -110,6 +113,8 @@ QueuedTxEntryTopic=queued_pool_entry QueuedTxExitTopic=queued_pool_exit ConcurrencyFactor=10 Port=7000 +Pub0SubHost=127.0.0.1 +Pub0SubPort=13000 ``` Environment Variable | Interpretation @@ -125,6 +130,8 @@ QueuedTxEntryTopic | Whenever tx enters queued pool, it'll be published on Pub/S QueuedTxExitTopic | Whenever tx leaves queued pool, it'll be published on Pub/Sub topic `t` ConcurrencyFactor | Whenever concurrency can be leveraged, `harmony` will create worker pool with `#-of logical CPUs x ConcurrencyFactor` go routines. **[ Can be float too ]** Port | Starts HTTP server on this port ( > 1024 ) +Pub0SubHost | Pub/Sub Hub i.e. `0hub` listening on address +Pub0SubPort | Pub/Sub Hub i.e. `0hub` listening on port > Note : When pool size exceeds, tx with lowest gas price paid to be dropped. Consider setting pool sizes to higher values, if you've enough memory on machine, otherwise it'll crash. diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 70c3217..8585b1b 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -2,7 +2,6 @@ package bootup import ( "context" - "runtime" "strconv" "time" @@ -14,14 +13,12 @@ import ( "github.com/itzmeanjan/harmony/app/graph" "github.com/itzmeanjan/harmony/app/listen" "github.com/itzmeanjan/harmony/app/networking" - "github.com/itzmeanjan/pubsub" + "github.com/itzmeanjan/pub0sub/publisher" ) // GetNetwork - Make RPC call for reading network ID func GetNetwork(ctx context.Context, rpc *rpc.Client) (uint64, error) { - var result string - if err := rpc.CallContext(ctx, &result, "net_version"); err != nil { return 0, err } @@ -32,7 +29,6 @@ func GetNetwork(ctx context.Context, rpc *rpc.Client) (uint64, error) { } return _result, nil - } // SetGround - This is to be called when starting application @@ -54,16 +50,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { return nil, err } - _pubsub := pubsub.New(uint64(runtime.NumCPU())) - - // To be used when subscription requests are received from clients - if err := graph.InitPubSub(_pubsub); err != nil { - return nil, err - } - - // Pubsub to be used in p2p networking handling section - // for letting clients know of some newly seen mempool tx - if err := networking.InitPubSub(_pubsub); err != nil { + publisher, err := publisher.New(ctx, "tcp", config.GetPub0SubAddress()) + if err != nil { return nil, err } @@ -104,7 +92,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { DoneChan: make(chan chan uint64, 1), SetLastSeenBlockChan: lastSeenBlockChan, LastSeenBlockChan: make(chan chan data.LastSeenBlock, 1), - PubSub: _pubsub, + PubSub: publisher, RPC: client, } @@ -123,7 +111,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { CountTxsChan: make(chan data.CountRequest, 1), ListTxsChan: make(chan data.ListRequest, 1), TxsFromAChan: make(chan data.TxsFromARequest, 1), - PubSub: _pubsub, + PubSub: publisher, RPC: client, PendingPool: pendingPool, } @@ -207,12 +195,13 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { // Passing parent context to graphQL subscribers, so that // graceful system shutdown can be performed graph.InitParentContext(ctx) + // Same for p2p networking stack + networking.InitParentContext(ctx) return &data.Resource{ RPCClient: client, WSClient: wsClient, Pool: pool, - PubSub: _pubsub, StartedAt: time.Now().UTC(), NetworkID: network}, nil diff --git a/app/config/config.go b/app/config/config.go index 501855b..8f56c27 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -1,6 +1,7 @@ package config import ( + "fmt" "log" "math" "runtime" @@ -229,3 +230,13 @@ func GetNetworkingChoice() bool { return GetBool("NetworkingEnabled") } + +// Pub0Sub's 0hub server running on address +// port, to be used for pub/sub message +// passing purpose +func GetPub0SubAddress() string { + host := Get("Pub0SubHost") + port := GetUint("Pub0SubPort") + + return fmt.Sprintf("%s:%d", host, port) +} diff --git a/app/data/pending.go b/app/data/pending.go index 1a8e2c9..38ee6af 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -12,7 +12,8 @@ import ( "github.com/gammazero/workerpool" "github.com/itzmeanjan/harmony/app/config" "github.com/itzmeanjan/harmony/app/listen" - "github.com/itzmeanjan/pubsub" + "github.com/itzmeanjan/pub0sub/ops" + "github.com/itzmeanjan/pub0sub/publisher" ) // PendingPool - Currently present pending tx(s) i.e. which are ready to @@ -40,7 +41,7 @@ type PendingPool struct { DoneChan chan chan uint64 SetLastSeenBlockChan chan uint64 LastSeenBlockChan chan chan LastSeenBlock - PubSub *pubsub.PubSub + PubSub *publisher.Publisher RPC *rpc.Client } @@ -1024,16 +1025,18 @@ func (p *PendingPool) VerifiedAdd(ctx context.Context, tx *MemPoolTx) bool { // to pubsub topic func (p *PendingPool) PublishAdded(ctx context.Context, msg *MemPoolTx) { - _msg, err := msg.ToMessagePack() + data, err := msg.ToMessagePack() if err != nil { log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error()) return } - p.PubSub.Publish(&pubsub.Message{ + if _, err := p.PubSub.Publish(&ops.Msg{ Topics: []string{config.GetPendingTxEntryPublishTopic()}, - Data: _msg, - }) + Data: data, + }); err != nil { + log.Printf("[❗️] Failed to publish tx joining pending pool : %s\n", err.Error()) + } } @@ -1054,16 +1057,18 @@ func (p *PendingPool) Remove(ctx context.Context, txStat *TxStatus) bool { // These tx(s) are leaving pending pool i.e. they're confirmed now func (p *PendingPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) { - _msg, err := msg.ToMessagePack() + data, err := msg.ToMessagePack() if err != nil { log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error()) return } - p.PubSub.Publish(&pubsub.Message{ + if _, err := p.PubSub.Publish(&ops.Msg{ Topics: []string{config.GetPendingTxExitPublishTopic()}, - Data: _msg, - }) + Data: data, + }); err != nil { + log.Printf("[❗️] Failed to publish tx leaving pending pool : %s\n", err.Error()) + } } diff --git a/app/data/queued.go b/app/data/queued.go index bc76b83..ebcda9e 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -10,7 +10,8 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/gammazero/workerpool" "github.com/itzmeanjan/harmony/app/config" - "github.com/itzmeanjan/pubsub" + "github.com/itzmeanjan/pub0sub/ops" + "github.com/itzmeanjan/pub0sub/publisher" ) // QueuedPool - Currently present queued tx(s) i.e. these tx(s) are stuck @@ -33,7 +34,7 @@ type QueuedPool struct { CountTxsChan chan CountRequest ListTxsChan chan ListRequest TxsFromAChan chan TxsFromARequest - PubSub *pubsub.PubSub + PubSub *publisher.Publisher RPC *rpc.Client PendingPool *PendingPool } @@ -776,16 +777,18 @@ func (q *QueuedPool) Add(ctx context.Context, tx *MemPoolTx) bool { // to pubsub topic func (q *QueuedPool) PublishAdded(ctx context.Context, msg *MemPoolTx) { - _msg, err := msg.ToMessagePack() + data, err := msg.ToMessagePack() if err != nil { log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error()) return } - q.PubSub.Publish(&pubsub.Message{ + if _, err := q.PubSub.Publish(&ops.Msg{ Topics: []string{config.GetQueuedTxEntryPublishTopic()}, - Data: _msg, - }) + Data: data, + }); err != nil { + log.Printf("[❗️] Failed to publish tx joining queued pool : %s\n", err.Error()) + } } @@ -808,16 +811,18 @@ func (q *QueuedPool) Remove(ctx context.Context, txHash common.Hash) *MemPoolTx // failed to keep track of it func (q *QueuedPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) { - _msg, err := msg.ToMessagePack() + data, err := msg.ToMessagePack() if err != nil { log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error()) return } - q.PubSub.Publish(&pubsub.Message{ + if _, err := q.PubSub.Publish(&ops.Msg{ Topics: []string{config.GetQueuedTxExitPublishTopic()}, - Data: _msg, - }) + Data: data, + }); err != nil { + log.Printf("[❗️] Failed to publish tx leaving queued pool : %s\n", err.Error()) + } } diff --git a/app/data/resource.go b/app/data/resource.go index 419b3e7..679f2cf 100644 --- a/app/data/resource.go +++ b/app/data/resource.go @@ -5,7 +5,6 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" - "github.com/itzmeanjan/pubsub" ) // Resource - Shared resources among multiple go routines @@ -15,7 +14,6 @@ type Resource struct { RPCClient *rpc.Client WSClient *ethclient.Client Pool *MemPool - PubSub *pubsub.PubSub StartedAt time.Time NetworkID uint64 } diff --git a/app/graph/util.go b/app/graph/util.go index f703fab..b6ba373 100644 --- a/app/graph/util.go +++ b/app/graph/util.go @@ -10,36 +10,22 @@ import ( "github.com/itzmeanjan/harmony/app/config" "github.com/itzmeanjan/harmony/app/data" "github.com/itzmeanjan/harmony/app/graph/model" - "github.com/itzmeanjan/pubsub" + "github.com/itzmeanjan/pub0sub/ops" + "github.com/itzmeanjan/pub0sub/subscriber" ) var memPool *data.MemPool -var pubsubHub *pubsub.PubSub var parentCtx context.Context // InitMemPool - Initializing mempool handle, in this module // so that it can be used before responding back to graphql queries func InitMemPool(pool *data.MemPool) error { - if pool != nil { memPool = pool return nil } return errors.New("bad mempool received in graphQL handler") - -} - -// InitPubSub - Initializing pubsub handle, for managing subscriptions -func InitPubSub(client *pubsub.PubSub) error { - - if client != nil { - pubsubHub = client - return nil - } - - return errors.New("bad pub/sub received in graphQL handler") - } // InitParentContext - Initializing parent context, to be listened by all @@ -108,25 +94,21 @@ func checkHash(hash string) bool { // SubscribeToTopic - Subscribes to PubSub topic(s), while configuring subscription such // that at max 256 messages can be kept in buffer at a time. If client is consuming slowly // buffer size will be extended. -func SubscribeToTopic(ctx context.Context, topic ...string) (*pubsub.Subscriber, error) { - - _sub := pubsubHub.Subscribe(256, topic...) - if _sub == nil { +func SubscribeToTopic(ctx context.Context, topic ...string) (*subscriber.Subscriber, error) { + subscriber, err := subscriber.New(ctx, "tcp", config.GetPub0SubAddress(), 64, topic...) + if err != nil { return nil, errors.New("topic subscription failed") } - return _sub, nil - + return subscriber, nil } // SubscribeToPendingPool - Subscribes to both topics, associated with changes // happening in pending tx pool // // When tx joins/ leaves pending pool, subscribers will receive notification -func SubscribeToPendingPool(ctx context.Context) (*pubsub.Subscriber, error) { - +func SubscribeToPendingPool(ctx context.Context) (*subscriber.Subscriber, error) { return SubscribeToTopic(ctx, config.GetPendingTxEntryPublishTopic(), config.GetPendingTxExitPublishTopic()) - } // SubscribeToQueuedPool - Subscribes to both topics, associated with changes @@ -136,10 +118,8 @@ func SubscribeToPendingPool(ctx context.Context) (*pubsub.Subscriber, error) { // // @note Tx(s) generally join queued pool, when there's nonce gap & this tx can't be // processed until some lower nonce tx(s) get(s) processed -func SubscribeToQueuedPool(ctx context.Context) (*pubsub.Subscriber, error) { - +func SubscribeToQueuedPool(ctx context.Context) (*subscriber.Subscriber, error) { return SubscribeToTopic(ctx, config.GetQueuedTxEntryPublishTopic(), config.GetQueuedTxExitPublishTopic()) - } // SubscribeToMemPool - Subscribes to any changes happening in mempool @@ -150,46 +130,36 @@ func SubscribeToQueuedPool(ctx context.Context) (*pubsub.Subscriber, error) { // // It'll subscribe to all 4 topics for listening // to tx(s) entering/ leaving any portion of mempool -func SubscribeToMemPool(ctx context.Context) (*pubsub.Subscriber, error) { - +func SubscribeToMemPool(ctx context.Context) (*subscriber.Subscriber, error) { return SubscribeToTopic(ctx, config.GetQueuedTxEntryPublishTopic(), config.GetQueuedTxExitPublishTopic(), config.GetPendingTxEntryPublishTopic(), config.GetPendingTxExitPublishTopic()) - } // SubscribeToPendingTxEntry - Subscribe to topic where new pending tx(s) // are published -func SubscribeToPendingTxEntry(ctx context.Context) (*pubsub.Subscriber, error) { - +func SubscribeToPendingTxEntry(ctx context.Context) (*subscriber.Subscriber, error) { return SubscribeToTopic(ctx, config.GetPendingTxEntryPublishTopic()) - } // SubscribeToQueuedTxEntry - Subscribe to topic where new queued tx(s) // are published -func SubscribeToQueuedTxEntry(ctx context.Context) (*pubsub.Subscriber, error) { - +func SubscribeToQueuedTxEntry(ctx context.Context) (*subscriber.Subscriber, error) { return SubscribeToTopic(ctx, config.GetQueuedTxEntryPublishTopic()) - } // SubscribeToPendingTxExit - Subscribe to topic where pending tx(s), getting // confirmed are published -func SubscribeToPendingTxExit(ctx context.Context) (*pubsub.Subscriber, error) { - +func SubscribeToPendingTxExit(ctx context.Context) (*subscriber.Subscriber, error) { return SubscribeToTopic(ctx, config.GetPendingTxExitPublishTopic()) - } // SubscribeToQueuedTxExit - Subscribe to topic where queued tx(s), getting // unstuck are published -func SubscribeToQueuedTxExit(ctx context.Context) (*pubsub.Subscriber, error) { - +func SubscribeToQueuedTxExit(ctx context.Context) (*subscriber.Subscriber, error) { return SubscribeToTopic(ctx, config.GetQueuedTxExitPublishTopic()) - } // ListenToMessages - Attempts to listen to messages being published @@ -203,13 +173,16 @@ func SubscribeToQueuedTxExit(ctx context.Context) (*pubsub.Subscriber, error) { // // You can always blindly return `true` in your `evaluationCriteria` function, // so that you get to receive any tx being published on topic of your interest -func ListenToMessages(ctx context.Context, subscriber *pubsub.Subscriber, comm chan<- *model.MemPoolTx, pubCriteria PublishingCriteria, params ...interface{}) { +func ListenToMessages(ctx context.Context, subscriber *subscriber.Subscriber, comm chan<- *model.MemPoolTx, pubCriteria PublishingCriteria, params ...interface{}) { defer func() { + if err := subscriber.Disconnect(); err != nil { + log.Printf("[❗️] Failed to destroy subscriber : %s\n", err.Error()) + } close(comm) }() - consume := func(msg *pubsub.PublishedMessage) { + consume := func(msg *ops.PushedMessage) { unmarshalled := UnmarshalPubSubMessage(msg.Data) if unmarshalled == nil || !pubCriteria(unmarshalled, params...) { return @@ -250,7 +223,7 @@ func ListenToMessages(ctx context.Context, subscriber *pubsub.Subscriber, comm c subscriber.UnsubscribeAll() break OUTER - case <-subscriber.Listener(): + case <-subscriber.Watch(): // Listening for message availablity // signal received := subscriber.Next() @@ -261,7 +234,7 @@ func ListenToMessages(ctx context.Context, subscriber *pubsub.Subscriber, comm c case <-time.After(duration): - if !subscriber.Consumable() { + if !subscriber.Queued() { break } @@ -280,11 +253,6 @@ func ListenToMessages(ctx context.Context, subscriber *pubsub.Subscriber, comm c } -// UnsubscribeFromTopic - Unsubscribes subscriber from all topics -func UnsubscribeFromTopic(ctx context.Context, subscriber *pubsub.Subscriber) { - subscriber.UnsubscribeAll() -} - // UnmarshalPubSubMessage - Attempts to unmarshal message pack serialized // pubsub message as structured tx data, which is to be sent to subscriber func UnmarshalPubSubMessage(message []byte) *data.MemPoolTx { diff --git a/app/networking/bootstrap.go b/app/networking/bootstrap.go index 555c456..18581ad 100644 --- a/app/networking/bootstrap.go +++ b/app/networking/bootstrap.go @@ -5,45 +5,35 @@ import ( "errors" "github.com/itzmeanjan/harmony/app/data" - "github.com/itzmeanjan/pubsub" ) var memPool *data.MemPool -var pubsubHub *pubsub.PubSub +var parentCtx context.Context var connectionManager *ConnectionManager // InitMemPool - Initializing mempool handle, in this module // so that it can be used updating local mempool state, when new // deserialisable tx chunk is received from any peer, over p2p network func InitMemPool(pool *data.MemPool) error { - if pool != nil { memPool = pool return nil } return errors.New("bad mempool received in p2p networking handler") - } -// InitPubSub - Initializing pubsub handle, so that all -// subscriptions can be managed using it -func InitPubSub(client *pubsub.PubSub) error { - - if client != nil { - pubsubHub = client - return nil - } - - return errors.New("bad pub/sub received in p2p networking handler") - +// To be used for listening to event when `harmony` asks its +// workers to stop gracefully +func InitParentContext(ctx context.Context) { + parentCtx = ctx } // Setup - Bootstraps `harmony`'s p2p networking stack func Setup(ctx context.Context, comm chan struct{}) error { - if !(memPool != nil && pubsubHub != nil) { - return errors.New("mempool/ pubsubHub instance not initialised") + if memPool == nil { + return errors.New("mempool instance not initialised") } // Attempt to create a new `harmony` node @@ -66,5 +56,4 @@ func Setup(ctx context.Context, comm chan struct{}) error { go connectionManager.Start(ctx) return nil - } diff --git a/app/networking/listen.go b/app/networking/listen.go index 2fe89ed..b5f3c2e 100644 --- a/app/networking/listen.go +++ b/app/networking/listen.go @@ -6,9 +6,11 @@ import ( "encoding/binary" "io" "log" + "time" "github.com/itzmeanjan/harmony/app/config" "github.com/itzmeanjan/harmony/app/graph" + "github.com/itzmeanjan/pub0sub/ops" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/protocol" @@ -17,63 +19,69 @@ import ( // ReadFrom - Read from stream & attempt to deserialize length prefixed // tx data received from peer, which will be acted upon -func ReadFrom(ctx context.Context, cancel context.CancelFunc, rw *bufio.ReadWriter, peerId string, remote multiaddr.Multiaddr) { - - defer cancel() +func ReadFrom(ctx context.Context, healthChan chan struct{}, rw *bufio.ReadWriter, peerId string, remote multiaddr.Multiaddr) { + defer func() { + close(healthChan) + }() +OUT: for { + select { + case <-ctx.Done(): + break OUT + + default: + buf := make([]byte, 4) - buf := make([]byte, 4) + if _, err := io.ReadFull(rw.Reader, buf); err != nil { + if err == io.EOF { + break + } - if _, err := io.ReadFull(rw.Reader, buf); err != nil { - if err == io.EOF { + log.Printf("[❗️] Failed to read size of next chunk : %s | %s\n", err.Error(), remote) break } - log.Printf("[❗️] Failed to read size of next chunk : %s | %s\n", err.Error(), remote) - break - } + size := binary.LittleEndian.Uint32(buf) + chunk := make([]byte, size) - size := binary.LittleEndian.Uint32(buf) - chunk := make([]byte, size) + if _, err := io.ReadFull(rw.Reader, chunk); err != nil { + if err == io.EOF { + break + } - if _, err := io.ReadFull(rw.Reader, chunk); err != nil { - if err == io.EOF { + log.Printf("[❗️] Failed to read chunk from peer : %s | %s\n", err.Error(), remote) break } - log.Printf("[❗️] Failed to read chunk from peer : %s | %s\n", err.Error(), remote) - break - } - - tx := graph.UnmarshalPubSubMessage(chunk) - if tx == nil { - log.Printf("[❗️] Failed to deserialise message from peer | %s\n", remote) - continue - } + tx := graph.UnmarshalPubSubMessage(chunk) + if tx == nil { + log.Printf("[❗️] Failed to deserialise message from peer | %s\n", remote) + continue + } - // Keeping entry of from which peer we received this tx - // so that we don't end up sending them again same tx - // when it'll be published on Pub/Sub topic - tx.ReceivedFrom = peerId + // Keeping entry of from which peer we received this tx + // so that we don't end up sending them again same tx + // when it'll be published on Pub/Sub topic + tx.ReceivedFrom = peerId - if memPool.HandleTxFromPeer(ctx, tx) { - log.Printf("✅ New tx from peer : %d bytes | %s\n", len(chunk), remote) - continue - } + if memPool.HandleTxFromPeer(ctx, tx) { + log.Printf("✅ New tx from peer : %d bytes | %s\n", len(chunk), remote) + continue + } - log.Printf("👍 Seen tx from peer : %d bytes | %s\n", len(chunk), remote) + log.Printf("👍 Seen tx from peer : %d bytes | %s\n", len(chunk), remote) + } } - } // WriteTo - Write to mempool changes into stream i.e. connection // with some remote peer -func WriteTo(ctx context.Context, cancel context.CancelFunc, rw *bufio.ReadWriter, peerId string, remote multiaddr.Multiaddr) { - - // @note Deferred functions are executed in LIFO order - defer cancel() +func WriteTo(ctx context.Context, healthChan chan struct{}, rw *bufio.ReadWriter, peerId string, remote multiaddr.Multiaddr) { + defer func() { + close(healthChan) + }() subscriber, err := graph.SubscribeToMemPool(ctx) if err != nil { @@ -81,50 +89,85 @@ func WriteTo(ctx context.Context, cancel context.CancelFunc, rw *bufio.ReadWrite return } - for { - - received := subscriber.Next() - if received == nil { - continue + defer func() { + if _, err := subscriber.UnsubscribeAll(); err != nil { + log.Printf("[❗️] Failed to unsubscribe : %s\n", err.Error()) } - - msg := graph.UnmarshalPubSubMessage(received.Data) - // Failed to deserialise message, we don't need - // to send it to remote - if msg == nil { - continue + if err := subscriber.Disconnect(); err != nil { + log.Printf("[❗️] Failed to destroy subscriber : %s\n", err.Error()) } + }() - // We found this tx from this peer, so we're - // not sending it back - if msg.ReceivedFrom == peerId { - continue + process := func(msg *ops.PushedMessage) error { + unmarshalled := graph.UnmarshalPubSubMessage(msg.Data) + if unmarshalled == nil { + return nil } - chunk := make([]byte, 4+len(received.Data)) + // Received from same peer, no need to let them + // know again + if unmarshalled.ReceivedFrom == peerId { + return nil + } - binary.LittleEndian.PutUint32(chunk[:4], uint32(len(received.Data))) - n := copy(chunk[4:], received.Data) + chunk := make([]byte, 4+len(msg.Data)) + binary.LittleEndian.PutUint32(chunk[:4], uint32(len(msg.Data))) + n := copy(chunk[4:], msg.Data) - if n != len(received.Data) { - log.Printf("[❗️] Failed to prepare chunk for peer | %s\n", remote) - continue + if n != len(msg.Data) { + return nil } if _, err := rw.Write(chunk); err != nil { - log.Printf("[❗️] Failed to write chunk on stream : %s | %s\n", err.Error(), remote) - break + return err } if err := rw.Flush(); err != nil { - log.Printf("[❗️] Failed to flush stream buffer : %s | %s\n", err.Error(), remote) - break + return err } + return nil } + duration := time.Duration(256) * time.Millisecond - graph.UnsubscribeFromTopic(ctx, subscriber) +OUT: + for { + select { + case <-ctx.Done(): + break OUT + + case <-subscriber.Watch(): + // Listening for message availablity signal + received := subscriber.Next() + if received == nil { + break + } + + if err := process(received); err != nil { + log.Printf("[❗️] Failed to notify peer : %s\n", err.Error()) + break OUT + } + + case <-time.After(duration): + // Explicitly checking for message availability in queue + if !subscriber.Queued() { + break + } + + started := time.Now() + for received := subscriber.Next(); received != nil; { + if err := process(received); err != nil { + log.Printf("[❗️] Failed to notify peer : %s\n", err.Error()) + break OUT + } + + if time.Since(started) > duration { + break + } + } + } + } } // HandleStream - Attepts new stream & handles it through out its life time @@ -155,28 +198,31 @@ func HandleStream(stream network.Stream) { // connect to them again connectionManager.Added(peerId) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(parentCtx) + readerHealth := make(chan struct{}) + writerHealth := make(chan struct{}) rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) - go ReadFrom(ctx, cancel, rw, peerId.String(), remote) - go WriteTo(ctx, cancel, rw, peerId.String(), remote) + go ReadFrom(ctx, readerHealth, rw, peerId.String(), remote) + go WriteTo(ctx, writerHealth, rw, peerId.String(), remote) log.Printf("🤩 Got new stream from peer : %s\n", remote) // @note This is a blocking call - <-ctx.Done() + select { + case <-readerHealth: + case <-writerHealth: + } + cancel() // Closing stream, may be it's already closed if err := stream.Close(); err != nil { - log.Printf("[❗️] Failed to close stream : %s\n", err.Error()) - } // Connection manager also knows this peer can be attempted to be // reconnected, if founded via discovery service connectionManager.Dropped(peerId) - log.Printf("🙂 Dropped peer connection : %s\n", remote) } @@ -184,7 +230,5 @@ func HandleStream(stream network.Stream) { // Listen - Handle incoming connection of other harmony peer for certain supported // protocol(s) func Listen(_host host.Host) { - _host.SetStreamHandler(protocol.ID(config.GetNetworkingStream()), HandleStream) - } diff --git a/go.mod b/go.mod index fe9e88d..65c874a 100644 --- a/go.mod +++ b/go.mod @@ -8,12 +8,12 @@ require ( github.com/agnivade/levenshtein v1.1.0 // indirect github.com/deckarep/golang-set v1.7.1 // indirect github.com/ethereum/go-ethereum v1.10.1 - github.com/gammazero/deque v0.0.0-20201010052221-3932da5530cc // indirect - github.com/gammazero/workerpool v1.1.1 + github.com/gammazero/workerpool v1.1.2 github.com/go-ole/go-ole v1.2.5 // indirect github.com/google/go-cmp v0.5.4 // indirect github.com/gorilla/websocket v1.4.2 - github.com/itzmeanjan/pubsub v0.1.7 + github.com/itzmeanjan/pub0sub v0.2.1 + github.com/itzmeanjan/pubsub v0.1.7 // indirect github.com/labstack/echo/v4 v4.2.0 github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-connmgr v0.2.4 diff --git a/go.sum b/go.sum index 0d121d8..5285bcf 100644 --- a/go.sum +++ b/go.sum @@ -169,11 +169,10 @@ github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/gammazero/deque v0.0.0-20200721202602-07291166fe33/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= -github.com/gammazero/deque v0.0.0-20201010052221-3932da5530cc h1:F7BbnLACph7UYiz9ZHi6npcROwKaZUyviDjsNERsoMM= -github.com/gammazero/deque v0.0.0-20201010052221-3932da5530cc/go.mod h1:IlBLfYXnuw9sspy1XS6ctu5exGb6WHGKQsyo4s7bOEA= -github.com/gammazero/workerpool v1.1.1 h1:MN29GcZtZZAgzTU+Zk54Y+J9XkE54MoXON/NCZvNulo= -github.com/gammazero/workerpool v1.1.1/go.mod h1:5BN0IJVRjSFAypo9QTJCaWdijjNz9Jjl6VFS1PRjCeg= +github.com/gammazero/deque v0.1.0 h1:f9LnNmq66VDeuAlSAapemq/U7hJ2jpIWa4c09q8Dlik= +github.com/gammazero/deque v0.1.0/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M= +github.com/gammazero/workerpool v1.1.2 h1:vuioDQbgrz4HoaCi2q1HLlOXdpbap5AET7xu5/qj87g= +github.com/gammazero/workerpool v1.1.2/go.mod h1:UelbXcO0zCIGFcufcirHhq2/xtLXJdQ29qZNlXG9OjQ= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff h1:tY80oXqGNY4FhTFhk+o9oFHGINQ/+vhlm8HFzi6znCI= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -357,10 +356,9 @@ github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBW github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= github.com/ipfs/go-log/v2 v2.1.1 h1:G4TtqN+V9y9HY9TA6BwbCVyyBZ2B9MbCjR2MtGx8FR0= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= -github.com/itzmeanjan/pubsub v0.1.4 h1:cKhELtYkPKW6RV60wzrwWKe/qxv1emJ24uNxy/AYXq4= -github.com/itzmeanjan/pubsub v0.1.4/go.mod h1:SaVrEsAQmxY70qWQIYQ9rWLXLElsicMcxPddiuXKfLU= -github.com/itzmeanjan/pubsub v0.1.5 h1:L2jPpw6bxnt1SGxfEfZ3Uy+Bs2EhZ9uf0OeqHONJGBE= -github.com/itzmeanjan/pubsub v0.1.5/go.mod h1:SaVrEsAQmxY70qWQIYQ9rWLXLElsicMcxPddiuXKfLU= +github.com/itzmeanjan/pub0sub v0.2.1 h1:YMGluw+XnMCvQs85JePbbwpzsYijEsqQrIHZyh1wDIY= +github.com/itzmeanjan/pub0sub v0.2.1/go.mod h1:7gQRFtM4BH1tH26MDEQktPSqUuD20jpkUpCfHwp+FL0= +github.com/itzmeanjan/pubsub v0.1.6/go.mod h1:SaVrEsAQmxY70qWQIYQ9rWLXLElsicMcxPddiuXKfLU= github.com/itzmeanjan/pubsub v0.1.7 h1:98GLeBrEkLLynkkm7kkgBYIqjzTSqKgCU5Kal89dKsM= github.com/itzmeanjan/pubsub v0.1.7/go.mod h1:SaVrEsAQmxY70qWQIYQ9rWLXLElsicMcxPddiuXKfLU= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= @@ -924,6 +922,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/xtaci/gaio v1.2.10 h1:AjZy43b3ZdlaCnyu0fPaUrAVXu/SV1n8zWmNlKQQuXw= +github.com/xtaci/gaio v1.2.10/go.mod h1:rJMerwiLCLnKa14YTM/sRggTPrnBZrlCg9U3DnV5VBE= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=