Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a5b1ad3
Add log type include filter
jorbaum Jan 5, 2026
a2088ec
Support exclude and include
jorbaum Jan 5, 2026
a17e86b
Error when using both include and exclude types
jorbaum Jan 5, 2026
8da9a70
Introduce logging to binding config
jorbaum Jan 8, 2026
7fc1b84
Add some more unit tests
jorbaum Jan 8, 2026
b961719
Fix wrong word
jorbaum Jan 12, 2026
110fb27
Create LogType as string, move to package
jorbaum Jan 12, 2026
1820deb
Prefix log type constants with LOG_
jorbaum Jan 12, 2026
3447483
Rename value to sourceType
jorbaum Jan 13, 2026
1e96b21
Only emit one warning on wrong drain config
jorbaum Jan 13, 2026
bb0368a
Rename DrainParameterParser in test
jorbaum Jan 13, 2026
2dfc90f
Add tests for the include and exclude filters
jorbaum Jan 13, 2026
95ff14f
Move new log filter test to table test
jorbaum Jan 19, 2026
6cda574
Move next test to table
jorbaum Jan 19, 2026
d0ae8a3
Move drain data for old parameter test to table
jorbaum Jan 19, 2026
db8afe4
Move log filter validation to binding_fetcher
jorbaum Feb 3, 2026
31b2c90
Dereference pointer to make an actual copy
jorbaum Feb 2, 2026
91407bd
Rename log type to source type where possible
jorbaum Feb 11, 2026
1f98d37
Only exclude filter shall pass unknown source type
jorbaum Feb 11, 2026
e9f1e1b
Add SourceType PROXY, HEALTH, SYS and STATS
jorbaum Feb 11, 2026
a3eda41
Make missing source_type tests clearer
jorbaum Feb 20, 2026
31b219b
Add tests for no filtering value configured
jorbaum Feb 20, 2026
d7e830c
Rename source to log source where sensible
jorbaum Feb 20, 2026
0775898
Remove unused method
jorbaum Feb 20, 2026
3ba2a27
Add -log to uri query param
jorbaum Feb 20, 2026
29c6830
No longer accept URI drains with space in values
jorbaum Feb 20, 2026
ecab32a
Move log source type parsing into common function
jorbaum Feb 20, 2026
a312cd4
Group tests where source_type is present
jorbaum Mar 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions src/pkg/egress/syslog/filtering_drain_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (w *FilteringDrainWriter) Write(env *loggregator_v2.Envelope) error {
}
}
if env.GetLog() != nil {
if sendsLogs(w.binding.DrainData) {
sourceType := env.GetTags()["source_type"]
if sendsLogs(w.binding.DrainData, w.binding.LogFilter, sourceType) {
return w.writer.Write(env)
}
}
Expand All @@ -63,17 +64,12 @@ func (w *FilteringDrainWriter) Write(env *loggregator_v2.Envelope) error {
return nil
}

func sendsLogs(drainData DrainData) bool {
switch drainData {
case LOGS:
return true
case LOGS_AND_METRICS:
return true
case LOGS_NO_EVENTS:
return true
default:
func sendsLogs(drainData DrainData, logFilter *LogFilter, sourceTypeTag string) bool {
if drainData != LOGS && drainData != LOGS_AND_METRICS && drainData != LOGS_NO_EVENTS {
return false
}

return logFilter.ShouldInclude(sourceTypeTag)
}

func sendsMetrics(drainData DrainData) bool {
Expand Down
172 changes: 172 additions & 0 deletions src/pkg/egress/syslog/filtering_drain_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,178 @@ var _ = Describe("Filtering Drain Writer", func() {
_, err := syslog.NewFilteringDrainWriter(binding, &fakeWriter{})
Expect(err).To(HaveOccurred())
})

Context("when source_type tag is missing", func() {
var envelope *loggregator_v2.Envelope

BeforeEach(func() {
envelope = &loggregator_v2.Envelope{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{
Payload: []byte("test log"),
},
},
Tags: map[string]string{
// source_type tag is intentionally missing
},
}
})

It("filters out the log with missing source_type when include filter is configured", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filter works based on the values of the source_type tag and here the code defines the behavior of what should happen.

Is this behavior documented somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not. The best place for that is likely https://docs.cloudfoundry.org/devguide/services/log-management.html ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the place. Please open a PR. Btw, why is the description of the parameters already in the docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the place. Please open a PR.

Opened up a new PR at cloudfoundry/docs-dev-guide#584

Btw, why is the description of the parameters already in the docs?

The previous PR has been merged a bit sooner than expected.

binding := syslog.Binding{
DrainData: syslog.LOGS,
LogFilter: syslog.NewLogFilter(syslog.LogSourceTypeSet{syslog.LOG_SOURCE_APP: struct{}{}}, syslog.LogFilterModeInclude),
}
fakeWriter := &fakeWriter{}
drainWriter, err := syslog.NewFilteringDrainWriter(binding, fakeWriter)
Expect(err).NotTo(HaveOccurred())

err = drainWriter.Write(envelope)
Expect(err).NotTo(HaveOccurred())
Expect(fakeWriter.received).To(Equal(0))

appEnvelope := &loggregator_v2.Envelope{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("app log")},
},
Tags: map[string]string{"source_type": "APP/PROC/WEB/0"},
}
err = drainWriter.Write(appEnvelope)
Expect(err).NotTo(HaveOccurred())
Expect(fakeWriter.received).To(Equal(1))
})

It("sends the log with missing source_type when exclude filter is configured", func() {
binding := syslog.Binding{
DrainData: syslog.LOGS,
LogFilter: syslog.NewLogFilter(syslog.LogSourceTypeSet{syslog.LOG_SOURCE_RTR: struct{}{}}, syslog.LogFilterModeExclude),
}
fakeWriter := &fakeWriter{}
drainWriter, err := syslog.NewFilteringDrainWriter(binding, fakeWriter)
Expect(err).NotTo(HaveOccurred())

err = drainWriter.Write(envelope)
Expect(err).NotTo(HaveOccurred())
Expect(fakeWriter.received).To(Equal(1))

rtrEnvelope := &loggregator_v2.Envelope{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("rtr log")},
},
Tags: map[string]string{"source_type": "RTR/1"},
}
err = drainWriter.Write(rtrEnvelope)
Expect(err).NotTo(HaveOccurred())
Expect(fakeWriter.received).To(Equal(1))
})
})

