Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
using pub0sub πŸš€
Browse files Browse the repository at this point in the history
  • Loading branch information
itzmeanjan committed May 30, 2021
1 parent 5a344df commit 106b999
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 193 deletions.
25 changes: 7 additions & 18 deletions app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bootup

import (
"context"
"runtime"
"strconv"
"time"

Expand All @@ -14,14 +13,12 @@ import (
"github.com/itzmeanjan/harmony/app/graph"
"github.com/itzmeanjan/harmony/app/listen"
"github.com/itzmeanjan/harmony/app/networking"
"github.com/itzmeanjan/pubsub"
"github.com/itzmeanjan/pub0sub/publisher"
)

// GetNetwork - Make RPC call for reading network ID
func GetNetwork(ctx context.Context, rpc *rpc.Client) (uint64, error) {

var result string

if err := rpc.CallContext(ctx, &result, "net_version"); err != nil {
return 0, err
}
Expand All @@ -32,7 +29,6 @@ func GetNetwork(ctx context.Context, rpc *rpc.Client) (uint64, error) {
}

return _result, nil

}

// SetGround - This is to be called when starting application
Expand All @@ -54,16 +50,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
return nil, err
}

_pubsub := pubsub.New(uint64(runtime.NumCPU()))

// To be used when subscription requests are received from clients
if err := graph.InitPubSub(_pubsub); err != nil {
return nil, err
}

// Pubsub to be used in p2p networking handling section
// for letting clients know of some newly seen mempool tx
if err := networking.InitPubSub(_pubsub); err != nil {
publisher, err := publisher.New(ctx, "tcp", config.GetPub0SubAddress())
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -104,7 +92,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
DoneChan: make(chan chan uint64, 1),
SetLastSeenBlockChan: lastSeenBlockChan,
LastSeenBlockChan: make(chan chan data.LastSeenBlock, 1),
PubSub: _pubsub,
PubSub: publisher,
RPC: client,
}

Expand All @@ -123,7 +111,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
CountTxsChan: make(chan data.CountRequest, 1),
ListTxsChan: make(chan data.ListRequest, 1),
TxsFromAChan: make(chan data.TxsFromARequest, 1),
PubSub: _pubsub,
PubSub: publisher,
RPC: client,
PendingPool: pendingPool,
}
Expand Down Expand Up @@ -207,12 +195,13 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
// Passing parent context to graphQL subscribers, so that
// graceful system shutdown can be performed
graph.InitParentContext(ctx)
// Same for p2p networking stack
networking.InitParentContext(ctx)

return &data.Resource{
RPCClient: client,
WSClient: wsClient,
Pool: pool,
PubSub: _pubsub,
StartedAt: time.Now().UTC(),
NetworkID: network}, nil

Expand Down
11 changes: 11 additions & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"fmt"
"log"
"math"
"runtime"
Expand Down Expand Up @@ -229,3 +230,13 @@ func GetNetworkingChoice() bool {
return GetBool("NetworkingEnabled")

}

// Pub0Sub's 0hub server running on address
// port, to be used for pub/sub message
// passing purpose
func GetPub0SubAddress() string {
host := Get("Pub0SubHost")
port := GetUint("Pub0SubPort")

return fmt.Sprintf("%s:%d", host, port)
}
25 changes: 15 additions & 10 deletions app/data/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"github.com/gammazero/workerpool"
"github.com/itzmeanjan/harmony/app/config"
"github.com/itzmeanjan/harmony/app/listen"
"github.com/itzmeanjan/pubsub"
"github.com/itzmeanjan/pub0sub/ops"
"github.com/itzmeanjan/pub0sub/publisher"
)

// PendingPool - Currently present pending tx(s) i.e. which are ready to
Expand Down Expand Up @@ -40,7 +41,7 @@ type PendingPool struct {
DoneChan chan chan uint64
SetLastSeenBlockChan chan uint64
LastSeenBlockChan chan chan LastSeenBlock
PubSub *pubsub.PubSub
PubSub *publisher.Publisher
RPC *rpc.Client
}

