From 77a490e3e3214649c7f4b2da5e43262b8af1e07e Mon Sep 17 00:00:00 2001 From: Lars Bahner Date: Sun, 17 Mar 2024 01:19:02 +0100 Subject: [PATCH] Added gossipsub router, with not much success. --- p2p/pubsub/router.go | 35 ++++++++++++++++++++++++++++ ui/pubsub.go | 55 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 p2p/pubsub/router.go create mode 100644 ui/pubsub.go diff --git a/p2p/pubsub/router.go b/p2p/pubsub/router.go new file mode 100644 index 0000000..3411299 --- /dev/null +++ b/p2p/pubsub/router.go @@ -0,0 +1,35 @@ +package pubsub + +import ( + p2pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" +) + +const PUBSUB_PROTOCOL = "/meshsub/1.1.0" + +var r *pubsub.GossipSubRouter + +func newRouter(h host.Host) *pubsub.GossipSubRouter { + + r = p2pubsub.DefaultGossipSubRouter(h) + + return r +} + +// Adds a peer ID to the GossipSubRouter +func AddPeer(id peer.ID) { + r.AddPeer(id, PUBSUB_PROTOCOL) + r.AcceptFrom(id) + +} + +// Removes a peer ID from the GossipSubRouter +func RemovePeer(id peer.ID) { + r.RemovePeer(id) +} + +func SetEoughPeers(t string, peerno int) { + r.EnoughPeers(t, peerno) +} diff --git a/ui/pubsub.go b/ui/pubsub.go new file mode 100644 index 0000000..f8c922d --- /dev/null +++ b/ui/pubsub.go @@ -0,0 +1,55 @@ +package ui + +import ( + "context" + "time" + + "github.com/bahner/go-ma-actor/p2p/peer" + "github.com/bahner/go-ma-actor/p2p/pubsub" + p2peer "github.com/libp2p/go-libp2p/core/peer" + log "github.com/sirupsen/logrus" +) + +const DISCOVERY_INTERVAL = time.Minute + +func (ui *ChatUI) pubsubPeersLoop(ctx context.Context) { + + ticker := time.NewTicker(DISCOVERY_INTERVAL) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + log.Debugf("Checking for new peers to add to pubsub") + for _, p := range ui.p.ConnectedProtectedPeersAddrInfo() { + err := ui.addPeerToPubsub(p) + if err == peer.ErrAddrInfoAddrsEmpty || err == peer.ErrAlreadyConnected { + continue + } + log.Debugf("Added peer %s to pubsub", p.ID) + } + case <-ctx.Done(): + log.Warn("pubsubPeersLoop: context done") + return + } + } +} + +func (ui *ChatUI) addPeerToPubsub(pai p2peer.AddrInfo) error { + + if len(pai.Addrs) == 0 { + return peer.ErrAddrInfoAddrsEmpty + } + + // Don't add already connected peers + for _, p := range ui.e.Topic.ListPeers() { + if p == pai.ID { + return peer.ErrAlreadyConnected + } + } + + pubsub.AddPeer(pai.ID) + + return nil + +}