Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #13 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
More elaborate stat API
  • Loading branch information
itzmeanjan committed Apr 18, 2021
2 parents 20e7368 + be6692c commit cc02779
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 12 deletions.
9 changes: 8 additions & 1 deletion app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
// & queued pool, so that when new tx gets added into pending pool
// queued pool also gets notified & gets to update state if required
alreadyInPendingPoolChan := make(chan *data.MemPoolTx, 4096)
lastSeenBlockChan := make(chan uint64, 1)

// initialising pending pool
pendingPool := &data.PendingPool{
Expand All @@ -116,6 +117,9 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
RemovedTxs: make(map[common.Hash]bool),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetPendingPoolSize()),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetPendingPoolSize()),
Done: 0,
LastSeenBlock: 0,
LastSeenAt: time.Now().UTC(),
AddTxChan: make(chan data.AddRequest, 1),
AddFromQueuedPoolChan: make(chan data.AddRequest, 1),
RemoveTxChan: make(chan data.RemoveRequest, 1),
Expand All @@ -125,6 +129,9 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
CountTxsChan: make(chan data.CountRequest, 1),
ListTxsChan: make(chan data.ListRequest, 1),
TxsFromAChan: make(chan data.TxsFromARequest, 1),
DoneChan: make(chan chan uint64, 1),
SetLastSeenBlockChan: lastSeenBlockChan,
LastSeenBlockChan: make(chan chan data.LastSeenBlock, 1),
PubSub: _redis,
RPC: client,
}
Expand Down Expand Up @@ -171,7 +178,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
go pool.Queued.Prune(ctx, confirmedTxsChan, alreadyInPendingPoolChan)
// Listens for new block headers & informs 👆 (a) for pruning
// txs which can be/ need to be
go listen.SubscribeHead(ctx, wsClient, caughtTxsChan)
go listen.SubscribeHead(ctx, wsClient, caughtTxsChan, lastSeenBlockChan)

