Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #11 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Lock free, concurrent safe state management data structure
  • Loading branch information
itzmeanjan authored Apr 10, 2021
2 parents 20b9625 + c2b5e71 commit b130077
Show file tree
Hide file tree
Showing 12 changed files with 1,582 additions and 570 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ touch .env

```bash
RPCUrl=https://<rpc-node>
WSUrl=wss://<rpc-node>
MemPoolPollingPeriod=1000
PendingTxEntryTopic=pending_pool_entry
PendingTxExitTopic=pending_pool_exit
Expand All @@ -116,6 +117,7 @@ Port=7000
Environment Variable | Interpretation
--- | ---
RPCUrl | `txpool` RPC API enabled Ethereum Node's URI
WSUrl | To be used for listening to newly mined block headers
MemPoolPollingPeriod | RPC node's mempool to be checked every `X` milliseconds
PendingTxEntryTopic | Whenever tx enters pending pool, it'll be published on Redis topic `t`
PendingTxExitTopic | Whenever tx leaves pending pool, it'll be published on Redis topic `t`
Expand Down
70 changes: 58 additions & 12 deletions app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/go-redis/redis/v8"
"github.com/itzmeanjan/harmony/app/config"
"github.com/itzmeanjan/harmony/app/data"
"github.com/itzmeanjan/harmony/app/graph"
"github.com/itzmeanjan/harmony/app/listen"
"github.com/itzmeanjan/harmony/app/networking"
)

Expand Down Expand Up @@ -43,7 +45,11 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
}

client, err := rpc.DialContext(ctx, config.Get("RPCUrl"))
if err != nil {
return nil, err
}

wsClient, err := ethclient.DialContext(ctx, config.Get("WSUrl"))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -98,21 +104,60 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
return nil, err
}

// initialising pending pool
pendingPool := &data.PendingPool{
Transactions: make(map[common.Hash]*data.MemPoolTx),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, 1024),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, 1024),
Lock: &sync.RWMutex{},
IsPruning: false,
AddTxChan: make(chan data.AddRequest, 1),
RemoveTxChan: make(chan data.RemoveRequest, 1),
TxExistsChan: make(chan data.ExistsRequest, 1),
GetTxChan: make(chan data.GetRequest, 1),
CountTxsChan: make(chan data.CountRequest, 1),
ListTxsChan: make(chan data.ListRequest, 1),
PubSub: _redis,
RPC: client,
}

// initialising queued pool
queuedPool := &data.QueuedPool{
Transactions: make(map[common.Hash]*data.MemPoolTx),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, 1024),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, 1024),
Lock: &sync.RWMutex{},
IsPruning: false,
AddTxChan: make(chan data.AddRequest, 1),
RemoveTxChan: make(chan data.RemovedUnstuckTx, 1),
TxExistsChan: make(chan data.ExistsRequest, 1),
GetTxChan: make(chan data.GetRequest, 1),
CountTxsChan: make(chan data.CountRequest, 1),
ListTxsChan: make(chan data.ListRequest, 1),
PubSub: _redis,
RPC: client,
PendingPool: pendingPool,
}

pool := &data.MemPool{
Pending: &data.PendingPool{
Transactions: make(map[common.Hash]*data.MemPoolTx),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, 1024),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, 1024),
Lock: &sync.RWMutex{},
},
Queued: &data.QueuedPool{
Transactions: make(map[common.Hash]*data.MemPoolTx),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, 1024),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, 1024),
Lock: &sync.RWMutex{},
},
Pending: pendingPool,
Queued: queuedPool,
}

// Block head listener & pending pool pruner
// talks over this buffered channel
commChan := make(chan listen.CaughtTxs, 1024)

// Starting pool life cycle manager go routine
go pool.Pending.Start(ctx)
// (a)
go pool.Pending.Prune(ctx, commChan)
go pool.Queued.Start(ctx)
go pool.Queued.Prune(ctx)
// Listens for new block headers & informs 👆 (a) for pruning
// txs which can be/ need to be
go listen.SubscribeHead(ctx, wsClient, commChan)

// Passed this mempool handle to graphql query resolver
if err := graph.InitMemPool(pool); err != nil {
return nil, err
Expand All @@ -130,6 +175,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {

return &data.Resource{
RPCClient: client,
WSClient: wsClient,
Pool: pool,
Redis: _redis,
StartedAt: time.Now().UTC(),
Expand Down
67 changes: 59 additions & 8 deletions app/data/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"errors"
"fmt"
"math/big"
"runtime"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gammazero/workerpool"
)

// IsPresentInCurrentPool - Given tx hash, which was previously present in pending/ queued pool
Expand All @@ -17,24 +19,73 @@ import (
// RPC interface
func IsPresentInCurrentPool(txs map[string]map[string]*MemPoolTx, txHash common.Hash) bool {

wp := workerpool.New(runtime.NumCPU())
workCount := len(txs)
resultChan := make(chan bool, workCount)
stopChan := make(chan struct{})

var present bool

{
OUTER:
for _, vOuter := range txs {
for _, vInner := range vOuter {
// @note ⭐️
//
// Don't copy value reference here, directly pass it during
// function invokation, while accessing value using field `k`
for k := range txs {

func(txs map[string]*MemPoolTx) {

wp.Submit(func() {

// Same as ⭐️
for k := range txs {

if vInner.Hash == txHash {
select {

present = true
break OUTER
case <-stopChan:
return

default:
if txs[k].Hash == txHash {
resultChan <- true
return
}

}

}

}
// If this worker couldnn't find anything of interest
resultChan <- false

})

}(txs[k])

}

// How many responses received from workers
var received int

for v := range resultChan {
if v {
present = true

// No other worker will send anything here
// which is exactly why we're fleeing
close(stopChan)
break
}

received++
if received >= workCount {
// We're done receiving all responses
// from all works we submitted
break
}
}

wp.Stop()

return present

}
Expand Down
75 changes: 75 additions & 0 deletions app/data/interaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package data

import "github.com/ethereum/go-ethereum/common"

// Sorting direction representation
const (
ASC = iota
DESC
)

// When submitting async request for pruning pending/ queued
// pool, immediate response to be sent to client in any of these form(s)
const (
EMPTY = iota
PRUNING
SCHEDULED
)

// AddRequest - For adding new tx into pool
type AddRequest struct {
Tx *MemPoolTx
ResponseChan chan bool
}

// RemoveRequest - For removing existing tx into pool
type RemoveRequest struct {
TxStat *TxStatus
ResponseChan chan bool
}

// RemovedUnstuckTx - Remove unstuck tx from queued pool, request to be
// sent in this form
type RemovedUnstuckTx struct {
Hash common.Hash
ResponseChan chan *MemPoolTx
}

// RemoveTxsRequest - For checking which txs can be removed
// from pending pool, this request to be sent to pending pool manager
type RemoveTxsFromPendingPool struct {
Txs map[string]map[string]*MemPoolTx
ResponseChan chan int
}

// RemoveTxsFromQueuedPool - For updating local queued pool state, request of
// this form to be sent to pool manager over channel, where it'll check which txs
// are likely to be unstuck & has moved to pending pool
type RemoveTxsFromQueuedPool struct {
Pending map[string]map[string]*MemPoolTx
Queued map[string]map[string]*MemPoolTx
ResponseChan chan int
}

// ExistsRequest - Checking whether tx is present in pool or not
type ExistsRequest struct {
Tx common.Hash
ResponseChan chan bool
}

// GetRequest - Obtaining reference to existing tx in pool
type GetRequest struct {
Tx common.Hash
ResponseChan chan *MemPoolTx
}

// CountRequest - Getting #-of txs present in pool
type CountRequest struct {
ResponseChan chan uint64
}

// ListRequest - Listing all txs in pool
type ListRequest struct {
Order int
ResponseChan chan []*MemPoolTx
}
Loading

0 comments on commit b130077

Please sign in to comment.