From 9f72be98e7fc0caa53f5cfb510cdc3fd3d2b1d79 Mon Sep 17 00:00:00 2001 From: Francesco Cheinasso Date: Wed, 11 Oct 2023 08:16:27 +0000 Subject: [PATCH] Liqonet: connchecker refactoring --- .gitignore | 3 ++ .../tunnel-operator/tunnel-operator.go | 11 ++++--- pkg/liqonet/conncheck/conncheck.go | 32 ++++++++++++------- pkg/liqonet/conncheck/receiver.go | 23 ++++++++----- pkg/liqonet/conncheck/sender.go | 13 +++++--- pkg/liqonet/tunnel/driver.go | 4 ++- pkg/liqonet/tunnel/wireguard/driver.go | 5 +-- 7 files changed, 58 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index 581c701f2c..d8e7036f12 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,6 @@ docs/_build # kubebuilder generated files /config + +# development files +/tmp diff --git a/internal/liqonet/tunnel-operator/tunnel-operator.go b/internal/liqonet/tunnel-operator/tunnel-operator.go index c8347a7724..9c7e34e208 100644 --- a/internal/liqonet/tunnel-operator/tunnel-operator.go +++ b/internal/liqonet/tunnel-operator/tunnel-operator.go @@ -146,8 +146,8 @@ func NewTunnelController(ctx context.Context, wg *sync.WaitGroup, return fmt.Errorf("failed to create connchecker: %w", err) } - go wg.Connchecker.RunReceiver() - go wg.Connchecker.RunReceiverDisconnectObserver() + go wg.Connchecker.RunReceiver(ctx) + go wg.Connchecker.RunReceiverDisconnectObserver(ctx) return nil } @@ -180,7 +180,7 @@ func (tc *TunnelController) Reconcile(ctx context.Context, req ctrl.Request) (ct if err = tc.EnsureIPTablesRulesPerCluster(tep); err != nil { return err } - con, err = tc.connectToPeer(tep, tc.forgeConncheckUpdateStatus(ctx, req)) + con, err = tc.connectToPeer(ctx, tep, tc.forgeConncheckUpdateStatus(ctx, req)) if err != nil { return err } @@ -295,14 +295,15 @@ func EnforceIP(link netlink.Link, ip string) error { return nil } -func (tc *TunnelController) connectToPeer(ep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) { +func (tc *TunnelController) connectToPeer(ctx context.Context, ep *netv1alpha1.TunnelEndpoint, + updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) { // retrieve driver based on backend type driver, ok := tc.drivers[ep.Spec.BackendType] if !ok { klog.Errorf("%s -> no registered driver of type %s found for resources %s", ep.Spec.ClusterIdentity, ep.Spec.BackendType, ep.Name) return nil, fmt.Errorf("no registered driver of type %s found", ep.Spec.BackendType) } - con, err := driver.ConnectToEndpoint(ep, updateStatus) + con, err := driver.ConnectToEndpoint(ctx, ep, updateStatus) if err != nil { tc.Eventf(ep, "Warning", "Processing", "unable to establish connection: %v", err) klog.Errorf("%s -> an error occurred while establishing vpn connection: %v", ep.Spec.ClusterIdentity, err) diff --git a/pkg/liqonet/conncheck/conncheck.go b/pkg/liqonet/conncheck/conncheck.go index 24cc017499..543392c76e 100644 --- a/pkg/liqonet/conncheck/conncheck.go +++ b/pkg/liqonet/conncheck/conncheck.go @@ -54,17 +54,18 @@ func NewConnChecker() (*ConnChecker, error) { } // RunReceiver runs the receiver. -func (c *ConnChecker) RunReceiver() { - c.receiver.Run() +func (c *ConnChecker) RunReceiver(ctx context.Context) { + c.receiver.Run(ctx) } // RunReceiverDisconnectObserver runs the receiver disconnect observer. -func (c *ConnChecker) RunReceiverDisconnectObserver() { - c.receiver.RunDisconnectObserver() +func (c *ConnChecker) RunReceiverDisconnectObserver(ctx context.Context) { + c.receiver.RunDisconnectObserver(ctx) } // AddAndRunSender create a new sender and runs it. -func (c *ConnChecker) AddAndRunSender(clusterID, ip string, updateCallback UpdateFunc) { +func (c *ConnChecker) AddAndRunSender(ctx context.Context, clusterID, ip string, updateCallback UpdateFunc) { + var err error c.sm.Lock() if _, ok := c.senders[clusterID]; ok { c.sm.Unlock() @@ -72,18 +73,24 @@ func (c *ConnChecker) AddAndRunSender(clusterID, ip string, updateCallback Updat return } - ctxSender, cancelSender := context.WithCancel(context.Background()) - c.senders[clusterID] = NewSender(ctxSender, clusterID, cancelSender, c.conn, ip) + ctxSender, cancelSender := context.WithCancel(ctx) + c.senders[clusterID], err = NewSender(clusterID, cancelSender, c.conn, ip) - err := c.receiver.InitPeer(clusterID, updateCallback) + if err != nil { + c.sm.Unlock() + klog.Errorf("failed to create sender: %w", err) + return + } + + err = c.receiver.InitPeer(clusterID, updateCallback) if err != nil { c.sm.Unlock() klog.Errorf("failed to add redirect chan: %w", err) } - klog.Infof("conncheck sender %s starting", clusterID) + klog.Infof("conncheck sender %q starting against %q", clusterID, ip) pingCallback := func(ctx context.Context) (done bool, err error) { - err = c.senders[clusterID].SendPing(ctx) + err = c.senders[clusterID].SendPing() if err != nil { klog.Warningf("failed to send ping: %s", err) } @@ -91,8 +98,9 @@ func (c *ConnChecker) AddAndRunSender(clusterID, ip string, updateCallback Updat } c.sm.Unlock() - // Ignore errors because only caused by context cancellation. - _ = wait.PollImmediateInfiniteWithContext(ctxSender, PingInterval, pingCallback) + if err := wait.PollUntilContextCancel(ctxSender, PingInterval, true, pingCallback); err != nil { + klog.Errorf("conncheck sender %s stopped for an error: %s", clusterID, err) + } klog.Infof("conncheck sender %s stopped", clusterID) } diff --git a/pkg/liqonet/conncheck/receiver.go b/pkg/liqonet/conncheck/receiver.go index f955154ffa..72a877b432 100644 --- a/pkg/liqonet/conncheck/receiver.go +++ b/pkg/liqonet/conncheck/receiver.go @@ -104,19 +104,19 @@ func (r *Receiver) InitPeer(clusterID string, updateCallback UpdateFunc) error { } // Run starts the receiver. -func (r *Receiver) Run() { - klog.V(8).Infof("conncheck receiver: starting") - for { +func (r *Receiver) Run(ctx context.Context) { + klog.Infof("conncheck receiver: started") + err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) { n, raddr, err := r.conn.ReadFromUDP(r.buff) if err != nil { klog.Errorf("conncheck receiver: failed to read from %s: %w", raddr.String(), err) - continue + return false, nil } msgr := &Msg{} err = json.Unmarshal(r.buff[:n], msgr) if err != nil { klog.Errorf("conncheck receiver: failed to unmarshal msg: %w", err) - continue + return false, nil } klog.V(9).Infof("conncheck receiver: received a msg -> %s", msgr) switch msgr.MsgType { @@ -130,14 +130,18 @@ func (r *Receiver) Run() { if err != nil { klog.Errorf("conncheck receiver: %v", err) } + return false, nil + }) + if err != nil { + klog.Errorf("conncheck receiver: %v", err) } } // RunDisconnectObserver starts the disconnect observer. -func (r *Receiver) RunDisconnectObserver() { - klog.V(9).Infof("conncheck receiver disconnect checker: starting") +func (r *Receiver) RunDisconnectObserver(ctx context.Context) { + klog.Infof("conncheck receiver disconnect checker: started") // Ignore errors because only caused by context cancellation. - _ = wait.PollImmediateInfiniteWithContext(context.Background(), time.Duration(PingLossThreshold)*PingInterval/10, + err := wait.PollUntilContextCancel(ctx, time.Duration(PingLossThreshold)*PingInterval/10, true, func(ctx context.Context) (done bool, err error) { r.m.Lock() defer r.m.Unlock() @@ -155,4 +159,7 @@ func (r *Receiver) RunDisconnectObserver() { } return false, nil }) + if err != nil { + klog.Errorf("conncheck disconnect observer: %v", err) + } } diff --git a/pkg/liqonet/conncheck/sender.go b/pkg/liqonet/conncheck/sender.go index dd82198b32..eedbf22e43 100644 --- a/pkg/liqonet/conncheck/sender.go +++ b/pkg/liqonet/conncheck/sender.go @@ -15,7 +15,6 @@ package conncheck import ( - "context" "encoding/json" "fmt" "net" @@ -33,17 +32,21 @@ type Sender struct { } // NewSender creates a new conncheck sender. -func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.UDPConn, ip string) *Sender { +func NewSender(clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) { + pip := net.ParseIP(ip) + if pip == nil { + return nil, fmt.Errorf("conncheck sender: invalid IP address %s", ip) + } return &Sender{ clusterID: clusterID, cancel: cancel, conn: conn, - raddr: net.UDPAddr{IP: net.ParseIP(ip), Port: port}, - } + raddr: net.UDPAddr{IP: pip, Port: port}, + }, nil } // SendPing sends a PING message to the given address. -func (s *Sender) SendPing(ctx context.Context) error { +func (s *Sender) SendPing() error { msgOut := Msg{ClusterID: s.clusterID, MsgType: PING, TimeStamp: time.Now()} b, err := json.Marshal(msgOut) if err != nil { diff --git a/pkg/liqonet/tunnel/driver.go b/pkg/liqonet/tunnel/driver.go index 3bce59c83d..9a9bc8b4e5 100644 --- a/pkg/liqonet/tunnel/driver.go +++ b/pkg/liqonet/tunnel/driver.go @@ -15,6 +15,8 @@ package tunnel import ( + "context" + "github.com/prometheus/client_golang/prometheus" "github.com/vishvananda/netlink" k8s "k8s.io/client-go/kubernetes" @@ -49,7 +51,7 @@ type Config struct { type Driver interface { Init() error - ConnectToEndpoint(tep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) + ConnectToEndpoint(ctx context.Context, tep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) DisconnectFromEndpoint(tep *netv1alpha1.TunnelEndpoint) error diff --git a/pkg/liqonet/tunnel/wireguard/driver.go b/pkg/liqonet/tunnel/wireguard/driver.go index 16e872e9de..d582830ea7 100644 --- a/pkg/liqonet/tunnel/wireguard/driver.go +++ b/pkg/liqonet/tunnel/wireguard/driver.go @@ -185,7 +185,8 @@ func (w *Wireguard) Init() error { // ConnectToEndpoint connects to a remote cluster described by the given tep. // updateStatusCallback is a function used by conncheck to update TunnelEndpoint connected status. -func (w *Wireguard) ConnectToEndpoint(tep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) { +func (w *Wireguard) ConnectToEndpoint(ctx context.Context, tep *netv1alpha1.TunnelEndpoint, + updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) { // parse allowed IPs. allowedIPs, stringAllowedIPs, err := getAllowedIPs(tep) if err != nil { @@ -281,7 +282,7 @@ func (w *Wireguard) ConnectToEndpoint(tep *netv1alpha1.TunnelEndpoint, updateSta klog.Infof("%s -> starting conncheck sender", tep.Spec.ClusterIdentity) - go w.Connchecker.AddAndRunSender(tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus) + go w.Connchecker.AddAndRunSender(ctx, tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus) klog.V(4).Infof("Done connecting cluster peer %s@%s", tep.Spec.ClusterIdentity, endpoint.String()) return c, nil