Skip to content

Commit

Permalink
refac: update to AM 0.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Jul 9, 2024
1 parent 9349f09 commit f767d23
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 71 deletions.
2 changes: 1 addition & 1 deletion discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (b *bootstrapFlow) BootstrapCheckingState(e *am.Event) {
}

// run on the main pubsubs queue
ps.Mach.Eval(nil, readyFn, "bootstrapFlow.BootstrapCheckingState")
ps.Mach.Eval("bootstrapFlow.BootstrapCheckingState", readyFn, nil)
if ready {
b.mach.Add1(ss.TopicBootstrapped, nil)
return
Expand Down
6 changes: 3 additions & 3 deletions discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestSimpleDiscovery(t *testing.T) {
owner := rand.Intn(len(psubs))
psubs[owner].SetLogLevelAM(psmon.AMLogLevelVerbose)
// TODO debug
//psmon.Log.Print("sending msg ", i, " from ", psubs[owner].Mach.ID)
// psmon.Log.Print("sending msg ", i, " from ", psubs[owner].Mach.ID)

if err := topicHandlers[owner].Publish(ctx, msg, WithReadiness(MinTopicSize(1))); err != nil {
t.Fatal(err)
Expand All @@ -241,7 +241,7 @@ func TestSimpleDiscovery(t *testing.T) {
for ii := range msgs {
sub := msgs[ii]
// TODO debug
//psmon.Log.Print("waiting for msg ", i, " with sub ", ii+1)
// psmon.Log.Print("waiting for msg ", i, " with sub ", ii+1)
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
Expand Down Expand Up @@ -342,7 +342,7 @@ func waitUntilGossipsubMeshCount(ps *PubSub, topic string, count int) {
checkLen := func() {
done = len(rt.mesh[topic]) == count
}
ps.Mach.Eval(nil, checkLen, "waitUntilGossipsubMeshCount")
ps.Mach.Eval("waitUntilGossipsubMeshCount", checkLen, nil)
if !done {
time.Sleep(100 * time.Millisecond)
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module github.com/pancsta/go-libp2p-pubsub

replace github.com/pancsta/asyncmachine-go => ../asyncmachine-go

go 1.22.3

require (
Expand Down
4 changes: 2 additions & 2 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,15 +1392,15 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {

func (gs *GossipSubRouter) heartbeatTimer() {
time.Sleep(gs.params.HeartbeatInitialDelay)
gs.p.Mach.Eval(nil, gs.heartbeat, "heartbeatTimer")
gs.p.Mach.Eval("heartbeatTimer", gs.heartbeat, nil)

ticker := time.NewTicker(gs.params.HeartbeatInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
gs.p.Mach.Eval(nil, gs.heartbeat, "heartbeatTimer")
gs.p.Mach.Eval("heartbeatTimer", gs.heartbeat, nil)
case <-gs.p.ctx.Done():
return
}
Expand Down
8 changes: 4 additions & 4 deletions gossipsub_feat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestGossipSubCustomProtocols(t *testing.T) {

// check the meshes of the gsubs, the gossipsub meshes should include each other but not the
// floddsub peer
gsubs[0].Mach.Eval(nil, func() {
gsubs[0].Mach.Eval("gsubs[0]", func() {
gs := gsubs[0].rt.(*GossipSubRouter)

_, ok := gs.mesh[topic][hosts[1].ID()]
Expand All @@ -78,9 +78,9 @@ func TestGossipSubCustomProtocols(t *testing.T) {
if ok {
t.Fatal("expected gs0 to not have fs in its mesh")
}
}, "gsubs[0]")
}, nil)

gsubs[1].Mach.Eval(nil, func() {
gsubs[1].Mach.Eval("gsubs[1]", func() {
gs := gsubs[1].rt.(*GossipSubRouter)

_, ok := gs.mesh[topic][hosts[0].ID()]
Expand All @@ -92,7 +92,7 @@ func TestGossipSubCustomProtocols(t *testing.T) {
if ok {
t.Fatal("expected gs1 to not have fs in its mesh")
}
}, "gsubs[1]")
}, nil)

// send some messages
for i := 0; i < 10; i++ {
Expand Down
4 changes: 2 additions & 2 deletions gossipsub_spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,10 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {

// make sure we are _not_ in the mesh
var inMesh bool
ps.Mach.Eval(nil, func() {
ps.Mach.Eval("TestGossipsubAttackGRAFTDuringBackoff", func() {
mesh := ps.rt.(*GossipSubRouter).mesh[mytopic]
_, inMesh = mesh[attacker.ID()]
}, "TestGossipsubAttackGRAFTDuringBackoff")
}, nil)

if inMesh {
t.Error("Expected to not be in the mesh of the legitimate host")
Expand Down
72 changes: 36 additions & 36 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,20 @@ func TestGossipsubFanoutExpiry(t *testing.T) {
}
}

psubs[0].Mach.Eval(nil, func() {
psubs[0].Mach.Eval("TestGossipsubFanoutExpiry", func() {
if len(psubs[0].rt.(*GossipSubRouter).fanout) == 0 {
t.Fatal("owner has no fanout")
}
}, "TestGossipsubFanoutExpiry")
}, nil)

// wait for TTL to expire fanout peers in owner
time.Sleep(time.Second * 2)

psubs[0].Mach.Eval(nil, func() {
psubs[0].Mach.Eval("TestGossipsubFanoutExpiry", func() {
if len(psubs[0].rt.(*GossipSubRouter).fanout) > 0 {
t.Fatal("fanout hasn't expired")
}
}, "TestGossipsubFanoutExpiry")
}, nil)

// wait for it to run in the event loop
time.Sleep(10 * time.Millisecond)
Expand Down Expand Up @@ -649,15 +649,15 @@ func TestGossipsubPruneBackoffTime(t *testing.T) {
// Copy i so this func keeps the correct value in the closure.
var idx = i
// Run this check in the eval thunk so that we don't step over the heartbeat goroutine and trigger a race.
psubs[idx].rt.(*GossipSubRouter).p.Mach.Eval(nil, func() {
psubs[idx].rt.(*GossipSubRouter).p.Mach.Eval("TestGossipsubPruneBackoffTime", func() {
backoff, ok := psubs[idx].rt.(*GossipSubRouter).backoff["foobar"][hosts[0].ID()]
if !ok {
atomic.AddUint32(&missingBackoffs, 1)
}
if ok && backoff.Sub(pruneTime)-params.PruneBackoff > time.Second {
t.Errorf("backoff time should be equal to prune backoff (with some slack) was %v", backoff.Sub(pruneTime)-params.PruneBackoff)
}
}, "TestGossipsubPruneBackoffTime")
}, nil)
}

// Sometimes not all the peers will have updated their backoffs by this point. If the majority haven't we'll fail this test.
Expand Down Expand Up @@ -1069,13 +1069,13 @@ func TestGossipsubStarTopology(t *testing.T) {
psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true))

// configure the center of the star with a very low D
psubs[0].Mach.Eval(nil, func() {
psubs[0].Mach.Eval("TestGossipsubStarTopology", func() {
gs := psubs[0].rt.(*GossipSubRouter)
gs.params.D = 0
gs.params.Dlo = 0
gs.params.Dhi = 0
gs.params.Dscore = 0
}, "TestGossipsubStarTopology")
}, nil)

// add all peer addresses to the peerstores
// this is necessary because we can't have signed address records witout identify
Expand Down Expand Up @@ -1153,13 +1153,13 @@ func TestGossipsubStarTopologyWithSignedPeerRecords(t *testing.T) {
psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true))

// configure the center of the star with a very low D
psubs[0].Mach.Eval(nil, func() {
psubs[0].Mach.Eval("TestGossipsubStarTopologyWithSignedPeerRecords", func() {
gs := psubs[0].rt.(*GossipSubRouter)
gs.params.D = 0
gs.params.Dlo = 0
gs.params.Dhi = 0
gs.params.Dscore = 0
}, "TestGossipsubStarTopologyWithSignedPeerRecords")
}, nil)

// manually create signed peer records for each host and add them to the
// peerstore of the center of the star, which is doing the bootstrapping
Expand Down Expand Up @@ -1368,14 +1368,14 @@ func TestGossipsubDirectPeersFanout(t *testing.T) {

// verify that h0 is in the fanout of h2, but not h1 who is a direct peer
result := make(chan bool, 2)
psubs[2].Mach.Eval(nil, func() {
psubs[2].Mach.Eval("TestGossipsubDirectPeersFanout", func() {
rt := psubs[2].rt.(*GossipSubRouter)
fanout := rt.fanout["test"]
_, ok := fanout[h[0].ID()]
result <- ok
_, ok = fanout[h[1].ID()]
result <- ok
}, "TestGossipsubDirectPeersFanout")
}, nil)

inFanout := <-result
if !inFanout {
Expand All @@ -1395,14 +1395,14 @@ func TestGossipsubDirectPeersFanout(t *testing.T) {

time.Sleep(2 * time.Second)

psubs[2].Mach.Eval(nil, func() {
psubs[2].Mach.Eval("TestGossipsubDirectPeersFanout", func() {
rt := psubs[2].rt.(*GossipSubRouter)
mesh := rt.mesh["test"]
_, ok := mesh[h[0].ID()]
result <- ok
_, ok = mesh[h[1].ID()]
result <- ok
}, "TestGossipsubDirectPeersFanout")
}, nil)

inMesh := <-result
if !inMesh {
Expand Down Expand Up @@ -1468,9 +1468,9 @@ func TestGossipsubEnoughPeers(t *testing.T) {

// at this point we have no connections and no mesh, so EnoughPeers should return false
res := make(chan bool, 1)
psubs[0].Mach.Eval(nil, func() {
psubs[0].Mach.Eval("TestGossipsubEnoughPeers", func() {
res <- psubs[0].rt.EnoughPeers("test", 0)
}, "TestGossipsubEnoughPeers")
}, nil)
enough := <-res
if enough {
t.Fatal("should not have enough peers")
Expand All @@ -1481,9 +1481,9 @@ func TestGossipsubEnoughPeers(t *testing.T) {

time.Sleep(3 * time.Second)

psubs[0].Mach.Eval(nil, func() {
psubs[0].Mach.Eval("TestGossipsubEnoughPeers", func() {
res <- psubs[0].rt.EnoughPeers("test", 0)
}, "TestGossipsubEnoughPeers")
}, nil)
enough = <-res
if !enough {
t.Fatal("should have enough peers")
Expand Down Expand Up @@ -1683,17 +1683,17 @@ func TestGossipsubScoreValidatorEx(t *testing.T) {
// assert that peer1's score is still 0 (its message was ignored) while peer2 should have
// a negative score (its message got rejected)
res := make(chan float64, 1)
psubs[0].Mach.Eval(nil, func() {
psubs[0].Mach.Eval("TestGossipsubScoreValidatorEx", func() {
res <- psubs[0].rt.(*GossipSubRouter).score.Score(hosts[1].ID())
}, "TestGossipsubScoreValidatorEx")
}, nil)
score := <-res
if score != 0 {
t.Fatalf("expected 0 score for peer1, but got %f", score)
}

psubs[0].Mach.Eval(nil, func() {
psubs[0].Mach.Eval("TestGossipsubScoreValidatorEx", func() {
res <- psubs[0].rt.(*GossipSubRouter).score.Score(hosts[2].ID())
}, "TestGossipsubScoreValidatorEx")
}, nil)
score = <-res
if score >= 0 {
t.Fatalf("expected negative score for peer2, but got %f", score)
Expand All @@ -1713,7 +1713,7 @@ func TestGossipsubPiggybackControl(t *testing.T) {
blah := peer.ID("bogotr0n")

res := make(chan *RPC, 1)
ps.Mach.Eval(nil, func() {
ps.Mach.Eval("TestGossipsubPiggybackControl", func() {
gs := ps.rt.(*GossipSubRouter)
test1 := "test1"
test2 := "test2"
Expand All @@ -1728,7 +1728,7 @@ func TestGossipsubPiggybackControl(t *testing.T) {
Prune: []*pb.ControlPrune{{TopicID: &test1}, {TopicID: &test2}, {TopicID: &test3}},
})
res <- rpc
}, "TestGossipsubPiggybackControl")
}, nil)

rpc := <-res
if rpc.Control == nil {
Expand Down Expand Up @@ -1772,12 +1772,12 @@ func TestGossipsubMultipleGraftTopics(t *testing.T) {
p1Router := psubs[0].rt.(*GossipSubRouter)
p2Router := psubs[1].rt.(*GossipSubRouter)

p2Sub.Mach.Eval(nil, func() {
p2Sub.Mach.Eval("TestGossipsubMultipleGraftTopics", func() {
// Add topics to second peer
p2Router.mesh[firstTopic] = map[peer.ID]struct{}{}
p2Router.mesh[secondTopic] = map[peer.ID]struct{}{}
p2Router.mesh[thirdTopic] = map[peer.ID]struct{}{}
}, "TestGossipsubMultipleGraftTopics")
}, nil)

// Send multiple GRAFT messages to second peer from
// 1st peer
Expand All @@ -1787,7 +1787,7 @@ func TestGossipsubMultipleGraftTopics(t *testing.T) {

time.Sleep(time.Second * 1)

p2Sub.Mach.Eval(nil, func() {
p2Sub.Mach.Eval("TestGossipsubMultipleGraftTopics", func() {
if _, ok := p2Router.mesh[firstTopic][firstPeer]; !ok {
t.Errorf("First peer wasnt added to mesh of the second peer for the topic %s", firstTopic)
}
Expand All @@ -1797,7 +1797,7 @@ func TestGossipsubMultipleGraftTopics(t *testing.T) {
if _, ok := p2Router.mesh[thirdTopic][firstPeer]; !ok {
t.Errorf("First peer wasnt added to mesh of the second peer for the topic %s", thirdTopic)
}
}, "TestGossipsubMultipleGraftTopics")
}, nil)
}

// TODO asyncmachine fix
Expand Down Expand Up @@ -1896,15 +1896,15 @@ func TestGossipsubOpportunisticGrafting(t *testing.T) {
// check the honest peer meshes, they should have at least 3 honest peers each
for _, ps := range psubs {
count := 0
ps.Mach.Eval(nil, func() {
ps.Mach.Eval("TestGossipsubOpportunisticGrafting", func() {
gs := ps.rt.(*GossipSubRouter)
for _, h := range hosts[:10] {
_, ok := gs.mesh["test"][h.ID()]
if ok {
count++
}
}
}, "TestGossipsubOpportunisticGrafting")
}, nil)

if count < 3 {
t.Fatalf("expected at least 3 honest peers, got %d", count)
Expand Down Expand Up @@ -1937,7 +1937,7 @@ func TestGossipSubLeaveTopic(t *testing.T) {

leaveTime := time.Now()

psubs[0].rt.(*GossipSubRouter).p.Mach.Eval(nil, func() {
psubs[0].rt.(*GossipSubRouter).p.Mach.Eval("TestGossipSubLeaveTopic", func() {
psubs[0].rt.Leave("test")
time.Sleep(time.Second)
peerMap := psubs[0].rt.(*GossipSubRouter).backoff["test"]
Expand All @@ -1954,11 +1954,11 @@ func TestGossipSubLeaveTopic(t *testing.T) {
if backoffTime-GossipSubUnsubscribeBackoff > time.Second {
t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.")
}
}, "TestGossipSubLeaveTopic")
}, nil)

// Ensure that remote peer 1 also applies the backoff appropriately
// for peer 0.
psubs[1].rt.(*GossipSubRouter).p.Mach.Eval(nil, func() {
psubs[1].rt.(*GossipSubRouter).p.Mach.Eval("TestGossipSubLeaveTopic", func() {
peerMap2 := psubs[1].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap2) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 1")
Expand All @@ -1973,7 +1973,7 @@ func TestGossipSubLeaveTopic(t *testing.T) {
if backoffTime-GossipSubUnsubscribeBackoff > time.Second {
t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.")
}
}, "TestGossipSubLeaveTopic")
}, nil)
}

func TestGossipSubJoinTopic(t *testing.T) {
Expand Down Expand Up @@ -2011,9 +2011,9 @@ func TestGossipSubJoinTopic(t *testing.T) {
time.Sleep(time.Second)

var meshMap map[peer.ID]struct{}
psubs[0].Mach.Eval(nil, func() {
psubs[0].Mach.Eval("TestGossipSubJoinTopic", func() {
meshMap = router0.mesh["test"]
}, "TestGossipSubJoinTopic")
}, nil)
if len(meshMap) != 1 {
t.Fatalf("Unexpect peer included in the mesh")
}
Expand Down
Loading

0 comments on commit f767d23

Please sign in to comment.