Skip to content

Commit

Permalink
dataclients/kubernetes: add metadata route
Browse files Browse the repository at this point in the history
Adds a special route with a predefined id `kube__metadata`
that contains `False` predicate, no filters and endpoints metadata.

Endpoints metadata is encoded as JSON via [data URI scheme](https://en.wikipedia.org/wiki/Data_URI_scheme)
into the route backend address field.

Example eskip:
```
kube__metadata: False() -> "data:application/json;base64,eyJhZGRyZXNzZXMiO...";
...
```

This route could be used to obtain zone, node and pod names for a given address.

A special route pre-processor detects and removes this route, decodes metadata
and stores it as EndpointRegistry metrics.

TODO:
- [ ] add flag  to enable metadata route
- [ ] tests

Fixes #1559

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov committed Jan 12, 2024
1 parent d238294 commit 935cdd6
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 26 deletions.
13 changes: 5 additions & 8 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,13 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe
// we should delete it, because of eventual consistency
// it is actually terminating
delete(resEps, address)
} else if ep.Conditions == nil {
} else if ep.Conditions == nil || ep.isReady() {
// if conditions are nil then we need to treat is as ready
resEps[address] = &skipperEndpoint{
Address: address,
Zone: ep.Zone,
}
} else if ep.isReady() {
resEps[address] = &skipperEndpoint{
Address: address,
Zone: ep.Zone,
Address: address,
Zone: ep.Zone,
NodeName: ep.NodeName,
TargetRef: ep.TargetRef,
}
}
}
Expand Down
22 changes: 11 additions & 11 deletions dataclients/kubernetes/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ func benchmarkCachedEndpoints(b *testing.B, n int) {
Subsets: []*subset{
{
Addresses: []*address{
{"192.168.0.1", "node1"},
{"192.168.0.2", "node2"},
{"192.168.0.3", "node3"},
{"192.168.0.4", "node4"},
{"192.168.0.5", "node5"},
{"192.168.0.6", "node6"},
{"192.168.0.7", "node7"},
{"192.168.0.8", "node8"},
{"192.168.0.9", "node9"},
{"192.168.0.10", "node10"},
{"192.168.0.11", "node11"},
{IP: "192.168.0.1", NodeName: "node1"},
{IP: "192.168.0.2", NodeName: "node2"},
{IP: "192.168.0.3", NodeName: "node3"},
{IP: "192.168.0.4", NodeName: "node4"},
{IP: "192.168.0.5", NodeName: "node5"},
{IP: "192.168.0.6", NodeName: "node6"},
{IP: "192.168.0.7", NodeName: "node7"},
{IP: "192.168.0.8", NodeName: "node8"},
{IP: "192.168.0.9", NodeName: "node9"},
{IP: "192.168.0.10", NodeName: "node10"},
{IP: "192.168.0.11", NodeName: "node11"},
},
Ports: []*port{
{"ssh", 22, "TCP"},
Expand Down
5 changes: 3 additions & 2 deletions dataclients/kubernetes/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ type subset struct {
}

type address struct {
IP string `json:"ip"`
Node string `json:"nodeName"`
IP string `json:"ip"`
NodeName string `json:"nodeName"`
TargetRef *objectReference `json:"targetRef"`
}

type port struct {
Expand Down
10 changes: 8 additions & 2 deletions dataclients/kubernetes/endpointslices.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ type skipperEndpointSlice struct {

// Conditions have to be evaluated before creation
type skipperEndpoint struct {
Address string
Zone string
Address string
Zone string
NodeName string
TargetRef *objectReference
}

func (eps *skipperEndpointSlice) getPort(protocol, pName string, pValue int) int {
Expand Down Expand Up @@ -125,6 +127,10 @@ type EndpointSliceEndpoints struct {
// https://kubernetes.io/docs/concepts/services-networking/topology-aware-routing/#safeguards
// Zone aware routing will be available if https://github.com/zalando/skipper/issues/1446 is closed.
Zone string `json:"zone"` // "eu-central-1c"
// Node hosting this endpoint. This can be used to determine endpoints local to a node.
NodeName string `json:"nodeName"`
// TargetRef is a reference to a Kubernetes object that represents this endpoint.
TargetRef *objectReference `json:"targetRef"`
}

type endpointsliceCondition struct {
Expand Down
10 changes: 9 additions & 1 deletion dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"github.com/zalando/skipper/secrets/certregistry"
)

const DefaultLoadBalancerAlgorithm = "roundRobin"
const (
DefaultLoadBalancerAlgorithm = "roundRobin"
MetadataRouteID = "kube__metadata"
EnableMetadataRoute = true // TODO: flag
)

const (
defaultIngressClass = "skipper"
Expand Down Expand Up @@ -437,6 +441,10 @@ func (c *Client) loadAndConvert() ([]*eskip.Route, error) {
r = append(r, globalRedirectRoute(c.httpsRedirectCode))
}

if EnableMetadataRoute {
r = append(r, metadataRoute(state))
}

return r, nil
}

Expand Down
4 changes: 2 additions & 2 deletions dataclients/kubernetes/kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func testEndpoints(namespace, name string, labels map[string]string, base string

for i := 0; i < n; i++ {
adr := &address{
IP: fmt.Sprintf("%s.%d", base, i),
Node: fmt.Sprintf("node-%d", i),
IP: fmt.Sprintf("%s.%d", base, i),
NodeName: fmt.Sprintf("node-%d", i),
}
s.Addresses = append(s.Addresses, adr)
}
Expand Down
181 changes: 181 additions & 0 deletions dataclients/kubernetes/metadataroute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package kubernetes

import (
"encoding/base64"
"encoding/json"
"fmt"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/predicates"
"github.com/zalando/skipper/routing"

snet "github.com/zalando/skipper/net"
)

type MetadataPreProcessorOptions struct {
EndpointRegistry *routing.EndpointRegistry
}

type metadataPreProcessor struct {
options MetadataPreProcessorOptions
}

type kubeRouteMetadata struct {
Addresses []kubeRouteMetadataAddress `json:"addresses"`
}

type kubeRouteMetadataAddress struct {
Address string `json:"address"`
Zone string `json:"zone"`
NodeName string `json:"nodeName"`
TargetRef *objectReference `json:"targetRef"`
}

// NewMetadataPreProcessor creates pre-processor for metadata route.
func NewMetadataPreProcessor(options MetadataPreProcessorOptions) routing.PreProcessor {
return &metadataPreProcessor{options: options}
}

func (pp *metadataPreProcessor) Do(routes []*eskip.Route) []*eskip.Route {
var metadataRoute *eskip.Route
filtered := make([]*eskip.Route, 0, len(routes))

for _, r := range routes {
if r.Id == MetadataRouteID {
if metadataRoute == nil {
metadataRoute = r
} else {
log.Errorf("Found multiple metadata routes, using the first one")
}
} else {
filtered = append(filtered, r)
}
}

if metadataRoute == nil {
log.Errorf("Metadata route not found")
return routes
}

metadata, err := decodeMetadata(metadataRoute)
if err != nil {
log.Errorf("Failed to decode metadata route: %v", err)
return filtered
}

for _, r := range filtered {
if r.BackendType == eskip.NetworkBackend {
pp.addMetadata(metadata, r.Backend)
} else if r.BackendType == eskip.LBBackend {
for _, ep := range r.LBEndpoints {
pp.addMetadata(metadata, ep)
}
}
}
return filtered
}

// metadataRoute creates a route with [MetadataRouteID] id that matches no requests and
// contains metadata for each endpoint address used by Ingresses and RouteGroups.
func metadataRoute(s *clusterState) *eskip.Route {
var metadata kubeRouteMetadata
for id := range s.cachedEndpoints {
if s.enableEndpointSlices {
if eps, ok := s.endpointSlices[id.ResourceID]; ok {
for _, ep := range eps.Endpoints {
metadata.Addresses = append(metadata.Addresses, kubeRouteMetadataAddress{
Address: ep.Address,
Zone: ep.Zone,
NodeName: ep.NodeName,
TargetRef: ep.TargetRef,
})
}
}
} else {
if ep, ok := s.endpoints[id.ResourceID]; ok {
for _, subset := range ep.Subsets {
for _, addr := range subset.Addresses {
metadata.Addresses = append(metadata.Addresses, kubeRouteMetadataAddress{
Address: addr.IP,
NodeName: addr.NodeName,
TargetRef: addr.TargetRef,
})
}
}
}
}
}

return &eskip.Route{
Id: MetadataRouteID,
Predicates: []*eskip.Predicate{{Name: predicates.FalseName}},
BackendType: eskip.NetworkBackend,
Backend: encodeDataURI(&metadata),
}
}

func decodeMetadata(r *eskip.Route) (map[string]*kubeRouteMetadataAddress, error) {
metadata, err := decodeDataURI(r.Backend)
if err != nil {
return nil, err
}

result := make(map[string]*kubeRouteMetadataAddress, len(metadata.Addresses))
for i := range metadata.Addresses {
addr := &metadata.Addresses[i]
result[addr.Address] = addr
}
return result, nil
}

const dataUriPrefix = "data:application/json;base64,"

// encodeDataURI encodes metadata into data URI.
// See https://datatracker.ietf.org/doc/html/rfc2397
func encodeDataURI(metadata *kubeRouteMetadata) string {
data, _ := json.Marshal(&metadata)

buf := make([]byte, len(dataUriPrefix)+base64.StdEncoding.EncodedLen(len(data)))

copy(buf, dataUriPrefix)
base64.StdEncoding.Encode(buf[len(dataUriPrefix):], data)

return string(buf)
}

// encodeDataURI encodes metadata into data URI.
// See https://datatracker.ietf.org/doc/html/rfc2397
func decodeDataURI(uri string) (*kubeRouteMetadata, error) {
var metadata kubeRouteMetadata

data, err := base64.StdEncoding.DecodeString(uri[len(dataUriPrefix):])
if err != nil {
return nil, fmt.Errorf("failed to decode base64: %w", err)
}

if err := json.Unmarshal(data, &metadata); err != nil {
return nil, fmt.Errorf("failed to decode json: %w", err)
}
return &metadata, nil
}

func (pp *metadataPreProcessor) addMetadata(metadata map[string]*kubeRouteMetadataAddress, endpoint string) {
_, host, err := snet.SchemeHost(endpoint)
if err != nil {
return
}

addr, ok := metadata[host]
if !ok {
return
}

metrics := pp.options.EndpointRegistry.GetMetrics(host)
metrics.SetTag("zone", addr.Zone)
metrics.SetTag("nodeName", addr.NodeName)
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
metrics.SetTag("pod", addr.TargetRef.Name)
metrics.SetTag("namespace", addr.TargetRef.Namespace)
}
}
7 changes: 7 additions & 0 deletions dataclients/kubernetes/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@ type secret struct {
type secretList struct {
Items []*secret `json:"items"`
}

type objectReference struct {
Kind string `json:"kind"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Uid string `json:"uid"`
}
15 changes: 15 additions & 0 deletions routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ type Metrics interface {
InflightRequests() int64
IncInflightRequest()
DecInflightRequest()

SetTag(name, value string)
Tag(name string) string
}

type entry struct {
detected atomic.Value // time.Time
lastSeen atomic.Value // time.Time
inflightRequests atomic.Int64
tags sync.Map // map[string]string
}

var _ Metrics = &entry{}
Expand Down Expand Up @@ -60,6 +64,17 @@ func (e *entry) SetLastSeen(ts time.Time) {
e.lastSeen.Store(ts)
}

func (e *entry) SetTag(name string, value string) {
e.tags.Store(name, value)
}

func (e *entry) Tag(name string) string {
if value, ok := e.tags.Load(name); ok {
return value.(string)
}
return ""
}

func newEntry() *entry {
result := &entry{}
result.SetDetected(time.Time{})
Expand Down
5 changes: 5 additions & 0 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1922,6 +1922,11 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
ro.PostProcessors = append(ro.PostProcessors, failClosedRatelimitPostProcessor)
}

if kubernetes.EnableMetadataRoute {
opts := kubernetes.MetadataPreProcessorOptions{EndpointRegistry: endpointRegistry}
ro.PreProcessors = append(ro.PreProcessors, kubernetes.NewMetadataPreProcessor(opts))
}

if o.DefaultFilters != nil {
ro.PreProcessors = append(ro.PreProcessors, o.DefaultFilters)
}
Expand Down

0 comments on commit 935cdd6

Please sign in to comment.