-
Notifications
You must be signed in to change notification settings - Fork 60
/
Copy pathagent.go
156 lines (142 loc) · 5.99 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"context"
"fmt"
"net"
"net/netip"
"os"
"os/signal"
"syscall"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/costela/wesher/cluster"
"github.com/costela/wesher/common"
"github.com/costela/wesher/etchosts"
"github.com/costela/wesher/wg"
"github.com/hashicorp/go-sockaddr"
"github.com/sirupsen/logrus"
)
type AgentCmd struct {
ClusterKey key `env:"WESHER_CLUSTER_KEY" help:"shared key for cluster membership; must be 32 bytes base64 encoded; will be generated if not provided"`
Join []string `env:"WESHER_JOIN" help:"comma separated list of hostnames or IP addresses to existing cluster members; if not provided, will attempt resuming any known state or otherwise wait for further members."`
Init bool `env:"WESHER_INIT" help:"whether to explicitly (re)initialize the cluster; any known state from previous runs will be forgotten"`
BindAddr string `env:"WESHER_BIND_ADDR" help:"IP address to bind to for cluster membership traffic (cannot be used with --bind-iface)"`
BindIface string `env:"WESHER_BIND_IFACE" help:"Interface to bind to for cluster membership traffic (cannot be used with --bind-addr)"`
ClusterPort int `env:"WESHER_CLUSTER_PORT" help:"port used for membership gossip traffic (both TCP and UDP); must be the same across cluster" default:"7946"`
WireguardPort int `env:"WESHER_WIREGUARD_PORT" help:"port used for wireguard traffic (UDP); must be the same across cluster" default:"51820"`
OverlayNet netip.Prefix `env:"WESHER_OVERLAY_NET" help:"the network in which to allocate addresses for the overlay mesh network (CIDR format); smaller networks increase the chance of IP collision" default:"10.0.0.0/8"`
Interface string `env:"WESHER_INTERFACE" help:"name of the wireguard interface to create and manage" default:"wgoverlay"`
NoEtcHosts bool `env:"WESHER_NO_ETC_HOSTS" help:"disable writing of entries to /etc/hosts"`
// for easier local testing; will break etchosts entry
UseIPAsName bool `name:"ip-as-name" default:"false" hidden:""`
}
func (a *AgentCmd) Validate() error {
if len(a.ClusterKey.bytes) != 0 && len(a.ClusterKey.bytes) != cluster.KeyLen {
return fmt.Errorf("unsupported cluster key length; expected %d, got %d", cluster.KeyLen, len(a.ClusterKey.bytes))
}
if a.OverlayNet.Bits()%8 != 0 {
return fmt.Errorf("unsupported overlay network size; net mask must be multiple of 8, got %d", a.OverlayNet.Bits())
}
if a.BindAddr != "" && a.BindIface != "" {
return fmt.Errorf("setting both bind address and bind interface is not supported")
} else if a.BindIface != "" {
// Compute the actual bind address based on the provided interface
iface, err := net.InterfaceByName(a.BindIface)
if err != nil {
return fmt.Errorf("getting interface by name %s: %w", a.BindIface, err)
}
addrs, err := iface.Addrs()
if err != nil {
return fmt.Errorf("getting addresses for interface %s: %w", a.BindIface, err)
}
if len(addrs) > 0 {
if addr, ok := addrs[0].(*net.IPNet); ok {
a.BindAddr = addr.IP.String()
}
}
} else if a.BindAddr == "" && a.BindIface == "" {
// FIXME: this is a workaround for memberlist refusing to listen on public IPs if BindAddr==0.0.0.0
detectedBindAddr, err := sockaddr.GetPublicIP()
if err != nil {
return err
}
// if we cannot find a public IP, let memberlist do its thing
if detectedBindAddr != "" {
a.BindAddr = detectedBindAddr
} else {
a.BindAddr = "0.0.0.0"
}
}
return nil
}
func (a *AgentCmd) Run(cli *cli) error {
// Create the wireguard and cluster configuration
cluster, err := cluster.New(a.Interface, a.Init, a.ClusterKey.bytes, a.BindAddr, a.ClusterPort, a.UseIPAsName)
if err != nil {
logrus.WithError(err).Fatal("could not create cluster")
}
wgstate, localNode, err := wg.New(a.Interface, a.WireguardPort, a.OverlayNet, cluster.LocalName)
if err != nil {
logrus.WithError(err).Fatal("could not instantiate wireguard controller")
}
// Prepare the /etc/hosts writer
hostsFile := &etchosts.EtcHosts{
Banner: "# ! managed automatically by wesher interface " + a.Interface,
Logger: logrus.StandardLogger(),
}
// Join the cluster
cluster.Update(localNode)
nodec := cluster.Members() // avoid deadlocks by starting before join
if err := backoff.RetryNotify(
func() error { return cluster.Join(a.Join) },
backoff.NewExponentialBackOff(),
func(err error, dur time.Duration) {
logrus.WithError(err).Errorf("could not join cluster, retrying in %s", dur)
},
); err != nil {
logrus.WithError(err).Fatal("could not join cluster")
}
ctx, cancelSignals := signal.NotifyContext(context.Background(), syscall.SIGTERM, os.Interrupt)
defer cancelSignals()
// Main loop
logrus.Debug("waiting for cluster events")
for {
select {
case rawNodes := <-nodec:
nodes := make([]common.Node, 0, len(rawNodes))
hosts := make(map[string][]string, len(rawNodes))
logrus.Info("cluster members:\n")
for _, node := range rawNodes {
if err := node.DecodeMeta(); err != nil {
logrus.Warnf("\t addr: %s, could not decode metadata", node.Addr)
continue
}
logrus.Infof("\taddr: %s, overlay: %s, pubkey: %s", node.Addr, node.OverlayAddr, node.PubKey)
nodes = append(nodes, node)
hosts[node.OverlayAddr.String()] = []string{node.Name}
}
if err := wgstate.SetUpInterface(nodes); err != nil {
logrus.WithError(err).Error("could not up interface")
wgstate.DownInterface() // nolint: errcheck // opportunistic
}
if !a.NoEtcHosts {
if err := hostsFile.WriteEntries(hosts); err != nil {
logrus.WithError(err).Error("could not write hosts entries")
}
}
case <-ctx.Done():
cancelSignals()
logrus.Info("terminating...")
cluster.Leave()
if !a.NoEtcHosts {
if err := hostsFile.WriteEntries(map[string][]string{}); err != nil {
logrus.WithError(err).Error("could not remove stale hosts entries")
}
}
if err := wgstate.DownInterface(); err != nil {
logrus.WithError(err).Error("could not down interface")
}
os.Exit(0)
}
}
}