学习RocketMQ,先写一个Demo演示一下看看效果。
一、服务端部署
  因为只是简单的为了演示效果,服务端仅部署单Master模式 —— 一个Name Server节点,一个Broker节点。主要有以下步骤。

* 下载RocketMQ源码、编译(也可以网上下载编译好的文件),这里使用最新的4.4.0版本,下载好之后放在Linux上通过一下命令解压缩、编译。
unzip rocketmq-all-4.4.0-source-release.zip cd rocketmq-all-4.4.0/ mvn
-Prelease-all -DskipTests clean install –U
 

* 编译之后到distribution/target/apache-rocketmq目录,后续所有操作都是在该路径下。 cd
distribution/target/apache-rocketmq
 

* 启动Name Server,查看日志确认启动成功。 nohup sh bin/mqnamesrv & tail -f
~/logs/rocketmqlogs/namesrv.log
 

* 启动Broker,查看日志确认启动成功。 nohup sh bin/mqbroker -n localhost:9876 & tail -f
~/logs/rocketmqlogs/broker.log
 

  Name
Server和Broker都成功启动,服务器就部署完成了。更详细的参考官方文档手册,里面还包含在服务器上运行Producer、Customer示例,这里主要是在项目中使用。

  官网手册戳这里:Quick Start <http://rocketmq.apache.org/docs/quick-start/>
二、客户端搭建:Spring Boot项目中使用
  客户端分为消息生产者和消息消费者,这里通过日志打印输出查看效果,为了看起来更清晰,我新建了两个模块分别作为消息生产者和消息消费者。

* 添加依赖,在两个模块的pom文件中添加以下配置。 <dependency>
<groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId>
<version>4.4.0</version> </dependency>
* 配置生产者模块。
* application.yml文件中增加用来初始化producer的相关配置,这里只配了一部分,更详细的配置参数可以查看官方文档。 #
RocketMQ生产者 rocketmq: producer: #
Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。默认DEFAULT_PRODUCER
producerGroup: ${spring.application.name} # namesrv地址 namesrvAddr:
192.168.101.213:9876 # 客户端限制的消息大小,超过报错,同时服务端也会限制,需要跟服务端配合使用。默认4MB
maxMessageSize:4096 # 发送消息超时时间,单位毫秒。默认10000 sendMsgTimeout: 5000 #
如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用。默认2 retryTimesWhenSendFailed:2 #
消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节。默认4096 compressMsgBodyOverHowmuch:4096
# 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。 createTopicKey:
XIAO_LIU
 

