From 5e4ee5358897e474260f3f4723c594c1b378d75a Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Wed, 31 Jan 2024 17:50:46 +0000 Subject: [PATCH] feat(discovery): Add a remote_relabel component This allows to receive discovery relabeling rules from a remote server using a websocket connection. We plan this for an experimental way to control profile collection with Grafana Cloud Profiles. --- .../sources/reference/compatibility/_index.md | 2 + .../discovery/discovery.remote_relabel.md | 126 +++ go.mod | 15 +- go.sum | 20 +- internal/component/all/all.go | 1 + internal/component/common/relabel/relabel.go | 10 +- .../remote_relabel/remote_relabel.go | 818 ++++++++++++++++++ .../remote_relabel/remote_relabel_test.go | 394 +++++++++ 8 files changed, 1366 insertions(+), 20 deletions(-) create mode 100644 docs/sources/reference/components/discovery/discovery.remote_relabel.md create mode 100644 internal/component/discovery/remote_relabel/remote_relabel.go create mode 100644 internal/component/discovery/remote_relabel/remote_relabel_test.go diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index 0da83a17a2..75f454fe0e 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -71,6 +71,7 @@ The following components, grouped by namespace, _export_ Targets. - [discovery.process](../components/discovery/discovery.process) - [discovery.puppetdb](../components/discovery/discovery.puppetdb) - [discovery.relabel](../components/discovery/discovery.relabel) +- [discovery.remote_relabel](../components/discovery/discovery.remote_relabel) - [discovery.scaleway](../components/discovery/discovery.scaleway) - [discovery.serverset](../components/discovery/discovery.serverset) - [discovery.triton](../components/discovery/discovery.triton) @@ -123,6 +124,7 @@ The following components, grouped by namespace, _consume_ Targets. {{< collapse title="discovery" >}} - [discovery.process](../components/discovery/discovery.process) - [discovery.relabel](../components/discovery/discovery.relabel) +- [discovery.remote_relabel](../components/discovery/discovery.remote_relabel) {{< /collapse >}} {{< collapse title="local" >}} diff --git a/docs/sources/reference/components/discovery/discovery.remote_relabel.md b/docs/sources/reference/components/discovery/discovery.remote_relabel.md new file mode 100644 index 0000000000..320bc4fd6d --- /dev/null +++ b/docs/sources/reference/components/discovery/discovery.remote_relabel.md @@ -0,0 +1,126 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/discovery/discovery.remote_relabel/ +description: Learn about discovery.remote_relabel +title: discovery.remote_relabel +--- + +Experimental + +# discovery.remote_relabel + +{{< docs/shared lookup="stability/experimental.md" source="alloy" version="" >}} + +`discovery.remote_relabel` accepts relabeling rules from a remote control server using Pyroscope's [settings API]. +Additionally it is also possible for the control server to request the targets that are received by this component in order to show the effects of the rules at the central control server. + +Multiple `discovery.remote_relabel` components can be specified by giving them different labels. + +## Usage + +```alloy +discovery.relabel "LABEL" { + targets = TARGET_LIST + websocket { + url = "ws://localhost:4040/settings.v1.SettingsService/GetCollectionRules?scope=alloy" + } +} +``` + +## Arguments + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| --------- | ------------------- | ------------------ | ------- | -------- | +| `targets` | `list(map(string))` | Targets to relabel | | yes | + +## Blocks + +The following blocks are supported inside the definition of `pyroscope.write`: + +| Hierarchy | Block | Description | Required | +| ---------------------- | -------------- | --------------------------------------------------------- | -------- | +| websocket | [websocket][] | Control server settingss to. | yes | +| websocket > basic_auth | [basic_auth][] | Configure basic_auth for authenticating to the websocket. | no | + +[websocket]:#websocket-block +[basic_auth]:#basic_auth-block + +### websocket block + +The `websocket` block describes a single command and control server. Only one `websocket` block can ben provided. + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| --------------------- | ---------- | ------------------------------------------------------------------ | --------- | -------- | +| `url` | `string` | Full URL to the websocket server. Needs to start with ws:// wss:// | | yes | +| `keep_alive` | `duration` | How often to sent keep alive pings. 0 to disable. | `"295s"` | no | +| `min_backoff_period` | `duration` | Initial backoff time between retries. | `"500ms"` | no | +| `max_backoff_period` | `duration` | Maximum backoff time between retries. | `"5m"` | no | +| `max_backoff_retries` | `int` | Maximum number of retries. 0 to retry infinitely. | `10` | no | + +### basic_auth block + +{{< docs/shared lookup="reference/components/basic-auth-block.md" source="alloy" version="" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +| Name | Type | Description | +| -------- | ------------------- | --------------------------------------------- | +| `output` | `list(map(string))` | The set of targets after applying relabeling. | +| `rules` | `RelabelRules` | The currently configured relabeling rules. | + +## Component health + +`discovery.remote_relabel` is only reported as unhealthy when the websocket is not connected. + +## Debug information + +| Name | Type | Description | +| --------------------------- | ----------- | -------------------------------------------------------- | +| `websocket_status` | `string` | Status of the websocket connection. | +| `websocket_connected_since` | `time.Time` | The currently configured relabeling rules. | +| `websocket_last_error` | `string` | What was the last error when reconnecting the websocket. | + +## Example + +```alloy +discovery.remote_relabel "control_profiles" { + targets = [ + { "__meta_foo" = "foo", "__address__" = "localhost", "instance" = "one", "app" = "backend" }, + { "__meta_bar" = "bar", "__address__" = "localhost", "instance" = "two", "app" = "database" }, + { "__meta_baz" = "baz", "__address__" = "localhost", "instance" = "three", "app" = "frontend" }, + ] + websocket { + url = "wss://profiles-prod-001.grafana.net/settings.v1.SettingsService/GetCollectionRules?scope=alloy" + basic_auth { + username = env("PROFILECLI_USERNAME") + password = env("PROFILECLI_PASSWORD") + } + } +} +``` + +[settings API]: https://github.com/grafana/pyroscope/blob/main/api/settings/v1/setting.proto + + + +## Compatible components + +`discovery.remote_relabel` can accept arguments from the following components: + +- Components that export [Targets](../../../compatibility/#targets-exporters) + +`discovery.remote_relabel` has exports that can be consumed by the following components: + +- Components that consume [Targets](../../../compatibility/#targets-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/go.mod b/go.mod index 95f4f1413c..c76e0624c4 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22.5 require ( cloud.google.com/go/pubsub v1.38.0 - connectrpc.com/connect v1.14.0 + connectrpc.com/connect v1.16.2 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 github.com/Azure/go-autorest/autorest v0.11.29 @@ -60,7 +60,7 @@ require ( github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb github.com/grafana/go-gelf/v2 v2.0.1 - github.com/grafana/jfr-parser/pprof v0.0.0-20240126072739-986e71dc0361 + github.com/grafana/jfr-parser/pprof v0.0.0-20240815065100-4296f63fd6d7 github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a github.com/grafana/loki/pkg/push v0.0.0-20240514112848-a1b1eeb09583 // k201 branch @@ -253,6 +253,8 @@ require ( sigs.k8s.io/yaml v1.4.0 ) +require github.com/gorilla/websocket v1.5.0 + require ( cloud.google.com/go v0.114.0 // indirect cloud.google.com/go/auth v0.5.1 // indirect @@ -445,11 +447,10 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.4 // indirect github.com/gophercloud/gophercloud v1.12.0 // indirect - github.com/gorilla/websocket v1.5.0 // indirect github.com/gosnmp/gosnmp v1.37.0 // indirect github.com/grafana/go-offsets-tracker v0.1.7 // indirect github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 // indirect - github.com/grafana/jfr-parser v0.8.0 // indirect + github.com/grafana/jfr-parser v0.8.1-0.20240815065100-4296f63fd6d7 // indirect github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548 github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect @@ -601,6 +602,7 @@ require ( github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect + github.com/planetscale/vtprotobuf v0.6.0 // indirect github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect github.com/prometheus-community/go-runit v0.1.0 // indirect github.com/prometheus-community/prom-label-proxy v0.6.0 // indirect @@ -615,7 +617,7 @@ require ( github.com/safchain/ethtool v0.3.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect - github.com/samber/lo v1.38.1 // indirect + github.com/samber/lo v1.38.1 github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect @@ -809,3 +811,6 @@ exclude ( ) replace github.com/prometheus/procfs => github.com/prometheus/procfs v0.12.0 + +// TODO: Remove after https://github.com/grafana/pyroscope/pull/3416 merges and gets tagged +replace github.com/grafana/pyroscope/api => github.com/simonswine/pyroscope/api v0.0.0-20240726141528-fef95be2e3c2 diff --git a/go.sum b/go.sum index c031b8b03c..fead838fa4 100644 --- a/go.sum +++ b/go.sum @@ -61,8 +61,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= -connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA= -connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s= +connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= +connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= @@ -1029,10 +1029,10 @@ github.com/grafana/go-offsets-tracker v0.1.7/go.mod h1:qcQdu7zlUKIFNUdBJlLyNHuJG github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I= github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 h1:aLBiDMjTtXx2800iCIp+8kdjIlvGX0MF/zICQMQO2qU= github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= -github.com/grafana/jfr-parser v0.8.0 h1:/uo2wZNXrxw7tKLFwP2omJ3EQGMkD9wzhPsRogVofc0= -github.com/grafana/jfr-parser v0.8.0/go.mod h1:M5u1ux34Qo47ZBWksbMYVk40s7dvU3WMVYpxweEu4R0= -github.com/grafana/jfr-parser/pprof v0.0.0-20240126072739-986e71dc0361 h1:TtNajaiSRfM2Mz8N7ouFQDFlviXbIEk9Hts0yoZnhGM= -github.com/grafana/jfr-parser/pprof v0.0.0-20240126072739-986e71dc0361/go.mod h1:P5406BrWxjahTzVF6aCSumNI1KPlZJc0zO0v+zKZ4gc= +github.com/grafana/jfr-parser v0.8.1-0.20240815065100-4296f63fd6d7 h1:uHdCwU81kXJ7b9dMZL1ayesmT+npV0L1vriGYdbdIfs= +github.com/grafana/jfr-parser v0.8.1-0.20240815065100-4296f63fd6d7/go.mod h1:M5u1ux34Qo47ZBWksbMYVk40s7dvU3WMVYpxweEu4R0= +github.com/grafana/jfr-parser/pprof v0.0.0-20240815065100-4296f63fd6d7 h1:S+iep7nivDiFfgBD1AB8RcbW4PA7Fn1HXagHOT4MelQ= +github.com/grafana/jfr-parser/pprof v0.0.0-20240815065100-4296f63fd6d7/go.mod h1:P5406BrWxjahTzVF6aCSumNI1KPlZJc0zO0v+zKZ4gc= github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d h1:YwbJJ/PrVWVdnR+j/EAVuazdeP+Za5qbiH1Vlr+wFXs= github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY= github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a h1:jqM4NNdx8LSquKo8bPx+XWn91S2b+sgNvEcFfSJQtHY= @@ -1059,16 +1059,12 @@ github.com/grafana/prometheus v1.8.2-0.20240514135907-13889ba362e6 h1:kih3d3M3dx github.com/grafana/prometheus v1.8.2-0.20240514135907-13889ba362e6/go.mod h1:yv4MwOn3yHMQ6MZGHPg/U7Fcyqf+rxqiZfSur6myVtc= github.com/grafana/pyroscope-go/godeltaprof v0.1.7 h1:C11j63y7gymiW8VugJ9ZW0pWfxTZugdSJyC48olk5KY= github.com/grafana/pyroscope-go/godeltaprof v0.1.7/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE= -github.com/grafana/pyroscope/api v0.4.0 h1:J86DxoNeLOvtJhB1Cn65JMZkXe682D+RqeoIUiYc/eo= -github.com/grafana/pyroscope/api v0.4.0/go.mod h1:MFnZNeUM4RDsDOnbgKW3GWoLSBpLzMMT9nkvhHHo81o= github.com/grafana/pyroscope/ebpf v0.4.7 h1:OwE7QrgEl4lH9BEzMW9NG4AySysBfCaH5fXYJiuroC8= github.com/grafana/pyroscope/ebpf v0.4.7/go.mod h1:0iOWpGm2M6KXiP2nGa4wf02knSSjEtu11vpUOdQT5AY= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3 h1:UPkAxuhlAcRmJT3/qd34OMTl+ZU7BLLfOO2+NXBlJpY= github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3/go.mod h1:iZiiwNT4HbtGRVqCQu7uJPEZCuEE5sfSSttcnePkDl4= -github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240701202215-1d847d62ed15 h1:J4PmreN24XmbqMIKReAS/P1t7ND6NCAZApcbjBhedrY= -github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240701202215-1d847d62ed15/go.mod h1:DANNLd5vSKqHloqNX4yeS+9ZRI89dj8ySZeEWlI5UU4= github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548 h1:XwoNrPHEl07N7EIMt/WXlzGj0Q4CDEfa+6nrdnQGOG4= github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548/go.mod h1:DANNLd5vSKqHloqNX4yeS+9ZRI89dj8ySZeEWlI5UU4= github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU= @@ -1890,6 +1886,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/planetscale/vtprotobuf v0.6.0 h1:nBeETjudeJ5ZgBHUz1fVHvbqUKnYOXNhsIEabROxmNA= +github.com/planetscale/vtprotobuf v0.6.0/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= @@ -2063,6 +2061,8 @@ github.com/sijms/go-ora/v2 v2.7.6/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4 github.com/simonpasquier/klog-gokit v0.3.0/go.mod h1:+SUlDQNrhVtGt2FieaqNftzzk8P72zpWlACateWxA9k= github.com/simonpasquier/klog-gokit/v3 v3.4.0 h1:2eD2INbzUHuGNynPP86BCB8H6Lwfp6wlkOcuyTr3VWM= github.com/simonpasquier/klog-gokit/v3 v3.4.0/go.mod h1:RREVB5Cc6yYHsweRfhUyM1ZP+Odb8ehxLfY8jaiqvjg= +github.com/simonswine/pyroscope/api v0.0.0-20240726141528-fef95be2e3c2 h1:7IYCsidGSiYp9mz1WIf5cavJEzZ77s8VqOOm2sEkfXw= +github.com/simonswine/pyroscope/api v0.0.0-20240726141528-fef95be2e3c2/go.mod h1:pvSSWHwrk8hPNE53ZkM6jz4k2Auq+0oQPCQ2yFb1B7A= github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 0791697a1b..da79fb012d 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -29,6 +29,7 @@ import ( _ "github.com/grafana/alloy/internal/component/discovery/process" // Import discovery.process _ "github.com/grafana/alloy/internal/component/discovery/puppetdb" // Import discovery.puppetdb _ "github.com/grafana/alloy/internal/component/discovery/relabel" // Import discovery.relabel + _ "github.com/grafana/alloy/internal/component/discovery/remote_relabel" // Import discovery.remote_relabel _ "github.com/grafana/alloy/internal/component/discovery/scaleway" // Import discovery.scaleway _ "github.com/grafana/alloy/internal/component/discovery/serverset" // Import discovery.serverset _ "github.com/grafana/alloy/internal/component/discovery/triton" // Import discovery.triton diff --git a/internal/component/common/relabel/relabel.go b/internal/component/common/relabel/relabel.go index 4bd3fb2970..52d25b53e8 100644 --- a/internal/component/common/relabel/relabel.go +++ b/internal/component/common/relabel/relabel.go @@ -68,13 +68,13 @@ type Regexp struct { *regexp.Regexp } -func newRegexp(s string) (Regexp, error) { +func NewRegexp(s string) (Regexp, error) { re, err := regexp.Compile("^(?:" + s + ")$") return Regexp{re}, err } -func mustNewRegexp(s string) Regexp { - re, err := newRegexp(s) +func MustNewRegexp(s string) Regexp { + re, err := NewRegexp(s) if err != nil { panic(err) } @@ -122,7 +122,7 @@ type Config struct { var DefaultRelabelConfig = Config{ Action: Replace, Separator: ";", - Regex: mustNewRegexp("(.*)"), + Regex: MustNewRegexp("(.*)"), Replacement: "$1", } @@ -131,7 +131,7 @@ func (c *Config) SetToDefault() { *c = Config{ Action: Replace, Separator: ";", - Regex: mustNewRegexp("(.*)"), + Regex: MustNewRegexp("(.*)"), Replacement: "$1", } } diff --git a/internal/component/discovery/remote_relabel/remote_relabel.go b/internal/component/discovery/remote_relabel/remote_relabel.go new file mode 100644 index 0000000000..8eae8b885b --- /dev/null +++ b/internal/component/discovery/remote_relabel/remote_relabel.go @@ -0,0 +1,818 @@ +package remote_relabel + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "slices" + "sort" + "sync" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/go-kit/log" + "github.com/gorilla/websocket" + ws "github.com/gorilla/websocket" + "github.com/grafana/dskit/backoff" + settingsv1 "github.com/grafana/pyroscope/api/gen/proto/go/settings/v1" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/common/config" + alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +const ( + topicRules = "rules" + topicInstances = "instances" +) + +func init() { + component.Register(component.Registration{ + Name: "discovery.remote_relabel", + Stability: featuregate.StabilityExperimental, + Args: Arguments{}, + Exports: Exports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +// Arguments holds values which are used to configure the discovery.remote_relabel component. +type Arguments struct { + // Targets contains the input 'targets' passed by a service discovery component. + Targets []discovery.Target `alloy:"targets,attr"` + + // The websocket endpoint options, must be set + Websocket *WebsocketOptions `alloy:"websocket,block"` +} + +type WebsocketOptions struct { + URL string `alloy:"url,attr"` + BasicAuth *config.BasicAuth `alloy:"basic_auth,block,optional"` + KeepAlive time.Duration `alloy:"keep_alive,attr,optional"` // 0 means disabled + MinBackoff time.Duration `alloy:"min_backoff_period,attr,optional"` // start backoff at this level + MaxBackoff time.Duration `alloy:"max_backoff_period,attr,optional"` // increase exponentially to this level + MaxBackoffRetries int `alloy:"max_backoff_retries,attr,optional"` // give up after this many; zero means infinite retries +} + +// Validate implements syntax.Validator. +func (a *Arguments) Validate() error { + return a.Websocket.Validate("websocket.") +} + +func (w *WebsocketOptions) Validate(prefix string) error { + var merr error + + if w.MinBackoff < 100*time.Millisecond { + merr = errors.Join(merr, fmt.Errorf("%smin_backoff_period must be at least 100ms", prefix)) + } + + if w.MaxBackoff < 5*time.Second { + merr = errors.Join(merr, fmt.Errorf("%smax_backoff_period must be at least 5s", prefix)) + } + + if w.MinBackoff > w.MaxBackoff { + merr = errors.Join(merr, fmt.Errorf("%smin_backoff_period must be smaller or equal than %smax_backoff_period", prefix, prefix)) + } + + if w.KeepAlive < 5*time.Second && w.KeepAlive != 0 { + merr = errors.Join(merr, fmt.Errorf("%skeep_alive must be disabled or at least 5s", prefix)) + } + + if w.MaxBackoffRetries < 0 { + merr = errors.Join(merr, fmt.Errorf("%smax_backoff_retries must be bigger or equals to 0", prefix)) + } + + if u, err := url.Parse(w.URL); err != nil { + } else if u.Scheme != "ws" && u.Scheme != "wss" { + return fmt.Errorf(`%surl has invalid scheme "%s": expect "ws" or "wss"`, prefix, u.Scheme) + } + return merr +} + +// SetToDefault implements syntax.Defaulter. +func (a *Arguments) SetToDefault() { + a.Websocket = &WebsocketOptions{ + KeepAlive: 295 * time.Second, // Just under 5 minutes + MinBackoff: 500 * time.Millisecond, + MaxBackoff: 10 * time.Minute, + } +} + +// Exports holds values which are exported by the discovery.remote_relabel component. +type Exports struct { + Output []discovery.Target `alloy:"output,attr"` + Rules []*alloy_relabel.Config `alloy:"rules,attr"` +} + +type debugInfo struct { + WebsocketStatus string `alloy:"websocket_status,attr,optional"` + WebsocketConnectedSince time.Time `alloy:"websocket_connected_since,attr,optional"` + WebsocketLastError string `alloy:"websocket_last_error,attr,optional"` +} + +type channelWrapper[T any] struct { + done chan struct{} + item T +} + +func (c *channelWrapper[T]) Set(item T) { + c.item = item + close(c.done) +} + +func (c *channelWrapper[T]) Wait(ch chan *channelWrapper[T]) T { + ch <- c + <-c.done + return c.item +} + +func newChannelWrapper[T any]() *channelWrapper[T] { + return &channelWrapper[T]{ + done: make(chan struct{}), + } +} + +// Component implements the discovery.remote_relabel component. +type Component struct { + opts component.Options + logger log.Logger + instance string + + args Arguments + argsCh chan Arguments + + rcs []*alloy_relabel.Config + rcsCh chan []*alloy_relabel.Config + + debugInfoCh chan *channelWrapper[debugInfo] // this is debug info for the UI + targetsCh chan *channelWrapper[[]*settingsv1.CollectionTarget] // over this channel the websocket requests the active targets + + websocket *webSocket + websocketStatus webSocketStatus + websocketNextTry time.Time + + backoff *backoff.Backoff + backoffConfig backoff.Config + + lblBuilder labels.Builder +} + +type webSocket struct { + logger log.Logger + comp *Component + + c *ws.Conn + url string + reqHeaders http.Header + keepAlive time.Duration + + lck sync.Mutex + connected bool + lastErr error + sendTargets bool + targetsCh chan *channelWrapper[[]*settingsv1.CollectionTarget] // over this channel the websocket requests the active targets + + hash uint64 + hasher xxhash.Digest + + wg sync.WaitGroup + cleanStopCh chan struct{} // This channel gets closed when a clean stop is requested + readLoopDone chan struct{} // This channel gets closed when the read loop is done + out chan []byte +} + +type webSocketState uint8 + +const ( + webSocketDisconnected webSocketState = iota + webSocketConnecting + webSocketConnected +) + +func (w webSocketState) String() string { + switch w { + case webSocketDisconnected: + return "disconnected" + case webSocketConnecting: + return "connecting" + case webSocketConnected: + return "connected" + default: + return "unknown" + } +} + +type webSocketStatus struct { + connectedSince time.Time + state webSocketState + lastError error +} + +func (w *webSocket) setState(connected bool, err error) { + w.lck.Lock() + defer w.lck.Unlock() + + w.connected = connected + if err != nil { + w.lastErr = err + } +} +func (w *webSocket) publishTargets() { + w.lck.Lock() + defer w.lck.Unlock() + + if !w.sendTargets { + return + } + + t := newChannelWrapper[[]*settingsv1.CollectionTarget]().Wait(w.targetsCh) + + if len(t) <= 0 { + return + } + + var p settingsv1.CollectionPayloadData + p.Instances = append(p.Instances, &settingsv1.CollectionInstance{ + Hostname: w.comp.instance, + Targets: t, + LastUpdated: time.Now().UnixMilli(), + }) + + data, err := json.Marshal(&settingsv1.CollectionMessage{ + PayloadData: &p, + }) + if err != nil { + panic(err) + } + + w.hasher.Reset() + _, _ = w.hasher.Write(data) + if hash := w.hasher.Sum64(); w.hash == hash { + return + } else { + w.hash = hash + } + + level.Debug(w.logger).Log("msg", "publish targets to the control server", "targets", len(t)) + w.out <- data +} + +func headersEqual(map1, map2 http.Header) bool { + if len(map1) != len(map2) { + return false + } + + for key, value := range map1 { + if val, ok := map2[key]; !ok || !slices.Equal(val, value) { + return false + } + } + return true +} + +func (w *webSocket) needsReplace(url string, headers http.Header, keepAlive time.Duration) bool { + if url != w.url { + return true + } + if keepAlive != w.keepAlive { + return true + } + return !headersEqual(headers, w.reqHeaders) +} + +func (w *webSocket) isConnected() (bool, error) { + w.lck.Lock() + defer w.lck.Unlock() + return w.connected, w.lastErr +} + +func (w *webSocket) readLoop() error { + var msg settingsv1.CollectionMessage + defer w.wg.Done() + defer close(w.readLoopDone) + for { + msg.Reset() + err := w.c.ReadJSON(&msg) + if err != nil { + if ws.IsUnexpectedCloseError(err, ws.CloseNormalClosure) { + return err + } + return nil + } + + if msg.PayloadData != nil { + rcs, err := collectionRulesToAlloyRelabelConfigs(msg.PayloadData.Rules) + if err != nil { + level.Error(w.logger).Log("msg", "error converting rules to relabel configs", "err", err) + continue + } + w.comp.rcsCh <- rcs + } else if msg.PayloadSubscribe != nil { + sendTargets := false + for _, t := range msg.PayloadSubscribe.Topics { + if t == topicInstances { + level.Debug(w.logger).Log("msg", "enable sending targets to control server") + sendTargets = true + break + } + } + w.lck.Lock() + w.sendTargets = sendTargets + w.lck.Unlock() + w.publishTargets() + } else { + level.Error(w.logger).Log("msg", "unknown message type", "msg", msg.Id) + continue + } + } +} + +func (w *webSocket) writeLoop() error { + var keepAlive <-chan time.Time + if w.keepAlive > 0 { + tick := time.NewTicker(30 * time.Second) + defer tick.Stop() + keepAlive = tick.C + } else { + keepAlive = make(chan time.Time) + } + + for { + select { + case <-w.readLoopDone: + return nil + case <-keepAlive: + err := w.c.WriteMessage(websocket.PingMessage, []byte(fmt.Sprintf("keepalive=%d", time.Now().Unix()))) + if err != nil { + return fmt.Errorf("failed to send keep alive: %w", err) + } + case t := <-w.out: + wr, err := w.c.NextWriter(ws.TextMessage) + if err != nil { + level.Error(w.logger).Log("msg", "error creating writer", "err", err) + continue + } + _, err = wr.Write(t) + err2 := wr.Close() + if err != nil { + level.Error(w.logger).Log("msg", "error writing message", "err", err) + return err + } + if err2 != nil { + level.Error(w.logger).Log("msg", "error closing writer", "err", err2) + continue + } + + case <-w.cleanStopCh: + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + err := w.c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + return err + } + select { + case <-w.readLoopDone: + case <-time.After(time.Second): + } + return nil + } + } +} + +func (w *webSocket) Close() error { + close(w.cleanStopCh) + w.wg.Wait() + + return w.c.Close() +} + +var _ component.Component = (*Component)(nil) + +func defaultInstance() string { + // TODO: This should come from Alloy + hostname := os.Getenv("HOSTNAME") + if hostname != "" { + return hostname + } + + hostname, err := os.Hostname() + if err != nil { + return "unknown" + } + return hostname +} + +// New creates a new discovery.remote_relabel component. +func New(o component.Options, args Arguments) (*Component, error) { + c := &Component{ + instance: defaultInstance(), + opts: o, + logger: log.With(o.Logger, "component", "discovery.remote_relabel"), + + argsCh: make(chan Arguments, 1), + rcsCh: make(chan []*alloy_relabel.Config), + debugInfoCh: make(chan *channelWrapper[debugInfo]), + targetsCh: make(chan *channelWrapper[[]*settingsv1.CollectionTarget]), + } + + // Call to Update() to set the output once at the start + if err := c.Update(args); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Component) createWebSocket(urlString string, reqHeaders http.Header, keepAlive time.Duration) (*webSocket, error) { + w := &webSocket{ + comp: c, + logger: c.logger, + url: urlString, + reqHeaders: reqHeaders, + keepAlive: keepAlive, + targetsCh: c.targetsCh, + + connected: true, + cleanStopCh: make(chan struct{}), + readLoopDone: make(chan struct{}), + out: make(chan []byte, 16), + } + + u, err := url.Parse(urlString) + if err != nil { + return nil, err + } + if u.Scheme != "ws" && u.Scheme != "wss" { + return nil, fmt.Errorf("invalid websocket URL scheme: %s, must be ws or wss", u.Scheme) + } + w.logger = log.With(w.logger, "server_url", u.String()) + level.Debug(w.logger).Log("msg", "connecting to control server websocket") + + var resp *http.Response + w.c, resp, err = ws.DefaultDialer.Dial(u.String(), reqHeaders) + if err != nil { + if resp != nil { + message := resp.Status + rBody, rErr := io.ReadAll(io.LimitReader(resp.Body, 4096)) + if rErr == nil { + message = string(rBody) + } + return nil, fmt.Errorf("error dialing websocket status_code=%d message=%s: %w", resp.StatusCode, message, err) + } + return nil, err + } + + pingHandler := w.c.PingHandler() + w.c.SetPingHandler(func(appData string) error { + level.Debug(w.logger).Log("msg", "ping", "data", appData) + return pingHandler(appData) + }) + w.c.SetPongHandler(func(appData string) error { + level.Debug(w.logger).Log("msg", "pong", "data", appData) + return nil + }) + + // subscribe to the rules and instances topics + var subscribe settingsv1.CollectionMessage + subscribe.PayloadSubscribe = &settingsv1.CollectionPayloadSubscribe{ + Topics: []string{topicRules}, + } + msg, err := json.Marshal(&subscribe) + if err != nil { + return nil, err + } + w.out <- msg + + w.wg.Add(1) + go func() { + defer w.wg.Done() + if err := w.writeLoop(); err != nil { + level.Error(w.logger).Log("msg", "error in the write loop", "err", err) + } + w.setState(false, err) + }() + + w.wg.Add(1) + go func() { + err := w.readLoop() + if err != nil { + level.Error(w.logger).Log("msg", "error in the read loop", "err", err) + } + w.setState(false, err) + }() + + return w, nil +} + +// Run implements component.Component. +func (c *Component) Run(ctx context.Context) error { + t := time.NewTicker(200 * time.Millisecond) + defer t.Stop() + defer func() { + // stop the websocket + if c.websocket != nil { + if err := c.websocket.Close(); err != nil { + level.Error(c.logger).Log("msg", "error closing websocket", "err", err) + } + } + }() + // close all channels + defer func() { + close(c.argsCh) + close(c.rcsCh) + close(c.debugInfoCh) + close(c.targetsCh) + }() + + // TODO: Implement websocket keep alive + + for { + select { + case <-ctx.Done(): + return nil + case newArgs := <-c.argsCh: + c.args = newArgs + c.updateOutputs() + c.updateWebSocket() + case newRCS := <-c.rcsCh: + c.rcs = newRCS + c.updateOutputs() + case <-t.C: + c.updateWebSocket() + case t := <-c.targetsCh: + t.Set(mapToTargets(c.args.Targets)) + case di := <-c.debugInfoCh: + errText := "" + if err := c.websocketStatus.lastError; err != nil { + errText = err.Error() + } + di.Set(debugInfo{ + WebsocketStatus: c.websocketStatus.state.String(), + WebsocketConnectedSince: c.websocketStatus.connectedSince, + WebsocketLastError: errText, + }) + } + } +} + +// this is the main business logic which will reevaluate the exports when either rules or targets have changed. +func (c *Component) updateOutputs() { + c.opts.OnStateChange(Exports{ + Output: c.filterTargets(), + Rules: c.rcs, + }) +} + +// this is the side logic, to keep the websocket connected. +func (c *Component) updateWebSocket() { + // ensure we have a backoff configured with the same values as the arguments + backoffChanged := false + if c.backoffConfig.MaxBackoff != c.args.Websocket.MaxBackoff { + c.backoffConfig.MaxBackoff = c.args.Websocket.MaxBackoff + backoffChanged = true + } + if c.backoffConfig.MinBackoff != c.args.Websocket.MinBackoff { + c.backoffConfig.MinBackoff = c.args.Websocket.MinBackoff + backoffChanged = true + } + if c.backoffConfig.MaxRetries != c.args.Websocket.MaxBackoffRetries { + c.backoffConfig.MaxRetries = c.args.Websocket.MaxBackoffRetries + backoffChanged = true + } + // setup backoff if empty or changed + if c.backoff == nil || backoffChanged { + c.backoff = backoff.New(context.Background(), c.backoffConfig) + c.websocketNextTry = time.Time{} + } + + // web socket is disconnected, create a new one + if c.websocket == nil { + c.newWebSocket() + return + } + + // check if websocket arguments suggest it needs. + if c.websocket.needsReplace(c.args.Websocket.URL, c.newHeader(), c.args.Websocket.KeepAlive) { + c.replaceWebSocket() + return + } + + // check if websocket is still connected + if connected, err := c.websocket.isConnected(); !connected { + c.websocketStatus.lastError = err + c.replaceWebSocket() + } + + // for all we know, the websocket is still connected +} + +func (c *Component) newHeader() http.Header { + req := &http.Request{Header: make(http.Header)} + if c.args.Websocket.BasicAuth != nil { + req.SetBasicAuth(c.args.Websocket.BasicAuth.Username, string(c.args.Websocket.BasicAuth.Password)) + } + return req.Header +} + +func (c *Component) newWebSocket() { + c.websocketStatus.state = webSocketConnecting + + if !c.websocketNextTry.IsZero() && c.websocketNextTry.After(time.Now()) { + // we are in backoff, don't try to connect + return + } + + req := &http.Request{Header: make(http.Header)} + if c.args.Websocket.BasicAuth != nil { + req.SetBasicAuth(c.args.Websocket.BasicAuth.Username, string(c.args.Websocket.BasicAuth.Password)) + } + w, err := c.createWebSocket(c.args.Websocket.URL, c.newHeader(), c.args.Websocket.KeepAlive) + if err != nil { + level.Error(c.logger).Log("msg", "error creating websocket", "err", err) + c.websocketNextTry = time.Now().Add(c.backoff.NextDelay()) + c.websocketStatus.lastError = err + return + } + c.websocket = w + c.websocketStatus.state = webSocketConnected + c.websocketStatus.connectedSince = time.Now() + c.backoff.Reset() + c.websocketNextTry = time.Time{} +} + +func (c *Component) replaceWebSocket() { + if err := c.websocket.Close(); err != nil { + level.Error(c.logger).Log("msg", "error closing websocket", "err", err) + } + c.websocketStatus.state = webSocketDisconnected + c.websocket = nil + c.newWebSocket() +} + +func (c *Component) filterTargets() []discovery.Target { + rcs := alloy_relabel.ComponentToPromRelabelConfigs(c.rcs) + targets := make([]discovery.Target, 0, len(c.args.Targets)) + for _, t := range c.args.Targets { + c.lblBuilder.Reset(nil) + addTargetToLblBuilder(&c.lblBuilder, t) + if keep := relabel.ProcessBuilder(&c.lblBuilder, rcs...); keep { + targets = append(targets, lblBuilderToMap(&c.lblBuilder)) + } + } + return targets +} + +// Update implements component.Component. +func (c *Component) Update(args component.Arguments) error { + newArgs := args.(Arguments) + c.argsCh <- newArgs + return nil +} + +// DebugInfo returns debug information for this component. +func (c *Component) DebugInfo() interface{} { + return newChannelWrapper[debugInfo]().Wait(c.debugInfoCh) +} + +// CurrentHealth returns the health of the component. +func (c *Component) CurrentHealth() component.Health { + debugInfo := newChannelWrapper[debugInfo]().Wait(c.debugInfoCh) + + h := component.Health{ + Health: component.HealthTypeUnknown, + UpdateTime: time.Now(), + } + + if debugInfo.WebsocketStatus == webSocketConnected.String() { + h.Health = component.HealthTypeHealthy + } else { + h.Health = component.HealthTypeUnhealthy + h.Message = debugInfo.WebsocketLastError + } + return h +} + +func collectionRulesToAlloyRelabelConfigs(rules []*settingsv1.CollectionRule) (alloy_relabel.Rules, error) { + res := make(alloy_relabel.Rules, 0, len(rules)) + resX := make([]alloy_relabel.Config, len(rules)) + for idx := range rules { + if err := collectionRulesToAlloyRelabelConfig(rules[idx], &resX[idx]); err != nil { + return nil, err + } + res = append(res, &resX[idx]) + } + return res, nil +} + +func lblBuilderToMap(lb *labels.Builder) discovery.Target { + lbls := lb.Labels() + res := make(map[string]string, len(lbls)) + for _, l := range lbls { + res[l.Name] = l.Value + } + return res +} + +func addTargetToLblBuilder(lb *labels.Builder, t discovery.Target) { + for k, v := range t { + lb.Set(k, v) + } +} + +func collectionRulesToAlloyRelabelConfig(rule *settingsv1.CollectionRule, config *alloy_relabel.Config) error { + // set default values + config.SetToDefault() + + switch rule.Action { + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_REPLACE: + config.Action = alloy_relabel.Replace + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_KEEP: + config.Action = alloy_relabel.Keep + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_DROP: + config.Action = alloy_relabel.Drop + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_KEEP_EQUAL: + config.Action = alloy_relabel.KeepEqual + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_DROP_EQUAL: + config.Action = alloy_relabel.DropEqual + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_HASHMOD: + config.Action = alloy_relabel.HashMod + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_LABELMAP: + config.Action = alloy_relabel.LabelMap + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_LABELDROP: + config.Action = alloy_relabel.LabelDrop + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_LABELKEEP: + config.Action = alloy_relabel.LabelKeep + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_LOWERCASE: + config.Action = alloy_relabel.Lowercase + case settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_UPPERCASE: + config.Action = alloy_relabel.Uppercase + default: + return fmt.Errorf("unknown action %s", rule.Action) + } + + if rule.Modulus != nil { + config.Modulus = *rule.Modulus + } + + if rule.Regex != nil { + var err error + config.Regex, err = alloy_relabel.NewRegexp(*rule.Regex) + if err != nil { + return err + } + } + + if rule.Replacement != nil { + config.Replacement = *rule.Replacement + } + + if rule.Separator != nil { + config.Separator = *rule.Separator + } + + if rule.TargetLabel != nil { + config.TargetLabel = *rule.TargetLabel + } + + if len(rule.SourceLabels) > 0 { + config.SourceLabels = rule.SourceLabels + } + + return nil +} + +func mapToTargets(m []discovery.Target) []*settingsv1.CollectionTarget { + result := make([]*settingsv1.CollectionTarget, len(m)) + var labelNames []string + for idx := range m { + labelNames = labelNames[:0] + for k := range m[idx] { + labelNames = append(labelNames, k) + } + sort.Strings(labelNames) + result[idx] = &settingsv1.CollectionTarget{ + Labels: make([]*typesv1.LabelPair, len(labelNames)), + } + for j, k := range labelNames { + result[idx].Labels[j] = &typesv1.LabelPair{ + Name: k, + Value: m[idx][k], + } + } + } + return result +} diff --git a/internal/component/discovery/remote_relabel/remote_relabel_test.go b/internal/component/discovery/remote_relabel/remote_relabel_test.go new file mode 100644 index 0000000000..10a448b77a --- /dev/null +++ b/internal/component/discovery/remote_relabel/remote_relabel_test.go @@ -0,0 +1,394 @@ +package remote_relabel + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + + "github.com/gorilla/websocket" + settingsv1 "github.com/grafana/pyroscope/api/gen/proto/go/settings/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/alloy/syntax" +) + +type mockServer struct { + t testing.TB + wg sync.WaitGroup + wsCh chan *mockConn +} + +func (ws *mockServer) handleWebSocket(w http.ResponseWriter, r *http.Request) { + // Upgrade connection to WebSocket + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + require.NoError(ws.t, err) + return + } + + c := &mockConn{ + Conn: conn, + t: ws.t, + read: make(chan []byte), + write: make(chan []byte), + } + + ws.wsCh <- c + + ws.wg.Add(1) + go func() { + defer ws.wg.Done() + for { + _, msg, err := c.ReadMessage() + if err != nil { + c.closeOnce.Do(func() { + close(c.write) + }) + close(c.read) + if !websocket.IsCloseError(err, websocket.CloseNormalClosure) && c.t != nil { + require.NoError(c.t, err, "error reading message") + } + break + } + c.read <- msg + } + }() + + ws.wg.Add(1) + go func() { + defer ws.wg.Done() + defer c.Close() + for b := range c.write { + err := c.WriteMessage(websocket.TextMessage, b) + if err != nil { + if c.t != nil { + require.NoError(c.t, err, "error writing message") + } + break + } + } + }() +} + +func (ws *mockServer) close() { + ws.wg.Wait() +} + +type mockConn struct { + *websocket.Conn + t testing.TB + closeOnce sync.Once + read chan []byte + write chan []byte +} + +var ( + upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } +) + +func newMockServer(t testing.TB) *mockServer { + return &mockServer{ + t: t, + wsCh: make(chan *mockConn, 16), + } +} + +func Test_Instances_Update(t *testing.T) { + // ensure that all goroutines are cleaned up + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + + // create mock websocket server + wsCh, wsURL, cleanup := newWebsocketTest(t) + defer cleanup() + + argument := Arguments{ + Targets: []discovery.Target{ + map[string]string{"job": "foo"}, + map[string]string{"job": "bar"}, + }, + Websocket: &WebsocketOptions{URL: wsURL}, + } + + c, err := New(component.Options{ + ID: "1", + Logger: util.TestAlloyLogger(t), + OnStateChange: func(e component.Exports) {}, + }, argument) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan struct{}) + go func() { + require.NoError(t, c.Run(ctx)) + close(done) + }() + + // get websocket connection + ws := <-wsCh + + // subscribe to instance updates + ws.write <- []byte(`{"payload_subscribe":{"topics":["instances"]}}`) + + // expect instance update + var msg settingsv1.CollectionMessage + for read := range ws.read { + err := json.Unmarshal(read, &msg) + require.NoError(t, err) + + if msg.PayloadData != nil { + break + } + } + + // assert instance update + require.NotNil(t, msg.PayloadData) + require.Len(t, msg.PayloadData.Instances, 1) + assert.Equal(t, defaultInstance(), msg.PayloadData.Instances[0].Hostname) + targets, err := json.Marshal(msg.PayloadData.Instances[0].Targets) + require.NoError(t, err) + assert.JSONEq(t, `[{"labels":[{"name":"job","value":"foo"}]},{"labels":[{"name":"job","value":"bar"}]}]`, string(targets)) + + cancel() + <-done + t.Log("component stopped") +} + +func newWebsocketTest(t testing.TB) (chan *mockConn, string, func()) { + ws := newMockServer(t) + mux := http.NewServeMux() + mux.HandleFunc("/ws", ws.handleWebSocket) + srv := httptest.NewServer(mux) + return ws.wsCh, strings.Replace(srv.URL, "http", "ws", 1) + "/ws", func() { + srv.Close() + ws.close() + } +} + +func Test_Rule_Update(t *testing.T) { + // ensure that all goroutines are cleaned up + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + + // create mock websocket server + wsCh, wsURL, cleanup := newWebsocketTest(t) + defer cleanup() + + argument := Arguments{ + Targets: []discovery.Target{ + map[string]string{"job": "foo"}, + map[string]string{"job": "bar"}, + }, + Websocket: &WebsocketOptions{URL: wsURL}, + } + + exportsCh := make(chan Exports, 1) + c, err := New(component.Options{ + ID: "1", + Logger: util.TestAlloyLogger(t), + OnStateChange: func(e component.Exports) { + export, ok := e.(Exports) + require.True(t, ok, "expect correct type") + exportsCh <- export + }, + }, argument) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan struct{}) + go func() { + require.NoError(t, c.Run(ctx)) + close(done) + }() + + ws := <-wsCh + + msg := <-ws.read + require.JSONEq(t, `{"payload_subscribe":{"topics":["rules"]}}`, string(msg)) + + // Send a initial rule + ws.write <- []byte(fmt.Sprintf(`{"payload_data":{"rules":[{"action": %d}]}}`, settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_DROP)) + + // retrieve updates + for export := range exportsCh { + if len(export.Rules) > 0 { + // export as alloy syntax + v, err := syntax.Marshal(export) + require.NoError(t, err) + assert.Equal(t, `output = [] +rules = [{ + source_labels = [], + separator = ";", + regex = "(.*)", + modulus = 0, + target_label = "", + replacement = "$1", + action = "drop", +}]`, string(v)) + t.Log("initial rule received") + break + } + } + + // Send a new rule + ws.write <- []byte(fmt.Sprintf(`{"payload_data":{"rules":[{"action": %d, "source_labels":["job"], "regex": "foo"}]}}`, settingsv1.CollectionRuleAction_COLLECTION_RULE_ACTION_KEEP)) + + // retrieve updates + for export := range exportsCh { + if len(export.Rules) > 0 { + // export as alloy syntax + v, err := syntax.Marshal(export) + require.NoError(t, err) + assert.Equal(t, `output = [{ + job = "foo", +}] +rules = [{ + source_labels = ["job"], + separator = ";", + regex = "foo", + modulus = 0, + target_label = "", + replacement = "$1", + action = "keep", +}]`, string(v)) + t.Log("second rule received") + break + } + } + + cancel() + <-done + t.Log("component stopped") +} + +func Test_Reconnect_Websocket(t *testing.T) { + // ensure that all goroutines are cleaned up + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + + // create mock websocket server + wsCh, wsURL, cleanup := newWebsocketTest(t) + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + argument := Arguments{ + Websocket: &WebsocketOptions{URL: wsURL}, + } + + c, err := New(component.Options{ + ID: "1", + Logger: util.TestAlloyLogger(t), + OnStateChange: func(e component.Exports) {}, + }, argument) + require.NoError(t, err) + + done := make(chan struct{}) + go func() { + require.NoError(t, c.Run(ctx)) + close(done) + }() + + ws := <-wsCh + + // wait for websocket connection + msg := <-ws.read + require.JSONEq(t, `{"payload_subscribe":{"topics":["rules"]}}`, string(msg)) + + // unexpectedly close websocket connection + ws.t = nil // this prevents error reporting + require.NoError(t, ws.Conn.UnderlyingConn().Close()) + + // wait for new websocket + ws = <-wsCh + + // wait for second websocket initiation + msg = <-ws.read + require.JSONEq(t, `{"payload_subscribe":{"topics":["rules"]}}`, string(msg)) + + // get debug info + di := c.DebugInfo() + diType, ok := di.(debugInfo) + require.True(t, ok) + require.Equal(t, "websocket: close 1006 (abnormal closure): unexpected EOF", diType.WebsocketLastError) + + cancel() + <-done +} + +func syntaxParse(obj interface{}, lines ...string) error { + lines = append(lines, "") // ensure we end with a newline + return syntax.Unmarshal([]byte(strings.Join(lines, "\n")), obj) +} + +func TestValidate(t *testing.T) { + t.Run("valid", func(t *testing.T) { + var args Arguments + err := syntaxParse(&args, + `websocket {`, + ` url = "wss://blabala.com"`, + `}`, + `targets = [{job = "foo"}]`, + ) + require.Nil(t, err) + assert.Equal(t, "wss://blabala.com", args.Websocket.URL) + assert.Equal(t, []discovery.Target{{"job": "foo"}}, args.Targets) + }) + + t.Run("missing websockets url", func(t *testing.T) { + var args Arguments + err := syntaxParse(&args, + `targets = [{job = "foo"}]`, + `websocket {`, + `}`, + ) + require.ErrorContains(t, err, `missing required attribute "url"`) + }) + + t.Run("url with wrong scheme", func(t *testing.T) { + var args Arguments + err := syntaxParse(&args, + `targets = [{job = "foo"}]`, + `websocket {`, + ` url = "http://blabala.com"`, + `}`, + ) + require.ErrorContains(t, err, `websocket.url has invalid scheme "http": expect "ws" or "wss"`) + }) + + t.Run("missing targets required", func(t *testing.T) { + var args Arguments + err := syntaxParse(&args, + `websocket {`, + ` url = "wss://blabala.com"`, + `}`, + ) + require.ErrorContains(t, err, `missing required attribute "targets"`) + }) + + t.Run("backoff too small", func(t *testing.T) { + var args Arguments + err := syntaxParse(&args, + `websocket {`, + ` url = "wss://blabala.com"`, + ` min_backoff_period = "1ms"`, + ` max_backoff_period = "100ms"`, + `}`, + `targets = [{job = "foo"}]`, + "", + ) + require.ErrorContains(t, err, `websocket.min_backoff_period must be at least 100ms`) + require.ErrorContains(t, err, `websocket.max_backoff_period must be at least 5s`) + }) +}