Skip to content

Commit

Permalink
Merge branch 'development' into dependabot/go_modules/go.opentelemetr…
Browse files Browse the repository at this point in the history
…y.io/otel/sdk/metric-1.33.0
  • Loading branch information
Umang01-hash authored Dec 24, 2024
2 parents f6e273c + 70abf1a commit abcc26e
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 100 deletions.
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ toolchain go1.22.10
require (
cloud.google.com/go/pubsub v1.45.3
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/XSAM/otelsql v0.34.0
github.com/alicebob/miniredis/v2 v2.33.0
github.com/XSAM/otelsql v0.36.0
github.com/alicebob/miniredis/v2 v2.34.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/go-redis/redismock/v9 v9.2.0
github.com/go-sql-driver/mysql v1.8.1
Expand All @@ -25,11 +25,11 @@ require (
github.com/redis/go-redis/v9 v9.7.0
github.com/segmentio/kafka-go v0.4.47
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0
go.opentelemetry.io/otel v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
go.opentelemetry.io/otel/exporters/prometheus v0.52.0
go.opentelemetry.io/otel/exporters/prometheus v0.55.0
go.opentelemetry.io/otel/exporters/zipkin v1.33.0
go.opentelemetry.io/otel/metric v1.33.0
go.opentelemetry.io/otel/sdk v1.33.0
Expand All @@ -40,10 +40,10 @@ require (
golang.org/x/sync v0.10.0
golang.org/x/term v0.27.0
golang.org/x/text v0.21.0
google.golang.org/api v0.212.0
google.golang.org/api v0.214.0
google.golang.org/grpc v1.68.1
google.golang.org/protobuf v1.36.0
modernc.org/sqlite v1.34.2
google.golang.org/protobuf v1.36.1
modernc.org/sqlite v1.34.4
)

require (
Expand All @@ -53,7 +53,7 @@ require (
cloud.google.com/go/compute/metadata v0.6.0 // indirect
cloud.google.com/go/iam v1.2.2 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -78,7 +78,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.59.1 // indirect
github.com/prometheus/common v0.61.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
Expand All @@ -91,7 +91,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect
Expand Down
40 changes: 20 additions & 20 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/XSAM/otelsql v0.34.0 h1:YdCRKy17Xn0MH717LEwqpVL/a+4nexmSCBrgoycYY6E=
github.com/XSAM/otelsql v0.34.0/go.mod h1:xaE+ybu+kJOYvtDyThbe0VoKWngvKHmNlrM1rOn8f94=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
github.com/XSAM/otelsql v0.36.0 h1:SvrlOd/Hp0ttvI9Hu0FUWtISTTDNhQYwxe8WB4J5zxo=
github.com/XSAM/otelsql v0.36.0/go.mod h1:fo4M8MU+fCn/jDfu+JwTQ0n6myv4cZ+FU5VxrllIlxY=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.34.0 h1:mBFWMaJSNL9RwdGRyEDoAAv8OQc5UlEhLDQggTglU/0=
github.com/alicebob/miniredis/v2 v2.34.0/go.mod h1:kWShP4b58T1CW0Y5dViCd5ztzrDqRWqM3nksiyXk5s8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -167,8 +167,8 @@ github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/j
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.59.1 h1:LXb1quJHWm1P6wq/U824uxYi4Sg0oGvNeUm1z5dJoX0=
github.com/prometheus/common v0.59.1/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0=
github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFSRQQ=
github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 h1:BIx9TNZH/Jsr4l1i7VVxnV0JPiwYj8qyrHyuL0fGZrk=
Expand Down Expand Up @@ -218,8 +218,8 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI=
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0 h1:4BZHA+B1wXEQoGNHxW8mURaLhcdGwvRnmhGbm+odRbc=
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0/go.mod h1:3qi2EEwMgB4xnKgPLqsDP3j9qxnHDZeHsnAxfjQqTko=
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0 h1:xwH3QJv6zL4u+gkPUu59NeT1Gyw9nScWT8FQpKLUJJI=
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.58.0/go.mod h1:uosvgpqTcTXtcPQORTbEkZNDQTCDOgTz1fe6aLSyqrQ=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q=
go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw=
Expand All @@ -228,8 +228,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 h1:Vh5HayB/0HHfOQA7Ctx
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0/go.mod h1:cpgtDBaqD/6ok/UG0jT15/uKjAY8mRA53diogHBg3UI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 h1:5pojmb1U1AogINhN3SurB+zm/nIcusopeBNp42f45QM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0/go.mod h1:57gTHJSE5S1tqg+EKsLPlTWhpHMsWlVmer+LA926XiA=
go.opentelemetry.io/otel/exporters/prometheus v0.52.0 h1:kmU3H0b9ufFSi8IQCcxack+sWUblKkFbqWYs6YiACGQ=
go.opentelemetry.io/otel/exporters/prometheus v0.52.0/go.mod h1:+wsAp2+JhuGXX7YRkjlkx6hyWY3ogFPfNA4x3nyiAh0=
go.opentelemetry.io/otel/exporters/prometheus v0.55.0 h1:sSPw658Lk2NWAv74lkD3B/RSDb+xRFx46GjkrL3VUZo=
go.opentelemetry.io/otel/exporters/prometheus v0.55.0/go.mod h1:nC00vyCmQixoeaxF6KNyP42II/RHa9UdruK02qBmHvI=
go.opentelemetry.io/otel/exporters/zipkin v1.33.0 h1:aFexjEJIw5kVz6vQwnsqCG/nTV/UpsZh7MtQwGmH1eI=
go.opentelemetry.io/otel/exporters/zipkin v1.33.0/go.mod h1:aYsOzr/SZwZXJM6DJmSP/ST2P7MYxuc0R9RewkFVp9s=
go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ=
Expand Down Expand Up @@ -282,8 +282,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI=
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE=
golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
Expand Down Expand Up @@ -347,8 +347,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.212.0 h1:BcRj3MJfHF3FYD29rk7u9kuu1SyfGqfHcA0hSwKqkHg=
google.golang.org/api v0.212.0/go.mod h1:gICpLlpp12/E8mycRMzgy3SQ9cFh2XnVJ6vJi/kQbvI=
google.golang.org/api v0.214.0 h1:h2Gkq07OYi6kusGOaT/9rnNljuXmqPnaig7WGPmKbwA=
google.golang.org/api v0.214.0/go.mod h1:bYPpLG8AyeMWwDU6NXoB00xC0DFkikVvd5MfwoxjLqE=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
Expand Down Expand Up @@ -378,8 +378,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
Expand Down Expand Up @@ -416,8 +416,8 @@ modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc=
modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss=
modernc.org/sqlite v1.34.2 h1:J9n76TPsfYYkFkZ9Uy1QphILYifiVEwwOT7yP5b++2Y=
modernc.org/sqlite v1.34.2/go.mod h1:dnR723UrTtjKpoHCAMN0Q/gZ9MT4r+iRvIBb9umWFkU=
modernc.org/sqlite v1.34.4 h1:sjdARozcL5KJBvYQvLlZEmctRgW9xqIZc2ncN7PU0P8=
modernc.org/sqlite v1.34.4/go.mod h1:3QQFCG2SEMtc2nv+Wq4cQCH7Hjcg+p/RMlS1XK+zwbk=
modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
Expand Down
81 changes: 81 additions & 0 deletions pkg/gofr/datasource/pubsub/mqtt/default_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package mqtt

import (
"fmt"
"sync"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/google/uuid"
)

func getDefaultClient(config *Config, logger Logger, metrics Metrics) *MQTT {
var (
host = publicBroker
port = 1883
clientID = getClientID(config.ClientID)
)

if config.Username == "gofr-mqtt-test" {
host = "test.mosquitto.org"
}

opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
opts.SetClientID(clientID)
opts.SetAutoReconnect(true)
opts.SetKeepAlive(config.KeepAlive)
client := mqtt.NewClient(opts)

if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Errorf("could not connect to MQTT at '%v:%v', error: %v", config.Hostname, config.Port, token.Error())

return &MQTT{Client: client, config: config, logger: logger, mu: new(sync.RWMutex), metrics: metrics}
}

config.Hostname = host
config.Port = port
config.ClientID = clientID

msg := make(map[string]subscription)

logger.Infof("connected to MQTT at '%v:%v' with clientID '%v'", config.Hostname, config.Port, clientID)

return &MQTT{Client: client, config: config, logger: logger, subscriptions: msg, mu: new(sync.RWMutex), metrics: metrics}
}

func getMQTTClientOptions(config *Config) *mqtt.ClientOptions {
options := mqtt.NewClientOptions()
options.AddBroker(fmt.Sprintf("%s://%s:%d", config.Protocol, config.Hostname, config.Port))

clientID := getClientID(config.ClientID)
options.SetClientID(clientID)

if config.Username != "" {
options.SetUsername(config.Username)
}

if config.Password != "" {
options.SetPassword(config.Password)
}

options.SetOrderMatters(config.Order)
options.SetResumeSubs(config.RetrieveRetained)
options.SetAutoReconnect(true)
options.SetKeepAlive(config.KeepAlive)

return options
}

func getClientID(clientID string) string {
if clientID != "" {
clientID = "-" + clientID
}

id, err := uuid.NewRandom()
if err != nil {
return "gofr-mqtt-default-client-id" + clientID
}

return id.String() + clientID
}
69 changes: 0 additions & 69 deletions pkg/gofr/datasource/pubsub/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ package mqtt
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"sync"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
"go.opentelemetry.io/otel"

"gofr.dev/pkg/gofr/datasource"
Expand Down Expand Up @@ -88,73 +86,6 @@ func New(config *Config, logger Logger, metrics Metrics) *MQTT {
return &MQTT{Client: client, config: config, logger: logger, subscriptions: subs, mu: mu, metrics: metrics}
}

func getDefaultClient(config *Config, logger Logger, metrics Metrics) *MQTT {
var (
host = publicBroker
port = 1883
clientID = getClientID(config.ClientID)
)

opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
opts.SetClientID(clientID)
opts.SetAutoReconnect(true)
opts.SetKeepAlive(config.KeepAlive)
client := mqtt.NewClient(opts)

if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Errorf("could not connect to MQTT at '%v:%v', error: %v", config.Hostname, config.Port, token.Error())

return &MQTT{Client: client, config: config, logger: logger, mu: new(sync.RWMutex), metrics: metrics}
}

config.Hostname = host
config.Port = port
config.ClientID = clientID

msg := make(map[string]subscription)

logger.Infof("connected to MQTT at '%v:%v' with clientID '%v'", config.Hostname, config.Port, clientID)

return &MQTT{Client: client, config: config, logger: logger, subscriptions: msg, mu: new(sync.RWMutex), metrics: metrics}
}

func getMQTTClientOptions(config *Config) *mqtt.ClientOptions {
options := mqtt.NewClientOptions()
options.AddBroker(fmt.Sprintf("%s://%s:%d", config.Protocol, config.Hostname, config.Port))

clientID := getClientID(config.ClientID)
options.SetClientID(clientID)

if config.Username != "" {
options.SetUsername(config.Username)
}

if config.Password != "" {
options.SetPassword(config.Password)
}

options.SetOrderMatters(config.Order)
options.SetResumeSubs(config.RetrieveRetained)
options.SetAutoReconnect(true)
options.SetKeepAlive(config.KeepAlive)

return options
}

func getClientID(clientID string) string {
if clientID != "" {
clientID = "-" + clientID
}

id, err := uuid.NewRandom()
if err != nil {
return "gofr-mqtt-default-client-id" + clientID
}

return id.String() + clientID
}

func (m *MQTT) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) {
m.mu.Lock()
// get the message channel for the given topic
Expand Down
2 changes: 1 addition & 1 deletion pkg/gofr/datasource/pubsub/mqtt/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestMQTT_EmptyConfigs(t *testing.T) {

out := testutil.StdoutOutputForFunc(func() {
mockLogger := logging.NewMockLogger(logging.DEBUG)
client = New(&Config{}, mockLogger, nil)
client = New(&Config{Username: "gofr-mqtt-test"}, mockLogger, nil)
})

assert.NotNil(t, client)
Expand Down

0 comments on commit abcc26e

Please sign in to comment.