From 94302d9fe3bad6dff6c0c1b2f881fcd64ba2188e Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 18 Sep 2024 14:22:51 -0300 Subject: [PATCH 01/21] feat(dot/network): Add warp sync request handler --- dot/network/messages/warp_sync.go | 45 ++++++++++++++++++++ dot/network/service.go | 3 ++ dot/network/warp_sync.go | 70 +++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 dot/network/messages/warp_sync.go create mode 100644 dot/network/warp_sync.go diff --git a/dot/network/messages/warp_sync.go b/dot/network/messages/warp_sync.go new file mode 100644 index 0000000000..5db3f4ebe4 --- /dev/null +++ b/dot/network/messages/warp_sync.go @@ -0,0 +1,45 @@ +package messages + +import ( + "bytes" + "fmt" + + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/pkg/scale" +) + +type WarpProofRequest struct { + Begin common.Hash +} + +func (wsr *WarpProofRequest) Decode(in []byte) error { + reader := bytes.NewReader(in) + sd := scale.NewDecoder(reader) + reqProof := &WarpProofRequest{} + err := sd.Decode(&reqProof) + if err != nil { + return err + } + + return nil +} + +func (wsr *WarpProofRequest) Encode() ([]byte, error) { + buffer := bytes.NewBuffer(nil) + encoder := scale.NewEncoder(buffer) + err := encoder.Encode(wsr) + if err != nil { + return nil, err + } + return buffer.Bytes(), nil +} + +func (wsr *WarpProofRequest) String() string { + if wsr == nil { + return "WarpProofRequest=nil" + } + + return fmt.Sprintf("WarpProofRequest begin=%v", wsr.Begin) +} + +var _ P2PMessage = (*WarpProofRequest)(nil) diff --git a/dot/network/service.go b/dot/network/service.go index 829e9f0fd0..9a5fb26536 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -33,6 +33,7 @@ const ( // the following are sub-protocols used by the node SyncID = "/sync/2" + WarpSyncID = "/sync/warp" lightID = "/light/2" blockAnnounceID = "/block-announces/1" transactionsID = "/transactions/1" @@ -129,6 +130,7 @@ type Service struct { blockState BlockState syncer Syncer transactionHandler TransactionHandler + warpSyncHandler WarpSyncRequestHandler // Configuration options noBootstrap bool @@ -253,6 +255,7 @@ func (s *Service) Start() error { } s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream) + s.host.registerStreamHandler(s.host.protocolID+WarpSyncID, s.handleWarpSyncStream) s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream) // register block announce protocol diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go new file mode 100644 index 0000000000..dde6d863d1 --- /dev/null +++ b/dot/network/warp_sync.go @@ -0,0 +1,70 @@ +package network + +import ( + "github.com/ChainSafe/gossamer/dot/network/messages" + "github.com/ChainSafe/gossamer/lib/common" + libp2pnetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" +) + +type WarpSyncProvider interface { + // Generate proof starting at given block hash. The proof is accumulated until maximum proof + // size is reached. + generate(start common.Hash) (encodedProof []byte, err error) +} + +type WarpSyncRequestHandler struct { + backend WarpSyncProvider +} + +func (w *WarpSyncRequestHandler) handleRequest(req messages.WarpProofRequest) ([]byte, error) { + // use the backend to generate the warp proof + proof, err := w.backend.generate(req.Begin) + if err != nil { + return nil, err + } + // send the response through pendingResponse channel + return proof, nil +} + +func (s *Service) handleWarpSyncStream(stream libp2pnetwork.Stream) { + if stream == nil { + return + } + + s.readStream(stream, decodeSyncMessage, s.handleWarpSyncMessage, MaxBlockResponseSize) +} + +func decodeWarpSyncMessage(in []byte, peer peer.ID, _ bool) (messages.P2PMessage, error) { + msg := new(messages.WarpProofRequest) + err := msg.Decode(in) + return msg, err +} + +func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg messages.P2PMessage) error { + if msg == nil { + return nil + } + + defer func() { + err := stream.Close() + if err != nil && err.Error() != ErrStreamReset.Error() { + logger.Warnf("failed to close stream: %s", err) + } + }() + + if req, ok := msg.(*messages.WarpProofRequest); ok { + resp, err := s.warpSyncHandler.handleRequest(*req) + if err != nil { + logger.Debugf("cannot create response for request: %s", err) + return nil + } + + if _, err = stream.Write(resp); err != nil { + logger.Debugf("failed to send WarpSyncResponse message to peer %s: %s", stream.Conn().RemotePeer(), err) + return err + } + } + + return nil +} From 900f9f3b2924c5a40243d2993941e19228773a4c Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 18 Sep 2024 14:26:44 -0300 Subject: [PATCH 02/21] Add warp sync handler to service cfg --- dot/network/config.go | 1 + dot/network/service.go | 1 + 2 files changed, 2 insertions(+) diff --git a/dot/network/config.go b/dot/network/config.go index 96d379a75a..c041b91138 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -66,6 +66,7 @@ type Config struct { // Service interfaces BlockState BlockState Syncer Syncer + warpSyncHandler WarpSyncRequestHandler TransactionHandler TransactionHandler // Used to specify the address broadcasted to other peers, and avoids using pubip.Get diff --git a/dot/network/service.go b/dot/network/service.go index 9a5fb26536..3e6d32eca3 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -217,6 +217,7 @@ func NewService(cfg *Config) (*Service, error) { noBootstrap: cfg.NoBootstrap, noMDNS: cfg.NoMDNS, syncer: cfg.Syncer, + warpSyncHandler: cfg.warpSyncHandler, notificationsProtocols: make(map[MessageType]*notificationsProtocol), lightRequest: make(map[peer.ID]struct{}), telemetryInterval: cfg.telemetryInterval, From 38815dad139c404ce171f7d9f9c4e86dc03d1d7a Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 18 Sep 2024 14:31:46 -0300 Subject: [PATCH 03/21] Ignore peer argument --- dot/network/warp_sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go index dde6d863d1..4ff277c6cb 100644 --- a/dot/network/warp_sync.go +++ b/dot/network/warp_sync.go @@ -35,7 +35,7 @@ func (s *Service) handleWarpSyncStream(stream libp2pnetwork.Stream) { s.readStream(stream, decodeSyncMessage, s.handleWarpSyncMessage, MaxBlockResponseSize) } -func decodeWarpSyncMessage(in []byte, peer peer.ID, _ bool) (messages.P2PMessage, error) { +func decodeWarpSyncMessage(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) { msg := new(messages.WarpProofRequest) err := msg.Decode(in) return msg, err From 3220e598dd5e201005c2ae7587cded3490ae1910 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 18 Sep 2024 14:31:56 -0300 Subject: [PATCH 04/21] Add message decoding test --- dot/network/warp_sync_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 dot/network/warp_sync_test.go diff --git a/dot/network/warp_sync_test.go b/dot/network/warp_sync_test.go new file mode 100644 index 0000000000..b599563018 --- /dev/null +++ b/dot/network/warp_sync_test.go @@ -0,0 +1,28 @@ +package network + +import ( + "testing" + + "github.com/ChainSafe/gossamer/dot/network/messages" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +func TestDecodeWarpSyncMessage(t *testing.T) { + t.Parallel() + testWarpReqMessage := &messages.WarpProofRequest{ + Begin: common.EmptyHash, + } + + testPeer := peer.ID("me") + reqEnc, err := testWarpReqMessage.Encode() + require.NoError(t, err) + + msg, err := decodeWarpSyncMessage(reqEnc, testPeer, true) + require.NoError(t, err) + + req, ok := msg.(*messages.WarpProofRequest) + require.True(t, ok) + require.Equal(t, testWarpReqMessage, req) +} From b907cc194c85ef9ff0983cae507a2d2a6304cc51 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 18 Sep 2024 16:36:45 -0300 Subject: [PATCH 05/21] Simplify handler --- dot/network/config.go | 2 +- dot/network/service.go | 4 ++-- dot/network/warp_sync.go | 10 +++------- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/dot/network/config.go b/dot/network/config.go index c041b91138..1fd041e018 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -66,7 +66,7 @@ type Config struct { // Service interfaces BlockState BlockState Syncer Syncer - warpSyncHandler WarpSyncRequestHandler + warpSyncProvider WarpSyncProvider TransactionHandler TransactionHandler // Used to specify the address broadcasted to other peers, and avoids using pubip.Get diff --git a/dot/network/service.go b/dot/network/service.go index 3e6d32eca3..61c29f55f1 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -130,7 +130,7 @@ type Service struct { blockState BlockState syncer Syncer transactionHandler TransactionHandler - warpSyncHandler WarpSyncRequestHandler + warpSyncProvider WarpSyncProvider // Configuration options noBootstrap bool @@ -217,7 +217,7 @@ func NewService(cfg *Config) (*Service, error) { noBootstrap: cfg.NoBootstrap, noMDNS: cfg.NoMDNS, syncer: cfg.Syncer, - warpSyncHandler: cfg.warpSyncHandler, + warpSyncProvider: cfg.warpSyncProvider, notificationsProtocols: make(map[MessageType]*notificationsProtocol), lightRequest: make(map[peer.ID]struct{}), telemetryInterval: cfg.telemetryInterval, diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go index 4ff277c6cb..8ae45c3e01 100644 --- a/dot/network/warp_sync.go +++ b/dot/network/warp_sync.go @@ -13,13 +13,9 @@ type WarpSyncProvider interface { generate(start common.Hash) (encodedProof []byte, err error) } -type WarpSyncRequestHandler struct { - backend WarpSyncProvider -} - -func (w *WarpSyncRequestHandler) handleRequest(req messages.WarpProofRequest) ([]byte, error) { +func (s *Service) handleWarpSyncRequest(req messages.WarpProofRequest) ([]byte, error) { // use the backend to generate the warp proof - proof, err := w.backend.generate(req.Begin) + proof, err := s.warpSyncProvider.generate(req.Begin) if err != nil { return nil, err } @@ -54,7 +50,7 @@ func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg message }() if req, ok := msg.(*messages.WarpProofRequest); ok { - resp, err := s.warpSyncHandler.handleRequest(*req) + resp, err := s.handleWarpSyncRequest(*req) if err != nil { logger.Debugf("cannot create response for request: %s", err) return nil From ea6bf9aa898a423b3771724d42e701e2931d8564 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 18 Sep 2024 17:01:26 -0300 Subject: [PATCH 06/21] Add missing license --- dot/network/messages/warp_sync.go | 3 +++ dot/network/warp_sync.go | 3 +++ dot/network/warp_sync_test.go | 3 +++ 3 files changed, 9 insertions(+) diff --git a/dot/network/messages/warp_sync.go b/dot/network/messages/warp_sync.go index 5db3f4ebe4..6efe9dbaea 100644 --- a/dot/network/messages/warp_sync.go +++ b/dot/network/messages/warp_sync.go @@ -1,3 +1,6 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + package messages import ( diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go index 8ae45c3e01..a0cc883735 100644 --- a/dot/network/warp_sync.go +++ b/dot/network/warp_sync.go @@ -1,3 +1,6 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + package network import ( diff --git a/dot/network/warp_sync_test.go b/dot/network/warp_sync_test.go index b599563018..b961a1c3cc 100644 --- a/dot/network/warp_sync_test.go +++ b/dot/network/warp_sync_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + package network import ( From c20454159a4812d0066fe8459024d62b2ff1f306 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 18 Sep 2024 17:18:52 -0300 Subject: [PATCH 07/21] Add more tests --- dot/network/mock_warp_sync_provider_test.go | 55 ++++++++++++++++++ dot/network/mocks_generate_test.go | 1 + dot/network/warp_sync_test.go | 63 +++++++++++++++++++++ 3 files changed, 119 insertions(+) create mode 100644 dot/network/mock_warp_sync_provider_test.go diff --git a/dot/network/mock_warp_sync_provider_test.go b/dot/network/mock_warp_sync_provider_test.go new file mode 100644 index 0000000000..4b99b3a346 --- /dev/null +++ b/dot/network/mock_warp_sync_provider_test.go @@ -0,0 +1,55 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ChainSafe/gossamer/dot/network (interfaces: WarpSyncProvider) +// +// Generated by this command: +// +// mockgen -destination=mock_warp_sync_provider_test.go -package network . WarpSyncProvider +// + +// Package network is a generated GoMock package. +package network + +import ( + reflect "reflect" + + common "github.com/ChainSafe/gossamer/lib/common" + gomock "go.uber.org/mock/gomock" +) + +// MockWarpSyncProvider is a mock of WarpSyncProvider interface. +type MockWarpSyncProvider struct { + ctrl *gomock.Controller + recorder *MockWarpSyncProviderMockRecorder +} + +// MockWarpSyncProviderMockRecorder is the mock recorder for MockWarpSyncProvider. +type MockWarpSyncProviderMockRecorder struct { + mock *MockWarpSyncProvider +} + +// NewMockWarpSyncProvider creates a new mock instance. +func NewMockWarpSyncProvider(ctrl *gomock.Controller) *MockWarpSyncProvider { + mock := &MockWarpSyncProvider{ctrl: ctrl} + mock.recorder = &MockWarpSyncProviderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockWarpSyncProvider) EXPECT() *MockWarpSyncProviderMockRecorder { + return m.recorder +} + +// generate mocks base method. +func (m *MockWarpSyncProvider) generate(arg0 common.Hash) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "generate", arg0) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// generate indicates an expected call of generate. +func (mr *MockWarpSyncProviderMockRecorder) generate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "generate", reflect.TypeOf((*MockWarpSyncProvider)(nil).generate), arg0) +} diff --git a/dot/network/mocks_generate_test.go b/dot/network/mocks_generate_test.go index d170388470..fd7f793e78 100644 --- a/dot/network/mocks_generate_test.go +++ b/dot/network/mocks_generate_test.go @@ -6,5 +6,6 @@ package network //go:generate mockgen -destination=mock_telemetry_test.go -package $GOPACKAGE . Telemetry //go:generate mockgen -destination=mock_syncer_test.go -package $GOPACKAGE . Syncer //go:generate mockgen -destination=mock_block_state_test.go -package $GOPACKAGE . BlockState +//go:generate mockgen -destination=mock_warp_sync_provider_test.go -package $GOPACKAGE . WarpSyncProvider //go:generate mockgen -destination=mock_transaction_handler_test.go -package $GOPACKAGE . TransactionHandler //go:generate mockgen -destination=mock_stream_test.go -package $GOPACKAGE github.com/libp2p/go-libp2p/core/network Stream diff --git a/dot/network/warp_sync_test.go b/dot/network/warp_sync_test.go index b961a1c3cc..e80def0345 100644 --- a/dot/network/warp_sync_test.go +++ b/dot/network/warp_sync_test.go @@ -4,16 +4,19 @@ package network import ( + "fmt" "testing" "github.com/ChainSafe/gossamer/dot/network/messages" "github.com/ChainSafe/gossamer/lib/common" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" + gomock "go.uber.org/mock/gomock" ) func TestDecodeWarpSyncMessage(t *testing.T) { t.Parallel() + testWarpReqMessage := &messages.WarpProofRequest{ Begin: common.EmptyHash, } @@ -29,3 +32,63 @@ func TestDecodeWarpSyncMessage(t *testing.T) { require.True(t, ok) require.Equal(t, testWarpReqMessage, req) } + +func createServiceWithWarpSyncHelper(t *testing.T, warpSyncProvider WarpSyncProvider) *Service { + t.Helper() + + config := &Config{ + BasePath: t.TempDir(), + Port: availablePort(t), + NoBootstrap: true, + NoMDNS: true, + warpSyncProvider: warpSyncProvider, + } + + srvc := createTestService(t, config) + srvc.noGossip = true + handler := newTestStreamHandler(decodeSyncMessage) + srvc.host.registerStreamHandler(srvc.host.protocolID, handler.handleStream) + + return srvc +} + +func TestHandleWarpSyncRequestOk(t *testing.T) { + t.Parallel() + + expectedProof := []byte{0x01} + + ctrl := gomock.NewController(t) + + warpSyncProvider := NewMockWarpSyncProvider(ctrl) + warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(expectedProof, nil).Times(1) + + srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider) + + req := messages.WarpProofRequest{ + Begin: common.EmptyHash, + } + + resp, err := srvc.handleWarpSyncRequest(req) + require.NoError(t, err) + require.Equal(t, expectedProof, resp) +} + +func TestHandleWarpSyncRequestError(t *testing.T) { + t.Parallel() + + expectedError := fmt.Errorf("error generating proof") + ctrl := gomock.NewController(t) + + warpSyncProvider := NewMockWarpSyncProvider(ctrl) + warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(nil, expectedError).Times(1) + + srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider) + + req := messages.WarpProofRequest{ + Begin: common.EmptyHash, + } + + resp, err := srvc.handleWarpSyncRequest(req) + require.Nil(t, resp) + require.ErrorIs(t, err, expectedError) +} From fc90804cd30fe26f48f870facd315999dac675ad Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 18 Sep 2024 17:27:47 -0300 Subject: [PATCH 08/21] Fix warp sync message decode --- dot/network/messages/warp_sync.go | 3 +-- dot/network/warp_sync_test.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dot/network/messages/warp_sync.go b/dot/network/messages/warp_sync.go index 6efe9dbaea..2754d2e9cd 100644 --- a/dot/network/messages/warp_sync.go +++ b/dot/network/messages/warp_sync.go @@ -18,8 +18,7 @@ type WarpProofRequest struct { func (wsr *WarpProofRequest) Decode(in []byte) error { reader := bytes.NewReader(in) sd := scale.NewDecoder(reader) - reqProof := &WarpProofRequest{} - err := sd.Decode(&reqProof) + err := sd.Decode(&wsr) if err != nil { return err } diff --git a/dot/network/warp_sync_test.go b/dot/network/warp_sync_test.go index e80def0345..5332d2a40b 100644 --- a/dot/network/warp_sync_test.go +++ b/dot/network/warp_sync_test.go @@ -18,7 +18,7 @@ func TestDecodeWarpSyncMessage(t *testing.T) { t.Parallel() testWarpReqMessage := &messages.WarpProofRequest{ - Begin: common.EmptyHash, + Begin: common.MustBlake2bHash([]byte("test")), } testPeer := peer.ID("me") From 4dfdbadd9f4c405218040c4ba6e9c72fc04fc598 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 18 Sep 2024 17:32:28 -0300 Subject: [PATCH 09/21] Add more comments --- dot/network/messages/warp_sync.go | 4 ++++ dot/network/warp_sync.go | 1 + dot/network/warp_sync_test.go | 13 +++++++++++-- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/dot/network/messages/warp_sync.go b/dot/network/messages/warp_sync.go index 2754d2e9cd..941ae1ea5c 100644 --- a/dot/network/messages/warp_sync.go +++ b/dot/network/messages/warp_sync.go @@ -11,10 +11,12 @@ import ( "github.com/ChainSafe/gossamer/pkg/scale" ) +// WarpProofRequest is a struct for p2p warp proof request type WarpProofRequest struct { Begin common.Hash } +// Decode decodes the message into a WarpProofRequest func (wsr *WarpProofRequest) Decode(in []byte) error { reader := bytes.NewReader(in) sd := scale.NewDecoder(reader) @@ -26,6 +28,7 @@ func (wsr *WarpProofRequest) Decode(in []byte) error { return nil } +// Encode encodes the warp sync request func (wsr *WarpProofRequest) Encode() ([]byte, error) { buffer := bytes.NewBuffer(nil) encoder := scale.NewEncoder(buffer) @@ -36,6 +39,7 @@ func (wsr *WarpProofRequest) Encode() ([]byte, error) { return buffer.Bytes(), nil } +// String returns the string representation of a WarpProofRequest func (wsr *WarpProofRequest) String() string { if wsr == nil { return "WarpProofRequest=nil" diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go index a0cc883735..0b215b3447 100644 --- a/dot/network/warp_sync.go +++ b/dot/network/warp_sync.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) +// WarpSyncProvider is an interface for generating warp sync proofs type WarpSyncProvider interface { // Generate proof starting at given block hash. The proof is accumulated until maximum proof // size is reached. diff --git a/dot/network/warp_sync_test.go b/dot/network/warp_sync_test.go index 5332d2a40b..5de0295c36 100644 --- a/dot/network/warp_sync_test.go +++ b/dot/network/warp_sync_test.go @@ -17,14 +17,17 @@ import ( func TestDecodeWarpSyncMessage(t *testing.T) { t.Parallel() + // Basic WarpProofRequest testWarpReqMessage := &messages.WarpProofRequest{ Begin: common.MustBlake2bHash([]byte("test")), } - testPeer := peer.ID("me") + // Test encoding reqEnc, err := testWarpReqMessage.Encode() require.NoError(t, err) + // Test decoding + testPeer := peer.ID("me") msg, err := decodeWarpSyncMessage(reqEnc, testPeer, true) require.NoError(t, err) @@ -33,6 +36,7 @@ func TestDecodeWarpSyncMessage(t *testing.T) { require.Equal(t, testWarpReqMessage, req) } +// createServiceWithWarpSyncHelper creates a basic service with warp sync handler support func createServiceWithWarpSyncHelper(t *testing.T, warpSyncProvider WarpSyncProvider) *Service { t.Helper() @@ -55,15 +59,17 @@ func createServiceWithWarpSyncHelper(t *testing.T, warpSyncProvider WarpSyncProv func TestHandleWarpSyncRequestOk(t *testing.T) { t.Parallel() + // Creates warp sync provider mock to generate proofs with the expected result expectedProof := []byte{0x01} ctrl := gomock.NewController(t) - warpSyncProvider := NewMockWarpSyncProvider(ctrl) warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(expectedProof, nil).Times(1) + // Initiate service using the warp sync provider mock srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider) + // Handle request and check resulting proof req := messages.WarpProofRequest{ Begin: common.EmptyHash, } @@ -76,14 +82,17 @@ func TestHandleWarpSyncRequestOk(t *testing.T) { func TestHandleWarpSyncRequestError(t *testing.T) { t.Parallel() + // Creates warp sync provider mock to generate proofs with the expected erro expectedError := fmt.Errorf("error generating proof") ctrl := gomock.NewController(t) warpSyncProvider := NewMockWarpSyncProvider(ctrl) warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(nil, expectedError).Times(1) + // Initiate service using the warp sync provider mock srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider) + // Handle request and check resulting error req := messages.WarpProofRequest{ Begin: common.EmptyHash, } From f93588310eb2662a732adad2b574183c96f667ce Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 19 Sep 2024 10:45:23 -0300 Subject: [PATCH 10/21] Fix use right decoder for warp sync --- dot/network/warp_sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go index 0b215b3447..f65dd16095 100644 --- a/dot/network/warp_sync.go +++ b/dot/network/warp_sync.go @@ -32,7 +32,7 @@ func (s *Service) handleWarpSyncStream(stream libp2pnetwork.Stream) { return } - s.readStream(stream, decodeSyncMessage, s.handleWarpSyncMessage, MaxBlockResponseSize) + s.readStream(stream, decodeWarpSyncMessage, s.handleWarpSyncMessage, MaxBlockResponseSize) } func decodeWarpSyncMessage(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) { From 6f468ba0ee2c30a1b9582a1ae820212ade48767d Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 19 Sep 2024 10:45:58 -0300 Subject: [PATCH 11/21] Change error comparission --- dot/network/warp_sync.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go index f65dd16095..18d972dcf6 100644 --- a/dot/network/warp_sync.go +++ b/dot/network/warp_sync.go @@ -4,6 +4,8 @@ package network import ( + "errors" + "github.com/ChainSafe/gossamer/dot/network/messages" "github.com/ChainSafe/gossamer/lib/common" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" @@ -48,7 +50,7 @@ func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg message defer func() { err := stream.Close() - if err != nil && err.Error() != ErrStreamReset.Error() { + if err != nil && errors.Is(err, ErrStreamReset) { logger.Warnf("failed to close stream: %s", err) } }() From 96824f459fc5c8c9b4505210214fff101b3c36e3 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 19 Sep 2024 10:47:17 -0300 Subject: [PATCH 12/21] Add debug message for successful warp sync reply --- dot/network/warp_sync.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go index 18d972dcf6..8e12fc7500 100644 --- a/dot/network/warp_sync.go +++ b/dot/network/warp_sync.go @@ -66,6 +66,8 @@ func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg message logger.Debugf("failed to send WarpSyncResponse message to peer %s: %s", stream.Conn().RemotePeer(), err) return err } + + logger.Debugf("successfully respond with WarpSyncResponse message to peer %s with proof %v", stream.Conn().RemotePeer(), resp) } return nil From af40d8a9e18f2b193619d2cb8f0047fec31ee7e0 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 19 Sep 2024 10:58:24 -0300 Subject: [PATCH 13/21] linting --- dot/network/warp_sync.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go index 8e12fc7500..0c3cded3e9 100644 --- a/dot/network/warp_sync.go +++ b/dot/network/warp_sync.go @@ -67,7 +67,10 @@ func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg message return err } - logger.Debugf("successfully respond with WarpSyncResponse message to peer %s with proof %v", stream.Conn().RemotePeer(), resp) + logger.Debugf("successfully respond with WarpSyncResponse message to peer %s with proof %v", + stream.Conn().RemotePeer(), + resp, + ) } return nil From 5b82a9d68521c5f1cbf46a33355ff6ec14d88118 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 19 Sep 2024 20:58:23 -0300 Subject: [PATCH 14/21] Use genesis hash to register the warp sync handler --- dot/network/service.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dot/network/service.go b/dot/network/service.go index 61c29f55f1..5294e54b29 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -255,8 +255,10 @@ func (s *Service) Start() error { s.ctx, s.cancel = context.WithCancel(context.Background()) } + genesisHashProtocolId := protocol.ID(s.cfg.BlockState.GenesisHash().String()) + s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream) - s.host.registerStreamHandler(s.host.protocolID+WarpSyncID, s.handleWarpSyncStream) + s.host.registerStreamHandler(genesisHashProtocolId+WarpSyncID, s.handleWarpSyncStream) s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream) // register block announce protocol From e15b167cb4281078e71b110c075d5d970b89f6b3 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 23 Sep 2024 10:06:03 -0300 Subject: [PATCH 15/21] Make WarpSyncProvider in config public --- dot/network/config.go | 2 +- dot/network/service.go | 2 +- dot/network/warp_sync_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dot/network/config.go b/dot/network/config.go index 1fd041e018..ca73afb7c6 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -66,7 +66,7 @@ type Config struct { // Service interfaces BlockState BlockState Syncer Syncer - warpSyncProvider WarpSyncProvider + WarpSyncProvider WarpSyncProvider TransactionHandler TransactionHandler // Used to specify the address broadcasted to other peers, and avoids using pubip.Get diff --git a/dot/network/service.go b/dot/network/service.go index 5294e54b29..4df54893ad 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -217,7 +217,7 @@ func NewService(cfg *Config) (*Service, error) { noBootstrap: cfg.NoBootstrap, noMDNS: cfg.NoMDNS, syncer: cfg.Syncer, - warpSyncProvider: cfg.warpSyncProvider, + warpSyncProvider: cfg.WarpSyncProvider, notificationsProtocols: make(map[MessageType]*notificationsProtocol), lightRequest: make(map[peer.ID]struct{}), telemetryInterval: cfg.telemetryInterval, diff --git a/dot/network/warp_sync_test.go b/dot/network/warp_sync_test.go index 5de0295c36..0d665895c7 100644 --- a/dot/network/warp_sync_test.go +++ b/dot/network/warp_sync_test.go @@ -45,7 +45,7 @@ func createServiceWithWarpSyncHelper(t *testing.T, warpSyncProvider WarpSyncProv Port: availablePort(t), NoBootstrap: true, NoMDNS: true, - warpSyncProvider: warpSyncProvider, + WarpSyncProvider: warpSyncProvider, } srvc := createTestService(t, config) From 6386aa8f05ce92a0e66b8f63765a36b79b55b691 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 23 Sep 2024 10:06:54 -0300 Subject: [PATCH 16/21] Rename WarpProofRequest receiver --- dot/network/messages/warp_sync.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dot/network/messages/warp_sync.go b/dot/network/messages/warp_sync.go index 941ae1ea5c..ad4a27811f 100644 --- a/dot/network/messages/warp_sync.go +++ b/dot/network/messages/warp_sync.go @@ -17,10 +17,10 @@ type WarpProofRequest struct { } // Decode decodes the message into a WarpProofRequest -func (wsr *WarpProofRequest) Decode(in []byte) error { +func (wpr *WarpProofRequest) Decode(in []byte) error { reader := bytes.NewReader(in) sd := scale.NewDecoder(reader) - err := sd.Decode(&wsr) + err := sd.Decode(&wpr) if err != nil { return err } @@ -29,10 +29,10 @@ func (wsr *WarpProofRequest) Decode(in []byte) error { } // Encode encodes the warp sync request -func (wsr *WarpProofRequest) Encode() ([]byte, error) { +func (wpr *WarpProofRequest) Encode() ([]byte, error) { buffer := bytes.NewBuffer(nil) encoder := scale.NewEncoder(buffer) - err := encoder.Encode(wsr) + err := encoder.Encode(wpr) if err != nil { return nil, err } @@ -40,12 +40,12 @@ func (wsr *WarpProofRequest) Encode() ([]byte, error) { } // String returns the string representation of a WarpProofRequest -func (wsr *WarpProofRequest) String() string { - if wsr == nil { +func (wpr *WarpProofRequest) String() string { + if wpr == nil { return "WarpProofRequest=nil" } - return fmt.Sprintf("WarpProofRequest begin=%v", wsr.Begin) + return fmt.Sprintf("WarpProofRequest begin=%v", wpr.Begin) } var _ P2PMessage = (*WarpProofRequest)(nil) From ce5cfec12a6380b6c5e999f4ebb5768dab13a54c Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 23 Sep 2024 10:49:51 -0300 Subject: [PATCH 17/21] Add invalid message log --- dot/network/warp_sync.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dot/network/warp_sync.go b/dot/network/warp_sync.go index 0c3cded3e9..ff14800951 100644 --- a/dot/network/warp_sync.go +++ b/dot/network/warp_sync.go @@ -71,6 +71,8 @@ func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg message stream.Conn().RemotePeer(), resp, ) + } else { + logger.Debugf("received invalid message in warp sync handler: %v", msg) } return nil From 0a9b1088e55fa40423cea9c6c0eca7563a0b1ac8 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 23 Sep 2024 11:00:30 -0300 Subject: [PATCH 18/21] Simplify WarpProofRequest encode decode --- dot/network/messages/warp_sync.go | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/dot/network/messages/warp_sync.go b/dot/network/messages/warp_sync.go index ad4a27811f..5a074ea045 100644 --- a/dot/network/messages/warp_sync.go +++ b/dot/network/messages/warp_sync.go @@ -4,7 +4,6 @@ package messages import ( - "bytes" "fmt" "github.com/ChainSafe/gossamer/lib/common" @@ -18,25 +17,12 @@ type WarpProofRequest struct { // Decode decodes the message into a WarpProofRequest func (wpr *WarpProofRequest) Decode(in []byte) error { - reader := bytes.NewReader(in) - sd := scale.NewDecoder(reader) - err := sd.Decode(&wpr) - if err != nil { - return err - } - - return nil + return scale.Unmarshal(in, &wpr) } // Encode encodes the warp sync request func (wpr *WarpProofRequest) Encode() ([]byte, error) { - buffer := bytes.NewBuffer(nil) - encoder := scale.NewEncoder(buffer) - err := encoder.Encode(wpr) - if err != nil { - return nil, err - } - return buffer.Bytes(), nil + return scale.Marshal(wpr) } // String returns the string representation of a WarpProofRequest From e26912cb1ce5c6216c551e3f94f0e6f27da5edc6 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 23 Sep 2024 11:34:58 -0300 Subject: [PATCH 19/21] Fix encoding --- dot/network/messages/warp_sync.go | 4 ++-- dot/network/warp_sync_test.go | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/dot/network/messages/warp_sync.go b/dot/network/messages/warp_sync.go index 5a074ea045..df3d5d106b 100644 --- a/dot/network/messages/warp_sync.go +++ b/dot/network/messages/warp_sync.go @@ -17,11 +17,11 @@ type WarpProofRequest struct { // Decode decodes the message into a WarpProofRequest func (wpr *WarpProofRequest) Decode(in []byte) error { - return scale.Unmarshal(in, &wpr) + return scale.Unmarshal(in, wpr) } // Encode encodes the warp sync request -func (wpr *WarpProofRequest) Encode() ([]byte, error) { +func (wpr WarpProofRequest) Encode() ([]byte, error) { return scale.Marshal(wpr) } diff --git a/dot/network/warp_sync_test.go b/dot/network/warp_sync_test.go index 0d665895c7..ee91f2e9e9 100644 --- a/dot/network/warp_sync_test.go +++ b/dot/network/warp_sync_test.go @@ -26,6 +26,13 @@ func TestDecodeWarpSyncMessage(t *testing.T) { reqEnc, err := testWarpReqMessage.Encode() require.NoError(t, err) + //Expected encoded message compared with substrate impl + require.Equal(t, []byte{ + 0x92, 0x8b, 0x20, 0x36, 0x69, 0x43, 0xe2, 0xaf, 0xd1, 0x1e, 0xbc, + 0xe, 0xae, 0x2e, 0x53, 0xa9, 0x3b, 0xf1, 0x77, 0xa4, 0xfc, 0xf3, 0x5b, + 0xcc, 0x64, 0xd5, 0x3, 0x70, 0x4e, 0x65, 0xe2, 0x2, + }, reqEnc) + // Test decoding testPeer := peer.ID("me") msg, err := decodeWarpSyncMessage(reqEnc, testPeer, true) From c7e88b051e201ed29c7dc92d22677d795f8379ea Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 23 Sep 2024 22:42:16 -0300 Subject: [PATCH 20/21] Add missing mock for new GenesisHash call --- dot/network/service.go | 2 +- dot/rpc/modules/system_integration_test.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dot/network/service.go b/dot/network/service.go index 4df54893ad..864728f6fb 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -258,8 +258,8 @@ func (s *Service) Start() error { genesisHashProtocolId := protocol.ID(s.cfg.BlockState.GenesisHash().String()) s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream) - s.host.registerStreamHandler(genesisHashProtocolId+WarpSyncID, s.handleWarpSyncStream) s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream) + s.host.registerStreamHandler(genesisHashProtocolId+WarpSyncID, s.handleWarpSyncStream) // register block announce protocol err := s.RegisterNotificationsProtocol( diff --git a/dot/rpc/modules/system_integration_test.go b/dot/rpc/modules/system_integration_test.go index 275a741892..1296226ffe 100644 --- a/dot/rpc/modules/system_integration_test.go +++ b/dot/rpc/modules/system_integration_test.go @@ -52,6 +52,9 @@ func newNetworkService(t *testing.T) *network.Service { blockStateMock.EXPECT(). GetHighestFinalisedHeader(). Return(types.NewEmptyHeader(), nil).AnyTimes() + blockStateMock.EXPECT(). + GenesisHash(). + Return(common.MustBlake2bHash([]byte("genesis"))).AnyTimes() syncerMock := NewMockSyncer(ctrl) From f9e9e01121c58adb5d4d9a4ee2dd4e10defd519a Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 24 Sep 2024 10:30:25 -0300 Subject: [PATCH 21/21] Use pointer receiver --- dot/network/messages/warp_sync.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dot/network/messages/warp_sync.go b/dot/network/messages/warp_sync.go index df3d5d106b..2e2897b6a1 100644 --- a/dot/network/messages/warp_sync.go +++ b/dot/network/messages/warp_sync.go @@ -21,8 +21,11 @@ func (wpr *WarpProofRequest) Decode(in []byte) error { } // Encode encodes the warp sync request -func (wpr WarpProofRequest) Encode() ([]byte, error) { - return scale.Marshal(wpr) +func (wpr *WarpProofRequest) Encode() ([]byte, error) { + if wpr == nil { + return nil, fmt.Errorf("cannot encode nil WarpProofRequest") + } + return scale.Marshal(*wpr) } // String returns the string representation of a WarpProofRequest