Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev 补充RocketMq的样例 #216

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions demo-mq-rocketmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,25 @@
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.xkcoding.mq.rocketmq.controller;

import com.xkcoding.mq.rocketmq.message.MessageProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;

@RestController
@RequestMapping("sendMessage")
public class SendMsgController {
@Autowired
MessageProducer messageProducer;

/**
* 官方发送消息示例
*/
@RequestMapping("official")
public void sendMsgOfficial() throws Exception {
//测试发送消息,实例化生产者producer
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("test_message");
// 设置NameServer的地址
producer.setNamesrvAddr("192.168.1.18:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 50; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

/**
* 发送普通消息
*/
@RequestMapping("plain")
public SendResult sendPlainMsg(){
return messageProducer.sendPlainMsg("发送普通消息");
}

/**
* 发送延迟消息,
* messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
@RequestMapping(value = "delay",method = RequestMethod.POST)
public SendResult sendDelayMsg(@RequestParam(name = "delayLevel") int delayLevel){
return messageProducer.sendDelayMsg("发送延迟消息",delayLevel);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xkcoding.mq.rocketmq.message;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup="consumer-test")
public class MessageConsumer implements RocketMQListener<String> {

@Override

public void onMessage(String message) {
System.out.printf("consumer name is %s data is %s%n", this.getClass().getSimpleName(), message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.xkcoding.mq.rocketmq.message;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.List;

@Service
@Slf4j
public class MessageProducer {
@Resource
RocketMQTemplate rocketMQTemplate;

/**
* 发送普通消息
* @param msg 消息内容
*/
public SendResult sendPlainMsg(String msg){
return rocketMQTemplate.syncSend("TopicTest", MessageBuilder.withPayload(msg).build());
}

/**
* 发送延迟消息
* @param msg 消息内容
* @param delayLevel 延迟级别,在broker配置文件里定义
*/
public SendResult sendDelayMsg(String msg,int delayLevel){
return rocketMQTemplate.syncSend("TopicTest",MessageBuilder.withPayload(msg).build(),2000,delayLevel);
}
public void sendAsyncMsg(String msg){
rocketMQTemplate.asyncSend("TopicTest", MessageBuilder.withPayload(msg).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功"+sendResult.getMsgId());
}

@Override
public void onException(Throwable throwable) {
log.error("消息发送失败",throwable);
}
});
}

/**
* 发送顺序消息
* @param msg 消息体
* @param index hashKey
* @return 发送结果
*/
public SendResult sendOrderMsg(String msg,int index){
rocketMQTemplate.setMessageQueueSelector((list, message, o) -> {
//分组规则
return null;
});
return rocketMQTemplate.syncSendOrderly("TopicTest",MessageBuilder.withPayload(msg).build(),String.valueOf(index));
}
}
9 changes: 9 additions & 0 deletions demo-mq-rocketmq/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
server:
port: 8080
servlet:
context-path: /demo
rocketmq:
name-server: 192.168.1.18:9876 # Rocketmq name-server地址
producer:
group: service-producer-group #生产者