From 4de9f1a6c2bf6bbdf287f9942b2dfa76a4309122 Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Thu, 11 Jan 2024 15:28:29 +0100 Subject: [PATCH] dataclients/kubernetes: add metadata route Adds a special route with a predefined id `kube__metadata` that contains `False` predicate, no filters and endpoints metadata. Endpoints metadata is encoded as JSON via [data URI scheme](https://en.wikipedia.org/wiki/Data_URI_scheme) into the route backend address field. Example eskip: ``` kube__metadata: False() -> "data:application/json;base64,eyJhZGRyZXNzZXMiO..."; ... ``` A special route pre-processor detects and removes this route, decodes metadata and stores it as EndpointRegistry metrics. The endpoint metrics then used to obtain endpoint node and pod names for tracing and logging. TODO: - [ ] add flag to enable metadata route - [ ] tests Fixes #1559 Signed-off-by: Alexander Yastrebov --- dataclients/kubernetes/clusterclient.go | 13 +- dataclients/kubernetes/clusterstate_test.go | 22 +-- dataclients/kubernetes/endpoints.go | 5 +- dataclients/kubernetes/endpointslices.go | 10 +- dataclients/kubernetes/kube.go | 10 +- dataclients/kubernetes/kube_test.go | 4 +- dataclients/kubernetes/metadataroute.go | 187 ++++++++++++++++++++ dataclients/kubernetes/resources.go | 14 ++ proxy/proxy.go | 32 +++- proxy/tracing.go | 1 + routing/endpointregistry.go | 18 +- skipper.go | 5 + 12 files changed, 284 insertions(+), 37 deletions(-) create mode 100644 dataclients/kubernetes/metadataroute.go diff --git a/dataclients/kubernetes/clusterclient.go b/dataclients/kubernetes/clusterclient.go index f476f7ec26..5ca1e3a6cb 100644 --- a/dataclients/kubernetes/clusterclient.go +++ b/dataclients/kubernetes/clusterclient.go @@ -519,16 +519,13 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe // we should delete it, because of eventual consistency // it is actually terminating delete(resEps, address) - } else if ep.Conditions == nil { + } else if ep.Conditions == nil || ep.isReady() { // if conditions are nil then we need to treat is as ready resEps[address] = &skipperEndpoint{ - Address: address, - Zone: ep.Zone, - } - } else if ep.isReady() { - resEps[address] = &skipperEndpoint{ - Address: address, - Zone: ep.Zone, + Address: address, + Zone: ep.Zone, + NodeName: ep.NodeName, + TargetRef: ep.TargetRef, } } } diff --git a/dataclients/kubernetes/clusterstate_test.go b/dataclients/kubernetes/clusterstate_test.go index f7c55e96a8..69cc8bae34 100644 --- a/dataclients/kubernetes/clusterstate_test.go +++ b/dataclients/kubernetes/clusterstate_test.go @@ -23,17 +23,17 @@ func benchmarkCachedEndpoints(b *testing.B, n int) { Subsets: []*subset{ { Addresses: []*address{ - {"192.168.0.1", "node1"}, - {"192.168.0.2", "node2"}, - {"192.168.0.3", "node3"}, - {"192.168.0.4", "node4"}, - {"192.168.0.5", "node5"}, - {"192.168.0.6", "node6"}, - {"192.168.0.7", "node7"}, - {"192.168.0.8", "node8"}, - {"192.168.0.9", "node9"}, - {"192.168.0.10", "node10"}, - {"192.168.0.11", "node11"}, + {IP: "192.168.0.1", NodeName: "node1"}, + {IP: "192.168.0.2", NodeName: "node2"}, + {IP: "192.168.0.3", NodeName: "node3"}, + {IP: "192.168.0.4", NodeName: "node4"}, + {IP: "192.168.0.5", NodeName: "node5"}, + {IP: "192.168.0.6", NodeName: "node6"}, + {IP: "192.168.0.7", NodeName: "node7"}, + {IP: "192.168.0.8", NodeName: "node8"}, + {IP: "192.168.0.9", NodeName: "node9"}, + {IP: "192.168.0.10", NodeName: "node10"}, + {IP: "192.168.0.11", NodeName: "node11"}, }, Ports: []*port{ {"ssh", 22, "TCP"}, diff --git a/dataclients/kubernetes/endpoints.go b/dataclients/kubernetes/endpoints.go index b293b41fed..ed64870b5f 100644 --- a/dataclients/kubernetes/endpoints.go +++ b/dataclients/kubernetes/endpoints.go @@ -98,8 +98,9 @@ type subset struct { } type address struct { - IP string `json:"ip"` - Node string `json:"nodeName"` + IP string `json:"ip"` + NodeName string `json:"nodeName"` + TargetRef *objectReference `json:"targetRef"` } type port struct { diff --git a/dataclients/kubernetes/endpointslices.go b/dataclients/kubernetes/endpointslices.go index 18e460fb87..6e2d41c2af 100644 --- a/dataclients/kubernetes/endpointslices.go +++ b/dataclients/kubernetes/endpointslices.go @@ -16,8 +16,10 @@ type skipperEndpointSlice struct { // Conditions have to be evaluated before creation type skipperEndpoint struct { - Address string - Zone string + Address string + Zone string + NodeName string + TargetRef *objectReference } func (eps *skipperEndpointSlice) getPort(protocol, pName string, pValue int) int { @@ -125,6 +127,10 @@ type EndpointSliceEndpoints struct { // https://kubernetes.io/docs/concepts/services-networking/topology-aware-routing/#safeguards // Zone aware routing will be available if https://github.com/zalando/skipper/issues/1446 is closed. Zone string `json:"zone"` // "eu-central-1c" + // Node hosting this endpoint. This can be used to determine endpoints local to a node. + NodeName string `json:"nodeName"` + // TargetRef is a reference to a Kubernetes object that represents this endpoint. + TargetRef *objectReference `json:"targetRef"` } type endpointsliceCondition struct { diff --git a/dataclients/kubernetes/kube.go b/dataclients/kubernetes/kube.go index 3500c71de0..0e3db7e16d 100644 --- a/dataclients/kubernetes/kube.go +++ b/dataclients/kubernetes/kube.go @@ -18,7 +18,11 @@ import ( "github.com/zalando/skipper/secrets/certregistry" ) -const DefaultLoadBalancerAlgorithm = "roundRobin" +const ( + DefaultLoadBalancerAlgorithm = "roundRobin" + MetadataRouteID = "kube__metadata" + EnableMetadataRoute = true // TODO: flag +) const ( defaultIngressClass = "skipper" @@ -437,6 +441,10 @@ func (c *Client) loadAndConvert() ([]*eskip.Route, error) { r = append(r, globalRedirectRoute(c.httpsRedirectCode)) } + if EnableMetadataRoute { + r = append(r, metadataRoute(state)) + } + return r, nil } diff --git a/dataclients/kubernetes/kube_test.go b/dataclients/kubernetes/kube_test.go index c11a8d15a2..4aba035448 100644 --- a/dataclients/kubernetes/kube_test.go +++ b/dataclients/kubernetes/kube_test.go @@ -86,8 +86,8 @@ func testEndpoints(namespace, name string, labels map[string]string, base string for i := 0; i < n; i++ { adr := &address{ - IP: fmt.Sprintf("%s.%d", base, i), - Node: fmt.Sprintf("node-%d", i), + IP: fmt.Sprintf("%s.%d", base, i), + NodeName: fmt.Sprintf("node-%d", i), } s.Addresses = append(s.Addresses, adr) } diff --git a/dataclients/kubernetes/metadataroute.go b/dataclients/kubernetes/metadataroute.go new file mode 100644 index 0000000000..cf2f0f0eb4 --- /dev/null +++ b/dataclients/kubernetes/metadataroute.go @@ -0,0 +1,187 @@ +package kubernetes + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "net" + + log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/predicates" + "github.com/zalando/skipper/routing" + + snet "github.com/zalando/skipper/net" +) + +type MetadataPreProcessorOptions struct { + EndpointRegistry *routing.EndpointRegistry +} + +type metadataPreProcessor struct { + options MetadataPreProcessorOptions +} + +type kubeRouteMetadata struct { + Addresses map[string]*kubeRouteMetadataAddress `json:"addresses"` +} + +type kubeRouteMetadataAddress struct { + Zone string `json:"zone,omitempty"` + NodeName string `json:"nodeName,omitempty"` + PodName string `json:"podName,omitempty"` +} + +// NewMetadataPreProcessor creates pre-processor for metadata route. +func NewMetadataPreProcessor(options MetadataPreProcessorOptions) routing.PreProcessor { + return &metadataPreProcessor{options: options} +} + +func (pp *metadataPreProcessor) Do(routes []*eskip.Route) []*eskip.Route { + var metadataRoute *eskip.Route + filtered := make([]*eskip.Route, 0, len(routes)) + + for _, r := range routes { + if r.Id == MetadataRouteID { + if metadataRoute == nil { + metadataRoute = r + } else { + log.Errorf("Found multiple metadata routes, using the first one") + } + } else { + filtered = append(filtered, r) + } + } + + if metadataRoute == nil { + log.Errorf("Metadata route not found") + return routes + } + + metadata, err := decodeMetadata(metadataRoute) + if err != nil { + log.Errorf("Failed to decode metadata route: %v", err) + return filtered + } + + for _, r := range filtered { + if r.BackendType == eskip.NetworkBackend { + pp.addMetadata(metadata, r.Backend) + } else if r.BackendType == eskip.LBBackend { + for _, ep := range r.LBEndpoints { + pp.addMetadata(metadata, ep) + } + } + } + return filtered +} + +// metadataRoute creates a route with [MetadataRouteID] id that matches no requests and +// contains metadata for each endpoint address used by Ingresses and RouteGroups. +func metadataRoute(s *clusterState) *eskip.Route { + metadata := kubeRouteMetadata{ + Addresses: make(map[string]*kubeRouteMetadataAddress), + } + + for id := range s.cachedEndpoints { + if s.enableEndpointSlices { + if eps, ok := s.endpointSlices[id.ResourceID]; ok { + for _, ep := range eps.Endpoints { + metadata.Addresses[ep.Address] = &kubeRouteMetadataAddress{ + Zone: ep.Zone, + NodeName: ep.NodeName, + PodName: ep.TargetRef.getPodName(), + } + } + } + } else { + if ep, ok := s.endpoints[id.ResourceID]; ok { + for _, subset := range ep.Subsets { + for _, addr := range subset.Addresses { + metadata.Addresses[addr.IP] = &kubeRouteMetadataAddress{ + // Endpoints do not provide zone + NodeName: addr.NodeName, + PodName: addr.TargetRef.getPodName(), + } + } + } + } + } + } + + return &eskip.Route{ + Id: MetadataRouteID, + Predicates: []*eskip.Predicate{{Name: predicates.FalseName}}, + BackendType: eskip.NetworkBackend, + Backend: encodeDataURI(&metadata), + } +} + +func decodeMetadata(r *eskip.Route) (map[string]*kubeRouteMetadataAddress, error) { + metadata, err := decodeDataURI(r.Backend) + if err != nil { + return nil, err + } + return metadata.Addresses, nil +} + +const dataUriPrefix = "data:application/json;base64," + +// encodeDataURI encodes metadata into data URI. +// Note that map keys are sorted and used as JSON object keys +// therefore encodeDataURI produces the same output for the same input. +// See https://datatracker.ietf.org/doc/html/rfc2397 +func encodeDataURI(metadata *kubeRouteMetadata) string { + data, _ := json.Marshal(&metadata) + + buf := make([]byte, len(dataUriPrefix)+base64.StdEncoding.EncodedLen(len(data))) + + copy(buf, dataUriPrefix) + base64.StdEncoding.Encode(buf[len(dataUriPrefix):], data) + + return string(buf) +} + +// encodeDataURI encodes metadata into data URI. +// See https://datatracker.ietf.org/doc/html/rfc2397 +func decodeDataURI(uri string) (*kubeRouteMetadata, error) { + var metadata kubeRouteMetadata + + data, err := base64.StdEncoding.DecodeString(uri[len(dataUriPrefix):]) + if err != nil { + return nil, fmt.Errorf("failed to decode base64: %w", err) + } + + if err := json.Unmarshal(data, &metadata); err != nil { + return nil, fmt.Errorf("failed to decode json: %w", err) + } + return &metadata, nil +} + +func (pp *metadataPreProcessor) addMetadata(metadata map[string]*kubeRouteMetadataAddress, endpoint string) { + _, hostPort, err := snet.SchemeHost(endpoint) + if err != nil { + return + } + + host, _, _ := net.SplitHostPort(hostPort) + if err != nil { + host = hostPort + } + + addr, ok := metadata[host] + if !ok { + return + } + + metrics := pp.options.EndpointRegistry.GetMetrics(hostPort) + setTag := func(name, value string) { + if value != "" { + metrics.SetTag(name, value) + } + } + + setTag("zone", addr.Zone) + setTag("nodeName", addr.NodeName) + setTag("podName", addr.PodName) +} diff --git a/dataclients/kubernetes/resources.go b/dataclients/kubernetes/resources.go index 3f35b27885..d509c2d5e7 100644 --- a/dataclients/kubernetes/resources.go +++ b/dataclients/kubernetes/resources.go @@ -24,3 +24,17 @@ type secret struct { type secretList struct { Items []*secret `json:"items"` } + +type objectReference struct { + Kind string `json:"kind"` + Name string `json:"name"` + Namespace string `json:"namespace"` + Uid string `json:"uid"` +} + +func (r *objectReference) getPodName() string { + if r != nil && r.Kind == "Pod" { + return r.Name + } + return "" +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 45b19812be..0db7cf7045 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -45,10 +45,10 @@ import ( ) const ( - proxyBufferSize = 8192 - unknownRouteID = "_unknownroute_" - unknownRouteBackendType = "" - unknownRouteBackend = "" + proxyBufferSize = 8192 + unknownRouteID = "_unknownroute_" + unknownRouteBackend = "" + endpointMetricsKey = "proxy:endpointMetricsKey" // export for filters? // Number of loops allowed by default. DefaultMaxLoopbacks = 9 @@ -852,6 +852,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co if endpointMetrics != nil { endpointMetrics.IncInflightRequest() defer endpointMetrics.DecInflightRequest() + + ctx.stateBag[endpointMetricsKey] = endpointMetrics } if p.experimentalUpgrade && isUpgradeRequest(req) { @@ -881,6 +883,12 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co setTag(ctx.proxySpan, HTTPUrlTag, u.String()) p.setCommonSpanInfo(u, req, ctx.proxySpan) + if endpointMetrics != nil { + if podName := endpointMetrics.Tag("podName"); podName != "" { + p.tracing.setTag(ctx.proxySpan, ServiceInstanceIdTag, podName) + } + } + carrier := ot.HTTPHeadersCarrier(req.Header) _ = p.tracing.tracer.Inject(ctx.proxySpan.Context(), ot.HTTPHeaders, carrier) @@ -1260,12 +1268,19 @@ func (p *Proxy) errorResponse(ctx *context, err error) { flowIdLog = fmt.Sprintf(", flow id %s", flowId) } id := unknownRouteID - backendType := unknownRouteBackendType backend := unknownRouteBackend if ctx.route != nil { id = ctx.route.Id - backendType = ctx.route.BackendType.String() - backend = fmt.Sprintf("%s://%s", ctx.request.URL.Scheme, ctx.request.URL.Host) + + backend = fmt.Sprintf("%s %s://%s", ctx.route.BackendType.String(), ctx.request.URL.Scheme, ctx.request.URL.Host) + if m, ok := ctx.stateBag[endpointMetricsKey].(routing.Metrics); ok { + if podName := m.Tag("podName"); podName != "" { + backend += " " + podName + } + if nodeName := m.Tag("nodeName"); nodeName != "" { + backend += " " + nodeName + } + } } if err == errRouteLookupFailed { @@ -1308,11 +1323,10 @@ func (p *Proxy) errorResponse(ctx *context, err error) { uri = uri[:i] } logFunc( - `%s after %v, route %s with backend %s %s%s, status code %d: %v, remote host: %s, request: "%s %s %s", host: %s, user agent: "%s"`, + `%s after %v, route %s with backend %s%s, status code %d: %v, remote host: %s, request: "%s %s %s", host: %s, user agent: "%s"`, msgPrefix, time.Since(ctx.startServe), id, - backendType, backend, flowIdLog, ctx.response.StatusCode, diff --git a/proxy/tracing.go b/proxy/tracing.go index 66b481c8f3..cb610733fb 100644 --- a/proxy/tracing.go +++ b/proxy/tracing.go @@ -21,6 +21,7 @@ const ( HTTPStatusCodeTag = "http.status_code" SkipperRouteIDTag = "skipper.route_id" SpanKindTag = "span.kind" + ServiceInstanceIdTag = "service.instance.id" ClientRequestCanceled = "canceled" SpanKindClient = "client" diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index f2f5149ac6..7763d96c35 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -22,12 +22,16 @@ type Metrics interface { InflightRequests() int64 IncInflightRequest() DecInflightRequest() + + SetTag(name, value string) + Tag(name string) string } type entry struct { detected atomic.Value // time.Time lastSeen atomic.Value // time.Time inflightRequests atomic.Int64 + tags sync.Map // map[string]string } var _ Metrics = &entry{} @@ -60,6 +64,17 @@ func (e *entry) SetLastSeen(ts time.Time) { e.lastSeen.Store(ts) } +func (e *entry) SetTag(name string, value string) { + e.tags.Store(name, value) +} + +func (e *entry) Tag(name string) string { + if value, ok := e.tags.Load(name); ok { + return value.(string) + } + return "" +} + func newEntry() *entry { result := &entry{} result.SetDetected(time.Time{}) @@ -90,10 +105,9 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { if epi.Metrics.DetectedTime().IsZero() { epi.Metrics.SetDetected(now) } - epi.Metrics.SetLastSeen(now) } - } else { + } else if route.BackendType == eskip.NetworkBackend { entry := r.GetMetrics(route.Host) if entry.DetectedTime().IsZero() { entry.SetDetected(now) diff --git a/skipper.go b/skipper.go index 3a6dc0344c..0c4a164cab 100644 --- a/skipper.go +++ b/skipper.go @@ -1922,6 +1922,11 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { ro.PostProcessors = append(ro.PostProcessors, failClosedRatelimitPostProcessor) } + if kubernetes.EnableMetadataRoute { + opts := kubernetes.MetadataPreProcessorOptions{EndpointRegistry: endpointRegistry} + ro.PreProcessors = append(ro.PreProcessors, kubernetes.NewMetadataPreProcessor(opts)) + } + if o.DefaultFilters != nil { ro.PreProcessors = append(ro.PreProcessors, o.DefaultFilters) }