Skip to content

Commit

Permalink
feat(inputs.opcua_listener): Add options for failure to connect (infl…
Browse files Browse the repository at this point in the history
…uxdata#14533)

Adds Error (default, same behavior as before), Ignore, and Retry as
options for when the plugin cannot connect to the OPC UA server when
Telegraf starts
  • Loading branch information
Anna Carrigan committed Jan 15, 2024
1 parent f899e71 commit 7df1954
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/developers/SAMPLE_CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Documentation is double commented, full sentences, and ends with a period.
# exchange_type = "topic"
```

Try to give every parameter a default value whenever possible. If an
Try to give every parameter a default value whenever possible. If a
parameter does not have a default or must frequently be changed then have it
uncommented.

Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/opcua_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ to use them.
## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#
## Behavior when we fail to connect to the endpoint on initialization. Valid options are:
## "error": throw an error and exits Telegraf
## "ignore": ignore this plugin if errors are encountered
# "retry": retry connecting at each interval
# connect_fail_behavior = "error"
#
## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"
#
Expand Down
23 changes: 20 additions & 3 deletions plugins/inputs/opcua_listener/opcua_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package opcua_listener
import (
"context"
_ "embed"
"fmt"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua"
opcuaclient "github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand All @@ -27,15 +29,26 @@ func (*OpcUaListener) SampleConfig() string {
}

func (o *OpcUaListener) Init() (err error) {
switch o.ConnectFailBehavior {
case "":
o.ConnectFailBehavior = "error"
case "error", "ignore", "retry":
// Do nothing as these are valid
default:
return fmt.Errorf("unknown setting %q for 'connect_fail_behavior'", o.ConnectFailBehavior)
}
o.client, err = o.SubscribeClientConfig.CreateSubscribeClient(o.Log)
return err
}

func (o *OpcUaListener) Gather(_ telegraf.Accumulator) error {
return nil
func (o *OpcUaListener) Gather(acc telegraf.Accumulator) error {
if o.client.State() == opcuaclient.Connected || o.SubscribeClientConfig.ConnectFailBehavior == "ignore" {
return nil
}
return o.connect(acc)
}

func (o *OpcUaListener) Start(acc telegraf.Accumulator) error {
func (o *OpcUaListener) connect(acc telegraf.Accumulator) error {
ctx := context.Background()
ch, err := o.client.StartStreamValues(ctx)
if err != nil {
Expand All @@ -56,6 +69,10 @@ func (o *OpcUaListener) Start(acc telegraf.Accumulator) error {
return nil
}

func (o *OpcUaListener) Start(acc telegraf.Accumulator) error {
return o.connect(acc)
}

func (o *OpcUaListener) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
select {
Expand Down
93 changes: 93 additions & 0 deletions plugins/inputs/opcua_listener/opcua_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,96 @@ func MapOPCTag(tags OPCTags) (out input.NodeSettings) {
return out
}

func TestInitPluginWithBadConnectFailBehaviorValue(t *testing.T) {
plugin := OpcUaListener{
SubscribeClientConfig: SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://notarealserver:4840",
SecurityPolicy: "None",
SecurityMode: "None",
ConnectTimeout: config.Duration(5 * time.Second),
RequestTimeout: config.Duration(10 * time.Second),
},
MetricName: "opcua",
Timestamp: input.TimestampSourceTelegraf,
RootNodes: make([]input.NodeSettings, 0),
},
ConnectFailBehavior: "notanoption",
SubscriptionInterval: config.Duration(100 * time.Millisecond),
},
Log: testutil.Logger{},
}
err := plugin.Init()
require.ErrorContains(t, err, "unknown setting \"notanoption\" for 'connect_fail_behavior'")
}

func TestStartPlugin(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

acc := &testutil.Accumulator{}

plugin := OpcUaListener{
SubscribeClientConfig: SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://notarealserver:4840",
SecurityPolicy: "None",
SecurityMode: "None",
ConnectTimeout: config.Duration(5 * time.Second),
RequestTimeout: config.Duration(10 * time.Second),
},
MetricName: "opcua",
Timestamp: input.TimestampSourceTelegraf,
RootNodes: make([]input.NodeSettings, 0),
},
SubscriptionInterval: config.Duration(100 * time.Millisecond),
},
Log: testutil.Logger{},
}
testopctags := []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
}
for _, tags := range testopctags {
plugin.SubscribeClientConfig.RootNodes = append(plugin.SubscribeClientConfig.RootNodes, MapOPCTag(tags))
}
require.NoError(t, plugin.Init())
err := plugin.Start(acc)
require.ErrorContains(t, err, "could not resolve address")

plugin.SubscribeClientConfig.ConnectFailBehavior = "ignore"
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(acc))
require.Equal(t, plugin.client.OpcUAClient.State(), opcua.Disconnected)
plugin.Stop()

container := testutil.Container{
Image: "open62541/open62541",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("TCP network layer listening on opc.tcp://"),
),
}
plugin.SubscribeClientConfig.ConnectFailBehavior = "retry"
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(acc))
require.Equal(t, plugin.client.OpcUAClient.State(), opcua.Disconnected)

err = container.Start()
require.NoError(t, err, "failed to start container")

defer container.Terminate()
newEndpoint := fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort])
plugin.client.Config.Endpoint = newEndpoint
plugin.client.OpcUAClient.Config.Endpoint = newEndpoint
err = plugin.Gather(acc)
require.NoError(t, err)
require.Equal(t, plugin.client.OpcUAClient.State(), opcua.Connected)
}

func TestSubscribeClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
Expand Down Expand Up @@ -104,6 +194,7 @@ func TestSubscribeClientIntegration(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
res, err := o.StartStreamValues(ctx)
require.Equal(t, o.State(), opcua.Connected)
require.NoError(t, err)

for {
Expand Down Expand Up @@ -299,6 +390,7 @@ endpoint = "opc.tcp://localhost:4840"
connect_timeout = "10s"
request_timeout = "5s"
subscription_interval = "200ms"
connect_fail_behavior = "error"
security_policy = "auto"
security_mode = "auto"
certificate = "/etc/telegraf/cert.pem"
Expand Down Expand Up @@ -347,6 +439,7 @@ additional_valid_status_codes = ["0xC0"]
require.Equal(t, config.Duration(10*time.Second), o.SubscribeClientConfig.ConnectTimeout)
require.Equal(t, config.Duration(5*time.Second), o.SubscribeClientConfig.RequestTimeout)
require.Equal(t, config.Duration(200*time.Millisecond), o.SubscribeClientConfig.SubscriptionInterval)
require.Equal(t, "error", o.SubscribeClientConfig.ConnectFailBehavior)
require.Equal(t, "auto", o.SubscribeClientConfig.SecurityPolicy)
require.Equal(t, "auto", o.SubscribeClientConfig.SecurityMode)
require.Equal(t, "/etc/telegraf/cert.pem", o.SubscribeClientConfig.Certificate)
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/opcua_listener/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#
## Behavior when we fail to connect to the endpoint on initialization. Valid options are:
## "error": throw an error and exits Telegraf
## "ignore": ignore this plugin if errors are encountered
# "retry": retry connecting at each interval
# connect_fail_behavior = "error"
#
## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"
#
Expand Down
13 changes: 13 additions & 0 deletions plugins/inputs/opcua_listener/subscribe_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
opcuaclient "github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
)

type SubscribeClientConfig struct {
input.InputClientConfig
SubscriptionInterval config.Duration `toml:"subscription_interval"`
ConnectFailBehavior string `toml:"connect_fail_behavior"`
}

type SubscribeClient struct {
Expand Down Expand Up @@ -133,6 +135,9 @@ func (o *SubscribeClient) Connect() error {

func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} {
o.Log.Debugf("Stopping OPC subscription...")
if o.State() != opcuaclient.Connected {
return nil
}
if o.sub != nil {
if err := o.sub.Cancel(ctx); err != nil {
o.Log.Warn("Cancelling OPC UA subscription failed with error ", err)
Expand All @@ -150,6 +155,14 @@ func (o *SubscribeClient) CurrentValues() ([]telegraf.Metric, error) {
func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) {
err := o.Connect()
if err != nil {
switch o.Config.ConnectFailBehavior {
case "retry":
o.Log.Warnf("Failed to connect to OPC UA server %s. Will attempt to connect again at the next interval: %s", o.Config.Endpoint, err)
return nil, nil
case "ignore":
o.Log.Errorf("Failed to connect to OPC UA server %s. Will not retry: %s", o.Config.Endpoint, err)
return nil, nil
}
return nil, err
}

Expand Down

0 comments on commit 7df1954

Please sign in to comment.