Context("when source_type tag is present", func() {
It("filters logs based on include filter - includes only APP logs", func() {
binding := syslog.Binding{
DrainData: syslog.LOGS,
LogFilter: syslog.NewLogFilter(syslog.LogSourceTypeSet{syslog.LOG_SOURCE_APP: struct{}{}}, syslog.LogFilterModeInclude),
}
fakeWriter := &fakeWriter{}
drainWriter, err := syslog.NewFilteringDrainWriter(binding, fakeWriter)
Expect(err).NotTo(HaveOccurred())

envelopes := []*loggregator_v2.Envelope{
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("app log")},
},
Tags: map[string]string{"source_type": "APP/PROC/WEB/0"},
},
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("rtr log")},
},
Tags: map[string]string{"source_type": "RTR/1"},
},
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("stg log")},
},
Tags: map[string]string{"source_type": "STG/0"},
},
}

for _, envelope := range envelopes {
err = drainWriter.Write(envelope)
Expect(err).NotTo(HaveOccurred())
}

// Only APP log should be sent
Expect(fakeWriter.received).To(Equal(1))
})

It("filters logs based on exclude filter - excludes RTR logs", func() {
binding := syslog.Binding{
DrainData: syslog.LOGS,
LogFilter: syslog.NewLogFilter(syslog.LogSourceTypeSet{syslog.LOG_SOURCE_RTR: struct{}{}}, syslog.LogFilterModeExclude),
}
fakeWriter := &fakeWriter{}
drainWriter, err := syslog.NewFilteringDrainWriter(binding, fakeWriter)
Expect(err).NotTo(HaveOccurred())

envelopes := []*loggregator_v2.Envelope{
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("app log")},
},
Tags: map[string]string{"source_type": "APP/PROC/WEB/0"},
},
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("rtr log")},
},
Tags: map[string]string{"source_type": "RTR/1"},
},
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("stg log")},
},
Tags: map[string]string{"source_type": "STG/0"},
},
}

