Skip to content

Commit 3f555bc

Browse files
authored
fix(p2p): do proper msg.ValidatorData handling (#281)
1 parent a7540c6 commit 3f555bc

File tree

2 files changed

+98
-24
lines changed

2 files changed

+98
-24
lines changed

p2p/subscriber.go

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package p2p
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"sync"
78

89
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -150,12 +151,6 @@ func (s *Subscriber[H]) verifyMessage(
150151
p peer.ID,
151152
msg *pubsub.Message,
152153
) (res pubsub.ValidationResult) {
153-
if msg.ValidatorData != nil {
154-
// means the message is local and was already validated
155-
// so simply accept it
156-
return pubsub.ValidationAccept
157-
}
158-
159154
defer func() {
160155
err := recover()
161156
if err != nil {
@@ -164,17 +159,7 @@ func (s *Subscriber[H]) verifyMessage(
164159
}
165160
}()
166161

167-
hdr := header.New[H]()
168-
err := hdr.UnmarshalBinary(msg.Data)
169-
if err != nil {
170-
log.Errorw("unmarshalling header",
171-
"from", p.ShortString(),
172-
"err", err)
173-
s.metrics.reject(ctx)
174-
return pubsub.ValidationReject
175-
}
176-
// ensure header validity
177-
err = hdr.Validate()
162+
hdr, err := s.extractHeader(msg)
178163
if err != nil {
179164
log.Errorw("invalid header",
180165
"from", p.ShortString(),
@@ -197,20 +182,37 @@ func (s *Subscriber[H]) verifyMessage(
197182
}
198183

199184
var verErr *header.VerifyError
200-
err = s.verifier(ctx, hdr)
201-
switch {
185+
switch err := s.verifier(ctx, hdr); {
202186
case errors.As(err, &verErr) && verErr.SoftFailure:
203187
s.metrics.ignore(ctx)
204188
return pubsub.ValidationIgnore
205189
case err != nil:
206190
s.metrics.reject(ctx)
207191
return pubsub.ValidationReject
208192
default:
193+
// keep the valid header in the msg so Subscriptions can access it without
194+
// additional unmarshalling
195+
msg.ValidatorData = hdr
196+
s.metrics.accept(ctx, len(msg.Data))
197+
return pubsub.ValidationAccept
198+
}
199+
}
200+
201+
func (s *Subscriber[H]) extractHeader(msg *pubsub.Message) (H, error) {
202+
if msg.ValidatorData != nil {
203+
hdr, ok := msg.ValidatorData.(H)
204+
if !ok {
205+
panic(fmt.Sprintf("msg ValidatorData is of type %T", msg.ValidatorData))
206+
}
207+
return hdr, nil
209208
}
210209

211-
// keep the valid header in the msg so Subscriptions can access it without
212-
// additional unmarshalling
213-
msg.ValidatorData = hdr
214-
s.metrics.accept(ctx, len(msg.Data))
215-
return pubsub.ValidationAccept
210+
hdr := header.New[H]()
211+
if err := hdr.UnmarshalBinary(msg.Data); err != nil {
212+
return hdr, err
213+
}
214+
if err := hdr.Validate(); err != nil {
215+
return hdr, err
216+
}
217+
return hdr, nil
216218
}

sync/syncsub_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
pubsub "github.com/libp2p/go-libp2p-pubsub"
9+
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/celestiaorg/go-header/headertest"
14+
"github.com/celestiaorg/go-header/p2p"
15+
)
16+
17+
func TestSyncerWithSubscriber(t *testing.T) {
18+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
19+
t.Cleanup(cancel)
20+
21+
suite := headertest.NewTestSuite(t)
22+
23+
netw, err := mocknet.FullMeshLinked(1)
24+
require.NoError(t, err)
25+
26+
gossipSub, err := pubsub.NewGossipSub(
27+
ctx,
28+
netw.Hosts()[0],
29+
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
30+
)
31+
require.NoError(t, err)
32+
33+
p2pSub, err := p2p.NewSubscriber[*headertest.DummyHeader](
34+
gossipSub,
35+
pubsub.DefaultMsgIdFn,
36+
)
37+
require.NoError(t, err)
38+
err = p2pSub.Start(context.Background())
39+
require.NoError(t, err)
40+
41+
sub, err := p2pSub.Subscribe()
42+
require.NoError(t, err)
43+
44+
head := suite.Head()
45+
syncer, err := NewSyncer(
46+
newTestStore(t, ctx, head),
47+
newTestStore(t, ctx, head),
48+
p2pSub,
49+
)
50+
require.NoError(t, err)
51+
err = syncer.Start(ctx)
52+
require.NoError(t, err)
53+
54+
t.Cleanup(func() {
55+
err = syncer.Stop(ctx)
56+
require.NoError(t, err)
57+
})
58+
59+
expectedHeader := suite.GenDummyHeaders(1)[0]
60+
61+
err = p2pSub.Broadcast(ctx, expectedHeader)
62+
require.NoError(t, err)
63+
64+
header, err := sub.NextHeader(ctx)
65+
require.NoError(t, err)
66+
assert.Equal(t, expectedHeader.Height(), header.Height())
67+
assert.Equal(t, expectedHeader.Hash(), header.Hash())
68+
69+
state := syncer.State()
70+
require.NoError(t, err)
71+
assert.Equal(t, expectedHeader.Height(), state.Height)
72+
}

0 commit comments

Comments
 (0)