Skip to content

Commit

Permalink
dataclients/kubernetes: add metadata route
Browse files Browse the repository at this point in the history
Adds a special route with a predefined id `kube__metadata`
that contains `False` predicate, no filters and endpoints metadata.

Endpoints metadata is encoded as JSON via [data URI scheme](https://en.wikipedia.org/wiki/Data_URI_scheme)
into the route backend address field.

Example eskip:
```
kube__metadata: False() -> "data:application/json;base64,eyJhZGRyZXNzZXMiO...";
...
```

A special route pre-processor detects and removes this route, decodes metadata
and stores it as EndpointRegistry metrics.

The endpoint metrics then used to obtain endpoint zone, node and pod names
for tracing and logging.

TODO:
- [ ] add flag  to enable metadata route
- [ ] tests

Fixes #1559

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov committed Jan 12, 2024
1 parent 7a440ef commit d214c11
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 37 deletions.
13 changes: 5 additions & 8 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,13 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe
// we should delete it, because of eventual consistency
// it is actually terminating
delete(resEps, address)
} else if ep.Conditions == nil {
} else if ep.Conditions == nil || ep.isReady() {
// if conditions are nil then we need to treat is as ready
resEps[address] = &skipperEndpoint{
Address: address,
Zone: ep.Zone,
}
} else if ep.isReady() {
resEps[address] = &skipperEndpoint{
Address: address,
Zone: ep.Zone,
Address: address,
Zone: ep.Zone,
NodeName: ep.NodeName,
TargetRef: ep.TargetRef,
}
}
}
Expand Down
22 changes: 11 additions & 11 deletions dataclients/kubernetes/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ func benchmarkCachedEndpoints(b *testing.B, n int) {
Subsets: []*subset{
{
Addresses: []*address{
{"192.168.0.1", "node1"},
{"192.168.0.2", "node2"},
{"192.168.0.3", "node3"},
{"192.168.0.4", "node4"},
{"192.168.0.5", "node5"},
{"192.168.0.6", "node6"},
{"192.168.0.7", "node7"},
{"192.168.0.8", "node8"},
{"192.168.0.9", "node9"},
{"192.168.0.10", "node10"},
{"192.168.0.11", "node11"},
{IP: "192.168.0.1", NodeName: "node1"},
{IP: "192.168.0.2", NodeName: "node2"},
{IP: "192.168.0.3", NodeName: "node3"},
{IP: "192.168.0.4", NodeName: "node4"},
{IP: "192.168.0.5", NodeName: "node5"},
{IP: "192.168.0.6", NodeName: "node6"},
{IP: "192.168.0.7", NodeName: "node7"},
{IP: "192.168.0.8", NodeName: "node8"},
{IP: "192.168.0.9", NodeName: "node9"},
{IP: "192.168.0.10", NodeName: "node10"},
{IP: "192.168.0.11", NodeName: "node11"},
},
Ports: []*port{
{"ssh", 22, "TCP"},
Expand Down
5 changes: 3 additions & 2 deletions dataclients/kubernetes/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ type subset struct {
}

type address struct {
IP string `json:"ip"`
Node string `json:"nodeName"`
IP string `json:"ip"`
NodeName string `json:"nodeName"`
TargetRef *objectReference `json:"targetRef"`
}

type port struct {
Expand Down
10 changes: 8 additions & 2 deletions dataclients/kubernetes/endpointslices.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ type skipperEndpointSlice struct {

// Conditions have to be evaluated before creation
type skipperEndpoint struct {
Address string
Zone string
Address string
Zone string
NodeName string
TargetRef *objectReference
}

func (eps *skipperEndpointSlice) getPort(protocol, pName string, pValue int) int {
Expand Down Expand Up @@ -125,6 +127,10 @@ type EndpointSliceEndpoints struct {
// https://kubernetes.io/docs/concepts/services-networking/topology-aware-routing/#safeguards
// Zone aware routing will be available if https://github.com/zalando/skipper/issues/1446 is closed.
Zone string `json:"zone"` // "eu-central-1c"
// Node hosting this endpoint. This can be used to determine endpoints local to a node.
NodeName string `json:"nodeName"`
// TargetRef is a reference to a Kubernetes object that represents this endpoint.
TargetRef *objectReference `json:"targetRef"`
}

type endpointsliceCondition struct {
Expand Down
10 changes: 9 additions & 1 deletion dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"github.com/zalando/skipper/secrets/certregistry"
)

const DefaultLoadBalancerAlgorithm = "roundRobin"
const (
DefaultLoadBalancerAlgorithm = "roundRobin"
MetadataRouteID = "kube__metadata"
EnableMetadataRoute = true // TODO: flag
)

const (
defaultIngressClass = "skipper"
Expand Down Expand Up @@ -437,6 +441,10 @@ func (c *Client) loadAndConvert() ([]*eskip.Route, error) {
r = append(r, globalRedirectRoute(c.httpsRedirectCode))
}

if EnableMetadataRoute {
r = append(r, metadataRoute(state))
}

return r, nil
}

Expand Down
4 changes: 2 additions & 2 deletions dataclients/kubernetes/kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func testEndpoints(namespace, name string, labels map[string]string, base string

for i := 0; i < n; i++ {
adr := &address{
IP: fmt.Sprintf("%s.%d", base, i),
Node: fmt.Sprintf("node-%d", i),
IP: fmt.Sprintf("%s.%d", base, i),
NodeName: fmt.Sprintf("node-%d", i),
}
s.Addresses = append(s.Addresses, adr)
}
Expand Down
188 changes: 188 additions & 0 deletions dataclients/kubernetes/metadataroute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package kubernetes

import (
"encoding/base64"
"encoding/json"
"fmt"
"net"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/predicates"
"github.com/zalando/skipper/routing"

snet "github.com/zalando/skipper/net"
)

type MetadataPreProcessorOptions struct {
EndpointRegistry *routing.EndpointRegistry
}

type metadataPreProcessor struct {
options MetadataPreProcessorOptions
}

type kubeRouteMetadata struct {
Addresses []kubeRouteMetadataAddress `json:"addresses"`
}

type kubeRouteMetadataAddress struct {
Address string `json:"address"`
Zone string `json:"zone"`
NodeName string `json:"nodeName"`
TargetRef *objectReference `json:"targetRef"`
}

// NewMetadataPreProcessor creates pre-processor for metadata route.
func NewMetadataPreProcessor(options MetadataPreProcessorOptions) routing.PreProcessor {
return &metadataPreProcessor{options: options}
}

func (pp *metadataPreProcessor) Do(routes []*eskip.Route) []*eskip.Route {
var metadataRoute *eskip.Route
filtered := make([]*eskip.Route, 0, len(routes))

for _, r := range routes {
if r.Id == MetadataRouteID {
if metadataRoute == nil {
metadataRoute = r
} else {
log.Errorf("Found multiple metadata routes, using the first one")
}
} else {
filtered = append(filtered, r)
}
}

if metadataRoute == nil {
log.Errorf("Metadata route not found")
return routes
}

metadata, err := decodeMetadata(metadataRoute)
if err != nil {
log.Errorf("Failed to decode metadata route: %v", err)
return filtered
}

for _, r := range filtered {
if r.BackendType == eskip.NetworkBackend {
pp.addMetadata(metadata, r.Backend)
} else if r.BackendType == eskip.LBBackend {
for _, ep := range r.LBEndpoints {
pp.addMetadata(metadata, ep)
}
}
}
return filtered
}

// metadataRoute creates a route with [MetadataRouteID] id that matches no requests and
// contains metadata for each endpoint address used by Ingresses and RouteGroups.
func metadataRoute(s *clusterState) *eskip.Route {
var metadata kubeRouteMetadata
for id := range s.cachedEndpoints {
if s.enableEndpointSlices {
if eps, ok := s.endpointSlices[id.ResourceID]; ok {
for _, ep := range eps.Endpoints {
metadata.Addresses = append(metadata.Addresses, kubeRouteMetadataAddress{
Address: ep.Address,
Zone: ep.Zone,
NodeName: ep.NodeName,
TargetRef: ep.TargetRef,
})
}
}
} else {
if ep, ok := s.endpoints[id.ResourceID]; ok {
for _, subset := range ep.Subsets {
for _, addr := range subset.Addresses {
metadata.Addresses = append(metadata.Addresses, kubeRouteMetadataAddress{
Address: addr.IP,
NodeName: addr.NodeName,
TargetRef: addr.TargetRef,
})
}
}
}
}
}

return &eskip.Route{
Id: MetadataRouteID,
Predicates: []*eskip.Predicate{{Name: predicates.FalseName}},
BackendType: eskip.NetworkBackend,
Backend: encodeDataURI(&metadata),
}
}

func decodeMetadata(r *eskip.Route) (map[string]*kubeRouteMetadataAddress, error) {
metadata, err := decodeDataURI(r.Backend)
if err != nil {
return nil, err
}

result := make(map[string]*kubeRouteMetadataAddress, len(metadata.Addresses))
for i := range metadata.Addresses {
addr := &metadata.Addresses[i]
result[addr.Address] = addr
}
return result, nil
}

const dataUriPrefix = "data:application/json;base64,"

// encodeDataURI encodes metadata into data URI.
// See https://datatracker.ietf.org/doc/html/rfc2397
func encodeDataURI(metadata *kubeRouteMetadata) string {
data, _ := json.Marshal(&metadata)

buf := make([]byte, len(dataUriPrefix)+base64.StdEncoding.EncodedLen(len(data)))

copy(buf, dataUriPrefix)
base64.StdEncoding.Encode(buf[len(dataUriPrefix):], data)

return string(buf)
}

// encodeDataURI encodes metadata into data URI.
// See https://datatracker.ietf.org/doc/html/rfc2397
func decodeDataURI(uri string) (*kubeRouteMetadata, error) {
var metadata kubeRouteMetadata

data, err := base64.StdEncoding.DecodeString(uri[len(dataUriPrefix):])
if err != nil {
return nil, fmt.Errorf("failed to decode base64: %w", err)
}

if err := json.Unmarshal(data, &metadata); err != nil {
return nil, fmt.Errorf("failed to decode json: %w", err)
}
return &metadata, nil
}

func (pp *metadataPreProcessor) addMetadata(metadata map[string]*kubeRouteMetadataAddress, endpoint string) {
_, hostPort, err := snet.SchemeHost(endpoint)
if err != nil {
return
}

host, _, _ := net.SplitHostPort(hostPort)
if err != nil {
host = hostPort
}

addr, ok := metadata[host]
if !ok {
return
}

metrics := pp.options.EndpointRegistry.GetMetrics(hostPort)

metrics.SetTag("zone", addr.Zone)
metrics.SetTag("nodeName", addr.NodeName)
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
metrics.SetTag("podName", addr.TargetRef.Name)
metrics.SetTag("namespace", addr.TargetRef.Namespace)
}
}
7 changes: 7 additions & 0 deletions dataclients/kubernetes/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@ type secret struct {
type secretList struct {
Items []*secret `json:"items"`
}

type objectReference struct {
Kind string `json:"kind"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Uid string `json:"uid"`
}
32 changes: 23 additions & 9 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ import (
)

const (
proxyBufferSize = 8192
unknownRouteID = "_unknownroute_"
unknownRouteBackendType = "<unknown>"
unknownRouteBackend = "<unknown>"
proxyBufferSize = 8192
unknownRouteID = "_unknownroute_"
unknownRouteBackend = "<unknown>"
endpointMetricsKey = "proxy:endpointMetricsKey" // TODO: export

// Number of loops allowed by default.
DefaultMaxLoopbacks = 9
Expand Down Expand Up @@ -852,6 +852,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
if endpointMetrics != nil {
endpointMetrics.IncInflightRequest()
defer endpointMetrics.DecInflightRequest()

ctx.stateBag[endpointMetricsKey] = endpointMetrics
}

if p.experimentalUpgrade && isUpgradeRequest(req) {
Expand Down Expand Up @@ -881,6 +883,12 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
setTag(ctx.proxySpan, HTTPUrlTag, u.String())
p.setCommonSpanInfo(u, req, ctx.proxySpan)

if endpointMetrics != nil {
if podName := endpointMetrics.Tag("podName"); podName != "" {
p.tracing.setTag(ctx.proxySpan, ServiceInstanceIdTag, podName)
}
}

carrier := ot.HTTPHeadersCarrier(req.Header)
_ = p.tracing.tracer.Inject(ctx.proxySpan.Context(), ot.HTTPHeaders, carrier)

Expand Down Expand Up @@ -1260,12 +1268,19 @@ func (p *Proxy) errorResponse(ctx *context, err error) {
flowIdLog = fmt.Sprintf(", flow id %s", flowId)
}
id := unknownRouteID
backendType := unknownRouteBackendType
backend := unknownRouteBackend
if ctx.route != nil {
id = ctx.route.Id
backendType = ctx.route.BackendType.String()
backend = fmt.Sprintf("%s://%s", ctx.request.URL.Scheme, ctx.request.URL.Host)

backend = fmt.Sprintf("%s %s://%s", ctx.route.BackendType.String(), ctx.request.URL.Scheme, ctx.request.URL.Host)
if m, ok := ctx.stateBag[endpointMetricsKey].(routing.Metrics); ok {
if podName := m.Tag("podName"); podName != "" {
backend += " " + podName
}
if nodeName := m.Tag("nodeName"); nodeName != "" {
backend += " " + nodeName
}
}
}

if err == errRouteLookupFailed {
Expand Down Expand Up @@ -1308,11 +1323,10 @@ func (p *Proxy) errorResponse(ctx *context, err error) {
uri = uri[:i]
}
logFunc(
`%s after %v, route %s with backend %s %s%s, status code %d: %v, remote host: %s, request: "%s %s %s", host: %s, user agent: "%s"`,
`%s after %v, route %s with backend %s%s, status code %d: %v, remote host: %s, request: "%s %s %s", host: %s, user agent: "%s"`,
msgPrefix,
time.Since(ctx.startServe),
id,
backendType,
backend,
flowIdLog,
ctx.response.StatusCode,
Expand Down
Loading

0 comments on commit d214c11

Please sign in to comment.