Skip to content

Commit

Permalink
optimize code (#462)
Browse files Browse the repository at this point in the history
  • Loading branch information
godeamon committed May 10, 2024
1 parent b2d87a2 commit 56f3d1a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
14 changes: 8 additions & 6 deletions bcs/network/p2pv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package p2pv2

import (
"fmt"
"testing"

xctx "github.com/xuperchain/xupercore/kernel/common/xcontext"
"github.com/xuperchain/xupercore/kernel/mock"
nctx "github.com/xuperchain/xupercore/kernel/network/context"
"github.com/xuperchain/xupercore/kernel/network/p2p"
pb "github.com/xuperchain/xupercore/protos"
"testing"
)

func Handler(ctx xctx.XContext, msg *pb.XuperMessage) (*pb.XuperMessage, error) {
Expand Down Expand Up @@ -52,10 +53,8 @@ func startNode1(t *testing.T) {
}

go func(t *testing.T) {
select {
case msg := <-ch:
t.Logf("recv msg: log_id=%v, msgType=%s\n", msg.GetHeader().GetLogid(), msg.GetHeader().GetType())
}
msg := <-ch
t.Logf("recv msg: log_id=%v, msgType=%s\n", msg.GetHeader().GetLogid(), msg.GetHeader().GetType())
}(t)
}

Expand Down Expand Up @@ -106,7 +105,10 @@ func startNode3(t *testing.T) {
}

func TestP2PServerV2(t *testing.T) {
mock.InitLogForTest()
err := mock.InitLogForTest()
if err != nil {
t.Errorf("init log error: %v", err)
}

go startNode1(t)
startNode2(t)
Expand Down
11 changes: 5 additions & 6 deletions bcs/network/p2pv2/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ type Stream struct {
rc ggio.ReadCloser

valid bool

grpcPort string
}

// NewStream create Stream instance
Expand Down Expand Up @@ -100,7 +98,7 @@ func (s *Stream) reset() {
func (s *Stream) resetLockFree() {
if s.Valid() {
if s.stream != nil {
s.stream.Reset()
_ = s.stream.Reset()
}
s.stream = nil
s.valid = false
Expand All @@ -122,7 +120,7 @@ func (s *Stream) Send(msg *pb.XuperMessage) error {
}

deadline := time.Now().Add(time.Duration(s.config.Timeout) * time.Second)
s.stream.SetWriteDeadline(deadline)
_ = s.stream.SetWriteDeadline(deadline)
msg.Header.From = s.srv.PeerID()
if err := s.wc.WriteMsg(msg); err != nil {
s.resetLockFree()
Expand Down Expand Up @@ -152,7 +150,6 @@ func (s *Stream) Recv() {
s.reset()
return
}
msg = nil
}
}

Expand Down Expand Up @@ -184,7 +181,9 @@ func (s *Stream) SendMessageWithResponse(ctx xctx.XContext,
s.log.Error("sendMessageWithResponse register error", "error", err)
return nil, err
}
defer s.srv.dispatcher.UnRegister(sub)
defer func() {
_ = s.srv.dispatcher.UnRegister(sub)
}()

errCh := make(chan error, 1)
respCh := make(chan *pb.XuperMessage, 1)
Expand Down
15 changes: 12 additions & 3 deletions bcs/network/p2pv2/stream_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ func (sp *StreamPool) Get(ctx xctx.XContext, peerId peer.ID) (*Stream, error) {
if stream.Valid() {
return stream, nil
} else {
sp.DelStream(stream)
err := sp.DelStream(stream)
if err != nil {
return nil, err
}
ctx.GetLog().Warn("stream not valid, create new stream", "peerId", peerId)
}
}
Expand All @@ -68,7 +71,10 @@ func (sp *StreamPool) Get(ctx xctx.XContext, peerId peer.ID) (*Stream, error) {
if stream.Valid() {
return stream, nil
} else {
sp.DelStream(stream)
err := sp.DelStream(stream)
if err != nil {
return nil, err
}
ctx.GetLog().Warn("stream not valid, create new stream", "peerId", peerId)
}
}
Expand Down Expand Up @@ -117,7 +123,10 @@ func (sp *StreamPool) AddStream(ctx xctx.XContext, stream *Stream) error {
if v, ok := sp.streams.Get(stream.id.Pretty()); ok {
ctx.GetLog().Warn("replace stream", "peerID", peerID, "multiAddr", multiAddr)
if s, ok := v.(*Stream); ok {
sp.DelStream(s)
err := sp.DelStream(s)
if err != nil {
return err
}
}
}

Expand Down

0 comments on commit 56f3d1a

Please sign in to comment.