Skip to content

Commit

Permalink
Ingest Dataproc GCE instance metadata attributes as logging labels. (#…
Browse files Browse the repository at this point in the history
…1541)

Co-authored-by: Quentin Smith <[email protected]>

Adds default ingestion of Dataproc-specific GCE instance metadata attributes as logging labels, namely:
- dataproc-cluster-name
- dataproc-cluster-uuid
- dataproc-region
  • Loading branch information
igorpeshansky authored Dec 13, 2023
1 parent 4e0c2bb commit c568f4f
Show file tree
Hide file tree
Showing 19 changed files with 1,120 additions and 58 deletions.
58 changes: 58 additions & 0 deletions confgenerator/confgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"log"
"path"
"regexp"
"sort"
Expand All @@ -28,6 +29,7 @@ import (

"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/resourcedetector"
"github.com/GoogleCloudPlatform/ops-agent/internal/platform"
)

Expand Down Expand Up @@ -293,6 +295,57 @@ func processUserDefinedMultilineParser(i int, pID string, receiver LoggingReceiv
return nil
}

func sliceContains(s []string, v string) bool {
for _, e := range s {
if e == v {
return true
}
}
return false
}

const (
attributeLabelPrefix string = "compute.googleapis.com/attributes/"
)

// addGceMetadataAttributesComponents annotates logs with labels corresponding
// to instance attributes from the GCE metadata server.
func addGceMetadataAttributesComponents(ctx context.Context, attributes []string, tag, uid string) []fluentbit.Component {
processorName := fmt.Sprintf("%s.%s.gce_metadata", tag, uid)
resource, err := platform.FromContext(ctx).GetResource()
if err != nil {
log.Printf("can't get resource metadata: %v", err)
return nil
}
gceMetadata, ok := resource.(resourcedetector.GCEResource)
if !ok {
// Not on GCE; no attributes to detect.
log.Printf("ignoring the gce_metadata_attributes processor outside of GCE: %T", resource)
return nil
}
modifications := map[string]*ModifyField{}
var attributeKeys []string
for k, _ := range gceMetadata.Metadata {
attributeKeys = append(attributeKeys, k)
}
sort.Strings(attributeKeys)
for _, k := range attributeKeys {
if !sliceContains(attributes, k) {
continue
}
v := gceMetadata.Metadata[k]
modifications[fmt.Sprintf(`labels."%s%s"`, attributeLabelPrefix, k)] = &ModifyField{
StaticValue: &v,
}
}
if len(modifications) == 0 {
return nil
}
return LoggingProcessorModifyFields{
Fields: modifications,
}.Components(ctx, tag, processorName)
}

// generateFluentbitComponents generates a slice of fluentbit config sections to represent l.
func (uc *UnifiedConfig) generateFluentbitComponents(ctx context.Context, userAgent string) ([]fluentbit.Component, error) {
l := uc.Logging
Expand Down Expand Up @@ -396,6 +449,11 @@ func (uc *UnifiedConfig) generateFluentbitComponents(ctx context.Context, userAg
out = append(out, stackdriverOutputComponent(ctx, strings.Join(tags, "|"), userAgent, "2G"))
}
out = append(out, uc.generateSelfLogsComponents(ctx, userAgent)...)
out = append(out, addGceMetadataAttributesComponents(ctx, []string{
"dataproc-cluster-name",
"dataproc-cluster-uuid",
"dataproc-region",
}, "*", "default-dataproc")...)
}
out = append(out, fluentbit.MetricsOutputComponent())

Expand Down
102 changes: 70 additions & 32 deletions confgenerator/confgenerator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,38 @@ var winlogv1channels = []string{
}

