安装需要

jdk 1.8
centos 7
rocketmq 4.2.0

因为我是在阿里云ECS上安装的,所以centos就不用说了,jdk的安装,如果没有安装可以看这篇文章:CentOS配置JAVA_HOME
<https://blog.csdn.net/zhwyj1019/article/details/80086813>,下面就开始正式的安装过程了。

安装步骤

1. 下载rocketmq 4.2.0

通过wget命令下载,首先安装wget
yum install wget
然后下载rocketmq,官网下载地址 <http://www-us.apache.org/dist/rocketmq/>
,可以选择需要的版本,我下载的是rocketmq4.2.0
wget http://www-us.apache.org/dist/rocketmq/4.2.0/rocketmq-all-4.2.0
-bin-release.zip
解压
yum install zip unzip //安装unzip命令 unzip rocketmq-all-4.2.0-bin-release.zip -d
rocketmq-4.2.0 //解压
解压好后:


将rocketmq-4.2.0这个目录copy到/usr/local路径下(个人习惯,可以忽略)
cp -r rocketmq-4.2.0 /usr/local
这样准备工作就已经完成了,无需安装就可以直接使用了,Apache的很多东西都是这样。

2. 启动rocketmq

参考:官网 Quick Start <http://rocketmq.apache.org/docs/quick-start/>

进入rocketmq目录
cd /usr/local/rocketmq-4.2.0
* 启动Name Server > nohup sh bin/mqnamesrv & // 启动 > tail -f
~/logs/rocketmqlogs/namesrv.log // 查看namaserver日志 The Name Server boot success
...
注意:启动过程中可能报错显示内存不足,报错信息如下:
# There is insufficient memory for the Java Runtime Environment to continue. #
Native memory allocation (mmap) failedto map 33554432 bytes for committing
reserved memory.
修改bin目录下的runserver.sh文件:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m
-XX:MaxPermSize=320m"
同理修改runbroker.sh文件:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m
同理修改tools.sh文件:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m
-XX:MaxPermSize=128m"
启动成功:


* 启动Broker > nohup sh bin/mqbroker -n localhost:9876 & // 启动 > tail -f
~/logs/rocketmqlogs/broker.log // 查看broker日志 The broker[%s, 172.30.30.233:10911]
boot success...
注意:官网的这个启动命令特别坑,启动时broker会通过私有ip启动,会导致客户端无法远程连接,所以启动之前我们需要修改一下配置文件,修改如下:
vim /usr/local/rocketmq-4.2.0/conf/broker.conf


然后通过以下命令启动:
nohup sh bin/mqbroker -n xxxx:9876 autoCreateTopicEnable=true -c /usr/local
/rocketmq-4.2.0/conf/broker.conf & // 启动broker,xxxx为你的公有ip,或者是localhost也可以 tail
-f ~/logs/rocketmqlogs/broker.log // 查看broker日志
启动成功:


3. 客户端远程连接代码

官网示例地址 <http://rocketmq.apache.org/docs/simple-example/>

执行以下代码之前需要在阿里云安全组中开放以下端口:


需要的maven依赖
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>
rocketmq-client</artifactId> <version>4.2.0</version> </dependency> <dependency>
<groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.29
</version> </dependency> </dependencies>
Producer 代码
public class Producer { public static void main(String[] args) throws
MQClientException, InterruptedException {// 声明并初始化一个producer // 需要一个producer
group名字作为构造方法的参数,这里为producer1 DefaultMQProducer producer = new
DefaultMQProducer("producer1"); producer.setVipChannelEnabled(false); //
设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔 // NameServer的地址必须有 //
producer.setClientIP("119.23.211.22"); // producer.setInstanceName("Producer");
producer.setNamesrvAddr("119.23.211.22:9876"); // 调用start()方法启动一个producer实例
producer.start();// 发送1条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
try { // 封装消息 Message msg = new Message("TopicTest",// topic "TagA",// tag (
"Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //
调用producer的send()方法发送消息 // 这里调用的是同步的方式,所以会有返回结果 SendResult sendResult =
producer.send(msg);// 打印返回结果 System.out.println(sendResult); } catch
(RemotingException e) { e.printStackTrace(); }catch (MQBrokerException e) {
e.printStackTrace(); }catch (UnsupportedEncodingException e) {
e.printStackTrace(); }//发送完消息之后,调用shutdown()方法关闭producer System.out.println(
"send success"); producer.shutdown(); } }
Consumer 代码
public class Consumer { public static void main(String[] args) throws
InterruptedException, MQClientException {//声明并初始化一个consumer //需要一个consumer
group名字作为构造方法的参数,这里为consumer1 DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer1"); //consumer.setVipChannelEnabled(false);
//同样也要设置NameServer地址 consumer.setNamesrvAddr("119.23.211.22:9876");
//这里设置的是一个consumer的消费策略 //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
//CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置consumer所订阅的Topic和Tag,*代表全部的Tag consumer.subscribe("TopicTest", "*");
//设置一个Listener,主要进行消息的逻辑处理 consumer.registerMessageListener(new
MessageListenerConcurrently() { @Overridepublic ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: "
+ msgs); System.out.println(
"----------------------------------------------------------------------------------"
);//返回消费状态 //CONSUME_SUCCESS 消费成功 //RECONSUME_LATER 消费失败,需要稍后重新消费 return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });//调用start()方法启动consumer
consumer.start(); System.out.println("Consumer Started."); } }
测试结果:先启动Consumer,接受到Producer发送的消息会将详细打印出来
Consumer Started. ConsumeMessageThread_1 Receive New Messages: [MessageExt
[queueId=3, storeSize=176, queueOffset=1352, sysFlag=0, bornTimestamp=
1525920532015, bornHost=/115.236.50.15:59835, storeTimestamp=1525920532023,
storeHost=/119.23.211.22:10911, msgId=7717D31600002A9F00000000000EDF4C,
commitLogOffset=974668, bodyCRC=1774740973, reconsumeTimes=0,
preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,
properties={MIN_OFFSET=0, MAX_OFFSET=1353, CONSUME_START_TIME=1525920532107,
UNIQ_KEY=0A15211928C018B4AAC230AB4A2E0000, WAIT=true, TAGS=TagA}, body=14]]]
----------------------------------------------------------------------------------
SendResult [sendStatus=SEND_OK, msgId=0A152119388C18B4AAC230AE98E10000,
offsetMsgId=7717D31600002A9F00000000000EE15C, messageQueue=MessageQueue
[topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1353] send
success
4. 遇到的错误:

* broker启动ip错误


这里如果你之间安装过docker环境,broker启动时,如果没有设置brokerIP1的IP地址,也有可能会通过docker的虚拟ip启动,导致远程无法连接。
本文参考文章地址如下:

参考地址
<https://yestermorrow.github.io/2018/05/08/RocketMQ-%E5%AE%89%E8%A3%85%E9%83%A8%E7%BD%B2%E6%95%99%E7%A8%8B/#more>

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信