Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #18 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Latest version of `pubsub`
  • Loading branch information
itzmeanjan authored May 30, 2021
2 parents 6debc51 + 5a344df commit 8a94b25
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 192 deletions.
7 changes: 2 additions & 5 deletions app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package bootup

import (
"context"
"errors"
"runtime"
"strconv"
"time"

Expand Down Expand Up @@ -54,10 +54,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
return nil, err
}

_pubsub := pubsub.New(ctx)
if !_pubsub.IsAlive() {
return nil, errors.New("failed to start pub/sub hub")
}
_pubsub := pubsub.New(uint64(runtime.NumCPU()))

// To be used when subscription requests are received from clients
if err := graph.InitPubSub(_pubsub); err != nil {
Expand Down
16 changes: 6 additions & 10 deletions app/data/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,12 +1030,10 @@ func (p *PendingPool) PublishAdded(ctx context.Context, msg *MemPoolTx) {
return
}

if ok, _ := p.PubSub.Publish(&pubsub.Message{
Topics: []pubsub.String{pubsub.String(config.GetPendingTxEntryPublishTopic())},
p.PubSub.Publish(&pubsub.Message{
Topics: []string{config.GetPendingTxEntryPublishTopic()},
Data: _msg,
}); !ok {
log.Printf("[❗️] Failed to publish new pending tx\n")
}
})

}

Expand All @@ -1062,12 +1060,10 @@ func (p *PendingPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) {
return
}

if ok, _ := p.PubSub.Publish(&pubsub.Message{
Topics: []pubsub.String{pubsub.String(config.GetPendingTxExitPublishTopic())},
p.PubSub.Publish(&pubsub.Message{
Topics: []string{config.GetPendingTxExitPublishTopic()},
Data: _msg,
}); !ok {
log.Printf("[❗️] Failed to publish confirmed/dropped tx\n")
}
})

}

Expand Down
16 changes: 6 additions & 10 deletions app/data/queued.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,12 +782,10 @@ func (q *QueuedPool) PublishAdded(ctx context.Context, msg *MemPoolTx) {
return
}

if ok, _ := q.PubSub.Publish(&pubsub.Message{
Topics: []pubsub.String{pubsub.String(config.GetQueuedTxEntryPublishTopic())},
q.PubSub.Publish(&pubsub.Message{
Topics: []string{config.GetQueuedTxEntryPublishTopic()},
Data: _msg,
}); !ok {
log.Printf("[❗️] Failed to publish new queued tx\n")
}
})

}

Expand Down Expand Up @@ -816,12 +814,10 @@ func (q *QueuedPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) {
return
}

if ok, _ := q.PubSub.Publish(&pubsub.Message{
Topics: []pubsub.String{pubsub.String(config.GetQueuedTxExitPublishTopic())},
q.PubSub.Publish(&pubsub.Message{
Topics: []string{config.GetQueuedTxExitPublishTopic()},
Data: _msg,
}); !ok {
log.Printf("[❗️] Failed to publish unstuck tx\n")
}
})

}

Expand Down
128 changes: 22 additions & 106 deletions app/graph/schema.resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"

"github.com/ethereum/go-ethereum/common"
"github.com/itzmeanjan/harmony/app/config"
"github.com/itzmeanjan/harmony/app/graph/generated"
"github.com/itzmeanjan/harmony/app/graph/model"
)
Expand Down Expand Up @@ -136,10 +135,7 @@ func (r *subscriptionResolver) NewPendingTx(ctx context.Context) (<-chan *model.
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetPendingTxEntryPublishTopic()}

// Because client wants to listen to any tx being published on this topic
go ListenToMessages(ctx, _pubsub, topics, comm, NoCriteria)
go ListenToMessages(ctx, _pubsub, comm, NoCriteria)

return comm, nil
}
Expand All @@ -151,10 +147,7 @@ func (r *subscriptionResolver) NewQueuedTx(ctx context.Context) (<-chan *model.M
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetQueuedTxEntryPublishTopic()}

// Because client wants to listen to any tx being published on this topic
go ListenToMessages(ctx, _pubsub, topics, comm, NoCriteria)
go ListenToMessages(ctx, _pubsub, comm, NoCriteria)

return comm, nil
}
Expand All @@ -166,10 +159,7 @@ func (r *subscriptionResolver) NewConfirmedTx(ctx context.Context) (<-chan *mode
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetPendingTxExitPublishTopic()}

// Because client wants to listen to any tx being published on this topic
go ListenToMessages(ctx, _pubsub, topics, comm, NoCriteria)
go ListenToMessages(ctx, _pubsub, comm, NoCriteria)