*
新增producer配置类,系统启动时读取yml文件的配置信息初始化producer。集群模式下,如果在同一个jvm中,要往多个的MQ集群发送消息,则需要创建多个的producer并设置不同的instanceName,默认不需要设置该参数。
@Configurationpublic class ProducerConfiguration { private static final Logger
LOGGER = LoggerFactory.getLogger(ProducerConfiguration.class); /** *
Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。默认DEFAULT_PRODUCER*/ @Value(
"${rocketmq.producer.producerGroup}") private String producerGroup; /** *
namesrv地址*/ @Value("${rocketmq.producer.namesrvAddr}") private String
namesrvAddr;/** * 客户端限制的消息大小,超过报错,同时服务端也会限制,需要跟服务端配合使用。默认4MB */ @Value(
"${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize; /** *
发送消息超时时间,单位毫秒。默认10000*/ @Value("${rocketmq.producer.sendMsgTimeout}") private
Integer sendMsgTimeout;/** * 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用。默认2 */ @Value(
"${rocketmq.producer.retryTimesWhenSendFailed}") private Integer
retryTimesWhenSendFailed;/** * 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节。默认4096 */
@Value("${rocketmq.producer.compressMsgBodyOverHowmuch}") private Integer
compressMsgBodyOverHowmuch;/** *
在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。*/ @Value(
"${rocketmq.producer.createTopicKey}") private String createTopicKey; @Bean
public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer =
new DefaultMQProducer(this.producerGroup); producer.setNamesrvAddr(this
.namesrvAddr); producer.setCreateTopicKey(this.createTopicKey); if (this
.maxMessageSize !=null) { producer.setMaxMessageSize(this.maxMessageSize); } if
(this.sendMsgTimeout != null) { producer.setSendMsgTimeout(this
.sendMsgTimeout); }if (this.retryTimesWhenSendFailed != null) {
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); } if (this
.compressMsgBodyOverHowmuch !=null) { producer.setCompressMsgBodyOverHowmuch(
this.compressMsgBodyOverHowmuch); } if (Strings.isNotBlank(this
.createTopicKey)) { producer.setCreateTopicKey(this.createTopicKey); } try {
producer.start(); LOGGER.info("Producer Started : producerGroup:[{}],
namesrvAddr:[{}]" , this.producerGroup, this.namesrvAddr); } catch
(MQClientException e) { LOGGER.error("Producer Start Failed : {}",
e.getMessage(), e); }return producer; } }
 

* 使用producer实例向MQ发送消息。 @RunWith(SpringRunner.class) @SpringBootTest public
class ProducerServiceApplicationTests { private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerServiceApplicationTests.class); @Autowired
private DefaultMQProducer defaultMQProducer; @Test public void send() throws
MQClientException, RemotingException, MQBrokerException, InterruptedException,
UnsupportedEncodingException {for (int i = 0; i < 100; i++) { User user = new
User(); user.setUsername("用户" + i); user.setPassword("密码" + i); user.setSex(i %
2); user.setBirthday(new Date()); Message message = new Message("user-topic",
"user-tag", JSON.toJSONString(user).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = defaultMQProducer.send(message);
LOGGER.info(sendResult.toString()); } } }
 

* 配置消费者模块。
* application.yml文件中增加用来初始化consumer的相关配置,同样参数这里只配了一部分,更详细的配置参数可以查看官方文档。 #
RocketMQ消费者 rocketmq: consumer: #
Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组。默认DEFAULT_CONSUMER
consumerGroup: ${spring.application.name} # namesrv地址 namesrvAddr:
192.168.101.213:9876 # 消费线程池最大线程数。默认10 consumeThreadMin: 10 # 消费线程池最大线程数。默认20
consumeThreadMax:20 # 批量消费,一次消费多少条消息。默认1 consumeMessageBatchMaxSize: 1 #
批量拉消息,一次最多拉多少条。默认32 pullBatchSize:32 # 订阅的主题 topics: user-topic
 

* 新增consumer配置。 @Configuration public class ConsumerConfiguration { private
static final Logger LOGGER = LoggerFactory.getLogger(ConsumerConfiguration.class
); @Value("${rocketmq.consumer.consumerGroup}") private String consumerGroup;
@Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value(
"${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value(
"${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value(
"${rocketmq.consumer.consumeMessageBatchMaxSize}") private int
consumeMessageBatchMaxSize; @Value("${rocketmq.consumer.pullBatchSize}") private
int pullBatchSize; @Value("${rocketmq.consumer.topics}") private String topics;
private final ConsumeMsgListener consumeMsgListener; @Autowired public
ConsumerConfiguration(final ConsumeMsgListener consumeMsgListener) { this
.consumeMsgListener = consumeMsgListener; } @Bean public DefaultMQPushConsumer
getRocketMQConsumer() { DefaultMQPushConsumer consumer= new
DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
consumer.setPullBatchSize(pullBatchSize);
consumer.registerMessageListener(consumeMsgListener); try { /** *
设置消费者订阅的主题和tag。subExpression参数为*表示订阅该主题下所有tag, *
如果需要订阅该主题下的指定tag,subExpression设置为对应tag名称,多个tag以||分割,例如"tag1 || tag2 || tag3"*/
consumer.subscribe(topics,"*"); consumer.start(); LOGGER.info("Consumer Started
: consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics,
namesrvAddr); }catch (Exception e) { LOGGER.error("Consumer Start Failed :
consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics,
namesrvAddr, e); e.printStackTrace(); }return consumer; } }
 

*
新增消息监听器,监听到新消息后,执行对应的业务逻辑。
@Component public class ConsumeMsgListener implements
MessageListenerConcurrently {private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumeMsgListener.class); @Override public
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {if (CollectionUtils.isEmpty(msgs)) {
LOGGER.info("Msgs is Empty."); return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }for (MessageExt msg : msgs) { try {
if ("user-topic".equals(msg.getTopic())) { LOGGER.info("{} Receive New
Messages: {}", Thread.currentThread().getName(),new String(msg.getBody())); //
do something } } catch (Exception e) { if (msg.getReconsumeTimes() == 3) { //
超过3次不再重试 LOGGER.error("Msg Consume Failed."); return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }else { // 重试 return
ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
 
三、效果
* 运行生产者测试代码。系统启动时初始化Producer,然后执行测试代码,往MQ中发送消息。效果如下:

 

* 启动消费者服务。系统启动时先初始化Customer。此时1.已经往MQ中发送了一些消息,监听器监听到MQ中有消息,随机马上消费消息。

 
四、总结
  Demo很简单,但是里面还有很多东西需要慢慢研究。

  代码可以戳这里:spring-cloud-learn <https://github.com/fubiaoLiu/spring-cloud-learn>

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信