@@ -3,6 +3,7 @@ package prober
3
3
import (
4
4
"context"
5
5
"emqx-exporter/config"
6
+ "fmt"
6
7
"sync"
7
8
"time"
8
9
@@ -49,6 +50,7 @@ func init() {
49
50
}
50
51
51
52
func initMQTTProbe (probe config.Probe , logger log.Logger ) (* MQTTProbe , error ) {
53
+ var isReady = make (chan struct {})
52
54
var msgChan = make (chan mqtt.Message )
53
55
54
56
opt := mqtt .NewClientOptions ().AddBroker (probe .Scheme + "://" + probe .Target )
@@ -60,7 +62,8 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
60
62
opt .SetTLSConfig (probe .TLSClientConfig .ToTLSConfig ())
61
63
}
62
64
opt .SetOnConnectHandler (func (c mqtt.Client ) {
63
- level .Info (logger ).Log ("msg" , "Connected to MQTT broker" , "target" , probe .Target )
65
+ optReader := c .OptionsReader ()
66
+ level .Info (logger ).Log ("msg" , "Connected to MQTT broker" , "target" , probe .Target , "client_id" , optReader .ClientID ())
64
67
token := c .Subscribe (probe .Topic , probe .QoS , func (c mqtt.Client , m mqtt.Message ) {
65
68
msgChan <- m
66
69
})
@@ -69,6 +72,7 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
69
72
level .Error (logger ).Log ("msg" , "Failed to subscribe to MQTT topic" , "target" , probe .Target , "topic" , probe .Topic , "qos" , probe .QoS , "err" , token .Error ())
70
73
return
71
74
}
75
+ isReady <- struct {}{}
72
76
level .Info (logger ).Log ("msg" , "Subscribed to MQTT topic" , "target" , probe .Target , "topic" , probe .Topic , "qos" , probe .QoS )
73
77
})
74
78
opt .SetConnectionLostHandler (func (c mqtt.Client , err error ) {
@@ -80,6 +84,12 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
80
84
return nil , token .Error ()
81
85
}
82
86
87
+ select {
88
+ case <- isReady :
89
+ case <- time .After (60 * time .Second ):
90
+ return nil , fmt .Errorf ("MQTT probe connect timeout" )
91
+ }
92
+
83
93
return & MQTTProbe {
84
94
Client : c ,
85
95
MsgChan : msgChan ,
@@ -102,6 +112,7 @@ func ProbeMQTT(probe config.Probe, logger log.Logger) bool {
102
112
return false
103
113
}
104
114
115
+ level .Info (logger ).Log ("msg" , "Publishing MQTT message" , "target" , probe .Target , "topic" , probe .Topic , "qos" , probe .QoS )
105
116
if token := mqttProbe .Client .Publish (probe .Topic , probe .QoS , false , "hello world" ); token .Wait () && token .Error () != nil {
106
117
return false
107
118
}
0 commit comments