Skip to content

Commit

Permalink
Liqonet: connchecker refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 authored and adamjensenbot committed Oct 12, 2023
1 parent ed54004 commit 9f72be9
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 33 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ docs/_build

# kubebuilder generated files
/config

# development files
/tmp
11 changes: 6 additions & 5 deletions internal/liqonet/tunnel-operator/tunnel-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func NewTunnelController(ctx context.Context, wg *sync.WaitGroup,
return fmt.Errorf("failed to create connchecker: %w", err)
}

go wg.Connchecker.RunReceiver()
go wg.Connchecker.RunReceiverDisconnectObserver()
go wg.Connchecker.RunReceiver(ctx)
go wg.Connchecker.RunReceiverDisconnectObserver(ctx)

return nil
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (tc *TunnelController) Reconcile(ctx context.Context, req ctrl.Request) (ct
if err = tc.EnsureIPTablesRulesPerCluster(tep); err != nil {
return err
}
con, err = tc.connectToPeer(tep, tc.forgeConncheckUpdateStatus(ctx, req))
con, err = tc.connectToPeer(ctx, tep, tc.forgeConncheckUpdateStatus(ctx, req))
if err != nil {
return err
}
Expand Down Expand Up @@ -295,14 +295,15 @@ func EnforceIP(link netlink.Link, ip string) error {
return nil
}

func (tc *TunnelController) connectToPeer(ep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) {
func (tc *TunnelController) connectToPeer(ctx context.Context, ep *netv1alpha1.TunnelEndpoint,
updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) {
// retrieve driver based on backend type
driver, ok := tc.drivers[ep.Spec.BackendType]
if !ok {
klog.Errorf("%s -> no registered driver of type %s found for resources %s", ep.Spec.ClusterIdentity, ep.Spec.BackendType, ep.Name)
return nil, fmt.Errorf("no registered driver of type %s found", ep.Spec.BackendType)
}
con, err := driver.ConnectToEndpoint(ep, updateStatus)
con, err := driver.ConnectToEndpoint(ctx, ep, updateStatus)
if err != nil {
tc.Eventf(ep, "Warning", "Processing", "unable to establish connection: %v", err)
klog.Errorf("%s -> an error occurred while establishing vpn connection: %v", ep.Spec.ClusterIdentity, err)
Expand Down
32 changes: 20 additions & 12 deletions pkg/liqonet/conncheck/conncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,45 +54,53 @@ func NewConnChecker() (*ConnChecker, error) {
}

// RunReceiver runs the receiver.
func (c *ConnChecker) RunReceiver() {
c.receiver.Run()
func (c *ConnChecker) RunReceiver(ctx context.Context) {
c.receiver.Run(ctx)
}

// RunReceiverDisconnectObserver runs the receiver disconnect observer.
func (c *ConnChecker) RunReceiverDisconnectObserver() {
c.receiver.RunDisconnectObserver()
func (c *ConnChecker) RunReceiverDisconnectObserver(ctx context.Context) {
c.receiver.RunDisconnectObserver(ctx)
}

// AddAndRunSender create a new sender and runs it.
func (c *ConnChecker) AddAndRunSender(clusterID, ip string, updateCallback UpdateFunc) {
func (c *ConnChecker) AddAndRunSender(ctx context.Context, clusterID, ip string, updateCallback UpdateFunc) {
var err error
c.sm.Lock()
if _, ok := c.senders[clusterID]; ok {
c.sm.Unlock()
klog.Infof("sender %s already exists", clusterID)
return
}

ctxSender, cancelSender := context.WithCancel(context.Background())
c.senders[clusterID] = NewSender(ctxSender, clusterID, cancelSender, c.conn, ip)
ctxSender, cancelSender := context.WithCancel(ctx)
c.senders[clusterID], err = NewSender(clusterID, cancelSender, c.conn, ip)

err := c.receiver.InitPeer(clusterID, updateCallback)
if err != nil {
c.sm.Unlock()
klog.Errorf("failed to create sender: %w", err)
return
}

err = c.receiver.InitPeer(clusterID, updateCallback)
if err != nil {
c.sm.Unlock()
klog.Errorf("failed to add redirect chan: %w", err)
}

klog.Infof("conncheck sender %s starting", clusterID)
klog.Infof("conncheck sender %q starting against %q", clusterID, ip)
pingCallback := func(ctx context.Context) (done bool, err error) {
err = c.senders[clusterID].SendPing(ctx)
err = c.senders[clusterID].SendPing()
if err != nil {
klog.Warningf("failed to send ping: %s", err)
}
return false, nil
}
c.sm.Unlock()

// Ignore errors because only caused by context cancellation.
_ = wait.PollImmediateInfiniteWithContext(ctxSender, PingInterval, pingCallback)
if err := wait.PollUntilContextCancel(ctxSender, PingInterval, true, pingCallback); err != nil {
klog.Errorf("conncheck sender %s stopped for an error: %s", clusterID, err)
}

klog.Infof("conncheck sender %s stopped", clusterID)
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/liqonet/conncheck/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,19 @@ func (r *Receiver) InitPeer(clusterID string, updateCallback UpdateFunc) error {
}

// Run starts the receiver.
func (r *Receiver) Run() {
klog.V(8).Infof("conncheck receiver: starting")
for {
func (r *Receiver) Run(ctx context.Context) {
klog.Infof("conncheck receiver: started")
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) {
n, raddr, err := r.conn.ReadFromUDP(r.buff)
if err != nil {
klog.Errorf("conncheck receiver: failed to read from %s: %w", raddr.String(), err)
continue
return false, nil
}
msgr := &Msg{}
err = json.Unmarshal(r.buff[:n], msgr)
if err != nil {
klog.Errorf("conncheck receiver: failed to unmarshal msg: %w", err)
continue
return false, nil
}
klog.V(9).Infof("conncheck receiver: received a msg -> %s", msgr)
switch msgr.MsgType {
Expand All @@ -130,14 +130,18 @@ func (r *Receiver) Run() {
if err != nil {
klog.Errorf("conncheck receiver: %v", err)
}
return false, nil
})
if err != nil {
klog.Errorf("conncheck receiver: %v", err)
}
}

// RunDisconnectObserver starts the disconnect observer.
func (r *Receiver) RunDisconnectObserver() {
klog.V(9).Infof("conncheck receiver disconnect checker: starting")
func (r *Receiver) RunDisconnectObserver(ctx context.Context) {
klog.Infof("conncheck receiver disconnect checker: started")
// Ignore errors because only caused by context cancellation.
_ = wait.PollImmediateInfiniteWithContext(context.Background(), time.Duration(PingLossThreshold)*PingInterval/10,
err := wait.PollUntilContextCancel(ctx, time.Duration(PingLossThreshold)*PingInterval/10, true,
func(ctx context.Context) (done bool, err error) {
r.m.Lock()
defer r.m.Unlock()
Expand All @@ -155,4 +159,7 @@ func (r *Receiver) RunDisconnectObserver() {
}
return false, nil
})
if err != nil {
klog.Errorf("conncheck disconnect observer: %v", err)
}
}
13 changes: 8 additions & 5 deletions pkg/liqonet/conncheck/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package conncheck

import (
"context"
"encoding/json"
"fmt"
"net"
Expand All @@ -33,17 +32,21 @@ type Sender struct {
}

// NewSender creates a new conncheck sender.
func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.UDPConn, ip string) *Sender {
func NewSender(clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) {
pip := net.ParseIP(ip)
if pip == nil {
return nil, fmt.Errorf("conncheck sender: invalid IP address %s", ip)
}
return &Sender{
clusterID: clusterID,
cancel: cancel,
conn: conn,
raddr: net.UDPAddr{IP: net.ParseIP(ip), Port: port},
}
raddr: net.UDPAddr{IP: pip, Port: port},
}, nil
}

// SendPing sends a PING message to the given address.
func (s *Sender) SendPing(ctx context.Context) error {
func (s *Sender) SendPing() error {
msgOut := Msg{ClusterID: s.clusterID, MsgType: PING, TimeStamp: time.Now()}
b, err := json.Marshal(msgOut)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/liqonet/tunnel/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package tunnel

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/vishvananda/netlink"
k8s "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -49,7 +51,7 @@ type Config struct {
type Driver interface {
Init() error

ConnectToEndpoint(tep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error)
ConnectToEndpoint(ctx context.Context, tep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error)

DisconnectFromEndpoint(tep *netv1alpha1.TunnelEndpoint) error

Expand Down
5 changes: 3 additions & 2 deletions pkg/liqonet/tunnel/wireguard/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ func (w *Wireguard) Init() error {

// ConnectToEndpoint connects to a remote cluster described by the given tep.
// updateStatusCallback is a function used by conncheck to update TunnelEndpoint connected status.
func (w *Wireguard) ConnectToEndpoint(tep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) {
func (w *Wireguard) ConnectToEndpoint(ctx context.Context, tep *netv1alpha1.TunnelEndpoint,
updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) {
// parse allowed IPs.
allowedIPs, stringAllowedIPs, err := getAllowedIPs(tep)
if err != nil {
Expand Down Expand Up @@ -281,7 +282,7 @@ func (w *Wireguard) ConnectToEndpoint(tep *netv1alpha1.TunnelEndpoint, updateSta

klog.Infof("%s -> starting conncheck sender", tep.Spec.ClusterIdentity)

go w.Connchecker.AddAndRunSender(tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus)
go w.Connchecker.AddAndRunSender(ctx, tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus)

klog.V(4).Infof("Done connecting cluster peer %s@%s", tep.Spec.ClusterIdentity, endpoint.String())
return c, nil
Expand Down

0 comments on commit 9f72be9

Please sign in to comment.