Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/network): Add warp sync request handler #4186

Merged
merged 21 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Config struct {
// Service interfaces
BlockState BlockState
Syncer Syncer
WarpSyncProvider WarpSyncProvider
TransactionHandler TransactionHandler

// Used to specify the address broadcasted to other peers, and avoids using pubip.Get
Expand Down
40 changes: 40 additions & 0 deletions dot/network/messages/warp_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package messages

import (
"fmt"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"
)

// WarpProofRequest is a struct for p2p warp proof request
type WarpProofRequest struct {
Begin common.Hash
}

// Decode decodes the message into a WarpProofRequest
func (wpr *WarpProofRequest) Decode(in []byte) error {
return scale.Unmarshal(in, wpr)
}

// Encode encodes the warp sync request
func (wpr *WarpProofRequest) Encode() ([]byte, error) {
if wpr == nil {
return nil, fmt.Errorf("cannot encode nil WarpProofRequest")
}
return scale.Marshal(*wpr)
}

// String returns the string representation of a WarpProofRequest
func (wpr *WarpProofRequest) String() string {
if wpr == nil {
return "WarpProofRequest=nil"
}

return fmt.Sprintf("WarpProofRequest begin=%v", wpr.Begin)
}

var _ P2PMessage = (*WarpProofRequest)(nil)
55 changes: 55 additions & 0 deletions dot/network/mock_warp_sync_provider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dot/network/mocks_generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ package network
//go:generate mockgen -destination=mock_telemetry_test.go -package $GOPACKAGE . Telemetry
//go:generate mockgen -destination=mock_syncer_test.go -package $GOPACKAGE . Syncer
//go:generate mockgen -destination=mock_block_state_test.go -package $GOPACKAGE . BlockState
//go:generate mockgen -destination=mock_warp_sync_provider_test.go -package $GOPACKAGE . WarpSyncProvider
//go:generate mockgen -destination=mock_transaction_handler_test.go -package $GOPACKAGE . TransactionHandler
//go:generate mockgen -destination=mock_stream_test.go -package $GOPACKAGE github.com/libp2p/go-libp2p/core/network Stream
6 changes: 6 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (

// the following are sub-protocols used by the node
SyncID = "/sync/2"
WarpSyncID = "/sync/warp"
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"
Expand Down Expand Up @@ -129,6 +130,7 @@ type Service struct {
blockState BlockState
syncer Syncer
transactionHandler TransactionHandler
warpSyncProvider WarpSyncProvider

// Configuration options
noBootstrap bool
Expand Down Expand Up @@ -215,6 +217,7 @@ func NewService(cfg *Config) (*Service, error) {
noBootstrap: cfg.NoBootstrap,
noMDNS: cfg.NoMDNS,
syncer: cfg.Syncer,
warpSyncProvider: cfg.WarpSyncProvider,
notificationsProtocols: make(map[MessageType]*notificationsProtocol),
lightRequest: make(map[peer.ID]struct{}),
telemetryInterval: cfg.telemetryInterval,
Expand Down Expand Up @@ -252,8 +255,11 @@ func (s *Service) Start() error {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

genesisHashProtocolId := protocol.ID(s.cfg.BlockState.GenesisHash().String())

s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream)
s.host.registerStreamHandler(genesisHashProtocolId+WarpSyncID, s.handleWarpSyncStream)

// register block announce protocol
err := s.RegisterNotificationsProtocol(
Expand Down
79 changes: 79 additions & 0 deletions dot/network/warp_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package network

import (
"errors"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/lib/common"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

// WarpSyncProvider is an interface for generating warp sync proofs
type WarpSyncProvider interface {
// Generate proof starting at given block hash. The proof is accumulated until maximum proof
// size is reached.
generate(start common.Hash) (encodedProof []byte, err error)
}

func (s *Service) handleWarpSyncRequest(req messages.WarpProofRequest) ([]byte, error) {
// use the backend to generate the warp proof
proof, err := s.warpSyncProvider.generate(req.Begin)
if err != nil {
return nil, err
}
// send the response through pendingResponse channel
return proof, nil
}

func (s *Service) handleWarpSyncStream(stream libp2pnetwork.Stream) {
if stream == nil {
return
}

s.readStream(stream, decodeWarpSyncMessage, s.handleWarpSyncMessage, MaxBlockResponseSize)
}

func decodeWarpSyncMessage(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) {
msg := new(messages.WarpProofRequest)
err := msg.Decode(in)
return msg, err
}

func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg messages.P2PMessage) error {
if msg == nil {
return nil
}

defer func() {
err := stream.Close()
if err != nil && errors.Is(err, ErrStreamReset) {
logger.Warnf("failed to close stream: %s", err)
}
}()

if req, ok := msg.(*messages.WarpProofRequest); ok {
resp, err := s.handleWarpSyncRequest(*req)
if err != nil {
logger.Debugf("cannot create response for request: %s", err)
return nil
}

if _, err = stream.Write(resp); err != nil {
logger.Debugf("failed to send WarpSyncResponse message to peer %s: %s", stream.Conn().RemotePeer(), err)
return err
}
dimartiro marked this conversation as resolved.
Show resolved Hide resolved

logger.Debugf("successfully respond with WarpSyncResponse message to peer %s with proof %v",
stream.Conn().RemotePeer(),
resp,
)
} else {
logger.Debugf("received invalid message in warp sync handler: %v", msg)
}
dimartiro marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
110 changes: 110 additions & 0 deletions dot/network/warp_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package network

import (
"fmt"
"testing"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
gomock "go.uber.org/mock/gomock"
)

func TestDecodeWarpSyncMessage(t *testing.T) {
t.Parallel()

// Basic WarpProofRequest
testWarpReqMessage := &messages.WarpProofRequest{
Begin: common.MustBlake2bHash([]byte("test")),
}

// Test encoding
reqEnc, err := testWarpReqMessage.Encode()
require.NoError(t, err)
dimartiro marked this conversation as resolved.
Show resolved Hide resolved

//Expected encoded message compared with substrate impl
require.Equal(t, []byte{
0x92, 0x8b, 0x20, 0x36, 0x69, 0x43, 0xe2, 0xaf, 0xd1, 0x1e, 0xbc,
0xe, 0xae, 0x2e, 0x53, 0xa9, 0x3b, 0xf1, 0x77, 0xa4, 0xfc, 0xf3, 0x5b,
0xcc, 0x64, 0xd5, 0x3, 0x70, 0x4e, 0x65, 0xe2, 0x2,
}, reqEnc)

// Test decoding
testPeer := peer.ID("me")
msg, err := decodeWarpSyncMessage(reqEnc, testPeer, true)
require.NoError(t, err)

req, ok := msg.(*messages.WarpProofRequest)
require.True(t, ok)
require.Equal(t, testWarpReqMessage, req)
}

// createServiceWithWarpSyncHelper creates a basic service with warp sync handler support
func createServiceWithWarpSyncHelper(t *testing.T, warpSyncProvider WarpSyncProvider) *Service {
t.Helper()

config := &Config{
BasePath: t.TempDir(),
Port: availablePort(t),
NoBootstrap: true,
NoMDNS: true,
WarpSyncProvider: warpSyncProvider,
}

srvc := createTestService(t, config)
srvc.noGossip = true
handler := newTestStreamHandler(decodeSyncMessage)
srvc.host.registerStreamHandler(srvc.host.protocolID, handler.handleStream)

return srvc
}

func TestHandleWarpSyncRequestOk(t *testing.T) {
t.Parallel()

// Creates warp sync provider mock to generate proofs with the expected result
expectedProof := []byte{0x01}

ctrl := gomock.NewController(t)
warpSyncProvider := NewMockWarpSyncProvider(ctrl)
warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(expectedProof, nil).Times(1)

// Initiate service using the warp sync provider mock
srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider)

// Handle request and check resulting proof
req := messages.WarpProofRequest{
Begin: common.EmptyHash,
}

resp, err := srvc.handleWarpSyncRequest(req)
require.NoError(t, err)
require.Equal(t, expectedProof, resp)
}

func TestHandleWarpSyncRequestError(t *testing.T) {
t.Parallel()

// Creates warp sync provider mock to generate proofs with the expected erro
expectedError := fmt.Errorf("error generating proof")
ctrl := gomock.NewController(t)

warpSyncProvider := NewMockWarpSyncProvider(ctrl)
warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(nil, expectedError).Times(1)

// Initiate service using the warp sync provider mock
srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider)

// Handle request and check resulting error
req := messages.WarpProofRequest{
Begin: common.EmptyHash,
}

resp, err := srvc.handleWarpSyncRequest(req)
require.Nil(t, resp)
require.ErrorIs(t, err, expectedError)
}
3 changes: 3 additions & 0 deletions dot/rpc/modules/system_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func newNetworkService(t *testing.T) *network.Service {
blockStateMock.EXPECT().
GetHighestFinalisedHeader().
Return(types.NewEmptyHeader(), nil).AnyTimes()
blockStateMock.EXPECT().
GenesisHash().
Return(common.MustBlake2bHash([]byte("genesis"))).AnyTimes()

syncerMock := NewMockSyncer(ctrl)

Expand Down
Loading