Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/network): Add warp sync request handler #4186

Open
wants to merge 14 commits into
base: development
Choose a base branch
from
1 change: 1 addition & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Config struct {
// Service interfaces
BlockState BlockState
Syncer Syncer
warpSyncProvider WarpSyncProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably need to be public right?

TransactionHandler TransactionHandler

// Used to specify the address broadcasted to other peers, and avoids using pubip.Get
Expand Down
51 changes: 51 additions & 0 deletions dot/network/messages/warp_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package messages

import (
"bytes"
"fmt"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"
)

// WarpProofRequest is a struct for p2p warp proof request
type WarpProofRequest struct {
Begin common.Hash
}

// Decode decodes the message into a WarpProofRequest
func (wsr *WarpProofRequest) Decode(in []byte) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (wsr *WarpProofRequest) Decode(in []byte) error {
func (wpr *WarpProofRequest) Decode(in []byte) error {

same for below receivers

reader := bytes.NewReader(in)
sd := scale.NewDecoder(reader)
err := sd.Decode(&wsr)
if err != nil {
return err
}

return nil
}

// Encode encodes the warp sync request
func (wsr *WarpProofRequest) Encode() ([]byte, error) {
buffer := bytes.NewBuffer(nil)
encoder := scale.NewEncoder(buffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both encoding and decoding here, do we need to create the encoder/decoder object? Can we not just call scale.Marshal and scale.Unmarshal?

err := encoder.Encode(wsr)
if err != nil {
return nil, err
}
return buffer.Bytes(), nil
}

// String returns the string representation of a WarpProofRequest
func (wsr *WarpProofRequest) String() string {
if wsr == nil {
return "WarpProofRequest=nil"
}

return fmt.Sprintf("WarpProofRequest begin=%v", wsr.Begin)
}

var _ P2PMessage = (*WarpProofRequest)(nil)
55 changes: 55 additions & 0 deletions dot/network/mock_warp_sync_provider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dot/network/mocks_generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ package network
//go:generate mockgen -destination=mock_telemetry_test.go -package $GOPACKAGE . Telemetry
//go:generate mockgen -destination=mock_syncer_test.go -package $GOPACKAGE . Syncer
//go:generate mockgen -destination=mock_block_state_test.go -package $GOPACKAGE . BlockState
//go:generate mockgen -destination=mock_warp_sync_provider_test.go -package $GOPACKAGE . WarpSyncProvider
//go:generate mockgen -destination=mock_transaction_handler_test.go -package $GOPACKAGE . TransactionHandler
//go:generate mockgen -destination=mock_stream_test.go -package $GOPACKAGE github.com/libp2p/go-libp2p/core/network Stream
4 changes: 4 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (

// the following are sub-protocols used by the node
SyncID = "/sync/2"
WarpSyncID = "/sync/warp"
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"
Expand Down Expand Up @@ -129,6 +130,7 @@ type Service struct {
blockState BlockState
syncer Syncer
transactionHandler TransactionHandler
warpSyncProvider WarpSyncProvider

// Configuration options
noBootstrap bool
Expand Down Expand Up @@ -215,6 +217,7 @@ func NewService(cfg *Config) (*Service, error) {
noBootstrap: cfg.NoBootstrap,
noMDNS: cfg.NoMDNS,
syncer: cfg.Syncer,
warpSyncProvider: cfg.warpSyncProvider,
notificationsProtocols: make(map[MessageType]*notificationsProtocol),
lightRequest: make(map[peer.ID]struct{}),
telemetryInterval: cfg.telemetryInterval,
Expand Down Expand Up @@ -253,6 +256,7 @@ func (s *Service) Start() error {
}

s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+WarpSyncID, s.handleWarpSyncStream)
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream)

// register block announce protocol
Expand Down
70 changes: 70 additions & 0 deletions dot/network/warp_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package network

import (
"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/lib/common"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

// WarpSyncProvider is an interface for generating warp sync proofs
type WarpSyncProvider interface {
// Generate proof starting at given block hash. The proof is accumulated until maximum proof
// size is reached.
generate(start common.Hash) (encodedProof []byte, err error)
}

func (s *Service) handleWarpSyncRequest(req messages.WarpProofRequest) ([]byte, error) {
// use the backend to generate the warp proof
proof, err := s.warpSyncProvider.generate(req.Begin)
if err != nil {
return nil, err
}
// send the response through pendingResponse channel
return proof, nil
}

func (s *Service) handleWarpSyncStream(stream libp2pnetwork.Stream) {
if stream == nil {
return
}

s.readStream(stream, decodeSyncMessage, s.handleWarpSyncMessage, MaxBlockResponseSize)
dimartiro marked this conversation as resolved.
Show resolved Hide resolved
}

func decodeWarpSyncMessage(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) {
msg := new(messages.WarpProofRequest)
err := msg.Decode(in)
return msg, err
}

func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg messages.P2PMessage) error {
if msg == nil {
return nil
}

defer func() {
err := stream.Close()
if err != nil && err.Error() != ErrStreamReset.Error() {
dimartiro marked this conversation as resolved.
Show resolved Hide resolved
logger.Warnf("failed to close stream: %s", err)
}
}()

if req, ok := msg.(*messages.WarpProofRequest); ok {
resp, err := s.handleWarpSyncRequest(*req)
if err != nil {
logger.Debugf("cannot create response for request: %s", err)
return nil
}

if _, err = stream.Write(resp); err != nil {
logger.Debugf("failed to send WarpSyncResponse message to peer %s: %s", stream.Conn().RemotePeer(), err)
return err
}
dimartiro marked this conversation as resolved.
Show resolved Hide resolved
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a log or something for the !ok case?


return nil
}
103 changes: 103 additions & 0 deletions dot/network/warp_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package network

import (
"fmt"
"testing"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
gomock "go.uber.org/mock/gomock"
)

func TestDecodeWarpSyncMessage(t *testing.T) {
t.Parallel()

// Basic WarpProofRequest
testWarpReqMessage := &messages.WarpProofRequest{
Begin: common.MustBlake2bHash([]byte("test")),
}

// Test encoding
reqEnc, err := testWarpReqMessage.Encode()
require.NoError(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any tests in substrate with hardcoded bytes that we could use here? We have run into a problem before in serialization tests where we can encode and decode to an expected type, but the actual byte representation is incorrect.

However giving how simple this is, probably is not the end of the world. I just wanted to make a note of this


// Test decoding
testPeer := peer.ID("me")
msg, err := decodeWarpSyncMessage(reqEnc, testPeer, true)
require.NoError(t, err)

req, ok := msg.(*messages.WarpProofRequest)
require.True(t, ok)
require.Equal(t, testWarpReqMessage, req)
}

// createServiceWithWarpSyncHelper creates a basic service with warp sync handler support
func createServiceWithWarpSyncHelper(t *testing.T, warpSyncProvider WarpSyncProvider) *Service {
t.Helper()

config := &Config{
BasePath: t.TempDir(),
Port: availablePort(t),
NoBootstrap: true,
NoMDNS: true,
warpSyncProvider: warpSyncProvider,
}

srvc := createTestService(t, config)
srvc.noGossip = true
handler := newTestStreamHandler(decodeSyncMessage)
srvc.host.registerStreamHandler(srvc.host.protocolID, handler.handleStream)

return srvc
}

func TestHandleWarpSyncRequestOk(t *testing.T) {
t.Parallel()

// Creates warp sync provider mock to generate proofs with the expected result
expectedProof := []byte{0x01}

ctrl := gomock.NewController(t)
warpSyncProvider := NewMockWarpSyncProvider(ctrl)
warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(expectedProof, nil).Times(1)

// Initiate service using the warp sync provider mock
srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider)

// Handle request and check resulting proof
req := messages.WarpProofRequest{
Begin: common.EmptyHash,
}

resp, err := srvc.handleWarpSyncRequest(req)
require.NoError(t, err)
require.Equal(t, expectedProof, resp)
}

func TestHandleWarpSyncRequestError(t *testing.T) {
t.Parallel()

// Creates warp sync provider mock to generate proofs with the expected erro
expectedError := fmt.Errorf("error generating proof")
ctrl := gomock.NewController(t)

warpSyncProvider := NewMockWarpSyncProvider(ctrl)
warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(nil, expectedError).Times(1)

// Initiate service using the warp sync provider mock
srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider)

// Handle request and check resulting error
req := messages.WarpProofRequest{
Begin: common.EmptyHash,
}

resp, err := srvc.handleWarpSyncRequest(req)
require.Nil(t, resp)
require.ErrorIs(t, err, expectedError)
}
Loading