Skip to content

Commit

Permalink
psmon states
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Jun 6, 2024
1 parent dbd1c9e commit 64d5b00
Show file tree
Hide file tree
Showing 43 changed files with 1,808 additions and 669 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ prof.out
go-floodsub.test

.idea/
bench.env
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## fork

This is a fork of libp2p pubsub which replaces the internals to use [asyncmachine-go](http://github.com/pancsta/asyncmachine-go).

See [github.com/pancsta/**go-libp2p-pubsub-benchmark**](https://github.com/pancsta/go-libp2p-pubsub-benchmark/#libp2p-pubsub-benchmark) for more info.

# go-libp2p-pubsub

<p align="left">
Expand Down
2 changes: 1 addition & 1 deletion blacklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/libp2p/go-libp2p/core/peer"

"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/pancsta/go-libp2p-pubsub/timecache"
)

// Blacklist is an interface for peer blacklisting.
Expand Down
45 changes: 21 additions & 24 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/gogo/protobuf/proto"
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-varint"
am "github.com/pancsta/asyncmachine-go/pkg/machine"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/pancsta/go-libp2p-pubsub/pb"
ss "github.com/pancsta/go-libp2p-pubsub/states/pubsub"
)

// get the initial RPC containing all of our subscriptions to send to new peers
Expand All @@ -41,6 +43,16 @@ func (p *PubSub) getHelloPacket() *RPC {
return &rpc
}

func (p *PubSub) PeerCloseStreamState(e *am.Event) {
p.inboundStreamsMx.Lock()
pid := e.Args["pid"].(peer.ID)
if s, ok := p.inboundStreams[pid]; ok {
delete(p.inboundStreams, pid)
s.Reset()
}
p.inboundStreamsMx.Unlock()
}

func (p *PubSub) handleNewStream(s network.Stream) {
peer := s.Conn().RemotePeer()

Expand All @@ -53,16 +65,10 @@ func (p *PubSub) handleNewStream(s network.Stream) {
p.inboundStreams[peer] = s
p.inboundStreamsMx.Unlock()

defer func() {
p.inboundStreamsMx.Lock()
if p.inboundStreams[peer] == s {
delete(p.inboundStreams, peer)
}
p.inboundStreamsMx.Unlock()
}()
defer p.Mach.Remove1(ss.PeerCloseStream, am.A{"pid": peer})

r := msgio.NewVarintReaderSize(s, p.maxMessageSize)
for {
for p.ctx.Err() == nil {
msgbytes, err := r.ReadMsg()
if err != nil {
r.ReleaseMsg(msgbytes)
Expand Down Expand Up @@ -107,32 +113,23 @@ func (p *PubSub) notifyPeerDead(pid peer.ID) {
p.peerDeadPend[pid] = struct{}{}
p.peerDeadMx.Unlock()
p.peerDeadPrioLk.RUnlock()

select {
case p.peerDead <- struct{}{}:
default:
}
p.Mach.Add(am.S{ss.PeersDead}, nil)
}

func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
// TODO fixes "protocol not supported" error, fix the timing issue
time.Sleep(10 * time.Millisecond)

s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
if err != nil {
log.Debug("opening new stream to peer: ", err, pid)

select {
case p.newPeerError <- pid:
case <-ctx.Done():
}

p.Mach.Add1(ss.PeerError, am.A{"pid": pid})
return
}

go p.handleSendingMessages(ctx, s, outgoing)
go p.handlePeerDead(s)
select {
case p.newPeerStream <- s:
case <-ctx.Done():
}
p.Mach.Add1(ss.PeerNewStream, am.A{"network.Stream": s})
}

func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing <-chan *RPC) {
Expand Down
4 changes: 2 additions & 2 deletions compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package pubsub
import (
"testing"

compat_pb "github.com/libp2p/go-libp2p-pubsub/compat"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
compat_pb "github.com/pancsta/go-libp2p-pubsub/compat"
pb "github.com/pancsta/go-libp2p-pubsub/pb"
)

func TestMultitopicMessageCompatibility(t *testing.T) {
Expand Down
Loading

0 comments on commit 64d5b00

Please sign in to comment.