Skip to content

Commit

Permalink
Merge pull request #90 from anchore/next
Browse files Browse the repository at this point in the history
feat: add node metadata collection
  • Loading branch information
bradleyjones authored May 2, 2023
2 parents 71bdeae + dfa6f29 commit 8dc31ae
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 11 deletions.
55 changes: 55 additions & 0 deletions pkg/inventory/nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package inventory

import (
"context"
"fmt"

"github.com/anchore/k8s-inventory/internal/log"
"github.com/anchore/k8s-inventory/pkg/client"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func FetchNodes(c client.Client, batchSize, timeout int64) (map[string]Node, error) {
nodes := make(map[string]Node)

cont := ""
for {
opts := metav1.ListOptions{
Limit: batchSize,
Continue: cont,
TimeoutSeconds: &timeout,
}

list, err := c.Clientset.CoreV1().Nodes().List(context.Background(), opts)
if err != nil {
if k8sErrors.IsForbidden(err) {
log.Warnf("failed to list nodes: %w", err)
return nil, nil
}
return nil, fmt.Errorf("failed to list nodes: %w", err)
}

for _, n := range list.Items {
nodes[n.ObjectMeta.Name] = Node{
Name: n.ObjectMeta.Name,
UID: string(n.UID),
Annotations: n.Annotations,
Arch: n.Status.NodeInfo.Architecture,
ContainerRuntimeVersion: n.Status.NodeInfo.ContainerRuntimeVersion,
KernelVersion: n.Status.NodeInfo.KernelVersion,
KubeProxyVersion: n.Status.NodeInfo.KubeProxyVersion,
KubeletVersion: n.Status.NodeInfo.KubeletVersion,
Labels: n.Labels,
OperatingSystem: n.Status.NodeInfo.OperatingSystem,
}
}

cont = list.GetListMeta().GetContinue()
if cont == "" {
break
}
}

return nodes, nil
}
84 changes: 84 additions & 0 deletions pkg/inventory/nodes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package inventory

import (
"testing"

"github.com/anchore/k8s-inventory/pkg/client"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)

func TestFetchNodes(t *testing.T) {
type args struct {
c client.Client
batchSize int64
timeout int64
}
tests := []struct {
name string
args args
want map[string]Node
wantErr bool
}{
{
name: "successfully returns nodes",
args: args{
c: client.Client{
Clientset: fake.NewSimpleClientset(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
UID: "test-uid",
Annotations: map[string]string{
"test-annotation": "test-value",
},
Labels: map[string]string{
"test-label": "test-value",
},
},
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
Architecture: "arm64",
ContainerRuntimeVersion: "docker://20.10.23",
KernelVersion: "5.15.49-linuxkit",
KubeProxyVersion: "v1.26.1",
KubeletVersion: "v1.26.1",
OperatingSystem: "linux",
},
},
}),
},
batchSize: 100,
timeout: 100,
},
want: map[string]Node{
"test-node": {
Name: "test-node",
UID: "test-uid",
Annotations: map[string]string{
"test-annotation": "test-value",
},
Labels: map[string]string{
"test-label": "test-value",
},
Arch: "arm64",
ContainerRuntimeVersion: "docker://20.10.23",
KernelVersion: "5.15.49-linuxkit",
KubeProxyVersion: "v1.26.1",
KubeletVersion: "v1.26.1",
OperatingSystem: "linux",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := FetchNodes(tt.args.c, tt.args.batchSize, tt.args.timeout)
if (err != nil) != tt.wantErr {
assert.Error(t, err)
}
assert.Equal(t, tt.want, got)
})
}
}
12 changes: 8 additions & 4 deletions pkg/inventory/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,23 @@ func FetchPodsInNamespace(c client.Client, batchSize, timeout int64, namespace s
return podList, nil
}

