From 46c5f10280a080495f81167cc51f18984d797367 Mon Sep 17 00:00:00 2001 From: shoy Date: Tue, 7 Mar 2023 16:10:58 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E8=AE=A4?= =?UTF-8?q?=E8=AF=81=E4=BE=A7=E4=BB=A5=E5=8F=8A=E7=AE=A1=E7=90=86=E4=BE=A7?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E5=8F=91=E5=B8=83=E8=AE=A2=E9=98=85=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- authentication/authentication_client.go | 66 ++++++++++- authentication/authentication_client_test.go | 56 ++++++++- authentication/authentication_options.go | 4 + constant/base_constant.go | 2 + dto/EventReqDto.go | 19 +++ go.mod | 1 + go.sum | 2 + management/management_client.go | 56 +++++++++ management/management_client_options.go | 10 +- management/management_client_test.go | 54 ++++++++- management/management_http_client.go | 7 +- util/signature_utils.go | 59 ++++++++++ util/websocket_utils.go | 115 +++++++++++++++++++ 13 files changed, 437 insertions(+), 14 deletions(-) create mode 100644 dto/EventReqDto.go create mode 100644 util/signature_utils.go create mode 100644 util/websocket_utils.go diff --git a/authentication/authentication_client.go b/authentication/authentication_client.go index 92bd80d..869c84f 100644 --- a/authentication/authentication_client.go +++ b/authentication/authentication_client.go @@ -5,10 +5,11 @@ import ( "encoding/json" "errors" "fmt" + "strings" + "github.com/Authing/authing-golang-sdk/v3/constant" "github.com/Authing/authing-golang-sdk/v3/dto" "github.com/Authing/authing-golang-sdk/v3/util" - "strings" keyfunc "github.com/MicahParks/compatibility-keyfunc" "github.com/dgrijalva/jwt-go" @@ -25,8 +26,9 @@ var commonHeaders = map[string]string{ } type AuthenticationClient struct { - options *AuthenticationClientOptions - jwks *keyfunc.JWKS + options *AuthenticationClientOptions + jwks *keyfunc.JWKS + eventHub *util.WebSocketEventHub } func NewAuthenticationClient(options *AuthenticationClientOptions) (*AuthenticationClient, error) { @@ -39,6 +41,9 @@ func NewAuthenticationClient(options *AuthenticationClientOptions) (*Authenticat if options.AppHost == "" { return nil, errors.New("AppHost 不能为空") } + if options.WssHost == "" { + options.WssHost = constant.WebSocketHost + } if options.RedirectUri == "" { return nil, errors.New("RedirectUri 不能为空") } @@ -56,7 +61,8 @@ func NewAuthenticationClient(options *AuthenticationClientOptions) (*Authenticat } client := &AuthenticationClient{ - options: options, + options: options, + eventHub: util.NewWebSocketEvent(), } return client, nil @@ -2204,3 +2210,55 @@ func (client *AuthenticationClient) getUserAuthResourceStruct(reqDto *dto.GetUse } return &response } + +/* + * @summary 事件发布 + * @description 根据事件编码发布一个自定义事件 + * @param eventCode 事件编码 + * @param body 事件消息 + * @returns IsSuccessRespDto + */ +func (client *AuthenticationClient) PubEvent(eventCode string, data interface{}) *dto.IsSuccessRespDto { + var reqDto = dto.NewEventReqDto(eventCode, data) + b, err := client.SendHttpRequest("/api/v3/pub-userEvent", fasthttp.MethodPost, reqDto) + var response dto.IsSuccessRespDto + if err != nil { + fmt.Println(err) + return nil + } + err = json.Unmarshal(b, &response) + if err != nil { + fmt.Println(err) + return nil + } + return &response +} + +/* + * @summary 事件订阅 + * @description 根据事件编码订阅一个自定义事件 + * @param eventCode 事件编码 + * @param onSuccess 成功的消息 + * @param onError 异常处理 + */ +func (client *AuthenticationClient) SubEvent(eventCode string, onSuccess func(msg []byte), onError func(err error)) { + var options = client.options + token := options.AccessToken + // fmt.Println(token) + if !client.eventHub.CreateAuthentication(eventCode, options.WssHost, token) { + return + } + client.eventHub.AddReceiver(eventCode, onSuccess, onError) + // recv message exec corresponding callback function + go client.eventHub.StartReceive(eventCode) +} + +/* + * @summary 事件订阅 + * @description 根据事件编码订阅一个自定义事件 + * @param eventCode 事件编码 + * @param receiver 消息处理器 + */ +func (client *AuthenticationClient) SubEventByReceiver(eventCode string, receiver util.EventReceiver) { + client.SubEvent(eventCode, receiver.OnSuccess, receiver.OnError) +} diff --git a/authentication/authentication_client_test.go b/authentication/authentication_client_test.go index 6223342..10d39b9 100644 --- a/authentication/authentication_client_test.go +++ b/authentication/authentication_client_test.go @@ -2,6 +2,7 @@ package authentication import ( "fmt" + "github.com/Authing/authing-golang-sdk/v3/constant" "github.com/Authing/authing-golang-sdk/v3/dto" @@ -14,11 +15,12 @@ import ( var authenticationClient *AuthenticationClient var options = AuthenticationClientOptions{ - AppId: "", - AppSecret: "", - AppHost: "", - RedirectUri: "http://localhost:8989", - InsecureSkipVerify: true, + AppId: "", + AppSecret: "", + AppHost: "", + WssHost: "ws://localhost:88", + RedirectUri: "http://localhost:8989", + AccessToken: "", } const idToken = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI2M2IzZWE3Mzk0MDc0YWE1ZTE5YmIzMGMiLCJhdWQiOiI2M2IzZWE1YmM5NjY0YzlkMDhkMTkwYzgiLCJpYXQiOjE2NzI3MzY2NjcsImV4cCI6MTY3Mzk0NjI2NywiaXNzIjoiaHR0cHM6Ly9nby11c2VyLXBlcm1pc3Npb24tYXV0aC5hdXRoaW5nLmNuL29pZGMiLCJub25jZSI6InJtZ0pab3RxNGkiLCJuYW1lIjpudWxsLCJnaXZlbl9uYW1lIjpudWxsLCJtaWRkbGVfbmFtZSI6bnVsbCwiZmFtaWx5X25hbWUiOm51bGwsIm5pY2tuYW1lIjpudWxsLCJwcmVmZXJyZWRfdXNlcm5hbWUiOm51bGwsInByb2ZpbGUiOm51bGwsInBpY3R1cmUiOiJodHRwczovL2ZpbGVzLmF1dGhpbmcuY28vYXV0aGluZy1jb25zb2xlL2RlZmF1bHQtdXNlci1hdmF0YXIucG5nIiwid2Vic2l0ZSI6bnVsbCwiYmlydGhkYXRlIjpudWxsLCJnZW5kZXIiOiJVIiwiem9uZWluZm8iOm51bGwsImxvY2FsZSI6bnVsbCwidXBkYXRlZF9hdCI6IjIwMjMtMDEtMDNUMDg6NTQ6NDUuMzMwWiIsImVtYWlsIjoidGVzdEBleGFtcGxlLmNvbSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwicGhvbmVfbnVtYmVyIjpudWxsLCJwaG9uZV9udW1iZXJfdmVyaWZpZWQiOmZhbHNlfQ.3awp567aJ3wBXR0mh0l1oBTugTNsDqYpJVIaDTeHXbI\n" @@ -262,3 +264,47 @@ func TestAuthenticationClient_GetUserAuthResourceStruct(t *testing.T) { fmt.Println(response) } + +type UserEvent struct { + Id string `json:"id"` + Name string `json:"name"` +} + +func TestClient_PubEvent(t *testing.T) { + var data = &UserEvent{ + Id: "232323232", + Name: "golang-authentication-test", + } + var resp = authenticationClient.PubEvent("custom_xrgu.user_create", data) + fmt.Println(resp) +} + +type Receiver1 struct{} + +func (receiver *Receiver1) OnSuccess(msg []byte) { + fmt.Println(string(msg)) +} +func (receiver *Receiver1) OnError(err error) { + fmt.Println(err) +} + +func TestClient_SubEvent(t *testing.T) { + ErrChan := make(chan error, 1) + receiver := &Receiver1{} + authenticationClient.SubEventByReceiver("custom_xrgu.user_create", receiver) + + authenticationClient.SubEvent("custom_xrgu.user_create", func(msg []byte) { + fmt.Println(string(msg) + "222") + }, func(err error) { + fmt.Println(err) + ErrChan <- err + }) + + authenticationClient.SubEvent("custom_xrgu.user_create", func(msg []byte) { + fmt.Println(string(msg) + "333") + }, func(err error) { + fmt.Println(err) + ErrChan <- err + }) + <-ErrChan +} diff --git a/authentication/authentication_options.go b/authentication/authentication_options.go index 8470d63..a256403 100644 --- a/authentication/authentication_options.go +++ b/authentication/authentication_options.go @@ -77,6 +77,10 @@ type AuthenticationClientOptions struct { 是否跳过 HTTPS 证书检测,默认为 false;如果是私有化部署的场景且证书不被信任,可以设置为 true */ InsecureSkipVerify bool + /** + * 订阅事件 WebSocket 地址 + */ + WssHost string } type AuthUrlResult struct { diff --git a/constant/base_constant.go b/constant/base_constant.go index 860f065..011c730 100644 --- a/constant/base_constant.go +++ b/constant/base_constant.go @@ -13,4 +13,6 @@ const ( StringEmpty = "" None = "none" + + WebSocketHost = "wss://events.authing.com" ) diff --git a/dto/EventReqDto.go b/dto/EventReqDto.go new file mode 100644 index 0000000..62152f9 --- /dev/null +++ b/dto/EventReqDto.go @@ -0,0 +1,19 @@ +package dto + +import "encoding/json" + +type EventReqDto struct { + EventType string `json:"eventType"` + EventData string `json:"eventData"` +} + +func NewEventReqDto(eventCode string, eventData interface{}) *EventReqDto { + data, err := json.Marshal(eventData) + if err != nil { + panic(err) + } + return &EventReqDto{ + EventType: eventCode, + EventData: string(data), + } +} diff --git a/go.mod b/go.mod index 7a91b15..64741f5 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.8 require ( github.com/MicahParks/compatibility-keyfunc v0.14.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/gorilla/websocket v1.5.0 // indirect github.com/klauspost/compress v1.15.2 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible github.com/valyala/fasthttp v1.36.0 diff --git a/go.sum b/go.sum index 5d86bde..94ec094 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9mImVMaBPw9L/0TBHU8= github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.2 h1:3WH+AG7s2+T8o3nrM/8u2rdqUEcQhmga7smjrT41nAw= github.com/klauspost/compress v1.15.2/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= diff --git a/management/management_client.go b/management/management_client.go index a35dd9c..351ba49 100644 --- a/management/management_client.go +++ b/management/management_client.go @@ -3,7 +3,9 @@ package management import ( "encoding/json" "fmt" + "github.com/Authing/authing-golang-sdk/v3/dto" + "github.com/Authing/authing-golang-sdk/v3/util" "github.com/valyala/fasthttp" ) @@ -6547,3 +6549,57 @@ func (client *ManagementClient) UpdateAccessKey(reqDto *dto.UpdateAccessKeyDto) } return &response } + +/* + * @summary 事件发布 + * @description 根据事件编码发布一个自定义事件 + * @param eventCode 事件编码 + * @param body 事件消息 + * @returns IsSuccessRespDto + */ +func (client *ManagementClient) PubEvent(eventCode string, data interface{}) *dto.IsSuccessRespDto { + var reqDto = dto.NewEventReqDto(eventCode, data) + b, err := client.SendHttpRequest("/api/v3/pub-event", fasthttp.MethodPost, reqDto) + var response dto.IsSuccessRespDto + if err != nil { + fmt.Println(err) + return nil + } + err = json.Unmarshal(b, &response) + if err != nil { + fmt.Println(err) + return nil + } + return &response +} + +/* + * @summary 事件订阅 + * @description 根据事件编码订阅一个自定义事件 + * @param eventCode 事件编码 + * @param onSuccess 成功的消息 + * @param onError 失败处理 + */ +func (client *ManagementClient) SubEvent(eventCode string, onSuccess func(msg []byte), onError func(err error)) { + var options = client.options + defMap := make(map[string]string, 0) + stringToSign := util.ComposeStringToSign("websocket", "", defMap, defMap) + token := util.GetAuthorization(client.options.AccessKeyId, client.options.AccessKeySecret, stringToSign) + // fmt.Println(token) + if !client.eventHub.CreateManagement(eventCode, options.WssHost, token) { + return + } + client.eventHub.AddReceiver(eventCode, onSuccess, onError) + // recv message exec corresponding callback function + go client.eventHub.StartReceive(eventCode) +} + +/* + * @summary 事件订阅 + * @description 根据事件编码订阅一个自定义事件 + * @param eventCode 事件编码 + * @param receiver 消息处理器 + */ +func (client *ManagementClient) SubEventByReceiver(eventCode string, receiver util.EventReceiver) { + client.SubEvent(eventCode, receiver.OnSuccess, receiver.OnError) +} diff --git a/management/management_client_options.go b/management/management_client_options.go index 8fcb80e..50c53e6 100644 --- a/management/management_client_options.go +++ b/management/management_client_options.go @@ -1,14 +1,17 @@ package management import ( - "github.com/Authing/authing-golang-sdk/v3/constant" "net/http" + + "github.com/Authing/authing-golang-sdk/v3/constant" + "github.com/Authing/authing-golang-sdk/v3/util" ) type ManagementClient struct { HttpClient *http.Client options *ManagementClientOptions userPoolId string + eventHub *util.WebSocketEventHub } type ManagementClientOptions struct { @@ -22,12 +25,16 @@ type ManagementClientOptions struct { 是否跳过 HTTPS 证书检测,默认为 false;如果是私有化部署的场景且证书不被信任,可以设置为 true */ InsecureSkipVerify bool + WssHost string } func NewManagementClient(options *ManagementClientOptions) (*ManagementClient, error) { if options.Host == "" { options.Host = constant.ApiServiceUrl } + if options.WssHost == "" { + options.WssHost = constant.WebSocketHost + } c := &ManagementClient{ options: options, } @@ -42,5 +49,6 @@ func NewManagementClient(options *ManagementClientOptions) (*ManagementClient, e ) c.HttpClient = oauth2.NewManagementClient(context.Background(), src)*/ } + c.eventHub = util.NewWebSocketEvent() return c, nil } diff --git a/management/management_client_test.go b/management/management_client_test.go index 1157263..1f91744 100644 --- a/management/management_client_test.go +++ b/management/management_client_test.go @@ -2,8 +2,9 @@ package management import ( "fmt" - "github.com/Authing/authing-golang-sdk/v3/dto" "testing" + + "github.com/Authing/authing-golang-sdk/v3/dto" ) var client *ManagementClient @@ -16,6 +17,7 @@ func init() { AccessKeyId: "63a517e42e4a0aa457cd0b2d", AccessKeySecret: "1b4ee0b200838618d30d4f385c8c3836", Host: "http://127.0.0.1:3000", + WssHost: "ws://127.0.0.1:88", } var err error client, err = NewManagementClient(&options) @@ -1621,3 +1623,53 @@ func TestClient_CheckUserSameLevelPermission(t *testing.T) { response := client.CheckUserSameLevelPermission(&request) fmt.Println(response) } + +type UserEvent struct { + Id string `json:"id"` + Name string `json:"name"` +} + +func TestClient_PubEvent(t *testing.T) { + var data = &UserEvent{ + Id: "563434342323", + Name: "golang-test", + } + var resp = client.PubEvent("custom_xrgu.user_create", data) + fmt.Println(resp) +} + +type Receiver1 struct{} + +func (receiver *Receiver1) OnSuccess(msg []byte) { + fmt.Println(string(msg)) +} +func (receiver *Receiver1) OnError(err error) { + fmt.Println(err) +} + +func TestClient_SubEvent(t *testing.T) { + ErrChan := make(chan error, 1) + + client.SubEventByReceiver("custom_xrgu.user_create", &Receiver1{}) + client.SubEvent("custom_xrgu.user_create", func(msg []byte) { + fmt.Println(string(msg)) + }, func(err error) { + fmt.Println(err) + ErrChan <- err + }) + + client.SubEvent("custom_xrgu.user_create", func(msg []byte) { + fmt.Println(string(msg) + "222") + }, func(err error) { + fmt.Println(err) + ErrChan <- err + }) + + client.SubEvent("custom_xrgu.user_create", func(msg []byte) { + fmt.Println(string(msg) + "333") + }, func(err error) { + fmt.Println(err) + ErrChan <- err + }) + <-ErrChan +} diff --git a/management/management_http_client.go b/management/management_http_client.go index cf940e2..bfcf9bb 100644 --- a/management/management_http_client.go +++ b/management/management_http_client.go @@ -4,14 +4,15 @@ import ( "crypto/tls" "encoding/json" "fmt" + url2 "net/url" + "sync" + "time" + "github.com/Authing/authing-golang-sdk/v3/constant" "github.com/Authing/authing-golang-sdk/v3/dto" "github.com/Authing/authing-golang-sdk/v3/util/cache" "github.com/dgrijalva/jwt-go" "github.com/valyala/fasthttp" - url2 "net/url" - "sync" - "time" ) type JwtClaims struct { diff --git a/util/signature_utils.go b/util/signature_utils.go new file mode 100644 index 0000000..ffea563 --- /dev/null +++ b/util/signature_utils.go @@ -0,0 +1,59 @@ +package util + +import ( + "crypto/hmac" + "crypto/sha1" + "encoding/base64" + "sort" + "strings" +) + +func sortForeach(data map[string]string, action func(string, string)) { + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + action(k, data[k]) + } +} + +func signature(secret, stringToSign string) string { + key := []byte(secret) + mac := hmac.New(sha1.New, key) + mac.Write([]byte(stringToSign)) + return base64.StdEncoding.EncodeToString(mac.Sum(nil)) +} + +func ComposeStringToSign(method, uri string, headers, queries map[string]string) string { + stringToSign := method + "\n" + headerLen := len(headers) + if headerLen > 0 { + sortForeach(headers, func(s1, s2 string) { + stringToSign += s1 + ":" + s2 + "\n" + }) + } + uriParts := strings.Split(uri, "?") + stringToSign += uriParts[0] + if len(uriParts) > 1 { + queries[uriParts[1]] = "" + } + if len(queries) > 0 { + stringToSign += "?" + sortForeach(queries, func(s1, s2 string) { + stringToSign += s1 + if s2 != "" { + stringToSign += "=" + s2 + } + stringToSign += "&" + }) + stringToSign = stringToSign[:len(stringToSign)-1] + } + return stringToSign +} + +func GetAuthorization(accessKeyId, accessKeySecret, stringToSign string) string { + signature := signature(accessKeySecret, stringToSign) + return "authing " + accessKeyId + ":" + signature +} diff --git a/util/websocket_utils.go b/util/websocket_utils.go new file mode 100644 index 0000000..5e87dac --- /dev/null +++ b/util/websocket_utils.go @@ -0,0 +1,115 @@ +package util + +import ( + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type WebSocketEventHub struct { + Connections map[string]*websocket.Conn + receiveMap map[string]bool + funcMap sync.Map +} + +type EventReceiver interface { + OnSuccess(msg []byte) + OnError(err error) +} + +type EventReceives struct { + Successes []func(msg []byte) + Errors []func(err error) +} + +// NewWebSocketEvent +func NewWebSocketEvent() *WebSocketEventHub { + return &WebSocketEventHub{ + Connections: make(map[string]*websocket.Conn), + receiveMap: make(map[string]bool), + } +} + +func NewEventReceives() *EventReceives { + return &EventReceives{ + Successes: make([]func(msg []byte), 0), + Errors: make([]func(err error), 0), + } +} + +func (eventHub *WebSocketEventHub) CreateManagement(eventCode string, websocketHost string, token string) bool { + if eventHub.Connections[eventCode] == nil { + var socketUri = fmt.Sprintf("%s/events/v1/management/sub?code=%s", websocketHost, eventCode) + // fmt.Println(socketUri) + conn, _, err := websocket.DefaultDialer.Dial( + socketUri, + http.Header{ + "Authorization": []string{token}, + }, + ) + if err != nil { + log.Printf("dail: %s", err.Error()) + return false + } + eventHub.Connections[eventCode] = conn + } + return true +} + +func (eventHub *WebSocketEventHub) CreateAuthentication(eventCode string, websocketHost string, token string) bool { + if eventHub.Connections[eventCode] == nil { + var socketUri = fmt.Sprintf("%s/events/v1/authentication/sub?code=%s&token=%s", websocketHost, eventCode, token) + // fmt.Println(socketUri) + conn, _, err := websocket.DefaultDialer.Dial( + socketUri, + http.Header{}, + ) + if err != nil { + log.Printf("dail: %s", err.Error()) + return false + } + eventHub.Connections[eventCode] = conn + } + return true +} + +func (eventHub *WebSocketEventHub) AddReceiver(eventCode string, onSuccess func(msg []byte), onError func(err error)) { + receivers := NewEventReceives() + if funcMap, ok := eventHub.funcMap.Load(eventCode); ok { + receivers.Successes = append(funcMap.(*EventReceives).Successes, onSuccess) + receivers.Errors = append(funcMap.(*EventReceives).Errors, onError) + } else { + receivers.Successes = []func(msg []byte){onSuccess} + receivers.Errors = []func(err error){onError} + } + eventHub.funcMap.Store(eventCode, receivers) +} + +func (eventHub *WebSocketEventHub) StartReceive(eventCode string) { + started, ok := eventHub.receiveMap[eventCode] + if ok && started { + return + } + eventHub.receiveMap[eventCode] = true + for { + _, message, err := eventHub.Connections[eventCode].ReadMessage() + if funcMap, ok := eventHub.funcMap.Load(eventCode); ok { + funcs := funcMap.(*EventReceives) + if err != nil { + for _, onError := range funcs.Errors { + onError(err) + } + } else { + for _, onSuccess := range funcs.Successes { + onSuccess(message) + } + } + } + + time.Sleep(time.Microsecond * 500) + } +} From baefb1a57614f7170a2f0654494f693228f524be Mon Sep 17 00:00:00 2001 From: shoy Date: Thu, 9 Mar 2023 16:57:27 +0800 Subject: [PATCH 2/2] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=20websocket?= =?UTF-8?q?=20=E8=BF=9E=E6=8E=A5=E7=AE=A1=E7=90=86=E4=BB=A5=E5=8F=8A?= =?UTF-8?q?=E5=BF=83=E8=B7=B3=E5=A4=84=E7=90=86=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- util/websocket_utils.go | 94 +++++++++++++++++++++++++++-------------- 1 file changed, 63 insertions(+), 31 deletions(-) diff --git a/util/websocket_utils.go b/util/websocket_utils.go index 5e87dac..d8d78da 100644 --- a/util/websocket_utils.go +++ b/util/websocket_utils.go @@ -10,9 +10,13 @@ import ( "github.com/gorilla/websocket" ) +const ( + pongWait = 15 * time.Second +) + type WebSocketEventHub struct { Connections map[string]*websocket.Conn - receiveMap map[string]bool + listenTag sync.Map funcMap sync.Map } @@ -30,7 +34,6 @@ type EventReceives struct { func NewWebSocketEvent() *WebSocketEventHub { return &WebSocketEventHub{ Connections: make(map[string]*websocket.Conn), - receiveMap: make(map[string]bool), } } @@ -41,18 +44,28 @@ func NewEventReceives() *EventReceives { } } +func createConnection(uri string, headers http.Header) *websocket.Conn { + conn, _, err := websocket.DefaultDialer.Dial(uri, headers) + if err != nil { + log.Fatal("dial:", err) + return nil + } + // conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetPongHandler(func(appData string) error { + conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + return conn +} + func (eventHub *WebSocketEventHub) CreateManagement(eventCode string, websocketHost string, token string) bool { if eventHub.Connections[eventCode] == nil { var socketUri = fmt.Sprintf("%s/events/v1/management/sub?code=%s", websocketHost, eventCode) // fmt.Println(socketUri) - conn, _, err := websocket.DefaultDialer.Dial( - socketUri, - http.Header{ - "Authorization": []string{token}, - }, - ) - if err != nil { - log.Printf("dail: %s", err.Error()) + conn := createConnection(socketUri, http.Header{ + "Authorization": []string{token}, + }) + if conn == nil { return false } eventHub.Connections[eventCode] = conn @@ -64,12 +77,8 @@ func (eventHub *WebSocketEventHub) CreateAuthentication(eventCode string, websoc if eventHub.Connections[eventCode] == nil { var socketUri = fmt.Sprintf("%s/events/v1/authentication/sub?code=%s&token=%s", websocketHost, eventCode, token) // fmt.Println(socketUri) - conn, _, err := websocket.DefaultDialer.Dial( - socketUri, - http.Header{}, - ) - if err != nil { - log.Printf("dail: %s", err.Error()) + conn := createConnection(socketUri, http.Header{}) + if conn == nil { return false } eventHub.Connections[eventCode] = conn @@ -89,27 +98,50 @@ func (eventHub *WebSocketEventHub) AddReceiver(eventCode string, onSuccess func( eventHub.funcMap.Store(eventCode, receivers) } +func connPong(conn *websocket.Conn) { + ticker := time.NewTicker(pongWait) + defer ticker.Stop() + for range ticker.C { + err := conn.WriteMessage(websocket.PongMessage, nil) + if err != nil { + log.Fatal(err) + } + } +} + func (eventHub *WebSocketEventHub) StartReceive(eventCode string) { - started, ok := eventHub.receiveMap[eventCode] - if ok && started { + started, loaded := eventHub.listenTag.LoadOrStore(eventCode, true) + if loaded && started.(bool) { return } - eventHub.receiveMap[eventCode] = true + log.Println("start connection receive") + + conn := eventHub.Connections[eventCode] + defer conn.Close() + go connPong(conn) + ticker := time.NewTicker(pongWait) + defer ticker.Stop() + begin_time := time.Now() + count := 0 for { - _, message, err := eventHub.Connections[eventCode].ReadMessage() - if funcMap, ok := eventHub.funcMap.Load(eventCode); ok { - funcs := funcMap.(*EventReceives) - if err != nil { - for _, onError := range funcs.Errors { - onError(err) - } - } else { - for _, onSuccess := range funcs.Successes { - onSuccess(message) + select { + case <-ticker.C: + log.Printf("received %v messages, has been running for %v seconds", count, time.Since(begin_time).Seconds()) + default: + _, message, err := conn.ReadMessage() + if funcMap, ok := eventHub.funcMap.Load(eventCode); ok { + funcs := funcMap.(*EventReceives) + if err != nil { + for _, onError := range funcs.Errors { + onError(err) + } + } else { + for _, onSuccess := range funcs.Successes { + onSuccess(message) + } } } + count += 1 } - - time.Sleep(time.Microsecond * 500) } }