Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/netflow] Add the netflow receiver - PR 1 #34164

Open
wants to merge 41 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e4b5115
netflowreceiver - factory and config
dlopes7 Jul 14, 2024
be502c2
netflowreceiver - add minimum files
dlopes7 Jul 18, 2024
3ae8ad1
netflowreceiver - run make commands
dlopes7 Jul 18, 2024
9aa77b6
netflowreceiver - adjust config and tests
dlopes7 Jul 18, 2024
b619e40
netflowreceiver - run make generate and config_test
dlopes7 Jul 18, 2024
5f03ff9
netflowreceiver - add config tests
dlopes7 Jul 18, 2024
040136f
netflowreceiver - add tests, mod tidy
dlopes7 Jul 18, 2024
c4e9bd2
netflowreceiver - readme
dlopes7 Jul 19, 2024
8de9251
netflowreceiver - add readme table
dlopes7 Jul 19, 2024
cf4720b
netflowreceiver - make multimod-verify
dlopes7 Jul 19, 2024
c5ee961
netflowreceiver - make addlicense
dlopes7 Jul 19, 2024
b9f74cf
Merge branch 'main' into netflow-receiver
dlopes7 Jul 19, 2024
0996c1c
Update receiver/netflowreceiver/README.md
dlopes7 Aug 9, 2024
97e1bec
Update receiver/netflowreceiver/testdata/config.yaml
dlopes7 Aug 9, 2024
77270e6
Update receiver/netflowreceiver/factory.go
dlopes7 Aug 9, 2024
8c7bd24
Update receiver/netflowreceiver/README.md
dlopes7 Aug 9, 2024
d446489
Merge branch 'open-telemetry:main' into netflow-receiver
dlopes7 Aug 9, 2024
7561af8
netflow - pr suggestions
dlopes7 Aug 10, 2024
871f2ff
netflow - update deps
dlopes7 Aug 20, 2024
f8d4c2c
netflow - add to CODEOWNERS
dlopes7 Aug 20, 2024
5cea6ec
netflow - adjust linting and go.mod
dlopes7 Aug 20, 2024
d6dae6e
Merge branch 'main' into netflow-receiver
dlopes7 Aug 20, 2024
3b42bc4
netflowreceiver - gofmt and gci
dlopes7 Aug 22, 2024
306b267
Merge branch 'netflow-receiver' of github.com:dlopes7/opentelemetry-c…
dlopes7 Aug 22, 2024
3b1019e
netflowreceiver - go.mod version update
dlopes7 Aug 22, 2024
7e13976
Merge branch 'main' into netflow-receiver
dlopes7 Aug 22, 2024
062d07a
netflow - add to versions.yaml
dlopes7 Sep 5, 2024
f62e092
netflow - run make checks
dlopes7 Sep 5, 2024
ca5128c
netflow - update go.mod
dlopes7 Sep 5, 2024
8c65324
Update receiver/netflowreceiver/factory.go
dlopes7 Oct 14, 2024
56b1024
#34164 - add dlopes7 to githubgen allowlist
dlopes7 Nov 8, 2024
f3e45ed
netflow - update readme with new format
dlopes7 Nov 8, 2024
9680294
Merge remote-tracking branch 'upstream/main' into netflow-receiver
dlopes7 Nov 11, 2024
cd49448
netflow - update to 0.113.0
dlopes7 Nov 11, 2024
e1c03bf
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
dlopes7 Nov 11, 2024
20eee81
netflow - generate
dlopes7 Nov 11, 2024
f56b4ae
Merge branch 'main' into netflow-receiver
dlopes7 Nov 12, 2024
4428150
Merge branch 'main' into netflow-receiver
dlopes7 Nov 20, 2024
0ff2e6c
Merge branch 'main' into netflow-receiver
dlopes7 Nov 22, 2024
e6b99ac
Fix linting errors
evan-bradley Nov 23, 2024
58b4baa
Update to latest Collector version
evan-bradley Nov 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/opencensus
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/opencensus
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/opencensus
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/opencensus
Expand Down
1 change: 1 addition & 0 deletions receiver/netflowreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
96 changes: 96 additions & 0 deletions receiver/netflowreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Netflow receiver
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fnetflow%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fnetflow) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fnetflow%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fnetflow) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@evan-bradley](https://www.github.com/evan-bradley), [@dlopes7](https://www.github.com/dlopes7) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

The netflow receiver can listen for [netflow](https://en.wikipedia.org/wiki/NetFlow), [sflow](https://en.wikipedia.org/wiki/SFlow), and [ipfix](https://en.wikipedia.org/wiki/IP_Flow_Information_Export) data and convert it to OpenTelemetry logs. The receiver is based on the [goflow2](https://github.com/netsampler/goflow2) project.

This gives Opentelemetry users the capability of monitoring network traffic, and answer questions like:
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

* Which protocols are passing through the network?
* Which servers and clients are producing the highest amount of traffic?
* What ports are involved in these network calls?
* How many bytes and packets are being sent and received?


## Getting started

By default the receiver will listen for ipfix and netflow on port `2055` and sflow on port `6343`. The receiver can be configured to listen on different ports and protocols.
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

Example configuration:

```yaml
receivers:
netflow:
listeners:
- scheme: netflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I configure multiple netflow listeners, or is it one listener per scheme?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but now that you mention it I think it might make more sense to allow a single listener per netflow config

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider supporting the auto-detect feature of goflow2 vs making users explicitly set what flow type they are using.

port: 2055
sockets: 16
workers: 32

processors:
batch:
send_batch_size: 2000
timeout: 30s

exporters:
debug:
verbosity: detailed

service:
pipelines:
logs:
receivers: [netflow]
processors: [batch]
exporters: [debug]
telemetry:
logs:
level: debug
```

We recommend using the batch processor to reduce the number of log requests being sent to the exporter. The batch processor will batch log records together and send them in a single request to the exporter.

You would then configure your network devices to send netflow, sflow, or ipfix data to the collector on the specified ports.
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

## Configuration
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

| Field | Description | Examples |
|-------|-------------|--------|
| scheme | The type of flow data that the listener will receive | `sflow`, `netflow`, `flow` |
| hostname | The hostname or IP address that the listener will bind to | `localhost` |
| port | The port that the listener will bind to | `2055` |
| sockets | The number of sockets that the listener will use | 1 |
| workers | The number of workers that the listener will use to decode incoming flow messages | 2 |
| queue_size | The size of the queue that the listener will use | 1000 |

## Data format

The netflow data is standardized for the different schemas and is converted to OpenTelemetry logs. The output will look like this:

```json
{
"type": "SFLOW_5",
"time_received_ns": 1681583295157626000,
"sequence_num": 2999,
"sampling_rate": 100,
"sampler_address": "192.168.0.1",
"time_flow_start_ns": 1681583295157626000,
"time_flow_end_ns": 1681583295157626000,
"bytes": 1500,
"packets": 1,
"src_addr": "fd01::1",
"dst_addr": "fd01::2",
"etype": "IPv6",
"proto": "TCP",
"src_port": 443,
"dst_port": 50001
}
```
73 changes: 73 additions & 0 deletions receiver/netflowreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import "fmt"

// Config represents the receiver config settings within the collector's config.yaml
type Config struct {
Listeners []ListenerConfig `mapstructure:"listeners"`
}

type ListenerConfig struct {

// The scheme defines the type of flow data that the listener will receive
// The scheme must be one of sflow, netflow, or flow
Scheme string `mapstructure:"scheme"`

// The hostname or IP address that the listener will bind to
Hostname string `mapstructure:"hostname"`

// The port that the listener will bind to
Port int `mapstructure:"port"`

// The number of sockets that the listener will use
Sockets int `mapstructure:"sockets"`
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

// The number of workers that the listener will use to decode incoming flow messages
// By default it will be two times the number of sockets
// Ideally set this to the number of CPU cores
Workers int `mapstructure:"workers"`

// The size of the queue that the listener will use
// This is a buffer that will hold flow messages before they are processed by a worker
QueueSize int `mapstructure:"queue_size"`
}

// Validate checks if the receiver configuration is valid
func (cfg *Config) Validate() error {
validSchemes := [3]string{"sflow", "netflow", "flow"}

for _, listener := range cfg.Listeners {

validScheme := false
for _, scheme := range validSchemes {
if listener.Scheme == scheme {
validScheme = true
break
}
}
if !validScheme {
return fmt.Errorf("scheme must be one of sflow, netflow, or flow")
}

if listener.Sockets <= 0 {
return fmt.Errorf("sockets must be greater than 0")
}

if listener.Workers <= 0 {
return fmt.Errorf("workers must be greater than 0")
}

if listener.QueueSize <= 0 {
listener.QueueSize = defaultQueueSize
}

if listener.Port <= 0 {
return fmt.Errorf("port must be greater than 0")
}
}

return nil
}
118 changes: 118 additions & 0 deletions receiver/netflowreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expected component.Config
}{
{
id: component.NewIDWithName(metadata.Type, "defaults"),
expected: createDefaultConfig(),
},
{
id: component.NewIDWithName(metadata.Type, "two_listeners"),
expected: &Config{
Listeners: []ListenerConfig{
{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 4,
QueueSize: 0,
},
{
Scheme: "sflow",
Port: 6443,
Sockets: 1,
Workers: 2,
QueueSize: 1000,
},
},
},
},
{
id: component.NewIDWithName(metadata.Type, "one_listener"),
expected: &Config{
Listeners: []ListenerConfig{
{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 0,
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
})
}
}

func TestInvalidConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
err string
}{
{
id: component.NewIDWithName(metadata.Type, "invalid_schema"),
err: "scheme must be one of sflow, netflow, or flow",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_port"),
err: "port must be greater than 0",
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

err = component.ValidateConfig(cfg)
assert.ErrorContains(t, err, tt.err)
})
}

}
6 changes: 6 additions & 0 deletions receiver/netflowreceiver/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"
64 changes: 64 additions & 0 deletions receiver/netflowreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

var (
typeStr = component.MustNewType("netflow")
)
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

const (
defaultSockets = 1
defaultWorkers = 2
defaultQueueSize = 1_000_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the queue size here is # of UDP Packets, a UDP packet can be up to 9KB which would be 9GB of memory usage with a full queue.

Perhaps the default queue should be smaller?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's a good default? Should we make this 1_000 so it is only 9MB?

)

// NewFactory creates a factory for netflow receiver.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
typeStr,
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, component.StabilityLevelAlpha))
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved
}

func createDefaultConfig() component.Config {
return &Config{
Listeners: []ListenerConfig{
{
Scheme: "sflow",
Port: 6343,
Sockets: defaultSockets,
Workers: defaultWorkers,
QueueSize: defaultQueueSize,
},
{
Scheme: "netflow",
Port: 2055,
Sockets: defaultSockets,
Workers: defaultWorkers,
QueueSize: defaultQueueSize,
},
},
}
}

func createLogsReceiver(_ context.Context, params receiver.CreateSettings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
logger := params.Logger
conf := cfg.(*Config)

nr := &netflowReceiver{
logger: logger,
logConsumer: consumer,
config: conf,
}

return nr, nil
}
Loading
Loading