diff --git a/dot/network/config.go b/dot/network/config.go index 96d379a75a..ca73afb7c6 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -66,6 +66,7 @@ type Config struct { // Service interfaces BlockState BlockState Syncer Syncer + WarpSyncProvider WarpSyncProvider TransactionHandler TransactionHandler // Used to specify the address broadcasted to other peers, and avoids using pubip.Get diff --git a/dot/network/messages/warp_sync.go b/dot/network/messages/warp_sync.go new file mode 100644 index 0000000000..2e2897b6a1 --- /dev/null +++ b/dot/network/messages/warp_sync.go @@ -0,0 +1,40 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package messages + +import ( + "fmt" + + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/pkg/scale" +) + +// WarpProofRequest is a struct for p2p warp proof request +type WarpProofRequest struct { + Begin common.Hash +} + +// Decode decodes the message into a WarpProofRequest +func (wpr *WarpProofRequest) Decode(in []byte) error { + return scale.Unmarshal(in, wpr) +} + +// Encode encodes the warp sync request +func (wpr *WarpProofRequest) Encode() ([]byte, error) { + if wpr == nil { + return nil, fmt.Errorf("cannot encode nil WarpProofRequest") + } + return scale.Marshal(*wpr) +} + +// String returns the string representation of a WarpProofRequest +func (wpr *WarpProofRequest) String() string { + if wpr == nil { + return "WarpProofRequest=nil" + } + + return fmt.Sprintf("WarpProofRequest begin=%v", wpr.Begin) +} + +var _ P2PMessage = (*WarpProofRequest)(nil) diff --git a/dot/network/mock_warp_sync_provider_test.go b/dot/network/mock_warp_sync_provider_test.go new file mode 100644 index 0000000000..4b99b3a346 --- /dev/null +++ b/dot/network/mock_warp_sync_provider_test.go @@ -0,0 +1,55 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ChainSafe/gossamer/dot/network (interfaces: WarpSyncProvider) +// +// Generated by this command: +// +// mockgen -destination=mock_warp_sync_provider_test.go -package network . WarpSyncProvider +// + +// Package network is a generated GoMock package. +package network + +import ( + reflect "reflect" + + common "github.com/ChainSafe/gossamer/lib/common" + gomock "go.uber.org/mock/gomock" +) + +// MockWarpSyncProvider is a mock of WarpSyncProvider interface. +type MockWarpSyncProvider struct { + ctrl *gomock.Controller + recorder *MockWarpSyncProviderMockRecorder +} + +// MockWarpSyncProviderMockRecorder is the mock recorder for MockWarpSyncProvider. +type MockWarpSyncProviderMockRecorder struct { + mock *MockWarpSyncProvider +} + +// NewMockWarpSyncProvider creates a new mock instance. +func NewMockWarpSyncProvider(ctrl *gomock.Controller) *MockWarpSyncProvider { + mock := &MockWarpSyncProvider{ctrl: ctrl} + mock.recorder = &MockWarpSyncProviderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockWarpSyncProvider) EXPECT() *MockWarpSyncProviderMockRecorder { + return m.recorder +} + +// generate mocks base method. +func (m *MockWarpSyncProvider) generate(arg0 common.Hash) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "generate", arg0) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// generate indicates an expected call of generate. +func (mr *MockWarpSyncProviderMockRecorder) generate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "generate", reflect.TypeOf((*MockWarpSyncProvider)(nil).generate), arg0) +} diff --git a/dot/network/mocks_generate_test.go b/dot/network/mocks_generate_test.go index d170388470..fd7f793e78 100644 --- a/dot/network/mocks_generate_test.go +++ b/dot/network/mocks_generate_test.go @@ -6,5 +6,6 @@ package network //go:generate mockgen -destination=mock_telemetry_test.go -package $GOPACKAGE . Telemetry //go:generate mockgen -destination=mock_syncer_test.go -package $GOPACKAGE . Syncer //go:generate mockgen -destination=mock_block_state_test.go -package $GOPACKAGE . BlockState +//go:generate mockgen -destination=mock_warp_sync_provider_test.go -package $GOPACKAGE . WarpSyncProvider //go:generate mockgen -destination=mock_transaction_handler_test.go -package $GOPACKAGE . TransactionHandler //go:generate mockgen -destination=mock_stream_test.go -package $GOPACKAGE github.com/libp2p/go-libp2p/core/network Stream diff --git a/dot/network/service.go b/dot/network/service.go index 829e9f0fd0..864728f6fb 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -33,6 +33,7 @@ const ( // the following are sub-protocols used by the node SyncID = "/sync/2" + WarpSyncID = "/sync/warp" lightID = "/light/2" blockAnnounceID = "/block-announces/1" transactionsID = "/transactions/1" @@ -129,6 +130,7 @@ type Service struct { blockState BlockState syncer Syncer transactionHandler TransactionHandler + warpSyncProvider WarpSyncProvider // Configuration options noBootstrap bool @@ -215,6 +217,7 @@ func NewService(cfg *Config) (*Service, error) { noBootstrap: cfg.NoBootstrap, noMDNS: cfg.NoMDNS, syncer: cfg.Syncer, + warpSyncProvider: cfg.WarpSyncProvider, notificationsProtocols: make(map[MessageType]*notificationsProtocol), lightRequest: make(map[peer.ID]struct{}), telemetryInterval: cfg.telemetryInterval, @@ -252,8 +255,11 @@ func (s *Service) Start() error { s.ctx, s.cancel = context.WithCancel(context.Background()) } + genesisHashProtocolId := protocol.ID(s.cfg.BlockState.GenesisHash().String()) + s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream) s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream) + s.host.registerStreamHandler(genesisHashProtocolId+WarpSyncID, s.handleWarpSyncStream) // register block announce protocol err := s.RegisterNotificationsProtocol( diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go new file mode 100644 index 0000000000..ff14800951 --- /dev/null +++ b/dot/network/warp_sync.go @@ -0,0 +1,79 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package network + +import ( + "errors" + + "github.com/ChainSafe/gossamer/dot/network/messages" + "github.com/ChainSafe/gossamer/lib/common" + libp2pnetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" +) + +// WarpSyncProvider is an interface for generating warp sync proofs +type WarpSyncProvider interface { + // Generate proof starting at given block hash. The proof is accumulated until maximum proof + // size is reached. + generate(start common.Hash) (encodedProof []byte, err error) +} + +func (s *Service) handleWarpSyncRequest(req messages.WarpProofRequest) ([]byte, error) { + // use the backend to generate the warp proof + proof, err := s.warpSyncProvider.generate(req.Begin) + if err != nil { + return nil, err + } + // send the response through pendingResponse channel + return proof, nil +} + +func (s *Service) handleWarpSyncStream(stream libp2pnetwork.Stream) { + if stream == nil { + return + } + + s.readStream(stream, decodeWarpSyncMessage, s.handleWarpSyncMessage, MaxBlockResponseSize) +} + +func decodeWarpSyncMessage(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) { + msg := new(messages.WarpProofRequest) + err := msg.Decode(in) + return msg, err +} + +func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg messages.P2PMessage) error { + if msg == nil { + return nil + } + + defer func() { + err := stream.Close() + if err != nil && errors.Is(err, ErrStreamReset) { + logger.Warnf("failed to close stream: %s", err) + } + }() + + if req, ok := msg.(*messages.WarpProofRequest); ok { + resp, err := s.handleWarpSyncRequest(*req) + if err != nil { + logger.Debugf("cannot create response for request: %s", err) + return nil + } + + if _, err = stream.Write(resp); err != nil { + logger.Debugf("failed to send WarpSyncResponse message to peer %s: %s", stream.Conn().RemotePeer(), err) + return err + } + + logger.Debugf("successfully respond with WarpSyncResponse message to peer %s with proof %v", + stream.Conn().RemotePeer(), + resp, + ) + } else { + logger.Debugf("received invalid message in warp sync handler: %v", msg) + } + + return nil +} diff --git a/dot/network/warp_sync_test.go b/dot/network/warp_sync_test.go new file mode 100644 index 0000000000..ee91f2e9e9 --- /dev/null +++ b/dot/network/warp_sync_test.go @@ -0,0 +1,110 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package network + +import ( + "fmt" + "testing" + + "github.com/ChainSafe/gossamer/dot/network/messages" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + gomock "go.uber.org/mock/gomock" +) + +func TestDecodeWarpSyncMessage(t *testing.T) { + t.Parallel() + + // Basic WarpProofRequest + testWarpReqMessage := &messages.WarpProofRequest{ + Begin: common.MustBlake2bHash([]byte("test")), + } + + // Test encoding + reqEnc, err := testWarpReqMessage.Encode() + require.NoError(t, err) + + //Expected encoded message compared with substrate impl + require.Equal(t, []byte{ + 0x92, 0x8b, 0x20, 0x36, 0x69, 0x43, 0xe2, 0xaf, 0xd1, 0x1e, 0xbc, + 0xe, 0xae, 0x2e, 0x53, 0xa9, 0x3b, 0xf1, 0x77, 0xa4, 0xfc, 0xf3, 0x5b, + 0xcc, 0x64, 0xd5, 0x3, 0x70, 0x4e, 0x65, 0xe2, 0x2, + }, reqEnc) + + // Test decoding + testPeer := peer.ID("me") + msg, err := decodeWarpSyncMessage(reqEnc, testPeer, true) + require.NoError(t, err) + + req, ok := msg.(*messages.WarpProofRequest) + require.True(t, ok) + require.Equal(t, testWarpReqMessage, req) +} + +// createServiceWithWarpSyncHelper creates a basic service with warp sync handler support +func createServiceWithWarpSyncHelper(t *testing.T, warpSyncProvider WarpSyncProvider) *Service { + t.Helper() + + config := &Config{ + BasePath: t.TempDir(), + Port: availablePort(t), + NoBootstrap: true, + NoMDNS: true, + WarpSyncProvider: warpSyncProvider, + } + + srvc := createTestService(t, config) + srvc.noGossip = true + handler := newTestStreamHandler(decodeSyncMessage) + srvc.host.registerStreamHandler(srvc.host.protocolID, handler.handleStream) + + return srvc +} + +func TestHandleWarpSyncRequestOk(t *testing.T) { + t.Parallel() + + // Creates warp sync provider mock to generate proofs with the expected result + expectedProof := []byte{0x01} + + ctrl := gomock.NewController(t) + warpSyncProvider := NewMockWarpSyncProvider(ctrl) + warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(expectedProof, nil).Times(1) + + // Initiate service using the warp sync provider mock + srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider) + + // Handle request and check resulting proof + req := messages.WarpProofRequest{ + Begin: common.EmptyHash, + } + + resp, err := srvc.handleWarpSyncRequest(req) + require.NoError(t, err) + require.Equal(t, expectedProof, resp) +} + +func TestHandleWarpSyncRequestError(t *testing.T) { + t.Parallel() + + // Creates warp sync provider mock to generate proofs with the expected erro + expectedError := fmt.Errorf("error generating proof") + ctrl := gomock.NewController(t) + + warpSyncProvider := NewMockWarpSyncProvider(ctrl) + warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(nil, expectedError).Times(1) + + // Initiate service using the warp sync provider mock + srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider) + + // Handle request and check resulting error + req := messages.WarpProofRequest{ + Begin: common.EmptyHash, + } + + resp, err := srvc.handleWarpSyncRequest(req) + require.Nil(t, resp) + require.ErrorIs(t, err, expectedError) +} diff --git a/dot/rpc/modules/system_integration_test.go b/dot/rpc/modules/system_integration_test.go index 275a741892..1296226ffe 100644 --- a/dot/rpc/modules/system_integration_test.go +++ b/dot/rpc/modules/system_integration_test.go @@ -52,6 +52,9 @@ func newNetworkService(t *testing.T) *network.Service { blockStateMock.EXPECT(). GetHighestFinalisedHeader(). Return(types.NewEmptyHeader(), nil).AnyTimes() + blockStateMock.EXPECT(). + GenesisHash(). + Return(common.MustBlake2bHash([]byte("genesis"))).AnyTimes() syncerMock := NewMockSyncer(ctrl)