Skip to content

Commit

Permalink
dataclients/kubernetes: add metadata route
Browse files Browse the repository at this point in the history
Adds a special route with a predefined id `kube__metadata`
that contains `False` predicate, no filters and endpoints metadata.

Endpoints metadata is encoded as JSON via [data URI scheme](https://en.wikipedia.org/wiki/Data_URI_scheme)
into the route backend address field.

Example eskip:
```
kube__metadata: False() -> "data:application/json;base64,eyJhZGRyZXNzZXMiO...";
...
```

A special route pre-processor detects and removes this route, decodes metadata
and stores it as EndpointRegistry metrics.

The endpoint metrics then used to obtain endpoint node and pod names
for tracing and logging.

TODO:
- [ ] add flag  to enable metadata route
- [ ] tests

Fixes #1559

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov committed Jan 13, 2024
1 parent 14cec82 commit 4de9f1a
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 37 deletions.
13 changes: 5 additions & 8 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,13 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe
// we should delete it, because of eventual consistency
// it is actually terminating
delete(resEps, address)
} else if ep.Conditions == nil {
} else if ep.Conditions == nil || ep.isReady() {
// if conditions are nil then we need to treat is as ready
resEps[address] = &skipperEndpoint{
Address: address,
Zone: ep.Zone,
}
} else if ep.isReady() {
resEps[address] = &skipperEndpoint{
Address: address,
Zone: ep.Zone,
Address: address,
Zone: ep.Zone,
NodeName: ep.NodeName,
TargetRef: ep.TargetRef,
}
}
}
Expand Down
22 changes: 11 additions & 11 deletions dataclients/kubernetes/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ func benchmarkCachedEndpoints(b *testing.B, n int) {
Subsets: []*subset{
{
Addresses: []*address{
{"192.168.0.1", "node1"},
{"192.168.0.2", "node2"},
{"192.168.0.3", "node3"},
{"192.168.0.4", "node4"},
{"192.168.0.5", "node5"},
{"192.168.0.6", "node6"},
{"192.168.0.7", "node7"},
{"192.168.0.8", "node8"},
{"192.168.0.9", "node9"},
{"192.168.0.10", "node10"},
{"192.168.0.11", "node11"},
{IP: "192.168.0.1", NodeName: "node1"},
{IP: "192.168.0.2", NodeName: "node2"},
{IP: "192.168.0.3", NodeName: "node3"},
{IP: "192.168.0.4", NodeName: "node4"},
{IP: "192.168.0.5", NodeName: "node5"},
{IP: "192.168.0.6", NodeName: "node6"},
{IP: "192.168.0.7", NodeName: "node7"},
{IP: "192.168.0.8", NodeName: "node8"},
{IP: "192.168.0.9", NodeName: "node9"},
{IP: "192.168.0.10", NodeName: "node10"},
{IP: "192.168.0.11", NodeName: "node11"},
},
Ports: []*port{
{"ssh", 22, "TCP"},
Expand Down
5 changes: 3 additions & 2 deletions dataclients/kubernetes/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ type subset struct {
}

type address struct {
IP string `json:"ip"`
Node string `json:"nodeName"`
IP string `json:"ip"`
NodeName string `json:"nodeName"`
TargetRef *objectReference `json:"targetRef"`
}

type port struct {
Expand Down
10 changes: 8 additions & 2 deletions dataclients/kubernetes/endpointslices.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ type skipperEndpointSlice struct {

// Conditions have to be evaluated before creation
type skipperEndpoint struct {
Address string
Zone string
Address string
Zone string
NodeName string
TargetRef *objectReference
}

func (eps *skipperEndpointSlice) getPort(protocol, pName string, pValue int) int {
Expand Down Expand Up @@ -125,6 +127,10 @@ type EndpointSliceEndpoints struct {
// https://kubernetes.io/docs/concepts/services-networking/topology-aware-routing/#safeguards
// Zone aware routing will be available if https://github.com/zalando/skipper/issues/1446 is closed.
Zone string `json:"zone"` // "eu-central-1c"
// Node hosting this endpoint. This can be used to determine endpoints local to a node.
NodeName string `json:"nodeName"`
// TargetRef is a reference to a Kubernetes object that represents this endpoint.
TargetRef *objectReference `json:"targetRef"`
}

type endpointsliceCondition struct {
Expand Down
10 changes: 9 additions & 1 deletion dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"github.com/zalando/skipper/secrets/certregistry"
)

const DefaultLoadBalancerAlgorithm = "roundRobin"
const (
DefaultLoadBalancerAlgorithm = "roundRobin"
MetadataRouteID = "kube__metadata"
EnableMetadataRoute = true // TODO: flag
)

const (
defaultIngressClass = "skipper"
Expand Down Expand Up @@ -437,6 +441,10 @@ func (c *Client) loadAndConvert() ([]*eskip.Route, error) {
r = append(r, globalRedirectRoute(c.httpsRedirectCode))
}

if EnableMetadataRoute {
r = append(r, metadataRoute(state))
}

return r, nil
}

Expand Down
4 changes: 2 additions & 2 deletions dataclients/kubernetes/kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func testEndpoints(namespace, name string, labels map[string]string, base string

for i := 0; i < n; i++ {
adr := &address{
IP: fmt.Sprintf("%s.%d", base, i),
Node: fmt.Sprintf("node-%d", i),
IP: fmt.Sprintf("%s.%d", base, i),
NodeName: fmt.Sprintf("node-%d", i),
}
s.Addresses = append(s.Addresses, adr)
}
Expand Down
187 changes: 187 additions & 0 deletions dataclients/kubernetes/metadataroute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package kubernetes

import (
"encoding/base64"
"encoding/json"
"fmt"
"net"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/predicates"
"github.com/zalando/skipper/routing"

snet "github.com/zalando/skipper/net"
)

type MetadataPreProcessorOptions struct {
EndpointRegistry *routing.EndpointRegistry
}

type metadataPreProcessor struct {
options MetadataPreProcessorOptions
}

type kubeRouteMetadata struct {
Addresses map[string]*kubeRouteMetadataAddress `json:"addresses"`
}

type kubeRouteMetadataAddress struct {
Zone string `json:"zone,omitempty"`
NodeName string `json:"nodeName,omitempty"`
PodName string `json:"podName,omitempty"`
}

// NewMetadataPreProcessor creates pre-processor for metadata route.
func NewMetadataPreProcessor(options MetadataPreProcessorOptions) routing.PreProcessor {
return &metadataPreProcessor{options: options}
}

func (pp *metadataPreProcessor) Do(routes []*eskip.Route) []*eskip.Route {
var metadataRoute *eskip.Route
filtered := make([]*eskip.Route, 0, len(routes))

for _, r := range routes {
if r.Id == MetadataRouteID {
if metadataRoute == nil {
metadataRoute = r
} else {
log.Errorf("Found multiple metadata routes, using the first one")
}
} else {
filtered = append(filtered, r)
}
}

if metadataRoute == nil {
log.Errorf("Metadata route not found")
return routes
}

metadata, err := decodeMetadata(metadataRoute)
if err != nil {
log.Errorf("Failed to decode metadata route: %v", err)
return filtered
}

for _, r := range filtered {
if r.BackendType == eskip.NetworkBackend {
pp.addMetadata(metadata, r.Backend)
} else if r.BackendType == eskip.LBBackend {
for _, ep := range r.LBEndpoints {
pp.addMetadata(metadata, ep)
}
}
}
return filtered
}

// metadataRoute creates a route with [MetadataRouteID] id that matches no requests and
// contains metadata for each endpoint address used by Ingresses and RouteGroups.
func metadataRoute(s *clusterState) *eskip.Route {
metadata := kubeRouteMetadata{
Addresses: make(map[string]*kubeRouteMetadataAddress),
}

for id := range s.cachedEndpoints {
if s.enableEndpointSlices {
if eps, ok := s.endpointSlices[id.ResourceID]; ok {
for _, ep := range eps.Endpoints {
metadata.Addresses[ep.Address] = &kubeRouteMetadataAddress{
Zone: ep.Zone,
NodeName: ep.NodeName,
PodName: ep.TargetRef.getPodName(),
}
}
}
} else {
if ep, ok := s.endpoints[id.ResourceID]; ok {
for _, subset := range ep.Subsets {
for _, addr := range subset.Addresses {
metadata.Addresses[addr.IP] = &kubeRouteMetadataAddress{
// Endpoints do not provide zone
NodeName: addr.NodeName,
PodName: addr.TargetRef.getPodName(),
}
}
}
}
}
}

return &eskip.Route{
Id: MetadataRouteID,
Predicates: []*eskip.Predicate{{Name: predicates.FalseName}},
BackendType: eskip.NetworkBackend,
Backend: encodeDataURI(&metadata),
}
}

func decodeMetadata(r *eskip.Route) (map[string]*kubeRouteMetadataAddress, error) {
metadata, err := decodeDataURI(r.Backend)
if err != nil {
return nil, err
}
return metadata.Addresses, nil
}

const dataUriPrefix = "data:application/json;base64,"

// encodeDataURI encodes metadata into data URI.
// Note that map keys are sorted and used as JSON object keys
// therefore encodeDataURI produces the same output for the same input.
// See https://datatracker.ietf.org/doc/html/rfc2397
func encodeDataURI(metadata *kubeRouteMetadata) string {
data, _ := json.Marshal(&metadata)

buf := make([]byte, len(dataUriPrefix)+base64.StdEncoding.EncodedLen(len(data)))

copy(buf, dataUriPrefix)
base64.StdEncoding.Encode(buf[len(dataUriPrefix):], data)

return string(buf)
}

// encodeDataURI encodes metadata into data URI.
// See https://datatracker.ietf.org/doc/html/rfc2397
func decodeDataURI(uri string) (*kubeRouteMetadata, error) {
var metadata kubeRouteMetadata

data, err := base64.StdEncoding.DecodeString(uri[len(dataUriPrefix):])
if err != nil {
return nil, fmt.Errorf("failed to decode base64: %w", err)
}

if err := json.Unmarshal(data, &metadata); err != nil {
return nil, fmt.Errorf("failed to decode json: %w", err)
}
return &metadata, nil
}

func (pp *metadataPreProcessor) addMetadata(metadata map[string]*kubeRouteMetadataAddress, endpoint string) {
_, hostPort, err := snet.SchemeHost(endpoint)
if err != nil {
return
}

host, _, _ := net.SplitHostPort(hostPort)
if err != nil {
host = hostPort
}

addr, ok := metadata[host]
if !ok {
return
}

metrics := pp.options.EndpointRegistry.GetMetrics(hostPort)
setTag := func(name, value string) {
if value != "" {
metrics.SetTag(name, value)
}
}

setTag("zone", addr.Zone)
setTag("nodeName", addr.NodeName)
setTag("podName", addr.PodName)
}
14 changes: 14 additions & 0 deletions dataclients/kubernetes/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,17 @@ type secret struct {
type secretList struct {
Items []*secret `json:"items"`
}

type objectReference struct {
Kind string `json:"kind"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Uid string `json:"uid"`
}

func (r *objectReference) getPodName() string {
if r != nil && r.Kind == "Pod" {
return r.Name
}
return ""
}
Loading

0 comments on commit 4de9f1a

Please sign in to comment.