return comm, nil
}
Expand All @@ -181,10 +171,7 @@ func (r *subscriptionResolver) NewUnstuckTx(ctx context.Context) (<-chan *model.
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetQueuedTxExitPublishTopic()}

// Because client wants to listen to any tx being published on this topic
go ListenToMessages(ctx, _pubsub, topics, comm, NoCriteria)
go ListenToMessages(ctx, _pubsub, comm, NoCriteria)

return comm, nil
}
Expand All @@ -196,10 +183,7 @@ func (r *subscriptionResolver) PendingPool(ctx context.Context) (<-chan *model.M
}

comm := make(chan *model.MemPoolTx, 2)
topics := []string{config.GetPendingTxEntryPublishTopic(), config.GetPendingTxExitPublishTopic()}

// Because client wants to listen to any tx being published on these two topic
go ListenToMessages(ctx, _pubsub, topics, comm, NoCriteria)
go ListenToMessages(ctx, _pubsub, comm, NoCriteria)

return comm, nil
}
Expand All @@ -211,10 +195,7 @@ func (r *subscriptionResolver) QueuedPool(ctx context.Context) (<-chan *model.Me
}

comm := make(chan *model.MemPoolTx, 2)
topics := []string{config.GetQueuedTxEntryPublishTopic(), config.GetQueuedTxExitPublishTopic()}

// Because client wants to listen to any tx being published on these two topic
go ListenToMessages(ctx, _pubsub, topics, comm, NoCriteria)
go ListenToMessages(ctx, _pubsub, comm, NoCriteria)

return comm, nil
}
Expand All @@ -226,10 +207,7 @@ func (r *subscriptionResolver) MemPool(ctx context.Context) (<-chan *model.MemPo
}

comm := make(chan *model.MemPoolTx, 4)
topics := []string{config.GetQueuedTxEntryPublishTopic(), config.GetQueuedTxExitPublishTopic(), config.GetPendingTxEntryPublishTopic(), config.GetPendingTxExitPublishTopic()}

// Because client wants to listen to any tx being published on these two topic
go ListenToMessages(ctx, _pubsub, topics, comm, NoCriteria)
go ListenToMessages(ctx, _pubsub, comm, NoCriteria)

return comm, nil
}
Expand All @@ -245,10 +223,7 @@ func (r *subscriptionResolver) NewPendingTxFrom(ctx context.Context, address str
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetPendingTxEntryPublishTopic()}

