Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New components remote.kubernetes.secret and remote.kubernetes.configmap #4854

Merged
merged 40 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7811445
initial work
captncraig Aug 10, 2023
2ddf059
Merge branch 'main' into cmp_remote_k8s
captncraig Aug 17, 2023
da27c63
move to subdir
captncraig Aug 17, 2023
07c0c9d
cleanup
captncraig Aug 17, 2023
48588df
add docs
captncraig Aug 17, 2023
36d13b7
add all files
captncraig Aug 17, 2023
6bbcc7b
lint
captncraig Aug 17, 2023
13a25b8
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Aug 28, 2023
cab2dfa
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Aug 28, 2023
889c17f
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Aug 28, 2023
8c3dbc5
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Aug 28, 2023
183359e
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Aug 28, 2023
854c260
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Aug 28, 2023
1f15a59
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Aug 28, 2023
b13bc6a
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Aug 28, 2023
c15604a
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Aug 28, 2023
7cf1e78
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Aug 28, 2023
6812df9
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Aug 28, 2023
6a2400a
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Aug 28, 2023
6620e9b
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Aug 28, 2023
ba13c0f
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Aug 28, 2023
8f65945
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Aug 28, 2023
c7e7fa8
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Aug 28, 2023
28adfa1
Merge branch 'main' into cmp_remote_k8s
captncraig Sep 8, 2023
3fa70c4
Merge branch 'cmp_remote_k8s' of github.com:grafana/agent into cmp_re…
captncraig Sep 8, 2023
908606e
switch to correct package after river move
captncraig Sep 8, 2023
c91f4aa
changelog
captncraig Sep 8, 2023
1a972a9
Merge branch 'main' into cmp_remote_k8s
captncraig Sep 14, 2023
7458ded
Merge branch 'main' into cmp_remote_k8s
captncraig Sep 14, 2023
f7e3cde
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Sep 14, 2023
e0b7562
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Sep 14, 2023
bbe8651
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Sep 14, 2023
4a1a9bb
Update component/remote/kubernetes/kubernetes.go
captncraig Sep 14, 2023
f8a91bf
Update docs/sources/flow/reference/components/remote.kubernetes.confi…
captncraig Sep 14, 2023
9e1cb10
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Sep 14, 2023
31b92b4
Update component/remote/kubernetes/kubernetes.go
captncraig Sep 14, 2023
1cac49b
Update docs/sources/flow/reference/components/remote.kubernetes.secre…
captncraig Sep 14, 2023
9834560
add test, make timeout optional, and enforced
captncraig Sep 14, 2023
ecd414a
require positive timeout
captncraig Sep 14, 2023
f27bcf4
newline
captncraig Sep 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Main (unreleased)
- `discovery.serverset` discovers Serversets stored in Zookeeper. (@thampiotr)
- `discovery.scaleway` discovers scrape targets from Scaleway virtual
instances and bare-metal machines. (@rfratto)
- `remote.kubernetes.configmap` loads a configmap's data for use in other components (@captncraig)
- `remote.kubernetes.secret` loads a secret's data for use in other components (@captncraig)

- Flow: allow the HTTP server to be configured with TLS in the config file
using the new `http` config block. (@rfratto)
Expand Down
2 changes: 2 additions & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ import (
_ "github.com/grafana/agent/component/pyroscope/scrape" // Import pyroscope.scrape
_ "github.com/grafana/agent/component/pyroscope/write" // Import pyroscope.write
_ "github.com/grafana/agent/component/remote/http" // Import remote.http
_ "github.com/grafana/agent/component/remote/kubernetes/configmap" // Import remote.kubernetes.configmap
_ "github.com/grafana/agent/component/remote/kubernetes/secret" // Import remote.kubernetes.secret
_ "github.com/grafana/agent/component/remote/s3" // Import remote.s3
_ "github.com/grafana/agent/component/remote/vault" // Import remote.vault
)
17 changes: 17 additions & 0 deletions component/remote/kubernetes/configmap/configmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package configmap

import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/remote/kubernetes"
)

func init() {
component.Register(component.Registration{
Name: "remote.kubernetes.configmap",
Args: kubernetes.Arguments{},
Exports: kubernetes.Exports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return kubernetes.New(opts, args.(kubernetes.Arguments), kubernetes.TypeConfigMap)
},
})
}
251 changes: 251 additions & 0 deletions component/remote/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Package kubernetes implements the logic for remote.kubernetes.secret and remote.kubernetes.configmap component.
package kubernetes

import (
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/go-kit/log"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/kubernetes"
"github.com/grafana/river/rivertypes"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
client_go "k8s.io/client-go/kubernetes"
)

type ResourceType string

const (
TypeSecret ResourceType = "secret"
TypeConfigMap ResourceType = "configmap"
)

// Arguments control the component.
type Arguments struct {
Namespace string `river:"namespace,attr"`
Name string `river:"name,attr"`
PollFrequency time.Duration `river:"poll_frequency,attr,optional"`
PollTimeout time.Duration `river:"poll_timeout,attr,optional"`

// Client settings to connect to Kubernetes.
Client kubernetes.ClientArguments `river:"client,block,optional"`
}

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
PollFrequency: 1 * time.Minute,
PollTimeout: 15 * time.Second,
}

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
if args.PollFrequency <= 0 {
return fmt.Errorf("poll_frequency must be greater than 0")
}
if args.PollTimeout <= 0 {
return fmt.Errorf("poll_timeout must not be greater than 0")
}
return nil
}

// Exports holds settings exported by this component.
type Exports struct {
Data map[string]rivertypes.OptionalSecret `river:"data,attr"`
}

