-
Notifications
You must be signed in to change notification settings - Fork 480
/
Copy pathaliyun_rocketmq.go
86 lines (79 loc) · 1.99 KB
/
aliyun_rocketmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package rocketmq
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"strconv"
"time"
)
// 使用阿里云的rocketmq
var (
endpoint = "http://MQ_INST_1923451467929791_CN8n7179.mq-internet-access.mq-internet.aliyuncs.com:80"
accessKey = ""
secretKey = ""
instanceId = ""
addr, _ = primitive.NewNamesrvAddr(endpoint)
cre = primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}
)
func ConsumerFromAli() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName(group),
consumer.WithNameServer(addr),
consumer.WithCredentials(cre),
consumer.WithNamespace(instanceId),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumerOrder(false), // 是否是顺序消费
)
err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
panic(err)
}
// Note: start after subscribe
if err = c.Start(); err != nil {
panic(err)
}
time.Sleep(time.Minute)
if err = c.Shutdown(); err != nil {
panic(err)
}
}
func PublishFromAli() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer(addr),
producer.WithCredentials(cre),
producer.WithNamespace(instanceId),
producer.WithGroupName(group),
producer.WithRetry(2),
)
err := p.Start()
if err != nil {
panic(err)
}
for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ From Ali" + strconv.Itoa(i)),
}
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
if err = p.Shutdown(); err != nil {
panic(err)
}
}