Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Liqonet: connchecker refactoring #2076

Merged
merged 1 commit into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ docs/_build

# kubebuilder generated files
/config

# development files
/tmp
11 changes: 6 additions & 5 deletions internal/liqonet/tunnel-operator/tunnel-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func NewTunnelController(ctx context.Context, wg *sync.WaitGroup,
return fmt.Errorf("failed to create connchecker: %w", err)
}

go wg.Connchecker.RunReceiver()
go wg.Connchecker.RunReceiverDisconnectObserver()
go wg.Connchecker.RunReceiver(ctx)
go wg.Connchecker.RunReceiverDisconnectObserver(ctx)

return nil
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (tc *TunnelController) Reconcile(ctx context.Context, req ctrl.Request) (ct
if err = tc.EnsureIPTablesRulesPerCluster(tep); err != nil {
return err
}
con, err = tc.connectToPeer(tep, tc.forgeConncheckUpdateStatus(ctx, req))
con, err = tc.connectToPeer(ctx, tep, tc.forgeConncheckUpdateStatus(ctx, req))
if err != nil {
return err
}
Expand Down Expand Up @@ -295,14 +295,15 @@ func EnforceIP(link netlink.Link, ip string) error {
return nil
}

func (tc *TunnelController) connectToPeer(ep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) {
func (tc *TunnelController) connectToPeer(ctx context.Context, ep *netv1alpha1.TunnelEndpoint,
updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) {
// retrieve driver based on backend type
driver, ok := tc.drivers[ep.Spec.BackendType]
if !ok {
klog.Errorf("%s -> no registered driver of type %s found for resources %s", ep.Spec.ClusterIdentity, ep.Spec.BackendType, ep.Name)
return nil, fmt.Errorf("no registered driver of type %s found", ep.Spec.BackendType)
}
con, err := driver.ConnectToEndpoint(ep, updateStatus)
con, err := driver.ConnectToEndpoint(ctx, ep, updateStatus)
if err != nil {
tc.Eventf(ep, "Warning", "Processing", "unable to establish connection: %v", err)
klog.Errorf("%s -> an error occurred while establishing vpn connection: %v", ep.Spec.ClusterIdentity, err)
Expand Down
32 changes: 20 additions & 12 deletions pkg/liqonet/conncheck/conncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,45 +54,53 @@ func NewConnChecker() (*ConnChecker, error) {
}

// RunReceiver runs the receiver.
func (c *ConnChecker) RunReceiver() {
c.receiver.Run()
func (c *ConnChecker) RunReceiver(ctx context.Context) {
c.receiver.Run(ctx)
}

// RunReceiverDisconnectObserver runs the receiver disconnect observer.
func (c *ConnChecker) RunReceiverDisconnectObserver() {
c.receiver.RunDisconnectObserver()
func (c *ConnChecker) RunReceiverDisconnectObserver(ctx context.Context) {
c.receiver.RunDisconnectObserver(ctx)
}

// AddAndRunSender create a new sender and runs it.
func (c *ConnChecker) AddAndRunSender(clusterID, ip string, updateCallback UpdateFunc) {
func (c *ConnChecker) AddAndRunSender(ctx context.Context, clusterID, ip string, updateCallback UpdateFunc) {
var err error
c.sm.Lock()
if _, ok := c.senders[clusterID]; ok {
c.sm.Unlock()
klog.Infof("sender %s already exists", clusterID)
return
}

ctxSender, cancelSender := context.WithCancel(context.Background())
c.senders[clusterID] = NewSender(ctxSender, clusterID, cancelSender, c.conn, ip)
ctxSender, cancelSender := context.WithCancel(ctx)
c.senders[clusterID], err = NewSender(clusterID, cancelSender, c.conn, ip)

err := c.receiver.InitPeer(clusterID, updateCallback)
if err != nil {
c.sm.Unlock()
klog.Errorf("failed to create sender: %w", err)
return
}

err = c.receiver.InitPeer(clusterID, updateCallback)
if err != nil {
c.sm.Unlock()
klog.Errorf("failed to add redirect chan: %w", err)
}

klog.Infof("conncheck sender %s starting", clusterID)
klog.Infof("conncheck sender %q starting against %q", clusterID, ip)
pingCallback := func(ctx context.Context) (done bool, err error) {
err = c.senders[clusterID].SendPing(ctx)
err = c.senders[clusterID].SendPing()
if err != nil {
klog.Warningf("failed to send ping: %s", err)
}
return false, nil
}
c.sm.Unlock()

// Ignore errors because only caused by context cancellation.
_ = wait.PollImmediateInfiniteWithContext(ctxSender, PingInterval, pingCallback)
if err := wait.PollUntilContextCancel(ctxSender, PingInterval, true, pingCallback); err != nil {
klog.Errorf("conncheck sender %s stopped for an error: %s", clusterID, err)
}

klog.Infof("conncheck sender %s stopped", clusterID)
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/liqonet/conncheck/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,19 @@ func (r *Receiver) InitPeer(clusterID string, updateCallback UpdateFunc) error {
}

// Run starts the receiver.
func (r *Receiver) Run() {
klog.V(8).Infof("conncheck receiver: starting")
for {
func (r *Receiver) Run(ctx context.Context) {
klog.Infof("conncheck receiver: started")
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) {
n, raddr, err := r.conn.ReadFromUDP(r.buff)
if err != nil {
klog.Errorf("conncheck receiver: failed to read from %s: %w", raddr.String(), err)
continue
return false, nil
}
msgr := &Msg{}
err = json.Unmarshal(r.buff[:n], msgr)
if err != nil {
klog.Errorf("conncheck receiver: failed to unmarshal msg: %w", err)
continue
return false, nil
}
klog.V(9).Infof("conncheck receiver: received a msg -> %s", msgr)
switch msgr.MsgType {
Expand All @@ -130,14 +130,18 @@ func (r *Receiver) Run() {
if err != nil {
klog.Errorf("conncheck receiver: %v", err)
}
return false, nil
})
if err != nil {
klog.Errorf("conncheck receiver: %v", err)
}
}

// RunDisconnectObserver starts the disconnect observer.
func (r *Receiver) RunDisconnectObserver() {
klog.V(9).Infof("conncheck receiver disconnect checker: starting")
func (r *Receiver) RunDisconnectObserver(ctx context.Context) {
klog.Infof("conncheck receiver disconnect checker: started")
// Ignore errors because only caused by context cancellation.
_ = wait.PollImmediateInfiniteWithContext(context.Background(), time.Duration(PingLossThreshold)*PingInterval/10,
err := wait.PollUntilContextCancel(ctx, time.Duration(PingLossThreshold)*PingInterval/10, true,
func(ctx context.Context) (done bool, err error) {
r.m.Lock()
defer r.m.Unlock()
Expand All @@ -155,4 +159,7 @@ func (r *Receiver) RunDisconnectObserver() {
}
return false, nil
})
if err != nil {
klog.Errorf("conncheck disconnect observer: %v", err)
}
}
13 changes: 8 additions & 5 deletions pkg/liqonet/conncheck/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package conncheck

import (
"context"
"encoding/json"
"fmt"
"net"
Expand All @@ -33,17 +32,21 @@ type Sender struct {
}

// NewSender creates a new conncheck sender.
func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.UDPConn, ip string) *Sender {
func NewSender(clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) {
pip := net.ParseIP(ip)
if pip == nil {
return nil, fmt.Errorf("conncheck sender: invalid IP address %s", ip)
}
return &Sender{
clusterID: clusterID,
cancel: cancel,
conn: conn,
raddr: net.UDPAddr{IP: net.ParseIP(ip), Port: port},
}
raddr: net.UDPAddr{IP: pip, Port: port},
}, nil
}

// SendPing sends a PING message to the given address.
func (s *Sender) SendPing(ctx context.Context) error {
func (s *Sender) SendPing() error {
msgOut := Msg{ClusterID: s.clusterID, MsgType: PING, TimeStamp: time.Now()}
b, err := json.Marshal(msgOut)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/liqonet/tunnel/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package tunnel

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/vishvananda/netlink"
k8s "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -49,7 +51,7 @@ type Config struct {
type Driver interface {
Init() error

ConnectToEndpoint(tep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error)
ConnectToEndpoint(ctx context.Context, tep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error)

DisconnectFromEndpoint(tep *netv1alpha1.TunnelEndpoint) error

Expand Down
5 changes: 3 additions & 2 deletions pkg/liqonet/tunnel/wireguard/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ func (w *Wireguard) Init() error {

// ConnectToEndpoint connects to a remote cluster described by the given tep.
// updateStatusCallback is a function used by conncheck to update TunnelEndpoint connected status.
func (w *Wireguard) ConnectToEndpoint(tep *netv1alpha1.TunnelEndpoint, updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) {
func (w *Wireguard) ConnectToEndpoint(ctx context.Context, tep *netv1alpha1.TunnelEndpoint,
updateStatus conncheck.UpdateFunc) (*netv1alpha1.Connection, error) {
// parse allowed IPs.
allowedIPs, stringAllowedIPs, err := getAllowedIPs(tep)
if err != nil {
Expand Down Expand Up @@ -281,7 +282,7 @@ func (w *Wireguard) ConnectToEndpoint(tep *netv1alpha1.TunnelEndpoint, updateSta

klog.Infof("%s -> starting conncheck sender", tep.Spec.ClusterIdentity)

go w.Connchecker.AddAndRunSender(tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus)
go w.Connchecker.AddAndRunSender(ctx, tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus)

klog.V(4).Infof("Done connecting cluster peer %s@%s", tep.Spec.ClusterIdentity, endpoint.String())
return c, nil
Expand Down