// Passed this mempool handle to graphql query resolver
if err := graph.InitMemPool(pool); err != nil {
Expand Down
20 changes: 19 additions & 1 deletion app/data/interaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package data

import "github.com/ethereum/go-ethereum/common"
import (
"time"

"github.com/ethereum/go-ethereum/common"
)

// Sorting direction representation
const (
Expand Down Expand Up @@ -80,3 +84,17 @@ type TxsFromARequest struct {
From common.Address
ResponseChan chan []*MemPoolTx
}

// NewSeenBlock - When new block is seen by header listener, concurrent-safe updation
// is sent to pending pool worker
type NewSeenBlock struct {
Number uint64
ResponseChan chan bool
}

// LastSeenBlock - Which block number was last seen by header subscriber
// along with time
type LastSeenBlock struct {
Number uint64
At time.Time
}
63 changes: 59 additions & 4 deletions app/data/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type PendingPool struct {
RemovedTxs map[common.Hash]bool
AscTxsByGasPrice TxList
DescTxsByGasPrice TxList
Done uint64
LastSeenBlock uint64
LastSeenAt time.Time
AddTxChan chan AddRequest
AddFromQueuedPoolChan chan AddRequest
RemoveTxChan chan RemoveRequest
Expand All @@ -33,6 +36,9 @@ type PendingPool struct {
CountTxsChan chan CountRequest
ListTxsChan chan ListRequest
TxsFromAChan chan TxsFromARequest
DoneChan chan chan uint64
SetLastSeenBlockChan chan uint64
LastSeenBlockChan chan chan LastSeenBlock
PubSub *redis.Client
RPC *rpc.Client
}
Expand Down Expand Up @@ -220,11 +226,15 @@ func (p *PendingPool) Start(ctx context.Context) {

case req := <-p.RemoveTxChan:

req.ResponseChan <- txRemover(req.TxStat)
removed := txRemover(req.TxStat)
req.ResponseChan <- removed

// Marking that tx has been removed, so that
// it won't get picked up next time
p.RemovedTxs[req.TxStat.Hash] = true
if removed {
// Marking that tx has been removed, so that
// it won't get picked up next time
p.RemovedTxs[req.TxStat.Hash] = true
p.Done++
}

case req := <-p.TxExistsChan:

Expand Down Expand Up @@ -297,6 +307,30 @@ func (p *PendingPool) Start(ctx context.Context) {

req.ResponseChan <- nil

case req := <-p.DoneChan:

// How many tx(s) are seen to be
// processed successfully & left mempool
// permanently, as seen by this node, during
// its lifetime
//
// Nothing but count of `dropped` & `confirmed` tx(s)
req <- p.Done

case num := <-p.SetLastSeenBlockChan:

// Only keep moving forward
if p.LastSeenBlock > num {
continue
}

p.LastSeenBlock = num
p.LastSeenAt = time.Now().UTC()

case req := <-p.LastSeenBlockChan:

req <- LastSeenBlock{Number: p.LastSeenBlock, At: p.LastSeenAt}

}

}
Expand Down Expand Up @@ -476,6 +510,27 @@ func (p *PendingPool) Count() uint64 {

}

// Processed - These many tx(s) have permanently left mempool
// as seen by this `harmony` instance during its life time
//
// This is nothing but count of `dropped` & `confirmed` tx(s)
func (p *PendingPool) Processed() uint64 {
respChan := make(chan uint64)

p.DoneChan <- respChan

return <-respChan
}

// GetLastSeenBlock - Get last seen block & time, as reported
// by block header listener
func (p *PendingPool) GetLastSeenBlock() LastSeenBlock {
respChan := make(chan LastSeenBlock)

p.LastSeenBlockChan <- respChan
return <-respChan
}

// Prunables - Given tx, we're attempting to find out all txs which are living
// in pending pool now & having same sender address & same/ lower nonce, so that
// pruner can update state while removing mined txs from mempool
Expand Down
11 changes: 11 additions & 0 deletions app/data/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ func (m *MemPool) QueuedPoolLength() uint64 {
return m.Queued.Count()
}

// DoneTxCount - #-of tx(s) seen to processed during this node's life time
func (m *MemPool) DoneTxCount() uint64 {
return m.Pending.Processed()
}

// LastSeenBlock - Last seen block by mempool & when it was seen, to be invoked
// by stat generator http request handler method
func (m *MemPool) LastSeenBlock() LastSeenBlock {
return m.Pending.GetLastSeenBlock()
}

// PendingForGTE - Returning list of tx(s), pending for more than
// x time unit
func (m *MemPool) PendingForGTE(x time.Duration) []*MemPoolTx {
Expand Down
3 changes: 3 additions & 0 deletions app/data/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ type Stat struct {
PendingPoolSize uint64 `json:"pendingPoolSize"`
QueuedPoolSize uint64 `json:"queuedPoolSize"`
Uptime string `json:"uptime"`
Processed uint64 `json:"processed"`
LatestBlock uint64 `json:"latestBlock"`
SeenAgo string `json:"latestSeenAgo"`
NetworkID uint64 `json:"networkID"`
}

Expand Down
9 changes: 5 additions & 4 deletions app/listen/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type CaughtTxs []*CaughtTx
// SubscribeHead - Subscribe to block headers & as soon as new block gets mined
// its txs are picked up & published on a go channel, which will be listened
// to by pending pool watcher, so that it can prune its state
func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan CaughtTxs) {
func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) {

retryTable := make(map[*big.Int]bool)
headerChan := make(chan *types.Header, 64)
Expand All @@ -49,7 +49,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan

case header := <-headerChan:

if !ProcessBlock(ctx, client, header.Number, commChan) {
if !ProcessBlock(ctx, client, header.Number, commChan, lastSeenBlockChan) {

// Put entry in table that we failed to fetch this block, to be
// attempted in some time future
Expand All @@ -69,7 +69,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan
success := make([]*big.Int, 0, pendingC)
for num := range retryTable {

if ProcessBlock(ctx, client, num, commChan) {
if ProcessBlock(ctx, client, num, commChan, lastSeenBlockChan) {
success = append(success, num)
}

Expand All @@ -93,7 +93,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan
}

// ProcessBlock - Fetches all txs present in mined block & passes those to pending pool pruning worker
func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan CaughtTxs) bool {
func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) bool {

block, err := client.BlockByNumber(ctx, number)
if err != nil {
Expand Down Expand Up @@ -123,6 +123,7 @@ func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int
}

commChan <- txs
lastSeenBlockChan <- number.Uint64()
return true

}
9 changes: 7 additions & 2 deletions app/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,15 @@ func Start(ctx context.Context, res *data.Resource) {

v1.GET("/stat", func(c echo.Context) error {

latestBlock := res.Pool.LastSeenBlock()

return c.JSON(http.StatusOK, &data.Stat{
PendingPoolSize: res.Pool.PendingPoolLength(),
QueuedPoolSize: res.Pool.QueuedPoolLength(),
Uptime: time.Now().UTC().Sub(res.StartedAt).String(),
Processed: res.Pool.DoneTxCount(),
LatestBlock: latestBlock.Number,
SeenAgo: time.Now().UTC().Sub(latestBlock.At).String(),
NetworkID: res.NetworkID,
})

Expand All @@ -83,7 +88,7 @@ func Start(ctx context.Context, res *data.Resource) {
v1.GET("/graphql", func(c echo.Context) error {

if !c.IsWebSocket() {
return errors.New("Only websocket transport allowed")
return errors.New("only websocket transport allowed")
}

graphql.ServeHTTP(c.Response().Writer, c.Request())
Expand All @@ -94,7 +99,7 @@ func Start(ctx context.Context, res *data.Resource) {
v1.POST("/graphql", func(c echo.Context) error {

if c.IsWebSocket() {
return errors.New("Only http transport allowed")
return errors.New("only http transport allowed")
}

graphql.ServeHTTP(c.Response().Writer, c.Request())
Expand Down

0 comments on commit cc02779

Please sign in to comment.