From 9aa06e5539c43da71b4a2fcd79d72c41c9dc5af2 Mon Sep 17 00:00:00 2001 From: "wupeixin@zhejianglab.com" Date: Fri, 2 Aug 2024 15:59:32 +0800 Subject: [PATCH] close p2p stream when error occur or this stream will be not used --- pkg/loadbalancer/loadbalancer.go | 3 ++- pkg/proxy/proxysocket.go | 1 + pkg/tunnel/tunnel.go | 16 +++++++++++++++- pkg/util/net/conn.go | 1 + pkg/util/tunutils/tun.go | 2 ++ 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/loadbalancer/loadbalancer.go b/pkg/loadbalancer/loadbalancer.go index a1d78bf7f..936c2a7c3 100644 --- a/pkg/loadbalancer/loadbalancer.go +++ b/pkg/loadbalancer/loadbalancer.go @@ -13,7 +13,7 @@ import ( istioapi "istio.io/client-go/pkg/apis/networking/v1alpha3" istio "istio.io/client-go/pkg/clientset/versioned" istioinformers "istio.io/client-go/pkg/informers/externalversions" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -927,6 +927,7 @@ func (lb *LoadBalancer) TryConnectEndpoints(service proxy.ServicePortName, srcAd if err == nil { _, err = outConn.Write(reqBytes) if err != nil { + outConn.Close() return nil, err } } diff --git a/pkg/proxy/proxysocket.go b/pkg/proxy/proxysocket.go index 3477689f6..8943e73c3 100644 --- a/pkg/proxy/proxysocket.go +++ b/pkg/proxy/proxysocket.go @@ -184,6 +184,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *userspace.ClientCache, return nil, err } if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil { + svrConn.Close() klog.ErrorS(err, "SetDeadline failed") return nil, err } diff --git a/pkg/tunnel/tunnel.go b/pkg/tunnel/tunnel.go index fcabf7005..eec6b8281 100644 --- a/pkg/tunnel/tunnel.go +++ b/pkg/tunnel/tunnel.go @@ -37,7 +37,7 @@ import ( discoverypb "github.com/kubeedge/edgemesh/pkg/tunnel/pb/discovery" proxypb "github.com/kubeedge/edgemesh/pkg/tunnel/pb/proxy" netutil "github.com/kubeedge/edgemesh/pkg/util/net" - "github.com/kubeedge/edgemesh/pkg/util/tunutils" + cni "github.com/kubeedge/edgemesh/pkg/util/tunutils" ) const ( @@ -216,10 +216,12 @@ func (t *EdgeTunnel) discoveryStreamHandler(stream network.Stream) { msg := new(discoverypb.Discovery) err := streamReader.ReadMsg(msg) if err != nil { + stream.Reset() klog.Errorf("Read msg from %s err: %v", remotePeer, err) return } if msg.GetType() != discoverypb.Discovery_CONNECT { + stream.Reset() klog.Errorf("Stream between %s, Type should be CONNECT", remotePeer) return } @@ -231,6 +233,7 @@ func (t *EdgeTunnel) discoveryStreamHandler(stream network.Stream) { msg.NodeName = &t.Config.NodeName err = streamWriter.WriteMsg(msg) if err != nil { + stream.Reset() klog.Errorf("[%s] Write msg to %s err: %v", protocol, remotePeer, err) return } @@ -238,6 +241,7 @@ func (t *EdgeTunnel) discoveryStreamHandler(stream network.Stream) { // (re)mapping nodeName and peerID klog.Infof("[%s] Discovery from %s : %s", protocol, nodeName, remotePeer) t.nodePeerMap[nodeName] = remotePeer.ID + stream.Reset() } type ProxyOptions struct { @@ -339,10 +343,12 @@ func (t *EdgeTunnel) proxyStreamHandler(stream network.Stream) { msg := new(proxypb.Proxy) err := streamReader.ReadMsg(msg) if err != nil { + stream.Reset() klog.Errorf("Read msg from %s err: %v", remotePeer, err) return } if msg.GetType() != proxypb.Proxy_CONNECT { + stream.Reset() klog.Errorf("Read msg from %s type should be CONNECT", remotePeer) return } @@ -358,9 +364,11 @@ func (t *EdgeTunnel) proxyStreamHandler(stream network.Stream) { msg.Reset() msg.Type = proxypb.Proxy_FAILED.Enum() if err = streamWriter.WriteMsg(msg); err != nil { + stream.Reset() klog.Errorf("Write msg to %s err: %v", remotePeer, err) return } + stream.Reset() return } @@ -368,6 +376,7 @@ func (t *EdgeTunnel) proxyStreamHandler(stream network.Stream) { msg.Type = proxypb.Proxy_SUCCESS.Enum() err = streamWriter.WriteMsg(msg) if err != nil { + stream.Reset() klog.Errorf("Write msg to %s err: %v", remotePeer, err) return } @@ -820,10 +829,12 @@ func (t *EdgeTunnel) CNIAdapterStreamHandler(stream network.Stream) { msg := new(proxypb.Proxy) err := streamReader.ReadMsg(msg) if err != nil { + stream.Reset() klog.Errorf("Read msg from %s err: %v", remotePeer, err) return } if msg.GetType() != proxypb.Proxy_CONNECT { + stream.Reset() klog.Errorf("Read msg from %s type should be CONNECT", remotePeer) return } @@ -839,9 +850,11 @@ func (t *EdgeTunnel) CNIAdapterStreamHandler(stream network.Stream) { msg.Reset() msg.Type = proxypb.Proxy_FAILED.Enum() if err = streamWriter.WriteMsg(msg); err != nil { + stream.Reset() klog.Errorf("Write msg to %s err: %v", remotePeer, err) return } + stream.Reset() return } @@ -849,6 +862,7 @@ func (t *EdgeTunnel) CNIAdapterStreamHandler(stream network.Stream) { msg.Type = proxypb.Proxy_SUCCESS.Enum() err = streamWriter.WriteMsg(msg) if err != nil { + stream.Reset() klog.Errorf("Write msg to %s err: %v", remotePeer, err) return } diff --git a/pkg/util/net/conn.go b/pkg/util/net/conn.go index e11714e9d..63f2dce84 100644 --- a/pkg/util/net/conn.go +++ b/pkg/util/net/conn.go @@ -55,6 +55,7 @@ func copyBytes(direction string, dest, src net.Conn, wg *sync.WaitGroup) { } func ProxyConnUDP(inConn net.Conn, udpConn *net.UDPConn) { + defer inConn.Close() var buffer [4096]byte for { n, err := inConn.Read(buffer[0:]) diff --git a/pkg/util/tunutils/tun.go b/pkg/util/tunutils/tun.go index 11257eeed..f8b3fbe3e 100644 --- a/pkg/util/tunutils/tun.go +++ b/pkg/util/tunutils/tun.go @@ -323,6 +323,7 @@ func Dial() (*TunConn, error) { return nil, nil } func DialTun(stream net.Conn, name string) { p2p2Tun, err := NewTunConn(name) if err != nil { + stream.Close() klog.Errorf("p2p handler create TunConn failed", err) return } @@ -331,6 +332,7 @@ func DialTun(stream net.Conn, name string) { buffer := NewRecycleByteBuffer(PacketSize) // TODO: separate below as P2P handler and add SetWriteDeadline go func() { + defer stream.Close() for { n, err := stream.Read(packet) if err != nil {