func ProcessPods(pods []v1.Pod, namespaceUID string, metadata bool) []Pod {
func ProcessPods(pods []v1.Pod, namespaceUID string, metadata bool, nodes map[string]Node) []Pod {
var podList []Pod

for _, p := range pods {
if metadata {
podList = append(podList, Pod{
pod := Pod{
Name: p.ObjectMeta.Name,
UID: string(p.UID),
Annotations: p.Annotations,
Labels: p.Labels,
NamespaceUID: namespaceUID,
// TODO NodeUID
})
}
node, ok := nodes[p.Spec.NodeName]
if ok {
pod.NodeUID = node.UID
}
podList = append(podList, pod)
} else {
podList = append(podList, Pod{
Name: p.ObjectMeta.Name,
Expand Down
16 changes: 15 additions & 1 deletion pkg/inventory/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestProcessPods(t *testing.T) {
pods []v1.Pod
namespaceUID string
metadata bool
nodes map[string]Node
}
tests := []struct {
name string
Expand All @@ -98,10 +99,19 @@ func TestProcessPods(t *testing.T) {
},
Namespace: "test-namespace",
},
Spec: v1.PodSpec{
NodeName: "test-node",
},
},
},
namespaceUID: "namespace-uid-0000",
metadata: true,
nodes: map[string]Node{
"test-node": {
Name: "test-node",
UID: "test-node-uid",
},
},
},
want: []Pod{
{
Expand All @@ -114,6 +124,7 @@ func TestProcessPods(t *testing.T) {
"test-label": "test-value",
},
NamespaceUID: "namespace-uid-0000",
NodeUID: "test-node-uid",
},
},
},
Expand All @@ -133,6 +144,9 @@ func TestProcessPods(t *testing.T) {
},
Namespace: "test-namespace",
},
Spec: v1.PodSpec{
NodeName: "test-node",
},
},
},
namespaceUID: "namespace-uid-0000",
Expand All @@ -149,7 +163,7 @@ func TestProcessPods(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := ProcessPods(tt.args.pods, tt.args.namespaceUID, tt.args.metadata)
got := ProcessPods(tt.args.pods, tt.args.namespaceUID, tt.args.metadata, tt.args.nodes)
assert.Equal(t, tt.want, got)
})
}
Expand Down
1 change: 0 additions & 1 deletion pkg/inventory/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type Node struct {
KernelVersion string `json:"kernel_version,omitempty"`
KubeProxyVersion string `json:"kube_proxy_version,omitempty"`
KubeletVersion string `json:"kubelet_version,omitempty"`
KubernetesVersion string `json:"kubernetes_version,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Name string `json:"name"`
OperatingSystem string `json:"operating_system,omitempty"`
Expand Down
52 changes: 47 additions & 5 deletions pkg/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ func PeriodicallyGetInventoryReport(cfg *config.Application) {

// launchWorkerPool will create a worker pool of goroutines to grab pods/containers
// from each namespace. This should alleviate the load on the api server
func launchWorkerPool(cfg *config.Application, kubeconfig *rest.Config, ch channels, queue chan inventory.Namespace) {
func launchWorkerPool(
cfg *config.Application,
kubeconfig *rest.Config,
ch channels,
queue chan inventory.Namespace,
nodes map[string]inventory.Node,
) {
for i := 0; i < cfg.Kubernetes.WorkerPoolSize; i++ {
go func() {
// each worker needs its own clientset
Expand All @@ -100,14 +106,16 @@ func launchWorkerPool(cfg *config.Application, kubeconfig *rest.Config, ch chann
case <-ch.stopper:
return
default:
processNamespace(clientset, cfg, namespace, ch)
processNamespace(clientset, cfg, namespace, ch, nodes)
}
}
}()
}
}

// GetInventoryReport is an atomic method for getting in-use image results, in parallel for multiple namespaces
//
//nolint:funlen
func GetInventoryReport(cfg *config.Application) (inventory.Report, error) {
log.Info("Starting image inventory collection")

Expand Down Expand Up @@ -143,7 +151,19 @@ func GetInventoryReport(cfg *config.Application) (inventory.Report, error) {
}
close(queue)

launchWorkerPool(cfg, kubeconfig, ch, queue) // get pods/containers from namespaces using a worker pool pattern
var nodeMap map[string]inventory.Node
if cfg.Metadata {
nodeMap, err = inventory.FetchNodes(
client,
cfg.Kubernetes.RequestBatchSize,
cfg.Kubernetes.RequestTimeoutSeconds,
)
if err != nil {
return inventory.Report{}, err
}
}

launchWorkerPool(cfg, kubeconfig, ch, queue, nodeMap) // get pods/containers from namespaces using a worker pool pattern

results := make([]ReportItem, 0)
pods := make([]inventory.Pod, 0)
Expand All @@ -170,7 +190,23 @@ func GetInventoryReport(cfg *config.Application) (inventory.Report, error) {
return inventory.Report{}, fmt.Errorf("failed to get Cluster Server Version: %w", err)
}

var nodes []inventory.Node
for _, node := range nodeMap {
nodes = append(nodes, node)
}

log.Infof("Got Inventory Report with %d containers running across %d namespaces", len(containers), len(namespaces))
if cfg.Metadata {
return inventory.Report{
Timestamp: time.Now().UTC().Format(time.RFC3339),
Containers: containers,
Pods: pods,
Namespaces: namespaces,
Nodes: nodes,
ServerVersionMetadata: serverVersion,
ClusterName: cfg.KubeConfig.Cluster,
}, nil
}
return inventory.Report{
Timestamp: time.Now().UTC().Format(time.RFC3339),
Containers: containers,
Expand All @@ -181,7 +217,13 @@ func GetInventoryReport(cfg *config.Application) (inventory.Report, error) {
}, nil
}

func processNamespace(clientset *kubernetes.Clientset, cfg *config.Application, ns inventory.Namespace, ch channels) {
func processNamespace(
clientset *kubernetes.Clientset,
cfg *config.Application,
ns inventory.Namespace,
ch channels,
nodes map[string]inventory.Node,
) {
v1pods, err := inventory.FetchPodsInNamespace(
client.Client{Clientset: clientset},
cfg.Kubernetes.RequestBatchSize,
Expand All @@ -193,7 +235,7 @@ func processNamespace(clientset *kubernetes.Clientset, cfg *config.Application,
return
}

pods := inventory.ProcessPods(v1pods, ns.UID, cfg.Metadata)
pods := inventory.ProcessPods(v1pods, ns.UID, cfg.Metadata, nodes)
containers := inventory.GetContainersFromPods(
v1pods,
cfg.IgnoreNotRunning,
Expand Down

0 comments on commit 8dc31ae

Please sign in to comment.