Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type NetworkTransformKubeConfig struct {
ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"`
SecondaryNetworks []SecondaryNetwork `yaml:"secondaryNetworks,omitempty" json:"secondaryNetworks,omitempty" doc:"configuration for secondary networks"`
ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn"`
TrackedKinds []string `yaml:"trackedKinds,omitempty" json:"trackedKinds,omitempty" doc:"list of Kubernetes resource kinds to track for ownership chain (e.g., Deployment, Gateway, VirtualMachine). If a resource's owner is in this list, FLP will continue tracking up the ownership chain."`
}

type TransformNetworkOperationEnum string
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/transform/kubernetes/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Datasource struct {
Informers informers.Interface
}

func NewInformerDatasource(kubeconfig string, infConfig informers.Config, opMetrics *operational.Metrics) (*Datasource, error) {
func NewInformerDatasource(kubeconfig string, infConfig *informers.Config, opMetrics *operational.Metrics) (*Datasource, error) {
inf := &informers.Informers{}
if err := inf.InitFromConfig(kubeconfig, infConfig, opMetrics); err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/transform/kubernetes/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ var infConfig informers.Config

// For testing
func MockInformers() {
infConfig = informers.NewConfig(api.NetworkTransformKubeConfig{})
infConfig = informers.NewConfig(&api.NetworkTransformKubeConfig{})
ds = &datasource.Datasource{Informers: informers.NewInformersMock()}
}

func InitInformerDatasource(config api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error {
func InitInformerDatasource(config *api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error {
var err error
infConfig = informers.NewConfig(config)
if ds == nil {
ds, err = datasource.NewInformerDatasource(config.ConfigPath, infConfig, opMetrics)
ds, err = datasource.NewInformerDatasource(config.ConfigPath, &infConfig, opMetrics)
}
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/pipeline/transform/kubernetes/informers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ type Config struct {
secondaryNetworks []api.SecondaryNetwork
hasMultus bool
hasUDN bool
trackedKinds []string
}

func NewConfig(cfg api.NetworkTransformKubeConfig) Config {
func NewConfig(cfg *api.NetworkTransformKubeConfig) Config {
c := Config{
managedCNI: cfg.ManagedCNI,
secondaryNetworks: cfg.SecondaryNetworks,
trackedKinds: cfg.TrackedKinds,
}
if c.managedCNI == nil {
c.managedCNI = []string{api.OVN}
Expand Down
44 changes: 41 additions & 3 deletions pkg/pipeline/transform/kubernetes/informers/informers-mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewInformersMock() *Mock {
return inf
}

func (o *Mock) InitFromConfig(kubeconfig string, infConfig Config, opMetrics *operational.Metrics) error {
func (o *Mock) InitFromConfig(kubeconfig string, infConfig *Config, opMetrics *operational.Metrics) error {
args := o.Called(kubeconfig, infConfig, opMetrics)
return args.Error(0)
}
Expand Down Expand Up @@ -135,6 +135,24 @@ func (m *IndexerMock) MockReplicaSet(name, namespace, ownerName, ownerKind strin
}, true, nil)
}

func (m *IndexerMock) MockDeployment(name, namespace, ownerName, ownerKind string) {
if ownerName == "" {
// No owner
m.On("GetByKey", namespace+"/"+name).Return(&metav1.ObjectMeta{
Name: name,
OwnerReferences: []metav1.OwnerReference{},
}, true, nil)
} else {
m.On("GetByKey", namespace+"/"+name).Return(&metav1.ObjectMeta{
Name: name,
OwnerReferences: []metav1.OwnerReference{{
Kind: ownerKind,
Name: ownerName,
}},
}, true, nil)
}
}

func (m *IndexerMock) FallbackNotFound() {
m.On("ByIndex", IndexIP, mock.Anything).Return([]interface{}{}, nil)
}
Expand Down Expand Up @@ -163,6 +181,26 @@ func SetupIndexerMocks(kd *Informers) (pods, nodes, svc, rs *IndexerMock) {
return
}

func SetupIndexerMocksWithTrackedKinds(kd *Informers, trackedKinds []string) (pods, nodes, svc, rs, deploy *IndexerMock) {
// Setup base informers
pods, nodes, svc, rs = SetupIndexerMocks(kd)

// Setup additional informers based on trackedKinds
for _, kind := range trackedKinds {
switch kind {
case "Deployment", "Gateway":
// Gateway requires Deployment informer, so we initialize it for both
if deploy == nil {
deploy = &IndexerMock{}
dim := InformerMock{}
dim.On("GetIndexer").Return(deploy)
kd.deployments = &dim
}
}
}
return
}

type FakeInformers struct {
Interface
ipInfo map[string]*model.ResourceMetaData
Expand All @@ -171,15 +209,15 @@ type FakeInformers struct {
}

func SetupStubs(ipInfo, customKeysInfo, nodes map[string]*model.ResourceMetaData) (Config, *FakeInformers) {
cfg := NewConfig(api.NetworkTransformKubeConfig{SecondaryNetworks: secondaryNetConfig})
cfg := NewConfig(&api.NetworkTransformKubeConfig{SecondaryNetworks: secondaryNetConfig})
return cfg, &FakeInformers{
ipInfo: ipInfo,
customKeysInfo: customKeysInfo,
nodes: nodes,
}
}

func (f *FakeInformers) InitFromConfig(_ string, _ Config, _ *operational.Metrics) error {
func (f *FakeInformers) InitFromConfig(_ string, _ *Config, _ *operational.Metrics) error {
return nil
}

Expand Down
172 changes: 156 additions & 16 deletions pkg/pipeline/transform/kubernetes/informers/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
type Interface interface {
IndexLookup([]cni.SecondaryNetKey, string) *model.ResourceMetaData
GetNodeByName(string) (*model.ResourceMetaData, error)
InitFromConfig(string, Config, *operational.Metrics) error
InitFromConfig(string, *Config, *operational.Metrics) error
}

type Informers struct {
Expand All @@ -65,7 +65,11 @@ type Informers struct {
nodes cache.SharedIndexInformer
services cache.SharedIndexInformer
// replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers
replicaSets cache.SharedIndexInformer
replicaSets cache.SharedIndexInformer
// New informers for ownership tracking
deployments cache.SharedIndexInformer
// Config and channels
config Config
stopChan chan struct{}
mdStopChan chan struct{}
indexerHitMetric *prometheus.CounterVec
Expand Down Expand Up @@ -178,19 +182,120 @@ func (k *Informers) GetNodeByName(name string) (*model.ResourceMetaData, error)
}

func (k *Informers) checkParent(info *model.ResourceMetaData) {
if info.OwnerKind == "ReplicaSet" {
item, ok, err := k.replicaSets.GetIndexer().GetByKey(info.Namespace + "/" + info.OwnerName)
if err != nil {
log.WithError(err).WithField("key", info.Namespace+"/"+info.OwnerName).
Debug("can't get ReplicaSet info from informer. Ignoring")
} else if ok {
rsInfo := item.(*metav1.ObjectMeta)
if len(rsInfo.OwnerReferences) > 0 {
info.OwnerKind = rsInfo.OwnerReferences[0].Kind
info.OwnerName = rsInfo.OwnerReferences[0].Name
// Maximum 3 ownership hops: Pod → ReplicaSet → Deployment → Gateway
// This allows tracking up to 3 levels beyond the initial resource
const maxHops = 3

// If trackedKinds is empty, use legacy behavior (stop after ReplicaSet resolution)
if len(k.config.trackedKinds) == 0 {
// Legacy behavior: only resolve ReplicaSet
if info.OwnerKind == "ReplicaSet" {
item, ok, err := k.replicaSets.GetIndexer().GetByKey(info.Namespace + "/" + info.OwnerName)
if err != nil {
log.WithError(err).WithField("key", info.Namespace+"/"+info.OwnerName).
Debug("can't get ReplicaSet info from informer. Ignoring")
return
}
if ok {
rsInfo := item.(*metav1.ObjectMeta)
if len(rsInfo.OwnerReferences) > 0 {
info.OwnerKind = rsInfo.OwnerReferences[0].Kind
info.OwnerName = rsInfo.OwnerReferences[0].Name
}
}
}
return
}

// New behavior with trackedKinds: traverse ownership chain until we find a tracked kind or hit max depth
for i := 0; i < maxHops; i++ {
// Check if current owner is in trackedKinds
if k.isTracked(info.OwnerKind) {
// This owner IS tracked. Try to get its parent to see if we can go higher.
parent := k.getOwnerFromInformer(info.OwnerKind, info.Namespace, info.OwnerName)
if parent == nil {
// No parent exists → STOP at current tracked kind
break
}
// Parent exists - check if parent is ALSO tracked
if k.isTracked(parent.Kind) {
// Parent is also tracked → update and continue (prefer higher level)
info.OwnerKind = parent.Kind
info.OwnerName = parent.Name
continue
}
// Parent exists but is NOT tracked → STOP at current tracked kind
break
}

// Current owner is NOT tracked → try to find a tracked parent
parent := k.getOwnerFromInformer(info.OwnerKind, info.Namespace, info.OwnerName)
if parent == nil {
// No parent found → STOP at current (untracked) owner
break
}

// Update to parent and continue
info.OwnerKind = parent.Kind
info.OwnerName = parent.Name
}
}

// isTracked returns true if the given kind is in the trackedKinds configuration
func (k *Informers) isTracked(kind string) bool {
for _, tracked := range k.config.trackedKinds {
if tracked == kind {
return true
}
}
return false
}

// OwnerInfo contains basic ownership information
type OwnerInfo struct {
Kind string
Name string
}

// getOwnerFromInformer retrieves the owner of a resource from the appropriate informer
func (k *Informers) getOwnerFromInformer(kind, namespace, name string) *OwnerInfo {
var informer cache.SharedIndexInformer

switch kind {
case "ReplicaSet":
informer = k.replicaSets
case "Deployment":
informer = k.deployments
default:
return nil
}

if informer == nil {
log.WithField("kind", kind).Debug("informer not initialized for this kind")
return nil
}

item, ok, err := informer.GetIndexer().GetByKey(namespace + "/" + name)
if err != nil {
log.WithError(err).
WithField("kind", kind).
WithField("key", namespace+"/"+name).
Debug("can't get resource info from informer")
return nil
}
if !ok {
return nil
}

meta := item.(*metav1.ObjectMeta)
if len(meta.OwnerReferences) == 0 {
return nil
}

return &OwnerInfo{
Kind: meta.OwnerReferences[0].Kind,
Name: meta.OwnerReferences[0].Name,
}
}

func (k *Informers) getHostName(hostIP string) string {
Expand All @@ -202,7 +307,7 @@ func (k *Informers) getHostName(hostIP string) string {
return ""
}

func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, cfg Config) error {
func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, cfg *Config) error {
nodes := informerFactory.Core().V1().Nodes().Informer()
// Transform any *v1.Node instance into a *Info instance to save space
// in the informer's cache
Expand Down Expand Up @@ -254,7 +359,7 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory,
return nil
}

func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg Config, dynClient *dynamic.DynamicClient) error {
func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg *Config, dynClient *dynamic.DynamicClient) error {
pods := informerFactory.Core().V1().Pods().Informer()
// Transform any *v1.Pod instance into a *Info instance to save space
// in the informer's cache
Expand Down Expand Up @@ -376,8 +481,32 @@ func (k *Informers) initReplicaSetInformer(informerFactory metadatainformer.Shar
return nil
}

func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetrics *operational.Metrics) error {
func (k *Informers) initDeploymentInformer(informerFactory metadatainformer.SharedInformerFactory) error {
k.deployments = informerFactory.ForResource(
schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}).Informer()
if err := k.deployments.SetTransform(func(i interface{}) (interface{}, error) {
deploy, ok := i.(*metav1.PartialObjectMetadata)
if !ok {
return nil, fmt.Errorf("was expecting a Deployment. Got: %T", i)
}
return &metav1.ObjectMeta{
Name: deploy.Name,
Namespace: deploy.Namespace,
OwnerReferences: deploy.OwnerReferences,
}, nil
}); err != nil {
return fmt.Errorf("can't set Deployments transform: %w", err)
}
return nil
}

func (k *Informers) InitFromConfig(kubeconfig string, infConfig *Config, opMetrics *operational.Metrics) error {
// Initialization variables
k.config = *infConfig
k.stopChan = make(chan struct{})
k.mdStopChan = make(chan struct{})

Expand Down Expand Up @@ -410,7 +539,7 @@ func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetric
return nil
}

func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, dynClient *dynamic.DynamicClient, cfg Config) error {
func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, dynClient *dynamic.DynamicClient, cfg *Config) error {
informerFactory := inf.NewSharedInformerFactory(client, syncTime)
metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime)
err := k.initNodeInformer(informerFactory, cfg)
Expand All @@ -430,6 +559,17 @@ func (k *Informers) initInformers(client kubernetes.Interface, metaClient metada
return err
}

// Initialize additional informers based on trackedKinds configuration
for _, kind := range cfg.trackedKinds {
if kind == "Deployment" {
// Gateway requires Deployment informer to navigate ownership chain
log.Debugf("initializing Deployment informer (trackedKinds)")
if err := k.initDeploymentInformer(metadataInformerFactory); err != nil {
return err
}
}
}

// Informers expose an indexer
log.Debugf("adding indexers")
byIP := cache.Indexers{IndexIP: ipIndexer}
Expand Down
Loading