Skip to content

Commit

Permalink
Add configurable fields loki source cloudflare (#4765)
Browse files Browse the repository at this point in the history
* add custom_fields to cloudfare config

* refactor custom fields

* add tests

* update CHANGELOG.md

* compare to const instead of string literal

* add comment for FindInvalidFields func

* update loki source cloudflare doc

* fix loki source cloudflare doc

* minor refactoring/renaming following review

* improve doc and namings

* remove validation check for cloudfare additional fields

* improve handling of additional fields and original subsets

* use slices sort instead of map to remove duplicates

* Update component/loki/source/cloudflare/internal/cloudflaretarget/fields.go

Co-authored-by: Piotr <[email protected]>

* fix slice append

---------

Co-authored-by: Piotr <[email protected]>
  • Loading branch information
wildum and thampiotr authored Aug 10, 2023
1 parent 4507bfa commit 0bb4d12
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 31 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ Main (unreleased)

- `loki.write` WAL now exposes a last segment reclaimed metric. (@thepalbi)

- Flow: Users can now define `additional_fields` in `loki.source.cloudflare` (@wildum)

- New Grafana Agent Flow components:

- `prometheus.exporter.gcp` - scrape GCP metrics. (@tburgessdev)
Expand Down
32 changes: 17 additions & 15 deletions component/loki/source/cloudflare/cloudflare.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ func init() {
// Arguments holds values which are used to configure the
// loki.source.cloudflare component.
type Arguments struct {
APIToken rivertypes.Secret `river:"api_token,attr"`
ZoneID string `river:"zone_id,attr"`
Labels map[string]string `river:"labels,attr,optional"`
Workers int `river:"workers,attr,optional"`
PullRange time.Duration `river:"pull_range,attr,optional"`
FieldsType string `river:"fields_type,attr,optional"`
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
APIToken rivertypes.Secret `river:"api_token,attr"`
ZoneID string `river:"zone_id,attr"`
Labels map[string]string `river:"labels,attr,optional"`
Workers int `river:"workers,attr,optional"`
PullRange time.Duration `river:"pull_range,attr,optional"`
FieldsType string `river:"fields_type,attr,optional"`
AdditionalFields []string `river:"additional_fields,attr,optional"`
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
}

// Convert returns a cloudflaretarget Config struct from the Arguments.
Expand All @@ -47,12 +48,13 @@ func (c Arguments) Convert() *cft.Config {
lbls[model.LabelName(k)] = model.LabelValue(v)
}
return &cft.Config{
APIToken: string(c.APIToken),
ZoneID: c.ZoneID,
Labels: lbls,
Workers: c.Workers,
PullRange: model.Duration(c.PullRange),
FieldsType: c.FieldsType,
APIToken: string(c.APIToken),
ZoneID: c.ZoneID,
Labels: lbls,
Workers: c.Workers,
PullRange: model.Duration(c.PullRange),
FieldsType: c.FieldsType,
AdditionalFields: c.AdditionalFields,
}
}

Expand All @@ -73,9 +75,9 @@ func (c *Arguments) Validate() error {
if c.PullRange < 0 {
return fmt.Errorf("pull_range must be a positive duration")
}
_, err := cft.Fields(cft.FieldsType(c.FieldsType))
_, err := cft.Fields(cft.FieldsType(c.FieldsType), c.AdditionalFields)
if err != nil {
return fmt.Errorf("invalid fields_type set; the available values are 'default', 'minimal', 'extended' and 'all'")
return fmt.Errorf("invalid fields_type set; the available values are 'default', 'minimal', 'extended', 'custom' and 'all'")
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package cloudflaretarget

import (
"fmt"

"golang.org/x/exp/slices"
)

// FieldsType defines the set of fields to fetch alongside logs.
Expand All @@ -17,6 +19,7 @@ const (
FieldsTypeMinimal FieldsType = "minimal"
FieldsTypeExtended FieldsType = "extended"
FieldsTypeAll FieldsType = "all"
FieldsTypeCustom FieldsType = "custom"
)

var (
Expand All @@ -41,18 +44,24 @@ var (
}...)
)

// Fields returns the mapping of FieldsType to the set of fields it represents.
func Fields(t FieldsType) ([]string, error) {
// Fields returns the union of a set of fields represented by the Fieldtype and the given additional fields. The returned slice will contain no duplicates.
func Fields(t FieldsType, additionalFields []string) ([]string, error) {
var fields []string
switch t {
case FieldsTypeDefault:
return defaultFields, nil
fields = append(defaultFields, additionalFields...)
case FieldsTypeMinimal:
return minimalFields, nil
fields = append(minimalFields, additionalFields...)
case FieldsTypeExtended:
return extendedFields, nil
fields = append(extendedFields, additionalFields...)
case FieldsTypeAll:
return allFields, nil
fields = append(allFields, additionalFields...)
case FieldsTypeCustom:
fields = append(fields, additionalFields...)
default:
return nil, fmt.Errorf("unknown fields type: %s", t)
}
// remove duplicates
slices.Sort(fields)
return slices.Compact(fields), nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package cloudflaretarget

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestFields(t *testing.T) {
tests := []struct {
name string
fieldsType FieldsType
additionalFields []string
expected []string
}{
{
name: "Default fields",
fieldsType: FieldsTypeDefault,
additionalFields: []string{},
expected: defaultFields,
},
{
name: "Custom fields",
fieldsType: FieldsTypeCustom,
additionalFields: []string{"ClientIP", "OriginResponseBytes"},
expected: []string{"ClientIP", "OriginResponseBytes"},
},
{
name: "Default fields with added custom fields",
fieldsType: FieldsTypeDefault,
additionalFields: []string{"WAFFlags", "WAFMatchedVar"},
expected: append(defaultFields, "WAFFlags", "WAFMatchedVar"),
},
{
name: "Default fields with duplicated custom fields",
fieldsType: FieldsTypeDefault,
additionalFields: []string{"WAFFlags", "WAFFlags", "ClientIP"},
expected: append(defaultFields, "WAFFlags"), // clientIP is already part of defaultFields
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result, err := Fields(test.fieldsType, test.additionalFields)
assert.NoError(t, err)
assert.ElementsMatch(t, test.expected, result)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ var defaultBackoff = backoff.Config{

// Config defines how to connect to Cloudflare's Logpull API.
type Config struct {
APIToken string
ZoneID string
Labels model.LabelSet
Workers int
PullRange model.Duration
FieldsType string
APIToken string
ZoneID string
Labels model.LabelSet
Workers int
PullRange model.Duration
FieldsType string
AdditionalFields []string
}

// Target enables pulling HTTP log messages from Cloudflare using the Logpull
Expand All @@ -66,7 +67,7 @@ type Target struct {

// NewTarget creates and runs a Cloudflare target.
func NewTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, position positions.Positions, config *Config) (*Target, error) {
fields, err := Fields(FieldsType(config.FieldsType))
fields, err := Fields(FieldsType(config.FieldsType), config.AdditionalFields)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -221,7 +222,7 @@ func (t *Target) Ready() bool {

// Details returns debug details about the Cloudflare target.
func (t *Target) Details() map[string]string {
fields, _ := Fields(FieldsType(t.config.FieldsType))
fields, _ := Fields(FieldsType(t.config.FieldsType), t.config.AdditionalFields)
var errMsg string
if t.err != nil {
errMsg = t.err.Error()
Expand Down
11 changes: 9 additions & 2 deletions docs/sources/flow/reference/components/loki.source.cloudflare.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Name | Type | Description | Default | Requir
`workers` | `int` | The number of workers to use for parsing logs. | `3` | no
`pull_range` | `duration` | The timeframe to fetch for each pull request. | `"1m"` | no
`fields_type` | `string` | The set of fields to fetch for log entries. | `"default"` | no
`additional_fields` | `list(string)` | The additional list of fields to supplement those provided via `fields_type`. | | no


By default `loki.source.cloudflare` fetches logs with the `default` set of
Expand All @@ -49,21 +50,27 @@ and the fields they include:
```
"ClientIP", "ClientRequestHost", "ClientRequestMethod", "ClientRequestURI", "EdgeEndTimestamp", "EdgeResponseBytes", "EdgeRequestHost", "EdgeResponseStatus", "EdgeStartTimestamp", "RayID"
```
plus any extra fields provided via `additional_fields` argument.

* `minimal` includes all `default` fields and adds:
```
"ZoneID", "ClientSSLProtocol", "ClientRequestProtocol", "ClientRequestPath", "ClientRequestUserAgent", "ClientRequestReferer", "EdgeColoCode", "ClientCountry", "CacheCacheStatus", "CacheResponseStatus", "EdgeResponseContentType
"ZoneID", "ClientSSLProtocol", "ClientRequestProtocol", "ClientRequestPath", "ClientRequestUserAgent", "ClientRequestReferer", "EdgeColoCode", "ClientCountry", "CacheCacheStatus", "CacheResponseStatus", "EdgeResponseContentType"
```
plus any extra fields provided via `additional_fields` argument.

* `extended` includes all `minimal` fields and adds:
```
"ClientSSLCipher", "ClientASN", "ClientIPClass", "CacheResponseBytes", "EdgePathingOp", "EdgePathingSrc", "EdgePathingStatus", "ParentRayID", "WorkerCPUTime", "WorkerStatus", "WorkerSubrequest", "WorkerSubrequestCount", "OriginIP", "OriginResponseStatus", "OriginSSLProtocol", "OriginResponseHTTPExpires", "OriginResponseHTTPLastModified"
```
plus any extra fields provided via `additional_fields` argument.

* `all` includes all `extended` fields and adds:
```
"BotScore", "BotScoreSrc", "ClientRequestBytes", "ClientSrcPort", "ClientXRequestedWith", "CacheTieredFill", "EdgeResponseCompressionRatio", "EdgeServerIP", "FirewallMatchesSources", "FirewallMatchesActions", "FirewallMatchesRuleIDs", "OriginResponseBytes", "OriginResponseTime", "ClientDeviceType", "WAFFlags", "WAFMatchedVar", "EdgeColoID", "RequestHeaders", "ResponseHeaders"`k
"BotScore", "BotScoreSrc", "ClientRequestBytes", "ClientSrcPort", "ClientXRequestedWith", "CacheTieredFill", "EdgeResponseCompressionRatio", "EdgeServerIP", "FirewallMatchesSources", "FirewallMatchesActions", "FirewallMatchesRuleIDs", "OriginResponseBytes", "OriginResponseTime", "ClientDeviceType", "WAFFlags", "WAFMatchedVar", "EdgeColoID", "RequestHeaders", "ResponseHeaders"
```
plus any extra fields provided via `additional_fields` argument (this is still relevant in this case if new fields are made available via Cloudflare API but are not yet included in `all`).

* `custom` includes only the fields defined in `additional_fields`.

The component saves the last successfully-fetched timestamp in its positions
file. If a position is found in the file for a given zone ID, the component
Expand Down

0 comments on commit 0bb4d12

Please sign in to comment.