for _, envelope := range envelopes {
err = drainWriter.Write(envelope)
Expect(err).NotTo(HaveOccurred())
}

// APP and STG logs should be sent, RTR should be filtered out
Expect(fakeWriter.received).To(Equal(2))
})

It("sends logs with unknown source_type prefix when filter is set", func() {
binding := syslog.Binding{
DrainData: syslog.LOGS,
LogFilter: syslog.NewLogFilter(syslog.LogSourceTypeSet{syslog.LOG_SOURCE_APP: struct{}{}}, syslog.LogFilterModeExclude),
}
fakeWriter := &fakeWriter{}
drainWriter, err := syslog.NewFilteringDrainWriter(binding, fakeWriter)
Expect(err).NotTo(HaveOccurred())

envelope := &loggregator_v2.Envelope{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{
Payload: []byte("test log"),
},
},
Tags: map[string]string{
"source_type": "UNKNOWN/some/path",
},
}

err = drainWriter.Write(envelope)

// Should send the log because unknown types default to being included for exclude filter
Expect(err).NotTo(HaveOccurred())
Expect(fakeWriter.received).To(Equal(1))
})
})
})

type fakeWriter struct {
Expand Down
133 changes: 133 additions & 0 deletions src/pkg/egress/syslog/log_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package syslog

import "strings"

// LogSourceType defines the source types used within Cloud Foundry
// Their order in the code is as documented in https://docs.cloudfoundry.org/devguide/deploy-apps/streaming-logs.html#format
type LogSourceType string

const (
LOG_SOURCE_API LogSourceType = "API"
LOG_SOURCE_STG LogSourceType = "STG"
LOG_SOURCE_RTR LogSourceType = "RTR"
LOG_SOURCE_LGR LogSourceType = "LGR"
LOG_SOURCE_APP LogSourceType = "APP"
LOG_SOURCE_SSH LogSourceType = "SSH"
LOG_SOURCE_CELL LogSourceType = "CELL"
LOG_SOURCE_PROXY LogSourceType = "PROXY"
LOG_SOURCE_HEALTH LogSourceType = "HEALTH"
LOG_SOURCE_SYS LogSourceType = "SYS"
LOG_SOURCE_STATS LogSourceType = "STATS"
)

// validSourceTypes contains SourceType prefixes for efficient lookup
var validSourceTypes = map[LogSourceType]struct{}{
LOG_SOURCE_API: {},
LOG_SOURCE_STG: {},
LOG_SOURCE_RTR: {},
LOG_SOURCE_LGR: {},
LOG_SOURCE_APP: {},
LOG_SOURCE_SSH: {},
LOG_SOURCE_CELL: {},
}

// IsValid checks if the provided SourceType is valid
func (lt LogSourceType) IsValid() bool {
_, ok := validSourceTypes[lt]
return ok
}

// ParseSourceType parses a string into a SourceType value
func ParseSourceType(s string) (LogSourceType, bool) {
lt := LogSourceType(strings.ToUpper(s))
return lt, lt.IsValid()
}

// ParseSourceTypeList parses a comma-separated list of source types and returns
// the valid types as a set and any unknown types as a slice.
func ParseSourceTypeList(sourceTypeList string) (LogSourceTypeSet, []string) {
sourceTypes := strings.Split(sourceTypeList, ",")
set := make(LogSourceTypeSet, len(sourceTypes))
var unknownTypes []string

for _, sourceType := range sourceTypes {
t, ok := ParseSourceType(sourceType)
if !ok {
unknownTypes = append(unknownTypes, sourceType)
continue
}
set.Add(t)
}

return set, unknownTypes
}

// ExtractPrefix extracts the prefix from a source_type tag (e.g., "APP/PROC/WEB/0" -> "APP")
func ExtractPrefix(sourceTypeTag string) string {
if idx := strings.IndexByte(sourceTypeTag, '/'); idx != -1 {
return sourceTypeTag[:idx]
}
return sourceTypeTag
}

// LogSourceTypeSet is a set of SourceTypes for efficient membership checking
type LogSourceTypeSet map[LogSourceType]struct{}

// Add adds a SourceType to the set
func (s LogSourceTypeSet) Add(lt LogSourceType) {
s[lt] = struct{}{}
}

// Contains checks if the set contains a SourceType
func (s LogSourceTypeSet) Contains(lt LogSourceType) bool {
_, exists := s[lt]
return exists
}

// LogFilterMode determines how the log filter should be applied
type LogFilterMode int

const (
// LogFilterModeInclude only includes logs matching the specified types (strict)
LogFilterModeInclude LogFilterMode = iota
// LogFilterModeExclude excludes logs matching the specified types (permissive)
LogFilterModeExclude
)

// LogFilter encapsulates source type filtering configuration
type LogFilter struct {
Types LogSourceTypeSet
Mode LogFilterMode
}

// NewLogFilter creates a new LogFilter with the given types and mode
func NewLogFilter(types LogSourceTypeSet, mode LogFilterMode) *LogFilter {
return &LogFilter{
Types: types,
Mode: mode,
}
}

// ShouldInclude determines if a log with the given sourceTypeTag should be forwarded
// Include mode omits missing/unknown source types, exclude mode forwards them
func (f *LogFilter) ShouldInclude(sourceTypeTag string) bool {
if f == nil {
return true
}

if sourceTypeTag == "" {
return f.Mode == LogFilterModeExclude
}

prefix := ExtractPrefix(sourceTypeTag)
sourceType := LogSourceType(prefix)
if !sourceType.IsValid() {
return f.Mode == LogFilterModeExclude
}

inSet := f.Types.Contains(sourceType)
if f.Mode == LogFilterModeInclude {
return inSet
}
return !inSet
}
1 change: 1 addition & 0 deletions src/pkg/egress/syslog/syslog_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Binding struct {
DrainData DrainData `json:"type,omitempty"`
OmitMetadata bool
InternalTls bool
LogFilter *LogFilter
}

type Drain struct {
Expand Down
24 changes: 24 additions & 0 deletions src/pkg/ingress/bindings/binding_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (d *DrainParamParser) FetchBindings() ([]syslog.Binding, error) {
b.OmitMetadata = getOmitMetadata(urlParsed, d.defaultDrainMetadata)
b.InternalTls = getInternalTLS(urlParsed)
b.DrainData = getBindingType(urlParsed)
b.LogFilter = d.getLogFilter(urlParsed)

processed = append(processed, b)
}
Expand Down Expand Up @@ -84,6 +85,29 @@ func getBindingType(u *url.URL) syslog.DrainData {
return drainData
}

func (d *DrainParamParser) getLogFilter(u *url.URL) *syslog.LogFilter {
includeSourceTypes := u.Query().Get("include-log-source-types")
excludeSourceTypes := u.Query().Get("exclude-log-source-types")

if excludeSourceTypes != "" {
return d.newLogFilter(excludeSourceTypes, syslog.LogFilterModeExclude)
} else if includeSourceTypes != "" {
return d.newLogFilter(includeSourceTypes, syslog.LogFilterModeInclude)
}
return nil
}

// newLogFilter parses a URL query parameter into a LogFilter.
// sourceTypeList is assumed to be a comma-separated list of valid source types.
func (d *DrainParamParser) newLogFilter(sourceTypeList string, mode syslog.LogFilterMode) *syslog.LogFilter {
if sourceTypeList == "" {
return nil
}

set, _ := syslog.ParseSourceTypeList(sourceTypeList)
return syslog.NewLogFilter(set, mode)
}

func getRemoveMetadataQuery(u *url.URL) string {
q := u.Query().Get("disable-metadata")
if q == "" {
Expand Down
Loading
Loading