// Because client wants to get notified only when tx of certain address is detected
go ListenToMessages(ctx, _pubsub, topics, comm, CheckFromAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckFromAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -264,10 +239,7 @@ func (r *subscriptionResolver) NewQueuedTxFrom(ctx context.Context, address stri
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetQueuedTxEntryPublishTopic()}

// Because client wants to get notified only when tx of certain address is detected
go ListenToMessages(ctx, _pubsub, topics, comm, CheckFromAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckFromAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -283,10 +255,7 @@ func (r *subscriptionResolver) NewConfirmedTxFrom(ctx context.Context, address s
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetPendingTxExitPublishTopic()}

// Because client wants to get notified only when tx of certain address is detected
go ListenToMessages(ctx, _pubsub, topics, comm, CheckFromAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckFromAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -302,10 +271,7 @@ func (r *subscriptionResolver) NewUnstuckTxFrom(ctx context.Context, address str
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetQueuedTxExitPublishTopic()}

// Because client wants to get notified only when tx of certain address is detected
go ListenToMessages(ctx, _pubsub, topics, comm, CheckFromAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckFromAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -321,11 +287,7 @@ func (r *subscriptionResolver) NewTxFromAInPendingPool(ctx context.Context, addr
}

comm := make(chan *model.MemPoolTx, 2)
topics := []string{config.GetPendingTxEntryPublishTopic(), config.GetPendingTxExitPublishTopic()}

// Because client wants to get notified only when tx from certain address is detected
// to be entering/ leaving pending pool
go ListenToMessages(ctx, _pubsub, topics, comm, CheckFromAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckFromAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -341,11 +303,7 @@ func (r *subscriptionResolver) NewTxFromAInQueuedPool(ctx context.Context, addre
}

comm := make(chan *model.MemPoolTx, 2)
topics := []string{config.GetQueuedTxEntryPublishTopic(), config.GetQueuedTxExitPublishTopic()}

// Because client wants to get notified only when tx from certain address is detected
// to be entering/ leaving queued pool
go ListenToMessages(ctx, _pubsub, topics, comm, CheckFromAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckFromAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -361,17 +319,11 @@ func (r *subscriptionResolver) NewTxFromAInMemPool(ctx context.Context, address
}

comm := make(chan *model.MemPoolTx, 4)
topics := []string{
config.GetQueuedTxEntryPublishTopic(),
config.GetQueuedTxExitPublishTopic(),
config.GetPendingTxEntryPublishTopic(),
config.GetPendingTxExitPublishTopic()}

// Because client wants to get notified only when tx from certain address is detected
// to be entering/ leaving mem pool
//
// @note Mempool includes both pending & queued pool
go ListenToMessages(ctx, _pubsub, topics, comm, CheckFromAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckFromAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -387,10 +339,7 @@ func (r *subscriptionResolver) NewPendingTxTo(ctx context.Context, address strin
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetPendingTxEntryPublishTopic()}

// Because client wants to get notified only when tx of certain address is detected
go ListenToMessages(ctx, _pubsub, topics, comm, CheckToAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckToAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -406,10 +355,7 @@ func (r *subscriptionResolver) NewQueuedTxTo(ctx context.Context, address string
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetQueuedTxEntryPublishTopic()}

// Because client wants to get notified only when tx of certain address is detected
go ListenToMessages(ctx, _pubsub, topics, comm, CheckToAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckToAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -425,10 +371,7 @@ func (r *subscriptionResolver) NewConfirmedTxTo(ctx context.Context, address str
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetPendingTxExitPublishTopic()}

// Because client wants to get notified only when tx of certain address is detected
go ListenToMessages(ctx, _pubsub, topics, comm, CheckToAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckToAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -444,10 +387,7 @@ func (r *subscriptionResolver) NewUnstuckTxTo(ctx context.Context, address strin
}

comm := make(chan *model.MemPoolTx, 1)
topics := []string{config.GetQueuedTxExitPublishTopic()}

// Because client wants to get notified only when tx of certain address is detected
go ListenToMessages(ctx, _pubsub, topics, comm, CheckToAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckToAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -463,11 +403,7 @@ func (r *subscriptionResolver) NewTxToAInPendingPool(ctx context.Context, addres
}

comm := make(chan *model.MemPoolTx, 2)
topics := []string{config.GetPendingTxEntryPublishTopic(), config.GetPendingTxExitPublishTopic()}

// Because client wants to get notified only when tx to certain address is detected
// to be entering/ leaving pending pool
go ListenToMessages(ctx, _pubsub, topics, comm, CheckToAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckToAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -483,11 +419,7 @@ func (r *subscriptionResolver) NewTxToAInQueuedPool(ctx context.Context, address
}

comm := make(chan *model.MemPoolTx, 2)
topics := []string{config.GetQueuedTxEntryPublishTopic(), config.GetQueuedTxExitPublishTopic()}

// Because client wants to get notified only when tx to certain address is detected
// to be entering/ leaving queued pool
go ListenToMessages(ctx, _pubsub, topics, comm, CheckToAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckToAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -503,17 +435,7 @@ func (r *subscriptionResolver) NewTxToAInMemPool(ctx context.Context, address st
}

comm := make(chan *model.MemPoolTx, 4)
topics := []string{
config.GetQueuedTxEntryPublishTopic(),
config.GetQueuedTxExitPublishTopic(),
config.GetPendingTxEntryPublishTopic(),
config.GetPendingTxExitPublishTopic()}

// Because client wants to get notified only when tx to certain address is detected
// to be entering/ leaving mem pool
//
// @note Mempool denotes both pending & queued pool
go ListenToMessages(ctx, _pubsub, topics, comm, CheckToAddress, common.HexToAddress(address))
go ListenToMessages(ctx, _pubsub, comm, CheckToAddress, common.HexToAddress(address))

return comm, nil
}
Expand All @@ -534,13 +456,7 @@ func (r *subscriptionResolver) WatchTx(ctx context.Context, hash string) (<-chan
}

comm := make(chan *model.MemPoolTx, 4)
topics := []string{
config.GetQueuedTxEntryPublishTopic(),
config.GetQueuedTxExitPublishTopic(),
config.GetPendingTxEntryPublishTopic(),
config.GetPendingTxExitPublishTopic()}

go ListenToMessages(ctx, _pubsub, topics, comm, LinkedTx, tx)
go ListenToMessages(ctx, _pubsub, comm, LinkedTx, tx)

return comm, nil
}
Expand Down
Loading

0 comments on commit 8a94b25

Please sign in to comment.