var (
testPlatforms = []platformConfig{
{
name: "linux",
defaultLogsDir: "/var/log/google-cloud-ops-agent",
defaultStateDir: "/var/lib/google-cloud-ops-agent/fluent-bit",
platform: platform.Platform{
Type: platform.Linux,
HostInfo: &host.InfoStat{
OS: "linux",
Platform: "linux_platform",
PlatformVersion: "linux_platform_version",
},
// Set up the test environment with mocked data.
testResource = resourcedetector.GCEResource{
Project: "test-project",
Zone: "test-zone",
Network: "test-network",
Subnetwork: "test-subnetwork",
PublicIP: "test-public-ip",
PrivateIP: "test-private-ip",
InstanceID: "test-instance-id",
InstanceName: "test-instance-name",
Tags: "test-tag",
MachineType: "test-machine-type",
Metadata: map[string]string{"test-key": "test-value", "test-escape": "${foo:bar}"},
Label: map[string]string{"test-label-key": "test-label-value"},
InterfaceIPv4: map[string]string{"test-interface": "test-interface-ipv4"},
}
linuxTestPlatform = platformConfig{
name: "linux",
defaultLogsDir: "/var/log/google-cloud-ops-agent",
defaultStateDir: "/var/lib/google-cloud-ops-agent/fluent-bit",
platform: platform.Platform{
Type: platform.Linux,
HostInfo: &host.InfoStat{
OS: "linux",
Platform: "linux_platform",
PlatformVersion: "linux_platform_version",
},
TestGCEResourceOverride: testResource,
},
}
testPlatforms = []platformConfig{
linuxTestPlatform,
{
name: "linux-gpu",
defaultLogsDir: "/var/log/google-cloud-ops-agent",
Expand All @@ -82,7 +100,8 @@ var (
Platform: "linux_platform",
PlatformVersion: "linux_platform_version",
},
HasNvidiaGpu: true,
TestGCEResourceOverride: testResource,
HasNvidiaGpu: true,
},
},
{
Expand All @@ -98,6 +117,7 @@ var (
Platform: "win_platform",
PlatformVersion: "win_platform_version",
},
TestGCEResourceOverride: testResource,
},
},
{
Expand All @@ -113,6 +133,7 @@ var (
Platform: "win_platform",
PlatformVersion: "win_platform_version",
},
TestGCEResourceOverride: testResource,
},
},
}
Expand Down Expand Up @@ -149,6 +170,42 @@ func TestGoldens(t *testing.T) {
}
}

func TestDataprocDefaults(t *testing.T) {
t.Parallel()

goldensDir := "goldens"
testName := "builtin"
dataprocMetadata := map[string]string{
"dataproc-cluster-name": "test-cluster",
"dataproc-cluster-uuid": "test-uuid",
"dataproc-region": "test-region",
}

t.Run(testName, func(t *testing.T) {
t.Parallel()
pc := linuxTestPlatform
// Update mocked resource to include Dataproc labels.
dataprocResource := testResource
newMetadata := map[string]string{}
for k, v := range testResource.Metadata {
newMetadata[k] = v
}
for k, v := range dataprocMetadata {
newMetadata[k] = v
}
dataprocResource.Metadata = newMetadata
pc.platform.TestGCEResourceOverride = dataprocResource
t.Run(pc.name, func(t *testing.T) {
testDir := filepath.Join(goldensDir, testName)
got, err := generateConfigs(pc, testDir)
assert.NilError(t, err, "Failed to generate configs: %v", err)
if err := testGeneratedFiles(t, got, filepath.Join(testDir, goldenDir, "linux-dataproc")); err != nil {
t.Errorf("Failed to check generated configs: %v", err)
}
})
})
}

func getTestsInDir(t *testing.T, testDir string) []string {
t.Helper()

Expand Down Expand Up @@ -310,25 +367,6 @@ func TestMain(m *testing.M) {
}

func init() {
testResource := resourcedetector.GCEResource{
Project: "test-project",
Zone: "test-zone",
Network: "test-network",
Subnetwork: "test-subnetwork",
PublicIP: "test-public-ip",
PrivateIP: "test-private-ip",
InstanceID: "test-instance-id",
InstanceName: "test-instance-name",
Tags: "test-tag",
MachineType: "test-machine-type",
Metadata: map[string]string{"test-key": "test-value", "test-escape": "${foo:bar}"},
Label: map[string]string{"test-label-key": "test-label-value"},
InterfaceIPv4: map[string]string{"test-interface": "test-interface-ipv4"},
}

// Set up the test environment with mocked data.
confgenerator.MetadataResource = testResource

// Enable experimental features here by calling:
// os.Setenv("EXPERIMENTAL_FEATURES", "...(comma-separated feature list)...")
}
9 changes: 0 additions & 9 deletions confgenerator/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"io/ioutil"
"os"
"path/filepath"

"github.com/GoogleCloudPlatform/ops-agent/confgenerator/resourcedetector"
)

// ReadUnifiedConfigFromFile reads the user config file and returns a UnifiedConfig.
Expand Down Expand Up @@ -62,13 +60,6 @@ func (uc *UnifiedConfig) GenerateFilesFromConfig(ctx context.Context, service, l
}
}
case "otel":
// Fetch resource information from the metadata server.
var err error
MetadataResource, err = resourcedetector.GetResource()
if err != nil {
return fmt.Errorf("can't get resource metadata: %w", err)
}

