From 56fa0e7d96f36f72b713fd1174c171b42032266f Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 29 Sep 2024 16:09:50 +0200 Subject: [PATCH 1/4] Fix bug in COPY command by differentiating between total reads vs. chunk reads --- network/client.go | 10 +++++----- network/proxy.go | 30 +++++++++++++++++------------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/network/client.go b/network/client.go index 267e1df7..d9738b8d 100644 --- a/network/client.go +++ b/network/client.go @@ -231,7 +231,7 @@ func (c *Client) Receive() (int, []byte, *gerr.GatewayDError) { ctx = context.Background() } - var received int + total := 0 buffer := bytes.NewBuffer(nil) // Read the data in chunks. for ctx.Err() == nil { @@ -240,19 +240,19 @@ func (c *Client) Receive() (int, []byte, *gerr.GatewayDError) { if err != nil { c.logger.Error().Err(err).Msg("Couldn't receive data from the server") span.RecordError(err) - return received, buffer.Bytes(), gerr.ErrClientReceiveFailed.Wrap(err) + return total, buffer.Bytes(), gerr.ErrClientReceiveFailed.Wrap(err) } - received += read + total += read buffer.Write(chunk[:read]) - if read == 0 || read < c.ReceiveChunkSize { + if total == 0 || read < c.ReceiveChunkSize { break } } span.AddEvent("Received data from server") - return received, buffer.Bytes(), nil + return total, buffer.Bytes(), nil } // Reconnect reconnects to the server. diff --git a/network/proxy.go b/network/proxy.go index 7d3e5aac..3ba19f1c 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -494,8 +494,8 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate received, response, err := pr.receiveTrafficFromServer(client) span.AddEvent("Received traffic from server") - // If the response is empty, don't send anything, instead just close the ingress connection. - if received == 0 || err != nil { + // If there is an error, close the ingress connection. + if err != nil { fields := map[string]interface{}{"function": "proxy.passthrough"} if client.LocalAddr() != "" { fields["localAddr"] = client.LocalAddr() @@ -517,7 +517,7 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate // Get the last request from the stack. lastRequest := stack.PopLastRequest() - request := make([]byte, 0) + request := []byte{} if lastRequest != nil { request = lastRequest.Data } @@ -553,9 +553,14 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate span.AddEvent("Plugin(s) modified the response") } - // Send the response to the client. - errVerdict := pr.sendTrafficToClient(conn.Conn(), response, received) - span.AddEvent("Sent traffic to client") + var errVerdict *gerr.GatewayDError + if received > 0 { + // Send the response to the client. + errVerdict = pr.sendTrafficToClient(conn.Conn(), response, received) + span.AddEvent("Sent traffic to client") + } else { + span.AddEvent("No data to send to client") + } // Run the OnTrafficToClient hooks. pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), pr.PluginTimeout) @@ -698,7 +703,7 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayD defer span.End() // request contains the data from the client. - received := 0 + total := 0 buffer := bytes.NewBuffer(nil) for { chunk := make([]byte, pr.ClientConfig.ReceiveChunkSize) @@ -713,10 +718,10 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayD return chunk[:read], gerr.ErrReadFailed.Wrap(err) } - received += read + total += read buffer.Write(chunk[:read]) - if received == 0 || received < pr.ClientConfig.ReceiveChunkSize { + if total == 0 || read < pr.ClientConfig.ReceiveChunkSize { break } @@ -725,10 +730,9 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayD } } - length := len(buffer.Bytes()) pr.Logger.Debug().Fields( map[string]interface{}{ - "length": length, + "length": total, "local": LocalAddr(conn), "remote": RemoteAddr(conn), }, @@ -736,8 +740,8 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayD span.AddEvent("Received data from client") - metrics.BytesReceivedFromClient.Observe(float64(length)) - metrics.TotalTrafficBytes.Observe(float64(length)) + metrics.BytesReceivedFromClient.Observe(float64(total)) + metrics.TotalTrafficBytes.Observe(float64(total)) return buffer.Bytes(), nil } From aee7b0612f3fa9343f8c45eee76554185759f4d1 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 29 Sep 2024 16:58:43 +0200 Subject: [PATCH 2/4] Fix bug in concurrent reads and writes --- network/server.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/network/server.go b/network/server.go index a1b50deb..210931a9 100644 --- a/network/server.go +++ b/network/server.go @@ -79,7 +79,7 @@ type Server struct { LoadbalancerStrategyName string LoadbalancerRules []config.LoadBalancingRule LoadbalancerConsistentHash *config.ConsistentHash - connectionToProxyMap map[*ConnWrapper]IProxy + connectionToProxyMap *sync.Map } var _ IServer = (*Server)(nil) @@ -181,7 +181,7 @@ func (s *Server) OnOpen(conn *ConnWrapper) ([]byte, Action) { } // Assign connection to proxy - s.connectionToProxyMap[conn] = proxy + s.connectionToProxyMap.Store(conn, proxy) // Run the OnOpened hooks. pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), s.PluginTimeout) @@ -696,7 +696,7 @@ func NewServer( connections: 0, running: &atomic.Bool{}, stopServer: make(chan struct{}), - connectionToProxyMap: make(map[*ConnWrapper]IProxy), + connectionToProxyMap: &sync.Map{}, LoadbalancerStrategyName: srv.LoadbalancerStrategyName, LoadbalancerRules: srv.LoadbalancerRules, LoadbalancerConsistentHash: srv.LoadbalancerConsistentHash, @@ -737,11 +737,19 @@ func (s *Server) CountConnections() int { // GetProxyForConnection returns the proxy associated with the given connection. func (s *Server) GetProxyForConnection(conn *ConnWrapper) (IProxy, bool) { - proxy, exists := s.connectionToProxyMap[conn] - return proxy, exists + proxy, exists := s.connectionToProxyMap.Load(conn) + if !exists { + return nil, false + } + + if proxy, ok := proxy.(IProxy); ok { + return proxy, true + } + + return nil, false } // RemoveConnectionFromMap removes the given connection from the connection-to-proxy map. func (s *Server) RemoveConnectionFromMap(conn *ConnWrapper) { - delete(s.connectionToProxyMap, conn) + s.connectionToProxyMap.Delete(conn) } From 4c19dbca6b33d2eba420fff06c6780c5e601249b Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 29 Sep 2024 17:21:53 +0200 Subject: [PATCH 3/4] Remove explicit condition for checking total being zero --- network/client.go | 2 +- network/proxy.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/network/client.go b/network/client.go index d9738b8d..3d859599 100644 --- a/network/client.go +++ b/network/client.go @@ -245,7 +245,7 @@ func (c *Client) Receive() (int, []byte, *gerr.GatewayDError) { total += read buffer.Write(chunk[:read]) - if total == 0 || read < c.ReceiveChunkSize { + if read < c.ReceiveChunkSize { break } } diff --git a/network/proxy.go b/network/proxy.go index 3ba19f1c..21bd5f67 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -721,7 +721,7 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayD total += read buffer.Write(chunk[:read]) - if total == 0 || read < pr.ClientConfig.ReceiveChunkSize { + if read < pr.ClientConfig.ReceiveChunkSize { break } From 15c3ad0cee46d91410b81e169c611c54f03eaf20 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 29 Sep 2024 20:12:32 +0200 Subject: [PATCH 4/4] Return early if there is no data from the server (to send to the client) --- network/proxy.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/network/proxy.go b/network/proxy.go index 21bd5f67..b0a0ddaf 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -494,6 +494,15 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate received, response, err := pr.receiveTrafficFromServer(client) span.AddEvent("Received traffic from server") + // If there is no data to send to the client, + // we don't need to run the hooks and + // we obviously have no data to send to the client. + if received == 0 { + span.AddEvent("No data to send to client") + stack.PopLastRequest() + return nil + } + // If there is an error, close the ingress connection. if err != nil { fields := map[string]interface{}{"function": "proxy.passthrough"} @@ -553,14 +562,9 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate span.AddEvent("Plugin(s) modified the response") } - var errVerdict *gerr.GatewayDError - if received > 0 { - // Send the response to the client. - errVerdict = pr.sendTrafficToClient(conn.Conn(), response, received) - span.AddEvent("Sent traffic to client") - } else { - span.AddEvent("No data to send to client") - } + // Send the response to the client. + errVerdict := pr.sendTrafficToClient(conn.Conn(), response, received) + span.AddEvent("Sent traffic to client") // Run the OnTrafficToClient hooks. pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), pr.PluginTimeout)