Expand Down Expand Up @@ -1024,16 +1025,18 @@ func (p *PendingPool) VerifiedAdd(ctx context.Context, tx *MemPoolTx) bool {
// to pubsub topic
func (p *PendingPool) PublishAdded(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
data, err := msg.ToMessagePack()
if err != nil {
log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return
}

p.PubSub.Publish(&pubsub.Message{
if _, err := p.PubSub.Publish(&ops.Msg{
Topics: []string{config.GetPendingTxEntryPublishTopic()},
Data: _msg,
})
Data: data,
}); err != nil {
log.Printf("[❗️] Failed to publish tx joining pending pool : %s\n", err.Error())
}

}

Expand All @@ -1054,16 +1057,18 @@ func (p *PendingPool) Remove(ctx context.Context, txStat *TxStatus) bool {
// These tx(s) are leaving pending pool i.e. they're confirmed now
func (p *PendingPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
data, err := msg.ToMessagePack()
if err != nil {
log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return
}

p.PubSub.Publish(&pubsub.Message{
if _, err := p.PubSub.Publish(&ops.Msg{
Topics: []string{config.GetPendingTxExitPublishTopic()},
Data: _msg,
})
Data: data,
}); err != nil {
log.Printf("[❗️] Failed to publish tx leaving pending pool : %s\n", err.Error())
}

}

Expand Down
25 changes: 15 additions & 10 deletions app/data/queued.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/gammazero/workerpool"
"github.com/itzmeanjan/harmony/app/config"
"github.com/itzmeanjan/pubsub"
"github.com/itzmeanjan/pub0sub/ops"
"github.com/itzmeanjan/pub0sub/publisher"
)

// QueuedPool - Currently present queued tx(s) i.e. these tx(s) are stuck
Expand All @@ -33,7 +34,7 @@ type QueuedPool struct {
CountTxsChan chan CountRequest
ListTxsChan chan ListRequest
TxsFromAChan chan TxsFromARequest
PubSub *pubsub.PubSub
PubSub *publisher.Publisher
RPC *rpc.Client
PendingPool *PendingPool
}
Expand Down Expand Up @@ -776,16 +777,18 @@ func (q *QueuedPool) Add(ctx context.Context, tx *MemPoolTx) bool {
// to pubsub topic
func (q *QueuedPool) PublishAdded(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
data, err := msg.ToMessagePack()
if err != nil {
log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return
}

q.PubSub.Publish(&pubsub.Message{
if _, err := q.PubSub.Publish(&ops.Msg{
Topics: []string{config.GetQueuedTxEntryPublishTopic()},
Data: _msg,
})
Data: data,
}); err != nil {
log.Printf("[❗️] Failed to publish tx joining queued pool : %s\n", err.Error())
}

}

Expand All @@ -808,16 +811,18 @@ func (q *QueuedPool) Remove(ctx context.Context, txHash common.Hash) *MemPoolTx
// failed to keep track of it
func (q *QueuedPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
data, err := msg.ToMessagePack()
if err != nil {
log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return
}

q.PubSub.Publish(&pubsub.Message{
if _, err := q.PubSub.Publish(&ops.Msg{
Topics: []string{config.GetQueuedTxExitPublishTopic()},
Data: _msg,
})
Data: data,
}); err != nil {
log.Printf("[❗️] Failed to publish tx leaving queued pool : %s\n", err.Error())
}

}

Expand Down
2 changes: 0 additions & 2 deletions app/data/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/itzmeanjan/pubsub"
)

// Resource - Shared resources among multiple go routines
Expand All @@ -15,7 +14,6 @@ type Resource struct {
RPCClient *rpc.Client
WSClient *ethclient.Client
Pool *MemPool
PubSub *pubsub.PubSub
StartedAt time.Time
NetworkID uint64
}
Expand Down
Loading

0 comments on commit 106b999

Please sign in to comment.