Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #19 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Integrated `pub0sub`
  • Loading branch information
itzmeanjan committed Jul 23, 2021
2 parents 8a94b25 + cece1ee commit a5284d2
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 193 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ During my journey of exploring Ethereum MemPool, I found good initiative from [B

- Make sure you've _`Go ( >= 1.16)`_, _`make`_ installed
- Get one Ethereum Node up & running, with `txpool` RPC API enabled. You can always use SaaS Ethereum node.
- For leveraging pub/sub functionality `harmony` expects to get access to `0hub` - a Pub/Sub Hub, which can be deployed by following [this](https://github.com/itzmeanjan/pub0sub#hub) guide

> Note: You'll need to fill `0hub`'s host, port in .env file
## Installation

Expand Down Expand Up @@ -110,6 +113,8 @@ QueuedTxEntryTopic=queued_pool_entry
QueuedTxExitTopic=queued_pool_exit
ConcurrencyFactor=10
Port=7000
Pub0SubHost=127.0.0.1
Pub0SubPort=13000
```

Environment Variable | Interpretation
Expand All @@ -125,6 +130,8 @@ QueuedTxEntryTopic | Whenever tx enters queued pool, it'll be published on Pub/S
QueuedTxExitTopic | Whenever tx leaves queued pool, it'll be published on Pub/Sub topic `t`
ConcurrencyFactor | Whenever concurrency can be leveraged, `harmony` will create worker pool with `#-of logical CPUs x ConcurrencyFactor` go routines. **[ Can be float too ]**
Port | Starts HTTP server on this port ( > 1024 )
Pub0SubHost | Pub/Sub Hub i.e. `0hub` listening on address
Pub0SubPort | Pub/Sub Hub i.e. `0hub` listening on port

> Note : When pool size exceeds, tx with lowest gas price paid to be dropped. Consider setting pool sizes to higher values, if you've enough memory on machine, otherwise it'll crash.
Expand Down
25 changes: 7 additions & 18 deletions app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bootup

import (
"context"
"runtime"
"strconv"
"time"

Expand All @@ -14,14 +13,12 @@ import (
"github.com/itzmeanjan/harmony/app/graph"
"github.com/itzmeanjan/harmony/app/listen"
"github.com/itzmeanjan/harmony/app/networking"
"github.com/itzmeanjan/pubsub"
"github.com/itzmeanjan/pub0sub/publisher"
)

// GetNetwork - Make RPC call for reading network ID
func GetNetwork(ctx context.Context, rpc *rpc.Client) (uint64, error) {

var result string

if err := rpc.CallContext(ctx, &result, "net_version"); err != nil {
return 0, err
}
Expand All @@ -32,7 +29,6 @@ func GetNetwork(ctx context.Context, rpc *rpc.Client) (uint64, error) {
}

return _result, nil

}

// SetGround - This is to be called when starting application
Expand All @@ -54,16 +50,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
return nil, err
}

_pubsub := pubsub.New(uint64(runtime.NumCPU()))

// To be used when subscription requests are received from clients
if err := graph.InitPubSub(_pubsub); err != nil {
return nil, err
}

// Pubsub to be used in p2p networking handling section
// for letting clients know of some newly seen mempool tx
if err := networking.InitPubSub(_pubsub); err != nil {
publisher, err := publisher.New(ctx, "tcp", config.GetPub0SubAddress())
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -104,7 +92,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
DoneChan: make(chan chan uint64, 1),
SetLastSeenBlockChan: lastSeenBlockChan,
LastSeenBlockChan: make(chan chan data.LastSeenBlock, 1),
PubSub: _pubsub,
PubSub: publisher,
RPC: client,
}

Expand All @@ -123,7 +111,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
CountTxsChan: make(chan data.CountRequest, 1),
ListTxsChan: make(chan data.ListRequest, 1),
TxsFromAChan: make(chan data.TxsFromARequest, 1),
PubSub: _pubsub,
PubSub: publisher,
RPC: client,
PendingPool: pendingPool,
}
Expand Down Expand Up @@ -207,12 +195,13 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
// Passing parent context to graphQL subscribers, so that
// graceful system shutdown can be performed
graph.InitParentContext(ctx)
// Same for p2p networking stack
networking.InitParentContext(ctx)

return &data.Resource{
RPCClient: client,
WSClient: wsClient,
Pool: pool,
PubSub: _pubsub,
StartedAt: time.Now().UTC(),
NetworkID: network}, nil

Expand Down
11 changes: 11 additions & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"fmt"
"log"
"math"
"runtime"
Expand Down Expand Up @@ -229,3 +230,13 @@ func GetNetworkingChoice() bool {
return GetBool("NetworkingEnabled")

}

// Pub0Sub's 0hub server running on address
// port, to be used for pub/sub message
// passing purpose
func GetPub0SubAddress() string {
host := Get("Pub0SubHost")
port := GetUint("Pub0SubPort")

return fmt.Sprintf("%s:%d", host, port)
}
25 changes: 15 additions & 10 deletions app/data/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"github.com/gammazero/workerpool"
"github.com/itzmeanjan/harmony/app/config"
"github.com/itzmeanjan/harmony/app/listen"
"github.com/itzmeanjan/pubsub"
"github.com/itzmeanjan/pub0sub/ops"
"github.com/itzmeanjan/pub0sub/publisher"
)

