diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index a9b6d0b..8c59ed0 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -107,6 +107,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { // & queued pool, so that when new tx gets added into pending pool // queued pool also gets notified & gets to update state if required alreadyInPendingPoolChan := make(chan *data.MemPoolTx, 4096) + lastSeenBlockChan := make(chan uint64, 1) // initialising pending pool pendingPool := &data.PendingPool{ @@ -116,6 +117,9 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { RemovedTxs: make(map[common.Hash]bool), AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetPendingPoolSize()), DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetPendingPoolSize()), + Done: 0, + LastSeenBlock: 0, + LastSeenAt: time.Now().UTC(), AddTxChan: make(chan data.AddRequest, 1), AddFromQueuedPoolChan: make(chan data.AddRequest, 1), RemoveTxChan: make(chan data.RemoveRequest, 1), @@ -125,6 +129,9 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { CountTxsChan: make(chan data.CountRequest, 1), ListTxsChan: make(chan data.ListRequest, 1), TxsFromAChan: make(chan data.TxsFromARequest, 1), + DoneChan: make(chan chan uint64, 1), + SetLastSeenBlockChan: lastSeenBlockChan, + LastSeenBlockChan: make(chan chan data.LastSeenBlock, 1), PubSub: _redis, RPC: client, } @@ -171,7 +178,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { go pool.Queued.Prune(ctx, confirmedTxsChan, alreadyInPendingPoolChan) // Listens for new block headers & informs 👆 (a) for pruning // txs which can be/ need to be - go listen.SubscribeHead(ctx, wsClient, caughtTxsChan) + go listen.SubscribeHead(ctx, wsClient, caughtTxsChan, lastSeenBlockChan) // Passed this mempool handle to graphql query resolver if err := graph.InitMemPool(pool); err != nil { diff --git a/app/data/interaction.go b/app/data/interaction.go index febd140..e8cedf8 100644 --- a/app/data/interaction.go +++ b/app/data/interaction.go @@ -1,6 +1,10 @@ package data -import "github.com/ethereum/go-ethereum/common" +import ( + "time" + + "github.com/ethereum/go-ethereum/common" +) // Sorting direction representation const ( @@ -80,3 +84,17 @@ type TxsFromARequest struct { From common.Address ResponseChan chan []*MemPoolTx } + +// NewSeenBlock - When new block is seen by header listener, concurrent-safe updation +// is sent to pending pool worker +type NewSeenBlock struct { + Number uint64 + ResponseChan chan bool +} + +// LastSeenBlock - Which block number was last seen by header subscriber +// along with time +type LastSeenBlock struct { + Number uint64 + At time.Time +} diff --git a/app/data/pending.go b/app/data/pending.go index b4000f9..c36cdaf 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -24,6 +24,9 @@ type PendingPool struct { RemovedTxs map[common.Hash]bool AscTxsByGasPrice TxList DescTxsByGasPrice TxList + Done uint64 + LastSeenBlock uint64 + LastSeenAt time.Time AddTxChan chan AddRequest AddFromQueuedPoolChan chan AddRequest RemoveTxChan chan RemoveRequest @@ -33,6 +36,9 @@ type PendingPool struct { CountTxsChan chan CountRequest ListTxsChan chan ListRequest TxsFromAChan chan TxsFromARequest + DoneChan chan chan uint64 + SetLastSeenBlockChan chan uint64 + LastSeenBlockChan chan chan LastSeenBlock PubSub *redis.Client RPC *rpc.Client } @@ -220,11 +226,15 @@ func (p *PendingPool) Start(ctx context.Context) { case req := <-p.RemoveTxChan: - req.ResponseChan <- txRemover(req.TxStat) + removed := txRemover(req.TxStat) + req.ResponseChan <- removed - // Marking that tx has been removed, so that - // it won't get picked up next time - p.RemovedTxs[req.TxStat.Hash] = true + if removed { + // Marking that tx has been removed, so that + // it won't get picked up next time + p.RemovedTxs[req.TxStat.Hash] = true + p.Done++ + } case req := <-p.TxExistsChan: @@ -297,6 +307,30 @@ func (p *PendingPool) Start(ctx context.Context) { req.ResponseChan <- nil + case req := <-p.DoneChan: + + // How many tx(s) are seen to be + // processed successfully & left mempool + // permanently, as seen by this node, during + // its lifetime + // + // Nothing but count of `dropped` & `confirmed` tx(s) + req <- p.Done + + case num := <-p.SetLastSeenBlockChan: + + // Only keep moving forward + if p.LastSeenBlock > num { + continue + } + + p.LastSeenBlock = num + p.LastSeenAt = time.Now().UTC() + + case req := <-p.LastSeenBlockChan: + + req <- LastSeenBlock{Number: p.LastSeenBlock, At: p.LastSeenAt} + } } @@ -476,6 +510,27 @@ func (p *PendingPool) Count() uint64 { } +// Processed - These many tx(s) have permanently left mempool +// as seen by this `harmony` instance during its life time +// +// This is nothing but count of `dropped` & `confirmed` tx(s) +func (p *PendingPool) Processed() uint64 { + respChan := make(chan uint64) + + p.DoneChan <- respChan + + return <-respChan +} + +// GetLastSeenBlock - Get last seen block & time, as reported +// by block header listener +func (p *PendingPool) GetLastSeenBlock() LastSeenBlock { + respChan := make(chan LastSeenBlock) + + p.LastSeenBlockChan <- respChan + return <-respChan +} + // Prunables - Given tx, we're attempting to find out all txs which are living // in pending pool now & having same sender address & same/ lower nonce, so that // pruner can update state while removing mined txs from mempool diff --git a/app/data/pool.go b/app/data/pool.go index 5f11f5d..c6e4673 100644 --- a/app/data/pool.go +++ b/app/data/pool.go @@ -68,6 +68,17 @@ func (m *MemPool) QueuedPoolLength() uint64 { return m.Queued.Count() } +// DoneTxCount - #-of tx(s) seen to processed during this node's life time +func (m *MemPool) DoneTxCount() uint64 { + return m.Pending.Processed() +} + +// LastSeenBlock - Last seen block by mempool & when it was seen, to be invoked +// by stat generator http request handler method +func (m *MemPool) LastSeenBlock() LastSeenBlock { + return m.Pending.GetLastSeenBlock() +} + // PendingForGTE - Returning list of tx(s), pending for more than // x time unit func (m *MemPool) PendingForGTE(x time.Duration) []*MemPoolTx { diff --git a/app/data/response.go b/app/data/response.go index db562fd..7c76e78 100644 --- a/app/data/response.go +++ b/app/data/response.go @@ -6,6 +6,9 @@ type Stat struct { PendingPoolSize uint64 `json:"pendingPoolSize"` QueuedPoolSize uint64 `json:"queuedPoolSize"` Uptime string `json:"uptime"` + Processed uint64 `json:"processed"` + LatestBlock uint64 `json:"latestBlock"` + SeenAgo string `json:"latestSeenAgo"` NetworkID uint64 `json:"networkID"` } diff --git a/app/listen/header.go b/app/listen/header.go index a04800d..3c46930 100644 --- a/app/listen/header.go +++ b/app/listen/header.go @@ -25,7 +25,7 @@ type CaughtTxs []*CaughtTx // SubscribeHead - Subscribe to block headers & as soon as new block gets mined // its txs are picked up & published on a go channel, which will be listened // to by pending pool watcher, so that it can prune its state -func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan CaughtTxs) { +func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) { retryTable := make(map[*big.Int]bool) headerChan := make(chan *types.Header, 64) @@ -49,7 +49,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan case header := <-headerChan: - if !ProcessBlock(ctx, client, header.Number, commChan) { + if !ProcessBlock(ctx, client, header.Number, commChan, lastSeenBlockChan) { // Put entry in table that we failed to fetch this block, to be // attempted in some time future @@ -69,7 +69,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan success := make([]*big.Int, 0, pendingC) for num := range retryTable { - if ProcessBlock(ctx, client, num, commChan) { + if ProcessBlock(ctx, client, num, commChan, lastSeenBlockChan) { success = append(success, num) } @@ -93,7 +93,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan } // ProcessBlock - Fetches all txs present in mined block & passes those to pending pool pruning worker -func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan CaughtTxs) bool { +func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) bool { block, err := client.BlockByNumber(ctx, number) if err != nil { @@ -123,6 +123,7 @@ func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int } commChan <- txs + lastSeenBlockChan <- number.Uint64() return true } diff --git a/app/server/server.go b/app/server/server.go index e84d13f..7926a43 100644 --- a/app/server/server.go +++ b/app/server/server.go @@ -71,10 +71,15 @@ func Start(ctx context.Context, res *data.Resource) { v1.GET("/stat", func(c echo.Context) error { + latestBlock := res.Pool.LastSeenBlock() + return c.JSON(http.StatusOK, &data.Stat{ PendingPoolSize: res.Pool.PendingPoolLength(), QueuedPoolSize: res.Pool.QueuedPoolLength(), Uptime: time.Now().UTC().Sub(res.StartedAt).String(), + Processed: res.Pool.DoneTxCount(), + LatestBlock: latestBlock.Number, + SeenAgo: time.Now().UTC().Sub(latestBlock.At).String(), NetworkID: res.NetworkID, }) @@ -83,7 +88,7 @@ func Start(ctx context.Context, res *data.Resource) { v1.GET("/graphql", func(c echo.Context) error { if !c.IsWebSocket() { - return errors.New("Only websocket transport allowed") + return errors.New("only websocket transport allowed") } graphql.ServeHTTP(c.Response().Writer, c.Request()) @@ -94,7 +99,7 @@ func Start(ctx context.Context, res *data.Resource) { v1.POST("/graphql", func(c echo.Context) error { if c.IsWebSocket() { - return errors.New("Only http transport allowed") + return errors.New("only http transport allowed") } graphql.ServeHTTP(c.Response().Writer, c.Request())