From 888b2465f443ef8bacc9e7f9af8c0e255e62e1b1 Mon Sep 17 00:00:00 2001 From: Anjan Date: Fri, 16 Apr 2021 10:57:44 +0530 Subject: [PATCH 01/11] get count of how many tx(s) seen to be leaving mempool permanently during this node's life time --- app/bootup/bootup.go | 2 ++ app/data/pending.go | 36 ++++++++++++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index a9b6d0b..4ff6a3f 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -116,6 +116,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { RemovedTxs: make(map[common.Hash]bool), AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetPendingPoolSize()), DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetPendingPoolSize()), + Done: 0, AddTxChan: make(chan data.AddRequest, 1), AddFromQueuedPoolChan: make(chan data.AddRequest, 1), RemoveTxChan: make(chan data.RemoveRequest, 1), @@ -125,6 +126,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { CountTxsChan: make(chan data.CountRequest, 1), ListTxsChan: make(chan data.ListRequest, 1), TxsFromAChan: make(chan data.TxsFromARequest, 1), + DoneChan: make(chan chan uint64, 1), PubSub: _redis, RPC: client, } diff --git a/app/data/pending.go b/app/data/pending.go index b4000f9..267e0a0 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -24,6 +24,7 @@ type PendingPool struct { RemovedTxs map[common.Hash]bool AscTxsByGasPrice TxList DescTxsByGasPrice TxList + Done uint64 AddTxChan chan AddRequest AddFromQueuedPoolChan chan AddRequest RemoveTxChan chan RemoveRequest @@ -33,6 +34,7 @@ type PendingPool struct { CountTxsChan chan CountRequest ListTxsChan chan ListRequest TxsFromAChan chan TxsFromARequest + DoneChan chan chan uint64 PubSub *redis.Client RPC *rpc.Client } @@ -220,11 +222,15 @@ func (p *PendingPool) Start(ctx context.Context) { case req := <-p.RemoveTxChan: - req.ResponseChan <- txRemover(req.TxStat) + removed := txRemover(req.TxStat) + if removed { + req.ResponseChan <- removed - // Marking that tx has been removed, so that - // it won't get picked up next time - p.RemovedTxs[req.TxStat.Hash] = true + // Marking that tx has been removed, so that + // it won't get picked up next time + p.RemovedTxs[req.TxStat.Hash] = true + p.Done++ + } case req := <-p.TxExistsChan: @@ -297,6 +303,16 @@ func (p *PendingPool) Start(ctx context.Context) { req.ResponseChan <- nil + case req := <-p.DoneChan: + + // How many tx(s) are seen to be + // processed successfully & left mempool + // permanently, as seen by this node, during + // its lifetime + // + // Nothing but count of `dropped` & `confirmed` tx(s) + req <- p.Done + } } @@ -476,6 +492,18 @@ func (p *PendingPool) Count() uint64 { } +// Processed - These many tx(s) have permanently left mempool +// as seen by this `harmony` instance during its life time +// +// This is nothing but count of `dropped` & `confirmed` tx(s) +func (p *PendingPool) Processed() uint64 { + respChan := make(chan uint64) + + p.DoneChan <- respChan + + return <-respChan +} + // Prunables - Given tx, we're attempting to find out all txs which are living // in pending pool now & having same sender address & same/ lower nonce, so that // pruner can update state while removing mined txs from mempool From 6975500219b21d7819be6ee3b5ec71282e001936 Mon Sep 17 00:00:00 2001 From: Anjan Date: Fri, 16 Apr 2021 11:00:37 +0530 Subject: [PATCH 02/11] done tx count in response --- app/data/pool.go | 5 +++++ app/data/response.go | 1 + app/server/server.go | 1 + 3 files changed, 7 insertions(+) diff --git a/app/data/pool.go b/app/data/pool.go index 5f11f5d..a947a32 100644 --- a/app/data/pool.go +++ b/app/data/pool.go @@ -68,6 +68,11 @@ func (m *MemPool) QueuedPoolLength() uint64 { return m.Queued.Count() } +// DoneTxCount - #-of tx(s) seen to processed during this node's life time +func (m *MemPool) DoneTxCount() uint64 { + return m.Pending.Processed() +} + // PendingForGTE - Returning list of tx(s), pending for more than // x time unit func (m *MemPool) PendingForGTE(x time.Duration) []*MemPoolTx { diff --git a/app/data/response.go b/app/data/response.go index db562fd..d153d35 100644 --- a/app/data/response.go +++ b/app/data/response.go @@ -6,6 +6,7 @@ type Stat struct { PendingPoolSize uint64 `json:"pendingPoolSize"` QueuedPoolSize uint64 `json:"queuedPoolSize"` Uptime string `json:"uptime"` + Processed uint64 `json:"processed"` NetworkID uint64 `json:"networkID"` } diff --git a/app/server/server.go b/app/server/server.go index e84d13f..8d9ead6 100644 --- a/app/server/server.go +++ b/app/server/server.go @@ -75,6 +75,7 @@ func Start(ctx context.Context, res *data.Resource) { PendingPoolSize: res.Pool.PendingPoolLength(), QueuedPoolSize: res.Pool.QueuedPoolLength(), Uptime: time.Now().UTC().Sub(res.StartedAt).String(), + Processed: res.Pool.DoneTxCount(), NetworkID: res.NetworkID, }) From c9344c5c9c10c1a603a0ac5c5bbc8cc3645f88ea Mon Sep 17 00:00:00 2001 From: Anjan Date: Fri, 16 Apr 2021 11:01:19 +0530 Subject: [PATCH 03/11] fixed error message format --- app/server/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/server/server.go b/app/server/server.go index 8d9ead6..6d76657 100644 --- a/app/server/server.go +++ b/app/server/server.go @@ -84,7 +84,7 @@ func Start(ctx context.Context, res *data.Resource) { v1.GET("/graphql", func(c echo.Context) error { if !c.IsWebSocket() { - return errors.New("Only websocket transport allowed") + return errors.New("only websocket transport allowed") } graphql.ServeHTTP(c.Response().Writer, c.Request()) @@ -95,7 +95,7 @@ func Start(ctx context.Context, res *data.Resource) { v1.POST("/graphql", func(c echo.Context) error { if c.IsWebSocket() { - return errors.New("Only http transport allowed") + return errors.New("only http transport allowed") } graphql.ServeHTTP(c.Response().Writer, c.Request()) From 0e3dadf41191a158f0284d3623730483b55997f8 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 16 Apr 2021 19:55:25 +0530 Subject: [PATCH 04/11] first notify that removed or not, then do other work --- app/data/pending.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/data/pending.go b/app/data/pending.go index 267e0a0..67b1799 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -223,9 +223,9 @@ func (p *PendingPool) Start(ctx context.Context) { case req := <-p.RemoveTxChan: removed := txRemover(req.TxStat) - if removed { - req.ResponseChan <- removed + req.ResponseChan <- removed + if removed { // Marking that tx has been removed, so that // it won't get picked up next time p.RemovedTxs[req.TxStat.Hash] = true From 0cd21f69c54fa0a4d51058bebbe854dae4c8a645 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 17 Apr 2021 10:32:55 +0530 Subject: [PATCH 05/11] last block seen, with time --- app/data/interaction.go | 20 +++++++++++++++++++- app/data/pending.go | 4 ++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/app/data/interaction.go b/app/data/interaction.go index febd140..e8cedf8 100644 --- a/app/data/interaction.go +++ b/app/data/interaction.go @@ -1,6 +1,10 @@ package data -import "github.com/ethereum/go-ethereum/common" +import ( + "time" + + "github.com/ethereum/go-ethereum/common" +) // Sorting direction representation const ( @@ -80,3 +84,17 @@ type TxsFromARequest struct { From common.Address ResponseChan chan []*MemPoolTx } + +// NewSeenBlock - When new block is seen by header listener, concurrent-safe updation +// is sent to pending pool worker +type NewSeenBlock struct { + Number uint64 + ResponseChan chan bool +} + +// LastSeenBlock - Which block number was last seen by header subscriber +// along with time +type LastSeenBlock struct { + Number uint64 + At time.Time +} diff --git a/app/data/pending.go b/app/data/pending.go index 67b1799..b35f8bd 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -25,6 +25,8 @@ type PendingPool struct { AscTxsByGasPrice TxList DescTxsByGasPrice TxList Done uint64 + LastSeenBlock uint64 + LastSeenAt time.Time AddTxChan chan AddRequest AddFromQueuedPoolChan chan AddRequest RemoveTxChan chan RemoveRequest @@ -35,6 +37,8 @@ type PendingPool struct { ListTxsChan chan ListRequest TxsFromAChan chan TxsFromARequest DoneChan chan chan uint64 + SetLastSeenBlockChan chan NewSeenBlock + LastSeenBlockChan chan chan LastSeenBlock PubSub *redis.Client RPC *rpc.Client } From d0ff530b9322b57e96b83ae3efaba08d346aec37 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 17 Apr 2021 10:37:17 +0530 Subject: [PATCH 06/11] internal op for channel comm --- app/data/pending.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/app/data/pending.go b/app/data/pending.go index b35f8bd..d3e5204 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -317,6 +317,17 @@ func (p *PendingPool) Start(ctx context.Context) { // Nothing but count of `dropped` & `confirmed` tx(s) req <- p.Done + case req := <-p.SetLastSeenBlockChan: + + p.LastSeenBlock = req.Number + p.LastSeenAt = time.Now().UTC() + + req.ResponseChan <- true + + case req := <-p.LastSeenBlockChan: + + req <- LastSeenBlock{Number: p.LastSeenBlock, At: p.LastSeenAt} + } } From 83f9e8ffc6efc08b8c83d80620ba63aa3000d695 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 17 Apr 2021 10:39:19 +0530 Subject: [PATCH 07/11] don't consider already processed block --- app/data/pending.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/app/data/pending.go b/app/data/pending.go index d3e5204..cffaa19 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -319,6 +319,12 @@ func (p *PendingPool) Start(ctx context.Context) { case req := <-p.SetLastSeenBlockChan: + // Never look behind, just keep moving forward + if p.LastSeenBlock > req.Number { + req.ResponseChan <- false + continue + } + p.LastSeenBlock = req.Number p.LastSeenAt = time.Now().UTC() From 4a06d8583d89ba1d2bba971581ca0fc2096b93e7 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 17 Apr 2021 10:55:57 +0530 Subject: [PATCH 08/11] utility methods for updating/ getting block number --- app/data/pending.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/app/data/pending.go b/app/data/pending.go index cffaa19..9924d3b 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -525,6 +525,24 @@ func (p *PendingPool) Processed() uint64 { return <-respChan } +// UpdateLastSeenBlock - Block header subscriber is supposed to be +// invoking this method, when it sees new block +func (p *PendingPool) UpdateLastSeenBlock(number uint64) bool { + respChan := make(chan bool) + + p.SetLastSeenBlockChan <- NewSeenBlock{Number: number, ResponseChan: respChan} + return <-respChan +} + +// GetLastSeenBlock - Get last seen block & time, as reported +// by block header listener +func (p *PendingPool) GetLastSeenBlock() LastSeenBlock { + respChan := make(chan LastSeenBlock) + + p.LastSeenBlockChan <- respChan + return <-respChan +} + // Prunables - Given tx, we're attempting to find out all txs which are living // in pending pool now & having same sender address & same/ lower nonce, so that // pruner can update state while removing mined txs from mempool From 67763419ead7e07d19bbfcec7d28e846aa93880e Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 17 Apr 2021 11:03:57 +0530 Subject: [PATCH 09/11] comm between workers over channel --- app/bootup/bootup.go | 7 ++++++- app/data/pending.go | 22 +++++----------------- app/listen/header.go | 9 +++++---- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 4ff6a3f..8c59ed0 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -107,6 +107,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { // & queued pool, so that when new tx gets added into pending pool // queued pool also gets notified & gets to update state if required alreadyInPendingPoolChan := make(chan *data.MemPoolTx, 4096) + lastSeenBlockChan := make(chan uint64, 1) // initialising pending pool pendingPool := &data.PendingPool{ @@ -117,6 +118,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetPendingPoolSize()), DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetPendingPoolSize()), Done: 0, + LastSeenBlock: 0, + LastSeenAt: time.Now().UTC(), AddTxChan: make(chan data.AddRequest, 1), AddFromQueuedPoolChan: make(chan data.AddRequest, 1), RemoveTxChan: make(chan data.RemoveRequest, 1), @@ -127,6 +130,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { ListTxsChan: make(chan data.ListRequest, 1), TxsFromAChan: make(chan data.TxsFromARequest, 1), DoneChan: make(chan chan uint64, 1), + SetLastSeenBlockChan: lastSeenBlockChan, + LastSeenBlockChan: make(chan chan data.LastSeenBlock, 1), PubSub: _redis, RPC: client, } @@ -173,7 +178,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { go pool.Queued.Prune(ctx, confirmedTxsChan, alreadyInPendingPoolChan) // Listens for new block headers & informs 👆 (a) for pruning // txs which can be/ need to be - go listen.SubscribeHead(ctx, wsClient, caughtTxsChan) + go listen.SubscribeHead(ctx, wsClient, caughtTxsChan, lastSeenBlockChan) // Passed this mempool handle to graphql query resolver if err := graph.InitMemPool(pool); err != nil { diff --git a/app/data/pending.go b/app/data/pending.go index 9924d3b..c36cdaf 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -37,7 +37,7 @@ type PendingPool struct { ListTxsChan chan ListRequest TxsFromAChan chan TxsFromARequest DoneChan chan chan uint64 - SetLastSeenBlockChan chan NewSeenBlock + SetLastSeenBlockChan chan uint64 LastSeenBlockChan chan chan LastSeenBlock PubSub *redis.Client RPC *rpc.Client @@ -317,19 +317,16 @@ func (p *PendingPool) Start(ctx context.Context) { // Nothing but count of `dropped` & `confirmed` tx(s) req <- p.Done - case req := <-p.SetLastSeenBlockChan: + case num := <-p.SetLastSeenBlockChan: - // Never look behind, just keep moving forward - if p.LastSeenBlock > req.Number { - req.ResponseChan <- false + // Only keep moving forward + if p.LastSeenBlock > num { continue } - p.LastSeenBlock = req.Number + p.LastSeenBlock = num p.LastSeenAt = time.Now().UTC() - req.ResponseChan <- true - case req := <-p.LastSeenBlockChan: req <- LastSeenBlock{Number: p.LastSeenBlock, At: p.LastSeenAt} @@ -525,15 +522,6 @@ func (p *PendingPool) Processed() uint64 { return <-respChan } -// UpdateLastSeenBlock - Block header subscriber is supposed to be -// invoking this method, when it sees new block -func (p *PendingPool) UpdateLastSeenBlock(number uint64) bool { - respChan := make(chan bool) - - p.SetLastSeenBlockChan <- NewSeenBlock{Number: number, ResponseChan: respChan} - return <-respChan -} - // GetLastSeenBlock - Get last seen block & time, as reported // by block header listener func (p *PendingPool) GetLastSeenBlock() LastSeenBlock { diff --git a/app/listen/header.go b/app/listen/header.go index a04800d..3c46930 100644 --- a/app/listen/header.go +++ b/app/listen/header.go @@ -25,7 +25,7 @@ type CaughtTxs []*CaughtTx // SubscribeHead - Subscribe to block headers & as soon as new block gets mined // its txs are picked up & published on a go channel, which will be listened // to by pending pool watcher, so that it can prune its state -func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan CaughtTxs) { +func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) { retryTable := make(map[*big.Int]bool) headerChan := make(chan *types.Header, 64) @@ -49,7 +49,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan case header := <-headerChan: - if !ProcessBlock(ctx, client, header.Number, commChan) { + if !ProcessBlock(ctx, client, header.Number, commChan, lastSeenBlockChan) { // Put entry in table that we failed to fetch this block, to be // attempted in some time future @@ -69,7 +69,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan success := make([]*big.Int, 0, pendingC) for num := range retryTable { - if ProcessBlock(ctx, client, num, commChan) { + if ProcessBlock(ctx, client, num, commChan, lastSeenBlockChan) { success = append(success, num) } @@ -93,7 +93,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan } // ProcessBlock - Fetches all txs present in mined block & passes those to pending pool pruning worker -func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan CaughtTxs) bool { +func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) bool { block, err := client.BlockByNumber(ctx, number) if err != nil { @@ -123,6 +123,7 @@ func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int } commChan <- txs + lastSeenBlockChan <- number.Uint64() return true } From 720f675c2e771b62dffe5fbe508c2fece631ac13 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 17 Apr 2021 11:09:02 +0530 Subject: [PATCH 10/11] better stat with when latest block was seen --- app/data/pool.go | 6 ++++++ app/data/response.go | 14 +++++++++----- app/server/server.go | 4 ++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/app/data/pool.go b/app/data/pool.go index a947a32..c6e4673 100644 --- a/app/data/pool.go +++ b/app/data/pool.go @@ -73,6 +73,12 @@ func (m *MemPool) DoneTxCount() uint64 { return m.Pending.Processed() } +// LastSeenBlock - Last seen block by mempool & when it was seen, to be invoked +// by stat generator http request handler method +func (m *MemPool) LastSeenBlock() LastSeenBlock { + return m.Pending.GetLastSeenBlock() +} + // PendingForGTE - Returning list of tx(s), pending for more than // x time unit func (m *MemPool) PendingForGTE(x time.Duration) []*MemPoolTx { diff --git a/app/data/response.go b/app/data/response.go index d153d35..1937d80 100644 --- a/app/data/response.go +++ b/app/data/response.go @@ -1,13 +1,17 @@ package data +import "time" + // Stat - Response to client queries for current mempool state // to be sent in this form type Stat struct { - PendingPoolSize uint64 `json:"pendingPoolSize"` - QueuedPoolSize uint64 `json:"queuedPoolSize"` - Uptime string `json:"uptime"` - Processed uint64 `json:"processed"` - NetworkID uint64 `json:"networkID"` + PendingPoolSize uint64 `json:"pendingPoolSize"` + QueuedPoolSize uint64 `json:"queuedPoolSize"` + Uptime string `json:"uptime"` + Processed uint64 `json:"processed"` + LatestBlock uint64 `json:"latestBlock"` + SeenAgo time.Duration `json:"latestSeenAgo"` + NetworkID uint64 `json:"networkID"` } // Msg - Response message sent to client diff --git a/app/server/server.go b/app/server/server.go index 6d76657..d851554 100644 --- a/app/server/server.go +++ b/app/server/server.go @@ -71,11 +71,15 @@ func Start(ctx context.Context, res *data.Resource) { v1.GET("/stat", func(c echo.Context) error { + latestBlock := res.Pool.LastSeenBlock() + return c.JSON(http.StatusOK, &data.Stat{ PendingPoolSize: res.Pool.PendingPoolLength(), QueuedPoolSize: res.Pool.QueuedPoolLength(), Uptime: time.Now().UTC().Sub(res.StartedAt).String(), Processed: res.Pool.DoneTxCount(), + LatestBlock: latestBlock.Number, + SeenAgo: time.Now().UTC().Sub(latestBlock.At), NetworkID: res.NetworkID, }) From be6692c081a0e382e14405ea74df037c8391da98 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 17 Apr 2021 11:12:23 +0530 Subject: [PATCH 11/11] corrected response time of duration --- app/data/response.go | 16 +++++++--------- app/server/server.go | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/app/data/response.go b/app/data/response.go index 1937d80..7c76e78 100644 --- a/app/data/response.go +++ b/app/data/response.go @@ -1,17 +1,15 @@ package data -import "time" - // Stat - Response to client queries for current mempool state // to be sent in this form type Stat struct { - PendingPoolSize uint64 `json:"pendingPoolSize"` - QueuedPoolSize uint64 `json:"queuedPoolSize"` - Uptime string `json:"uptime"` - Processed uint64 `json:"processed"` - LatestBlock uint64 `json:"latestBlock"` - SeenAgo time.Duration `json:"latestSeenAgo"` - NetworkID uint64 `json:"networkID"` + PendingPoolSize uint64 `json:"pendingPoolSize"` + QueuedPoolSize uint64 `json:"queuedPoolSize"` + Uptime string `json:"uptime"` + Processed uint64 `json:"processed"` + LatestBlock uint64 `json:"latestBlock"` + SeenAgo string `json:"latestSeenAgo"` + NetworkID uint64 `json:"networkID"` } // Msg - Response message sent to client diff --git a/app/server/server.go b/app/server/server.go index d851554..7926a43 100644 --- a/app/server/server.go +++ b/app/server/server.go @@ -79,7 +79,7 @@ func Start(ctx context.Context, res *data.Resource) { Uptime: time.Now().UTC().Sub(res.StartedAt).String(), Processed: res.Pool.DoneTxCount(), LatestBlock: latestBlock.Number, - SeenAgo: time.Now().UTC().Sub(latestBlock.At), + SeenAgo: time.Now().UTC().Sub(latestBlock.At).String(), NetworkID: res.NetworkID, })