-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(dot/network): Add warp sync request handler #4186
base: development
Are you sure you want to change the base?
Changes from all commits
5958859
ee261f5
f80ff45
a7e03f7
592876a
fef07c5
6cd9a60
6204490
c513695
8be071e
8e188bb
e008834
c046b53
f3258f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,51 @@ | ||||||
// Copyright 2024 ChainSafe Systems (ON) | ||||||
// SPDX-License-Identifier: LGPL-3.0-only | ||||||
|
||||||
package messages | ||||||
|
||||||
import ( | ||||||
"bytes" | ||||||
"fmt" | ||||||
|
||||||
"github.com/ChainSafe/gossamer/lib/common" | ||||||
"github.com/ChainSafe/gossamer/pkg/scale" | ||||||
) | ||||||
|
||||||
// WarpProofRequest is a struct for p2p warp proof request | ||||||
type WarpProofRequest struct { | ||||||
Begin common.Hash | ||||||
} | ||||||
|
||||||
// Decode decodes the message into a WarpProofRequest | ||||||
func (wsr *WarpProofRequest) Decode(in []byte) error { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
same for below receivers |
||||||
reader := bytes.NewReader(in) | ||||||
sd := scale.NewDecoder(reader) | ||||||
err := sd.Decode(&wsr) | ||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
||||||
// Encode encodes the warp sync request | ||||||
func (wsr *WarpProofRequest) Encode() ([]byte, error) { | ||||||
buffer := bytes.NewBuffer(nil) | ||||||
encoder := scale.NewEncoder(buffer) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For both encoding and decoding here, do we need to create the encoder/decoder object? Can we not just call scale.Marshal and scale.Unmarshal? |
||||||
err := encoder.Encode(wsr) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
return buffer.Bytes(), nil | ||||||
} | ||||||
|
||||||
// String returns the string representation of a WarpProofRequest | ||||||
func (wsr *WarpProofRequest) String() string { | ||||||
if wsr == nil { | ||||||
return "WarpProofRequest=nil" | ||||||
} | ||||||
|
||||||
return fmt.Sprintf("WarpProofRequest begin=%v", wsr.Begin) | ||||||
} | ||||||
|
||||||
var _ P2PMessage = (*WarpProofRequest)(nil) |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
// Copyright 2024 ChainSafe Systems (ON) | ||
// SPDX-License-Identifier: LGPL-3.0-only | ||
|
||
package network | ||
|
||
import ( | ||
"errors" | ||
|
||
"github.com/ChainSafe/gossamer/dot/network/messages" | ||
"github.com/ChainSafe/gossamer/lib/common" | ||
libp2pnetwork "github.com/libp2p/go-libp2p/core/network" | ||
"github.com/libp2p/go-libp2p/core/peer" | ||
) | ||
|
||
// WarpSyncProvider is an interface for generating warp sync proofs | ||
type WarpSyncProvider interface { | ||
// Generate proof starting at given block hash. The proof is accumulated until maximum proof | ||
// size is reached. | ||
generate(start common.Hash) (encodedProof []byte, err error) | ||
} | ||
|
||
func (s *Service) handleWarpSyncRequest(req messages.WarpProofRequest) ([]byte, error) { | ||
// use the backend to generate the warp proof | ||
proof, err := s.warpSyncProvider.generate(req.Begin) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// send the response through pendingResponse channel | ||
return proof, nil | ||
} | ||
|
||
func (s *Service) handleWarpSyncStream(stream libp2pnetwork.Stream) { | ||
if stream == nil { | ||
return | ||
} | ||
|
||
s.readStream(stream, decodeWarpSyncMessage, s.handleWarpSyncMessage, MaxBlockResponseSize) | ||
} | ||
|
||
func decodeWarpSyncMessage(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) { | ||
msg := new(messages.WarpProofRequest) | ||
err := msg.Decode(in) | ||
return msg, err | ||
} | ||
|
||
func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg messages.P2PMessage) error { | ||
if msg == nil { | ||
return nil | ||
} | ||
|
||
defer func() { | ||
err := stream.Close() | ||
if err != nil && errors.Is(err, ErrStreamReset) { | ||
logger.Warnf("failed to close stream: %s", err) | ||
} | ||
}() | ||
|
||
if req, ok := msg.(*messages.WarpProofRequest); ok { | ||
resp, err := s.handleWarpSyncRequest(*req) | ||
if err != nil { | ||
logger.Debugf("cannot create response for request: %s", err) | ||
return nil | ||
} | ||
|
||
if _, err = stream.Write(resp); err != nil { | ||
logger.Debugf("failed to send WarpSyncResponse message to peer %s: %s", stream.Conn().RemotePeer(), err) | ||
return err | ||
} | ||
dimartiro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
logger.Debugf("successfully respond with WarpSyncResponse message to peer %s with proof %v", | ||
stream.Conn().RemotePeer(), | ||
resp, | ||
) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a log or something for the !ok case? |
||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
// Copyright 2024 ChainSafe Systems (ON) | ||
// SPDX-License-Identifier: LGPL-3.0-only | ||
|
||
package network | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/ChainSafe/gossamer/dot/network/messages" | ||
"github.com/ChainSafe/gossamer/lib/common" | ||
"github.com/libp2p/go-libp2p/core/peer" | ||
"github.com/stretchr/testify/require" | ||
gomock "go.uber.org/mock/gomock" | ||
) | ||
|
||
func TestDecodeWarpSyncMessage(t *testing.T) { | ||
t.Parallel() | ||
|
||
// Basic WarpProofRequest | ||
testWarpReqMessage := &messages.WarpProofRequest{ | ||
Begin: common.MustBlake2bHash([]byte("test")), | ||
} | ||
|
||
// Test encoding | ||
reqEnc, err := testWarpReqMessage.Encode() | ||
require.NoError(t, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any tests in substrate with hardcoded bytes that we could use here? We have run into a problem before in serialization tests where we can encode and decode to an expected type, but the actual byte representation is incorrect. However giving how simple this is, probably is not the end of the world. I just wanted to make a note of this |
||
|
||
// Test decoding | ||
testPeer := peer.ID("me") | ||
msg, err := decodeWarpSyncMessage(reqEnc, testPeer, true) | ||
require.NoError(t, err) | ||
|
||
req, ok := msg.(*messages.WarpProofRequest) | ||
require.True(t, ok) | ||
require.Equal(t, testWarpReqMessage, req) | ||
} | ||
|
||
// createServiceWithWarpSyncHelper creates a basic service with warp sync handler support | ||
func createServiceWithWarpSyncHelper(t *testing.T, warpSyncProvider WarpSyncProvider) *Service { | ||
t.Helper() | ||
|
||
config := &Config{ | ||
BasePath: t.TempDir(), | ||
Port: availablePort(t), | ||
NoBootstrap: true, | ||
NoMDNS: true, | ||
warpSyncProvider: warpSyncProvider, | ||
} | ||
|
||
srvc := createTestService(t, config) | ||
srvc.noGossip = true | ||
handler := newTestStreamHandler(decodeSyncMessage) | ||
srvc.host.registerStreamHandler(srvc.host.protocolID, handler.handleStream) | ||
|
||
return srvc | ||
} | ||
|
||
func TestHandleWarpSyncRequestOk(t *testing.T) { | ||
t.Parallel() | ||
|
||
// Creates warp sync provider mock to generate proofs with the expected result | ||
expectedProof := []byte{0x01} | ||
|
||
ctrl := gomock.NewController(t) | ||
warpSyncProvider := NewMockWarpSyncProvider(ctrl) | ||
warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(expectedProof, nil).Times(1) | ||
|
||
// Initiate service using the warp sync provider mock | ||
srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider) | ||
|
||
// Handle request and check resulting proof | ||
req := messages.WarpProofRequest{ | ||
Begin: common.EmptyHash, | ||
} | ||
|
||
resp, err := srvc.handleWarpSyncRequest(req) | ||
require.NoError(t, err) | ||
require.Equal(t, expectedProof, resp) | ||
} | ||
|
||
func TestHandleWarpSyncRequestError(t *testing.T) { | ||
t.Parallel() | ||
|
||
// Creates warp sync provider mock to generate proofs with the expected erro | ||
expectedError := fmt.Errorf("error generating proof") | ||
ctrl := gomock.NewController(t) | ||
|
||
warpSyncProvider := NewMockWarpSyncProvider(ctrl) | ||
warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(nil, expectedError).Times(1) | ||
|
||
// Initiate service using the warp sync provider mock | ||
srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider) | ||
|
||
// Handle request and check resulting error | ||
req := messages.WarpProofRequest{ | ||
Begin: common.EmptyHash, | ||
} | ||
|
||
resp, err := srvc.handleWarpSyncRequest(req) | ||
require.Nil(t, resp) | ||
require.ErrorIs(t, err, expectedError) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will probably need to be public right?