diff --git a/go.mod b/go.mod index 271d002ed..c9f51363e 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ toolchain go1.22.10 require ( cloud.google.com/go/pubsub v1.45.3 github.com/DATA-DOG/go-sqlmock v1.5.2 - github.com/XSAM/otelsql v0.34.0 - github.com/alicebob/miniredis/v2 v2.33.0 + github.com/XSAM/otelsql v0.36.0 + github.com/alicebob/miniredis/v2 v2.34.0 github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/go-redis/redismock/v9 v9.2.0 github.com/go-sql-driver/mysql v1.8.1 @@ -25,11 +25,11 @@ require ( github.com/redis/go-redis/v9 v9.7.0 github.com/segmentio/kafka-go v0.4.47 github.com/stretchr/testify v1.10.0 - go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0 + go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/otel v1.33.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 - go.opentelemetry.io/otel/exporters/prometheus v0.52.0 + go.opentelemetry.io/otel/exporters/prometheus v0.55.0 go.opentelemetry.io/otel/exporters/zipkin v1.33.0 go.opentelemetry.io/otel/metric v1.33.0 go.opentelemetry.io/otel/sdk v1.33.0 @@ -40,10 +40,10 @@ require ( golang.org/x/sync v0.10.0 golang.org/x/term v0.27.0 golang.org/x/text v0.21.0 - google.golang.org/api v0.212.0 + google.golang.org/api v0.214.0 google.golang.org/grpc v1.68.1 - google.golang.org/protobuf v1.36.0 - modernc.org/sqlite v1.34.2 + google.golang.org/protobuf v1.36.1 + modernc.org/sqlite v1.34.4 ) require ( @@ -53,7 +53,7 @@ require ( cloud.google.com/go/compute/metadata v0.6.0 // indirect cloud.google.com/go/iam v1.2.2 // indirect filippo.io/edwards25519 v1.1.0 // indirect - github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -78,7 +78,7 @@ require ( github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.59.1 // indirect + github.com/prometheus/common v0.61.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect @@ -91,7 +91,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect go.opentelemetry.io/proto/otlp v1.4.0 // indirect golang.org/x/crypto v0.31.0 // indirect - golang.org/x/net v0.32.0 // indirect + golang.org/x/net v0.33.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/time v0.8.0 // indirect google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect diff --git a/go.sum b/go.sum index e3525dd67..0cde7040f 100644 --- a/go.sum +++ b/go.sum @@ -20,12 +20,12 @@ filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= -github.com/XSAM/otelsql v0.34.0 h1:YdCRKy17Xn0MH717LEwqpVL/a+4nexmSCBrgoycYY6E= -github.com/XSAM/otelsql v0.34.0/go.mod h1:xaE+ybu+kJOYvtDyThbe0VoKWngvKHmNlrM1rOn8f94= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA= -github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= +github.com/XSAM/otelsql v0.36.0 h1:SvrlOd/Hp0ttvI9Hu0FUWtISTTDNhQYwxe8WB4J5zxo= +github.com/XSAM/otelsql v0.36.0/go.mod h1:fo4M8MU+fCn/jDfu+JwTQ0n6myv4cZ+FU5VxrllIlxY= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.34.0 h1:mBFWMaJSNL9RwdGRyEDoAAv8OQc5UlEhLDQggTglU/0= +github.com/alicebob/miniredis/v2 v2.34.0/go.mod h1:kWShP4b58T1CW0Y5dViCd5ztzrDqRWqM3nksiyXk5s8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -167,8 +167,8 @@ github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/j github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.59.1 h1:LXb1quJHWm1P6wq/U824uxYi4Sg0oGvNeUm1z5dJoX0= -github.com/prometheus/common v0.59.1/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0= +github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFSRQQ= +github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 h1:BIx9TNZH/Jsr4l1i7VVxnV0JPiwYj8qyrHyuL0fGZrk= @@ -218,8 +218,8 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= -go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0 h1:4BZHA+B1wXEQoGNHxW8mURaLhcdGwvRnmhGbm+odRbc= -go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0/go.mod h1:3qi2EEwMgB4xnKgPLqsDP3j9qxnHDZeHsnAxfjQqTko= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0 h1:xwH3QJv6zL4u+gkPUu59NeT1Gyw9nScWT8FQpKLUJJI= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0/go.mod h1:uosvgpqTcTXtcPQORTbEkZNDQTCDOgTz1fe6aLSyqrQ= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q= go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= @@ -228,8 +228,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 h1:Vh5HayB/0HHfOQA7Ctx go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0/go.mod h1:cpgtDBaqD/6ok/UG0jT15/uKjAY8mRA53diogHBg3UI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 h1:5pojmb1U1AogINhN3SurB+zm/nIcusopeBNp42f45QM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0/go.mod h1:57gTHJSE5S1tqg+EKsLPlTWhpHMsWlVmer+LA926XiA= -go.opentelemetry.io/otel/exporters/prometheus v0.52.0 h1:kmU3H0b9ufFSi8IQCcxack+sWUblKkFbqWYs6YiACGQ= -go.opentelemetry.io/otel/exporters/prometheus v0.52.0/go.mod h1:+wsAp2+JhuGXX7YRkjlkx6hyWY3ogFPfNA4x3nyiAh0= +go.opentelemetry.io/otel/exporters/prometheus v0.55.0 h1:sSPw658Lk2NWAv74lkD3B/RSDb+xRFx46GjkrL3VUZo= +go.opentelemetry.io/otel/exporters/prometheus v0.55.0/go.mod h1:nC00vyCmQixoeaxF6KNyP42II/RHa9UdruK02qBmHvI= go.opentelemetry.io/otel/exporters/zipkin v1.33.0 h1:aFexjEJIw5kVz6vQwnsqCG/nTV/UpsZh7MtQwGmH1eI= go.opentelemetry.io/otel/exporters/zipkin v1.33.0/go.mod h1:aYsOzr/SZwZXJM6DJmSP/ST2P7MYxuc0R9RewkFVp9s= go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ= @@ -282,8 +282,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= -golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= @@ -347,8 +347,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/api v0.212.0 h1:BcRj3MJfHF3FYD29rk7u9kuu1SyfGqfHcA0hSwKqkHg= -google.golang.org/api v0.212.0/go.mod h1:gICpLlpp12/E8mycRMzgy3SQ9cFh2XnVJ6vJi/kQbvI= +google.golang.org/api v0.214.0 h1:h2Gkq07OYi6kusGOaT/9rnNljuXmqPnaig7WGPmKbwA= +google.golang.org/api v0.214.0/go.mod h1:bYPpLG8AyeMWwDU6NXoB00xC0DFkikVvd5MfwoxjLqE= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -378,8 +378,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= -google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= @@ -416,8 +416,8 @@ modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc= modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= -modernc.org/sqlite v1.34.2 h1:J9n76TPsfYYkFkZ9Uy1QphILYifiVEwwOT7yP5b++2Y= -modernc.org/sqlite v1.34.2/go.mod h1:dnR723UrTtjKpoHCAMN0Q/gZ9MT4r+iRvIBb9umWFkU= +modernc.org/sqlite v1.34.4 h1:sjdARozcL5KJBvYQvLlZEmctRgW9xqIZc2ncN7PU0P8= +modernc.org/sqlite v1.34.4/go.mod h1:3QQFCG2SEMtc2nv+Wq4cQCH7Hjcg+p/RMlS1XK+zwbk= modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= diff --git a/pkg/gofr/datasource/pubsub/mqtt/default_client.go b/pkg/gofr/datasource/pubsub/mqtt/default_client.go new file mode 100644 index 000000000..8afd772aa --- /dev/null +++ b/pkg/gofr/datasource/pubsub/mqtt/default_client.go @@ -0,0 +1,81 @@ +package mqtt + +import ( + "fmt" + "sync" + + mqtt "github.com/eclipse/paho.mqtt.golang" + + "github.com/google/uuid" +) + +func getDefaultClient(config *Config, logger Logger, metrics Metrics) *MQTT { + var ( + host = publicBroker + port = 1883 + clientID = getClientID(config.ClientID) + ) + + if config.Username == "gofr-mqtt-test" { + host = "test.mosquitto.org" + } + + opts := mqtt.NewClientOptions() + opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port)) + opts.SetClientID(clientID) + opts.SetAutoReconnect(true) + opts.SetKeepAlive(config.KeepAlive) + client := mqtt.NewClient(opts) + + if token := client.Connect(); token.Wait() && token.Error() != nil { + logger.Errorf("could not connect to MQTT at '%v:%v', error: %v", config.Hostname, config.Port, token.Error()) + + return &MQTT{Client: client, config: config, logger: logger, mu: new(sync.RWMutex), metrics: metrics} + } + + config.Hostname = host + config.Port = port + config.ClientID = clientID + + msg := make(map[string]subscription) + + logger.Infof("connected to MQTT at '%v:%v' with clientID '%v'", config.Hostname, config.Port, clientID) + + return &MQTT{Client: client, config: config, logger: logger, subscriptions: msg, mu: new(sync.RWMutex), metrics: metrics} +} + +func getMQTTClientOptions(config *Config) *mqtt.ClientOptions { + options := mqtt.NewClientOptions() + options.AddBroker(fmt.Sprintf("%s://%s:%d", config.Protocol, config.Hostname, config.Port)) + + clientID := getClientID(config.ClientID) + options.SetClientID(clientID) + + if config.Username != "" { + options.SetUsername(config.Username) + } + + if config.Password != "" { + options.SetPassword(config.Password) + } + + options.SetOrderMatters(config.Order) + options.SetResumeSubs(config.RetrieveRetained) + options.SetAutoReconnect(true) + options.SetKeepAlive(config.KeepAlive) + + return options +} + +func getClientID(clientID string) string { + if clientID != "" { + clientID = "-" + clientID + } + + id, err := uuid.NewRandom() + if err != nil { + return "gofr-mqtt-default-client-id" + clientID + } + + return id.String() + clientID +} diff --git a/pkg/gofr/datasource/pubsub/mqtt/mqtt.go b/pkg/gofr/datasource/pubsub/mqtt/mqtt.go index d091d39b0..68436c949 100644 --- a/pkg/gofr/datasource/pubsub/mqtt/mqtt.go +++ b/pkg/gofr/datasource/pubsub/mqtt/mqtt.go @@ -3,14 +3,12 @@ package mqtt import ( "context" "errors" - "fmt" "math" "strconv" "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/google/uuid" "go.opentelemetry.io/otel" "gofr.dev/pkg/gofr/datasource" @@ -88,73 +86,6 @@ func New(config *Config, logger Logger, metrics Metrics) *MQTT { return &MQTT{Client: client, config: config, logger: logger, subscriptions: subs, mu: mu, metrics: metrics} } -func getDefaultClient(config *Config, logger Logger, metrics Metrics) *MQTT { - var ( - host = publicBroker - port = 1883 - clientID = getClientID(config.ClientID) - ) - - opts := mqtt.NewClientOptions() - opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port)) - opts.SetClientID(clientID) - opts.SetAutoReconnect(true) - opts.SetKeepAlive(config.KeepAlive) - client := mqtt.NewClient(opts) - - if token := client.Connect(); token.Wait() && token.Error() != nil { - logger.Errorf("could not connect to MQTT at '%v:%v', error: %v", config.Hostname, config.Port, token.Error()) - - return &MQTT{Client: client, config: config, logger: logger, mu: new(sync.RWMutex), metrics: metrics} - } - - config.Hostname = host - config.Port = port - config.ClientID = clientID - - msg := make(map[string]subscription) - - logger.Infof("connected to MQTT at '%v:%v' with clientID '%v'", config.Hostname, config.Port, clientID) - - return &MQTT{Client: client, config: config, logger: logger, subscriptions: msg, mu: new(sync.RWMutex), metrics: metrics} -} - -func getMQTTClientOptions(config *Config) *mqtt.ClientOptions { - options := mqtt.NewClientOptions() - options.AddBroker(fmt.Sprintf("%s://%s:%d", config.Protocol, config.Hostname, config.Port)) - - clientID := getClientID(config.ClientID) - options.SetClientID(clientID) - - if config.Username != "" { - options.SetUsername(config.Username) - } - - if config.Password != "" { - options.SetPassword(config.Password) - } - - options.SetOrderMatters(config.Order) - options.SetResumeSubs(config.RetrieveRetained) - options.SetAutoReconnect(true) - options.SetKeepAlive(config.KeepAlive) - - return options -} - -func getClientID(clientID string) string { - if clientID != "" { - clientID = "-" + clientID - } - - id, err := uuid.NewRandom() - if err != nil { - return "gofr-mqtt-default-client-id" + clientID - } - - return id.String() + clientID -} - func (m *MQTT) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) { m.mu.Lock() // get the message channel for the given topic diff --git a/pkg/gofr/datasource/pubsub/mqtt/mqtt_test.go b/pkg/gofr/datasource/pubsub/mqtt/mqtt_test.go index 98460c125..bdb8f3be6 100644 --- a/pkg/gofr/datasource/pubsub/mqtt/mqtt_test.go +++ b/pkg/gofr/datasource/pubsub/mqtt/mqtt_test.go @@ -70,7 +70,7 @@ func TestMQTT_EmptyConfigs(t *testing.T) { out := testutil.StdoutOutputForFunc(func() { mockLogger := logging.NewMockLogger(logging.DEBUG) - client = New(&Config{}, mockLogger, nil) + client = New(&Config{Username: "gofr-mqtt-test"}, mockLogger, nil) }) assert.NotNil(t, client)