otelConfig, err := uc.GenerateOtelConfig(ctx)
if err != nil {
return fmt.Errorf("can't parse configuration: %w", err)
Expand Down
15 changes: 5 additions & 10 deletions confgenerator/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"context"
"errors"
"fmt"
"log"
"os"
"reflect"
"sort"
"strconv"
"strings"

"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/resourcedetector"
"github.com/GoogleCloudPlatform/ops-agent/internal/platform"
"github.com/go-playground/validator/v10"
yaml "github.com/goccy/go-yaml"
Expand All @@ -36,12 +36,6 @@ import (
_ "github.com/prometheus/prometheus/discovery/install" // init() of this package registers service discovery impl.
)

var (
// MetadataResource is the resource metadata for the instance we're running on.
// Note: This is a global variable so that it can be set in tests.
MetadataResource resourcedetector.Resource
)

type PrometheusMetrics struct {
ConfigComponent `yaml:",inline"`

Expand All @@ -61,9 +55,10 @@ func (r PrometheusMetrics) Type() string {
}

func (r PrometheusMetrics) Pipelines(ctx context.Context) []otel.ReceiverPipeline {
resource := MetadataResource
if p := platform.FromContext(ctx).ResourceOverride; p != nil {
resource = p
resource, err := platform.FromContext(ctx).GetResource()
if err != nil {
log.Printf("can't get resource metadata: %v", err)
return nil
}
if resource != nil {
// Get the resource metadata for the instance we're running on.
Expand Down
29 changes: 24 additions & 5 deletions confgenerator/resourcedetector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,36 @@ import (
// An implementation of the Resource interface will have fields represent
// available attributes about the current monitoring resource.
type Resource interface {
ProjectName() string
MonitoredResource() *monitoredres.MonitoredResource
OTelResourceAttributes() map[string]string
ProjectName() string
PrometheusStyleMetadata() map[string]string
}

type resourceCache struct {
Resource Resource
Error error
}

var cachedResourceAndError *resourceCache

// Get a resource instance for the current environment;
// In order to access the attributes of a specific type of resource,
// needs to cast the returned Resource instance to its underlying type:
// actual, ok := resource.(GCEResource)
func GetResource() (Resource, error) {
if cachedResourceAndError != nil {
return cachedResourceAndError.Resource, cachedResourceAndError.Error
}
r, err := getUncachedResource()
cachedResourceAndError = &resourceCache{
Resource: r,
Error: err,
}
return r, err
}

func getUncachedResource() (Resource, error) {
switch {
case gcp_metadata.OnGCE():
return GetGCEResource()
Expand All @@ -46,10 +65,6 @@ func GetResource() (Resource, error) {
type UnrecognizedPlatformResource struct {
}

func (UnrecognizedPlatformResource) OTelResourceAttributes() map[string]string {
return nil
}

func (UnrecognizedPlatformResource) ProjectName() string {
return ""
}
Expand All @@ -58,6 +73,10 @@ func (UnrecognizedPlatformResource) MonitoredResource() *monitoredres.MonitoredR
return nil
}

func (UnrecognizedPlatformResource) OTelResourceAttributes() map[string]string {
return nil
}

func (UnrecognizedPlatformResource) PrometheusStyleMetadata() map[string]string {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

function process(tag, timestamp, record)
local v = "test-cluster";
(function(value)
if record["logging.googleapis.com/labels"] == nil
then
record["logging.googleapis.com/labels"] = {}
end
record["logging.googleapis.com/labels"]["compute.googleapis.com/attributes/dataproc-cluster-name"] = value
end)(v)
local v = "test-uuid";
(function(value)
if record["logging.googleapis.com/labels"] == nil
then
record["logging.googleapis.com/labels"] = {}
end
record["logging.googleapis.com/labels"]["compute.googleapis.com/attributes/dataproc-cluster-uuid"] = value
end)(v)
local v = "test-region";
(function(value)
if record["logging.googleapis.com/labels"] == nil
then
record["logging.googleapis.com/labels"] = {}
end
record["logging.googleapis.com/labels"]["compute.googleapis.com/attributes/dataproc-region"] = value
end)(v)
return 2, timestamp, record
end
Loading

0 comments on commit c568f4f

Please sign in to comment.