// PendingPool - Currently present pending tx(s) i.e. which are ready to
Expand Down Expand Up @@ -40,7 +41,7 @@ type PendingPool struct {
DoneChan chan chan uint64
SetLastSeenBlockChan chan uint64
LastSeenBlockChan chan chan LastSeenBlock
PubSub *pubsub.PubSub
PubSub *publisher.Publisher
RPC *rpc.Client
}

Expand Down Expand Up @@ -1024,16 +1025,18 @@ func (p *PendingPool) VerifiedAdd(ctx context.Context, tx *MemPoolTx) bool {
// to pubsub topic
func (p *PendingPool) PublishAdded(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
data, err := msg.ToMessagePack()
if err != nil {
log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return
}

p.PubSub.Publish(&pubsub.Message{
if _, err := p.PubSub.Publish(&ops.Msg{
Topics: []string{config.GetPendingTxEntryPublishTopic()},
Data: _msg,
})
Data: data,
}); err != nil {
log.Printf("[❗️] Failed to publish tx joining pending pool : %s\n", err.Error())
}

}

Expand All @@ -1054,16 +1057,18 @@ func (p *PendingPool) Remove(ctx context.Context, txStat *TxStatus) bool {
// These tx(s) are leaving pending pool i.e. they're confirmed now
func (p *PendingPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
data, err := msg.ToMessagePack()
if err != nil {
log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return
}

p.PubSub.Publish(&pubsub.Message{
if _, err := p.PubSub.Publish(&ops.Msg{
Topics: []string{config.GetPendingTxExitPublishTopic()},
Data: _msg,
})
Data: data,
}); err != nil {
log.Printf("[❗️] Failed to publish tx leaving pending pool : %s\n", err.Error())
}

}

Expand Down
25 changes: 15 additions & 10 deletions app/data/queued.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/gammazero/workerpool"
"github.com/itzmeanjan/harmony/app/config"
"github.com/itzmeanjan/pubsub"
"github.com/itzmeanjan/pub0sub/ops"
"github.com/itzmeanjan/pub0sub/publisher"
)

// QueuedPool - Currently present queued tx(s) i.e. these tx(s) are stuck
Expand All @@ -33,7 +34,7 @@ type QueuedPool struct {
CountTxsChan chan CountRequest
ListTxsChan chan ListRequest
TxsFromAChan chan TxsFromARequest
PubSub *pubsub.PubSub
PubSub *publisher.Publisher
RPC *rpc.Client
PendingPool *PendingPool
}
Expand Down Expand Up @@ -776,16 +777,18 @@ func (q *QueuedPool) Add(ctx context.Context, tx *MemPoolTx) bool {
// to pubsub topic
func (q *QueuedPool) PublishAdded(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
data, err := msg.ToMessagePack()
if err != nil {
log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return
}

q.PubSub.Publish(&pubsub.Message{
if _, err := q.PubSub.Publish(&ops.Msg{
Topics: []string{config.GetQueuedTxEntryPublishTopic()},
Data: _msg,
})
Data: data,
}); err != nil {
log.Printf("[❗️] Failed to publish tx joining queued pool : %s\n", err.Error())
}

}

Expand All @@ -808,16 +811,18 @@ func (q *QueuedPool) Remove(ctx context.Context, txHash common.Hash) *MemPoolTx
// failed to keep track of it
func (q *QueuedPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
data, err := msg.ToMessagePack()
if err != nil {
log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return
}

q.PubSub.Publish(&pubsub.Message{
if _, err := q.PubSub.Publish(&ops.Msg{
Topics: []string{config.GetQueuedTxExitPublishTopic()},
Data: _msg,
})
Data: data,
}); err != nil {
log.Printf("[❗️] Failed to publish tx leaving queued pool : %s\n", err.Error())
}

}

Expand Down
2 changes: 0 additions & 2 deletions app/data/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/itzmeanjan/pubsub"
)

// Resource - Shared resources among multiple go routines
Expand All @@ -15,7 +14,6 @@ type Resource struct {
RPCClient *rpc.Client
WSClient *ethclient.Client
Pool *MemPool
PubSub *pubsub.PubSub
StartedAt time.Time
NetworkID uint64
}
Expand Down
Loading

0 comments on commit a5284d2

Please sign in to comment.