Skip to content

Commit a9742c5

Browse files
committed
修改 pubsub.ChatMessage 的获取方式.因为使用频率多,所以用上 pool
1 parent d02cf0b commit a9742c5

File tree

5 files changed

+131
-99
lines changed

5 files changed

+131
-99
lines changed

handler/chat.go

+36-59
Original file line numberDiff line numberDiff line change
@@ -238,25 +238,8 @@ func sendChatMessageToFriend(ctx *gin.Context, req *SendChatMessageRequest, curr
238238
}
239239
JSON(ctx, rsp)
240240

241-
var psData = &pubsub.ChatMessage{
242-
ActionID: rsp.ActionID,
243-
ReceiverID: rsp.ReceiverID,
244-
SessionType: rsp.SessionType,
245-
Type: rsp.Type,
246-
SenderID: rsp.SenderID,
247-
MessageID: rsp.MessageID,
248-
CreatedAt: rsp.CreatedAt,
249-
Body: pubsub.ChatMessageBody{
250-
Text: rsp.Body.Text,
251-
Src: rsp.Body.Src,
252-
Format: rsp.Body.Format,
253-
Size: rsp.Body.Size,
254-
Longitude: rsp.Body.Longitude,
255-
Latitude: rsp.Body.Latitude,
256-
Scale: rsp.Body.Scale,
257-
LocationLabel: rsp.Body.LocationLabel,
258-
},
259-
}
241+
psData := fillChatMessageForPublish(&rsp)
242+
260243
err = pubsub.PublishChatMessage(ctx, psData)
261244
if err != nil {
262245
log.ErrorFromGinContext(ctx).Err(err).
@@ -374,26 +357,7 @@ func sendChatMessageToGroup(ctx *gin.Context, req *SendChatMessageRequest, curre
374357
return
375358
}
376359

377-
var psData = &pubsub.ChatMessage{
378-
ActionID: rsp.ActionID,
379-
ReceiverID: rsp.ReceiverID,
380-
SessionType: rsp.SessionType,
381-
Type: rsp.Type,
382-
SenderID: rsp.SenderID,
383-
MessageID: rsp.MessageID,
384-
CreatedAt: rsp.CreatedAt,
385-
Body: pubsub.ChatMessageBody{
386-
Text: rsp.Body.Text,
387-
Src: rsp.Body.Src,
388-
Format: rsp.Body.Format,
389-
Size: rsp.Body.Size,
390-
Longitude: rsp.Body.Longitude,
391-
Latitude: rsp.Body.Latitude,
392-
Scale: rsp.Body.Scale,
393-
LocationLabel: rsp.Body.LocationLabel,
394-
},
395-
PublishTargets: memberStrIds,
396-
}
360+
psData := fillChatMessageForPublish(&rsp, memberStrIds)
397361
err = pubsub.PublishChatMessage(ctx, psData)
398362
if err != nil {
399363
log.ErrorFromGinContext(ctx).Err(err).
@@ -458,25 +422,7 @@ func sendChatMessageToWorld(ctx *gin.Context, req *SendChatMessageRequest, curre
458422
}
459423
JSON(ctx, rsp)
460424

461-
var psData = &pubsub.ChatMessage{
462-
ActionID: rsp.ActionID,
463-
ReceiverID: rsp.ReceiverID,
464-
SessionType: rsp.SessionType,
465-
Type: rsp.Type,
466-
SenderID: rsp.SenderID,
467-
MessageID: rsp.MessageID,
468-
CreatedAt: rsp.CreatedAt,
469-
Body: pubsub.ChatMessageBody{
470-
Text: rsp.Body.Text,
471-
Src: rsp.Body.Src,
472-
Format: rsp.Body.Format,
473-
Size: rsp.Body.Size,
474-
Longitude: rsp.Body.Longitude,
475-
Latitude: rsp.Body.Latitude,
476-
Scale: rsp.Body.Scale,
477-
LocationLabel: rsp.Body.LocationLabel,
478-
},
479-
}
425+
psData := fillChatMessageForPublish(&rsp)
480426
err = pubsub.PublishChatMessage(ctx, psData)
481427
if err != nil {
482428
log.ErrorFromGinContext(ctx).Err(err).
@@ -492,6 +438,37 @@ func sendChatMessageToWorld(ctx *gin.Context, req *SendChatMessageRequest, curre
492438

493439
}
494440

441+
// fillChatMessageForPublish 填充推送用的聊天消息
442+
func fillChatMessageForPublish(rsp *SendChatMessageResponse, targetIDs ...[]int64) *pubsub.ChatMessage {
443+
var targets = make([]int64, 0)
444+
if len(targetIDs) > 0 {
445+
targets = targetIDs[0]
446+
}
447+
448+
msg := pubsub.NewChatMessage()
449+
msg.ActionID = rsp.ActionID
450+
msg.ReceiverID = rsp.ReceiverID
451+
msg.SessionType = rsp.SessionType
452+
msg.Type = rsp.Type
453+
msg.SenderID = rsp.SenderID
454+
msg.MessageID = rsp.MessageID
455+
msg.CreatedAt = rsp.CreatedAt
456+
msg.PublishTargets = targets
457+
458+
msgBody := pubsub.NewChatMessageBody()
459+
msgBody.Text = rsp.Body.Text
460+
msgBody.Src = rsp.Body.Src
461+
msgBody.Format = rsp.Body.Format
462+
msgBody.Size = rsp.Body.Size
463+
msgBody.Longitude = rsp.Body.Longitude
464+
msgBody.Latitude = rsp.Body.Latitude
465+
msgBody.Scale = rsp.Body.Scale
466+
msgBody.LocationLabel = rsp.Body.LocationLabel
467+
468+
msg.Body = msgBody
469+
return msg
470+
}
471+
495472
// RollbackChatMessageHandler 回滚聊天消息处理方法
496473
func RollbackChatMessageHandler(ctx *gin.Context) {
497474

@@ -508,7 +485,7 @@ func DeleteChatMessageHandler(ctx *gin.Context) {
508485

509486
// SubscribeChatMessageHandler 接收聊天消息
510487
func SubscribeChatMessageHandler(ctx context.Context, payload *pubsub.Payload) {
511-
chatMsg := new(pubsub.ChatMessage)
488+
chatMsg := pubsub.NewChatMessage()
512489
err := payload.UnmarshalData(chatMsg)
513490
if err != nil {
514491
log.Error().Err(err).

handler/chat_test.go

+25-20
Original file line numberDiff line numberDiff line change
@@ -107,17 +107,20 @@ func getToken() (string, error) {
107107
func BenchmarkChatSendMessageUseApi(b *testing.B) {
108108
//r := gin.New()
109109
jsonData := `{
110-
"receiver_id":"2",
110+
"action_id":"随机字符串",
111+
"receiver_id":2,
111112
"session_type":1,
112-
"message_type":1,
113+
"type":1,
113114
"body":{"text":"test"}
114115
}`
115116

116-
token, err := getToken()
117-
if err != nil {
118-
log.Println(err)
119-
return
120-
}
117+
//token, err := getToken()
118+
//if err != nil {
119+
// log.Println(err)
120+
// return
121+
//}
122+
123+
token := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJleHAiOjE3MjU2MzQ0ODl9.WTqdIHkA8D8lfE_nJVir9Z64Cy1gZ-V11extOlvUjSI"
121124

122125
file, err := os.Open("./benchmark.log")
123126
if err != nil && os.IsNotExist(err) {
@@ -145,7 +148,7 @@ func BenchmarkChatSendMessageUseApi(b *testing.B) {
145148
//w := httptest.NewRecorder()
146149
///req := httptest.NewRequest(http.MethodPost, "http://127.0.0.1:8080/api/chat/send_message", bytes.NewBufferString(jsonData))
147150
b.StopTimer()
148-
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:8080/api/chat/send_message", bytes.NewBufferString(jsonData))
151+
req, err := http.NewRequest(http.MethodPost, "http://192.168.31.100:8080/api/v1/chat/message/send", bytes.NewBufferString(jsonData))
149152
req.Header.Add("Authorization", token)
150153
req.Header.Add("Content-Type", "application/json")
151154

@@ -256,17 +259,18 @@ func BenchmarkChatSendMessageParallel(b *testing.B) {
256259
r := gin.New()
257260
jsonData := `{
258261
"action_id":"随机字符串",
259-
"receiver_id":"2",
262+
"receiver_id":2,
260263
"session_type":1,
261264
"type":1,
262265
"body":{"text":"test"}
263266
}`
264267

265-
token, err := getToken()
266-
if err != nil {
267-
log.Println(err)
268-
return
269-
}
268+
//token, err := getToken()
269+
//if err != nil {
270+
// log.Println(err)
271+
// return
272+
//}
273+
token := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJleHAiOjE3MjU2MzQ0ODl9.WTqdIHkA8D8lfE_nJVir9Z64Cy1gZ-V11extOlvUjSI"
270274

271275
//file, err := os.Open("./benchmark.log")
272276
//if err != nil && os.IsNotExist(err) {
@@ -291,7 +295,7 @@ func BenchmarkChatSendMessageParallel(b *testing.B) {
291295

292296
w := httptest.NewRecorder()
293297
b.StopTimer()
294-
req := httptest.NewRequest(http.MethodPost, "http://127.0.0.1:8080/api/chat/send_message", bytes.NewBufferString(jsonData))
298+
req := httptest.NewRequest(http.MethodPost, "http://192.168.31.100:8080/api/chat/send_message", bytes.NewBufferString(jsonData))
295299
req.Header.Add("Authorization", token)
296300
req.Header.Add("Content-Type", "application/json")
297301

@@ -339,11 +343,12 @@ func TestWebsocketHandler(t *testing.T) {
339343
}
340344

341345
func TestBenchmarkWebsocketApi(t *testing.T) {
342-
token, err := getToken()
343-
if err != nil {
344-
t.Fatal(err)
345-
}
346+
//token, err := getToken()
347+
//if err != nil {
348+
// t.Fatal(err)
349+
//}
346350

351+
token := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJleHAiOjE3MjU2MzQ0ODl9.WTqdIHkA8D8lfE_nJVir9Z64Cy1gZ-V11extOlvUjSI"
347352
wg := &sync.WaitGroup{}
348353

349354
for i := 0; i < 9999; i++ {
@@ -355,7 +360,7 @@ func TestBenchmarkWebsocketApi(t *testing.T) {
355360
}()
356361
header := http.Header{}
357362
header.Add("Authorization", token)
358-
conn, rsp, err := websocket.DefaultDialer.Dial("ws://127.0.0.1:8080/api/ws", header)
363+
conn, rsp, err := websocket.DefaultDialer.Dial("ws://192.168.31.100:8080/api/ws", header)
359364
if err != nil {
360365
log.Println(err)
361366
return

handler/friend.go

+25-18
Original file line numberDiff line numberDiff line change
@@ -658,24 +658,8 @@ func sayHelloFn(ctx *gin.Context, userID, targetID int64, now *time.Time) error
658658
}
659659

660660
// 进行多服务器订阅推送
661-
var psData = &pubsub.ChatMessage{
662-
ReceiverID: msg.ReceiverID,
663-
SessionType: msg.SessionType,
664-
Type: msg.Type,
665-
SenderID: msg.SenderID,
666-
MessageID: msg.MessageID,
667-
CreatedAt: msg.CreatedAt,
668-
Body: pubsub.ChatMessageBody{
669-
Text: msg.Body.Text,
670-
Src: msg.Body.Src,
671-
Format: msg.Body.Format,
672-
Size: msg.Body.Size,
673-
Longitude: msg.Body.Longitude,
674-
Latitude: msg.Body.Latitude,
675-
Scale: msg.Body.Scale,
676-
LocationLabel: msg.Body.LocationLabel,
677-
},
678-
}
661+
psData := fillSayHelloChatMessageForPublish(msg)
662+
679663
err = pubsub.PublishChatMessage(ctx, psData)
680664
if err != nil {
681665
log.ErrorFromGinContext(ctx).Err(err).
@@ -691,6 +675,29 @@ func sayHelloFn(ctx *gin.Context, userID, targetID int64, now *time.Time) error
691675
return nil
692676
}
693677

678+
func fillSayHelloChatMessageForPublish(rsp *database.ChatMessage) *pubsub.ChatMessage {
679+
msg := pubsub.NewChatMessage()
680+
msg.ReceiverID = rsp.ReceiverID
681+
msg.SessionType = rsp.SessionType
682+
msg.Type = rsp.Type
683+
msg.SenderID = rsp.SenderID
684+
msg.MessageID = rsp.MessageID
685+
msg.CreatedAt = rsp.CreatedAt
686+
687+
msgBody := pubsub.NewChatMessageBody()
688+
msgBody.Text = rsp.Body.Text
689+
msgBody.Src = rsp.Body.Src
690+
msgBody.Format = rsp.Body.Format
691+
msgBody.Size = rsp.Body.Size
692+
msgBody.Longitude = rsp.Body.Longitude
693+
msgBody.Latitude = rsp.Body.Latitude
694+
msgBody.Scale = rsp.Body.Scale
695+
msgBody.LocationLabel = rsp.Body.LocationLabel
696+
697+
msg.Body = msgBody
698+
return msg
699+
}
700+
694701
// publishInvite 推送邀请通知
695702
func publishInvite(ctx *gin.Context, invite *database.UserRelationInvite) error {
696703
defer func() {

pubsub/chat.go

+44-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pubsub
22

33
import (
44
"context"
5+
"sync"
56
)
67

78
/**
@@ -10,6 +11,32 @@ import (
1011
@describe :
1112
*/
1213

14+
var chatMessagePool = &sync.Pool{
15+
New: func() any {
16+
return new(ChatMessage)
17+
},
18+
}
19+
20+
var chatMessageBodyPool = &sync.Pool{
21+
New: func() any {
22+
return new(*ChatMessageBody)
23+
},
24+
}
25+
26+
func NewChatMessage() *ChatMessage {
27+
msg := chatMessagePool.Get().(*ChatMessage)
28+
msg.ActionID = ""
29+
msg.ReceiverID = 0
30+
msg.SessionType = 0
31+
msg.Type = 0
32+
msg.SenderID = 0
33+
msg.MessageID = 0
34+
msg.CreatedAt = 0
35+
msg.Body = nil
36+
msg.PublishTargets = nil
37+
return msg
38+
}
39+
1340
// ChatMessage 订阅传输用的聊天消息结构
1441
type ChatMessage struct {
1542
// ActionID 行为ID
@@ -34,7 +61,7 @@ type ChatMessage struct {
3461
CreatedAt int64 `json:"created_at"`
3562

3663
// Body 消息体;
37-
Body ChatMessageBody `json:"body"`
64+
Body *ChatMessageBody `json:"body"`
3865

3966
// PublishTargets 群成员ID列表
4067
PublishTargets []int64 `json:"publish_targets,omitempty"`
@@ -67,7 +94,22 @@ type ChatMessageBody struct {
6794
LocationLabel string `bson:"location_label,omitempty" json:"location_label,omitempty"`
6895
}
6996

97+
func NewChatMessageBody() *ChatMessageBody {
98+
body := chatMessageBodyPool.Get().(*ChatMessageBody)
99+
body.Text = ""
100+
body.Src = ""
101+
body.Format = ""
102+
body.Size = ""
103+
body.Longitude = ""
104+
body.Latitude = ""
105+
body.Scale = 0
106+
body.LocationLabel = ""
107+
return body
108+
}
109+
70110
// PublishChatMessage 发布聊天消息到其他服务器上
71111
func PublishChatMessage(ctx context.Context, data *ChatMessage) error {
72-
return PublishWithPayload(ctx, ChannelChatMessage, PayloadTypeChatMessage, data)
112+
err := PublishWithPayload(ctx, ChannelChatMessage, PayloadTypeChatMessage, data)
113+
chatMessagePool.Put(data)
114+
return err
73115
}

websocket/payload.go

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package websocket
55
@time : 2023/8/29 18:23
66
@describe :
77
*/
8+
89
// Payload 用于websocket的推送数据结构
910
type Payload struct {
1011
// Type 类型

0 commit comments

Comments
 (0)