From 7df195493ffd25123ba93c5011319e2f45eb694d Mon Sep 17 00:00:00 2001 From: Anna Carrigan Date: Fri, 5 Jan 2024 12:49:16 -0700 Subject: [PATCH] feat(inputs.opcua_listener): Add options for failure to connect (#14533) Adds Error (default, same behavior as before), Ignore, and Retry as options for when the plugin cannot connect to the OPC UA server when Telegraf starts --- docs/developers/SAMPLE_CONFIG.md | 2 +- plugins/inputs/opcua_listener/README.md | 6 ++ .../inputs/opcua_listener/opcua_listener.go | 23 ++++- .../opcua_listener/opcua_listener_test.go | 93 +++++++++++++++++++ plugins/inputs/opcua_listener/sample.conf | 6 ++ .../inputs/opcua_listener/subscribe_client.go | 13 +++ 6 files changed, 139 insertions(+), 4 deletions(-) diff --git a/docs/developers/SAMPLE_CONFIG.md b/docs/developers/SAMPLE_CONFIG.md index 75ff181b6679d..18d9cc355b57a 100644 --- a/docs/developers/SAMPLE_CONFIG.md +++ b/docs/developers/SAMPLE_CONFIG.md @@ -28,7 +28,7 @@ Documentation is double commented, full sentences, and ends with a period. # exchange_type = "topic" ``` -Try to give every parameter a default value whenever possible. If an +Try to give every parameter a default value whenever possible. If a parameter does not have a default or must frequently be changed then have it uncommented. diff --git a/plugins/inputs/opcua_listener/README.md b/plugins/inputs/opcua_listener/README.md index c8c266bf891d3..311655d10ab06 100644 --- a/plugins/inputs/opcua_listener/README.md +++ b/plugins/inputs/opcua_listener/README.md @@ -48,6 +48,12 @@ to use them. ## Maximum time allowed to establish a connect to the endpoint. # connect_timeout = "10s" # + ## Behavior when we fail to connect to the endpoint on initialization. Valid options are: + ## "error": throw an error and exits Telegraf + ## "ignore": ignore this plugin if errors are encountered + # "retry": retry connecting at each interval + # connect_fail_behavior = "error" + # ## Maximum time allowed for a request over the established connection. # request_timeout = "5s" # diff --git a/plugins/inputs/opcua_listener/opcua_listener.go b/plugins/inputs/opcua_listener/opcua_listener.go index 3e5af78c252e9..5d029e17aebb6 100644 --- a/plugins/inputs/opcua_listener/opcua_listener.go +++ b/plugins/inputs/opcua_listener/opcua_listener.go @@ -4,11 +4,13 @@ package opcua_listener import ( "context" _ "embed" + "fmt" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/opcua" + opcuaclient "github.com/influxdata/telegraf/plugins/common/opcua" "github.com/influxdata/telegraf/plugins/common/opcua/input" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -27,15 +29,26 @@ func (*OpcUaListener) SampleConfig() string { } func (o *OpcUaListener) Init() (err error) { + switch o.ConnectFailBehavior { + case "": + o.ConnectFailBehavior = "error" + case "error", "ignore", "retry": + // Do nothing as these are valid + default: + return fmt.Errorf("unknown setting %q for 'connect_fail_behavior'", o.ConnectFailBehavior) + } o.client, err = o.SubscribeClientConfig.CreateSubscribeClient(o.Log) return err } -func (o *OpcUaListener) Gather(_ telegraf.Accumulator) error { - return nil +func (o *OpcUaListener) Gather(acc telegraf.Accumulator) error { + if o.client.State() == opcuaclient.Connected || o.SubscribeClientConfig.ConnectFailBehavior == "ignore" { + return nil + } + return o.connect(acc) } -func (o *OpcUaListener) Start(acc telegraf.Accumulator) error { +func (o *OpcUaListener) connect(acc telegraf.Accumulator) error { ctx := context.Background() ch, err := o.client.StartStreamValues(ctx) if err != nil { @@ -56,6 +69,10 @@ func (o *OpcUaListener) Start(acc telegraf.Accumulator) error { return nil } +func (o *OpcUaListener) Start(acc telegraf.Accumulator) error { + return o.connect(acc) +} + func (o *OpcUaListener) Stop() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) select { diff --git a/plugins/inputs/opcua_listener/opcua_listener_test.go b/plugins/inputs/opcua_listener/opcua_listener_test.go index 8846653f7fed5..0c18f2f89b55f 100644 --- a/plugins/inputs/opcua_listener/opcua_listener_test.go +++ b/plugins/inputs/opcua_listener/opcua_listener_test.go @@ -37,6 +37,96 @@ func MapOPCTag(tags OPCTags) (out input.NodeSettings) { return out } +func TestInitPluginWithBadConnectFailBehaviorValue(t *testing.T) { + plugin := OpcUaListener{ + SubscribeClientConfig: SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://notarealserver:4840", + SecurityPolicy: "None", + SecurityMode: "None", + ConnectTimeout: config.Duration(5 * time.Second), + RequestTimeout: config.Duration(10 * time.Second), + }, + MetricName: "opcua", + Timestamp: input.TimestampSourceTelegraf, + RootNodes: make([]input.NodeSettings, 0), + }, + ConnectFailBehavior: "notanoption", + SubscriptionInterval: config.Duration(100 * time.Millisecond), + }, + Log: testutil.Logger{}, + } + err := plugin.Init() + require.ErrorContains(t, err, "unknown setting \"notanoption\" for 'connect_fail_behavior'") +} + +func TestStartPlugin(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + acc := &testutil.Accumulator{} + + plugin := OpcUaListener{ + SubscribeClientConfig: SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://notarealserver:4840", + SecurityPolicy: "None", + SecurityMode: "None", + ConnectTimeout: config.Duration(5 * time.Second), + RequestTimeout: config.Duration(10 * time.Second), + }, + MetricName: "opcua", + Timestamp: input.TimestampSourceTelegraf, + RootNodes: make([]input.NodeSettings, 0), + }, + SubscriptionInterval: config.Duration(100 * time.Millisecond), + }, + Log: testutil.Logger{}, + } + testopctags := []OPCTags{ + {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, + } + for _, tags := range testopctags { + plugin.SubscribeClientConfig.RootNodes = append(plugin.SubscribeClientConfig.RootNodes, MapOPCTag(tags)) + } + require.NoError(t, plugin.Init()) + err := plugin.Start(acc) + require.ErrorContains(t, err, "could not resolve address") + + plugin.SubscribeClientConfig.ConnectFailBehavior = "ignore" + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Start(acc)) + require.Equal(t, plugin.client.OpcUAClient.State(), opcua.Disconnected) + plugin.Stop() + + container := testutil.Container{ + Image: "open62541/open62541", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("TCP network layer listening on opc.tcp://"), + ), + } + plugin.SubscribeClientConfig.ConnectFailBehavior = "retry" + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Start(acc)) + require.Equal(t, plugin.client.OpcUAClient.State(), opcua.Disconnected) + + err = container.Start() + require.NoError(t, err, "failed to start container") + + defer container.Terminate() + newEndpoint := fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]) + plugin.client.Config.Endpoint = newEndpoint + plugin.client.OpcUAClient.Config.Endpoint = newEndpoint + err = plugin.Gather(acc) + require.NoError(t, err) + require.Equal(t, plugin.client.OpcUAClient.State(), opcua.Connected) +} + func TestSubscribeClientIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -104,6 +194,7 @@ func TestSubscribeClientIntegration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() res, err := o.StartStreamValues(ctx) + require.Equal(t, o.State(), opcua.Connected) require.NoError(t, err) for { @@ -299,6 +390,7 @@ endpoint = "opc.tcp://localhost:4840" connect_timeout = "10s" request_timeout = "5s" subscription_interval = "200ms" +connect_fail_behavior = "error" security_policy = "auto" security_mode = "auto" certificate = "/etc/telegraf/cert.pem" @@ -347,6 +439,7 @@ additional_valid_status_codes = ["0xC0"] require.Equal(t, config.Duration(10*time.Second), o.SubscribeClientConfig.ConnectTimeout) require.Equal(t, config.Duration(5*time.Second), o.SubscribeClientConfig.RequestTimeout) require.Equal(t, config.Duration(200*time.Millisecond), o.SubscribeClientConfig.SubscriptionInterval) + require.Equal(t, "error", o.SubscribeClientConfig.ConnectFailBehavior) require.Equal(t, "auto", o.SubscribeClientConfig.SecurityPolicy) require.Equal(t, "auto", o.SubscribeClientConfig.SecurityMode) require.Equal(t, "/etc/telegraf/cert.pem", o.SubscribeClientConfig.Certificate) diff --git a/plugins/inputs/opcua_listener/sample.conf b/plugins/inputs/opcua_listener/sample.conf index 3e333020cafc0..1d31f778f72f3 100644 --- a/plugins/inputs/opcua_listener/sample.conf +++ b/plugins/inputs/opcua_listener/sample.conf @@ -9,6 +9,12 @@ ## Maximum time allowed to establish a connect to the endpoint. # connect_timeout = "10s" # + ## Behavior when we fail to connect to the endpoint on initialization. Valid options are: + ## "error": throw an error and exits Telegraf + ## "ignore": ignore this plugin if errors are encountered + # "retry": retry connecting at each interval + # connect_fail_behavior = "error" + # ## Maximum time allowed for a request over the established connection. # request_timeout = "5s" # diff --git a/plugins/inputs/opcua_listener/subscribe_client.go b/plugins/inputs/opcua_listener/subscribe_client.go index 20d479caa6fd9..a6e914c20a011 100644 --- a/plugins/inputs/opcua_listener/subscribe_client.go +++ b/plugins/inputs/opcua_listener/subscribe_client.go @@ -11,12 +11,14 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + opcuaclient "github.com/influxdata/telegraf/plugins/common/opcua" "github.com/influxdata/telegraf/plugins/common/opcua/input" ) type SubscribeClientConfig struct { input.InputClientConfig SubscriptionInterval config.Duration `toml:"subscription_interval"` + ConnectFailBehavior string `toml:"connect_fail_behavior"` } type SubscribeClient struct { @@ -133,6 +135,9 @@ func (o *SubscribeClient) Connect() error { func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} { o.Log.Debugf("Stopping OPC subscription...") + if o.State() != opcuaclient.Connected { + return nil + } if o.sub != nil { if err := o.sub.Cancel(ctx); err != nil { o.Log.Warn("Cancelling OPC UA subscription failed with error ", err) @@ -150,6 +155,14 @@ func (o *SubscribeClient) CurrentValues() ([]telegraf.Metric, error) { func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) { err := o.Connect() if err != nil { + switch o.Config.ConnectFailBehavior { + case "retry": + o.Log.Warnf("Failed to connect to OPC UA server %s. Will attempt to connect again at the next interval: %s", o.Config.Endpoint, err) + return nil, nil + case "ignore": + o.Log.Errorf("Failed to connect to OPC UA server %s. Will not retry: %s", o.Config.Endpoint, err) + return nil, nil + } return nil, err }