// Component implements the remote.kubernetes.* component.
type Component struct {
log log.Logger
opts component.Options

mut sync.Mutex
args Arguments

client *client_go.Clientset
kind ResourceType

lastPoll time.Time
lastExports Exports // Used for determining whether exports should be updated

healthMut sync.RWMutex
health component.Health
}

var (
_ component.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
)

// New returns a new, unstarted remote.kubernetes.* component.
func New(opts component.Options, args Arguments, rType ResourceType) (*Component, error) {
c := &Component{
log: opts.Logger,
opts: opts,

kind: rType,
health: component.Health{
Health: component.HealthTypeUnknown,
Message: "component started",
UpdateTime: time.Now(),
},
}

if err := c.Update(args); err != nil {
return nil, err
}
return c, nil
}

// Run starts the remote.kubernetes.* component.
func (c *Component) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(c.nextPoll()):
c.poll()
}
}
}

// nextPoll returns how long to wait to poll given the last time a
// poll occurred. nextPoll returns 0 if a poll should occur immediately.
func (c *Component) nextPoll() time.Duration {
c.mut.Lock()
defer c.mut.Unlock()

nextPoll := c.lastPoll.Add(c.args.PollFrequency)
now := time.Now()

if now.After(nextPoll) {
// Poll immediately; next poll period was in the past.
return 0
}
return nextPoll.Sub(now)
}

// poll performs a HTTP GET for the component's configured URL. c.mut must
// not be held when calling. After polling, the component's health is updated
// with the success or failure status.
func (c *Component) poll() {
err := c.pollError()
c.updatePollHealth(err)
}

func (c *Component) updatePollHealth(err error) {
c.healthMut.Lock()
defer c.healthMut.Unlock()

if err == nil {
c.health = component.Health{
Health: component.HealthTypeHealthy,
Message: "got " + string(c.kind),
UpdateTime: time.Now(),
}
} else {
c.health = component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("polling failed: %s", err),
UpdateTime: time.Now(),
}
}
}

// pollError is like poll but returns an error if one occurred.
func (c *Component) pollError() error {
c.mut.Lock()
defer c.mut.Unlock()

c.lastPoll = time.Now()

ctx, cancel := context.WithTimeout(context.Background(), c.args.PollTimeout)
defer cancel()

data := map[string]rivertypes.OptionalSecret{}
if c.kind == TypeSecret {
secret, err := c.client.CoreV1().Secrets(c.args.Namespace).Get(ctx, c.args.Name, v1.GetOptions{})
if err != nil {
return err
}
for k, v := range secret.Data {
data[k] = rivertypes.OptionalSecret{
Value: string(v),
IsSecret: true,
}
}
} else if c.kind == TypeConfigMap {
cmap, err := c.client.CoreV1().ConfigMaps(c.args.Namespace).Get(ctx, c.args.Name, v1.GetOptions{})
if err != nil {
return err
}
for k, v := range cmap.Data {
data[k] = rivertypes.OptionalSecret{
Value: v,
IsSecret: false,
}
}
}

newExports := Exports{
Data: data,
}

// Only send a state change event if the exports have changed from the
// previous poll.
if !reflect.DeepEqual(newExports.Data, c.lastExports.Data) {
c.opts.OnStateChange(newExports)
}

c.lastExports = newExports
return nil
}

// Update updates the remote.kubernetes.* component. After the update completes, a
// poll is forced.
func (c *Component) Update(args component.Arguments) (err error) {
// defer initial poll so the lock is released first
defer func() {
if err != nil {
return
}
// Poll after updating and propagate the error if the poll fails. If an error
// occurred during Update, we don't bother to do anything.
// It is important to set err and the health so startup works correctly
err = c.pollError()
c.updatePollHealth(err)
}()

c.mut.Lock()
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.args = newArgs

restConfig, err := c.args.Client.BuildRESTConfig(c.log)
if err != nil {
return err
}
c.client, err = client_go.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("creating kubernetes client: %w", err)
}

return err
}

// CurrentHealth returns the current health of the component.
func (c *Component) CurrentHealth() component.Health {
c.healthMut.RLock()
defer c.healthMut.RUnlock()
return c.health
}
43 changes: 43 additions & 0 deletions component/remote/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package kubernetes

import (
"testing"
"time"

"github.com/grafana/river"
"github.com/stretchr/testify/require"
"gotest.tools/assert"
)

func TestRiverUnmarshal(t *testing.T) {
riverCfg := `
name = "foo"
namespace = "bar"
poll_frequency = "10m"
poll_timeout = "1s"`

var args Arguments
err := river.Unmarshal([]byte(riverCfg), &args)
require.NoError(t, err)

assert.Equal(t, 10*time.Minute, args.PollFrequency)
assert.Equal(t, time.Second, args.PollTimeout)
assert.Equal(t, "foo", args.Name)
assert.Equal(t, "bar", args.Namespace)
}
func TestValidate(t *testing.T) {
t.Run("0 Poll Freq", func(t *testing.T) {
args := Arguments{}
args.SetToDefault()
args.PollFrequency = 0
err := args.Validate()
require.ErrorContains(t, err, "poll_frequency must be greater than 0")
})
t.Run("negative Poll timeout", func(t *testing.T) {
args := Arguments{}
args.SetToDefault()
args.PollTimeout = 0
err := args.Validate()
require.ErrorContains(t, err, "poll_timeout must not be greater than 0")
})
}
17 changes: 17 additions & 0 deletions component/remote/kubernetes/secret/secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package secret

import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/remote/kubernetes"
)

func init() {
component.Register(component.Registration{
Name: "remote.kubernetes.secret",
Args: kubernetes.Arguments{},
Exports: kubernetes.Exports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return kubernetes.New(opts, args.(kubernetes.Arguments), kubernetes.TypeSecret)
},
})
}
Loading