一、前言


延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。

实现延迟队列的方式有两种:

* 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
* 使用rabbitmq-delayed-message-exchange插件实现延迟功能;
注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ
3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。

由于使用死信交换器相对曲折,本文重点介绍第二种方式,使用rabbitmq-delayed-message-exchange插件完成延迟队列的功能。

二、安装延迟插件

1.1 下载插件

打开官网下载:http://www.rabbitmq.com/community-plugins.html

选择相应的对应的版本“3.7.x”点击下载。

注意: 下载的是.zip的安装包,下载完之后需要手动解压。

1.2 安装插件

拷贝插件到Docker:

docker cp D:\rabbitmq_delayed_message_exchange-20171201-3.7.x.ez
rabbit:/plugins

RabbitMQ在Docker的安装,请参照本系列的上一篇文章:http://www.apigo.cn/2018/09/11/springboot13/

1.3 启动插件

进入docker内部:

docker exec -it rabbit /bin/bash

开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查询安装的所有插件:

rabbitmq-plugins list

安装正常,效果如下图:



重启RabbitMQ,使插件生效

docker restart rabbit

三、代码实现

3.1 配置队列
import com.example.rabbitmq.mq.DirectConfig; import
org.springframework.amqp.core.*; import
org.springframework.context.annotation.Bean; import
org.springframework.context.annotation.Configuration; import java.util.HashMap;
import java.util.Map; @Configuration public class DelayedConfig { final static
String QUEUE_NAME = "delayed.goods.order"; final static String EXCHANGE_NAME =
"delayedec"; @Bean public Queue queue() { return new
Queue(DelayedConfig.QUEUE_NAME); } // 配置默认的交换机 @Bean CustomExchange
customExchange() { Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); //参数二为类型:必须是x-delayed-message return new
CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false,
args); } // 绑定队列到交换器 @Bean Binding binding(Queue queue, CustomExchange
exchange) { return
BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
} }
3.2 发送消息
import org.springframework.amqp.AmqpException; import
org.springframework.amqp.core.AmqpTemplate; import
org.springframework.amqp.core.Message; import
org.springframework.amqp.core.MessagePostProcessor; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.stereotype.Component; import java.text.SimpleDateFormat;
import java.util.Date; @Component public class DelayedSender { @Autowired
private AmqpTemplate rabbitTemplate; public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("发送时间:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,
DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() { @Override public
Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 3000); return message; }
}); } }
3.3 消费消息
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import
org.springframework.amqp.rabbit.annotation.RabbitListener; import
org.springframework.stereotype.Component; import java.text.SimpleDateFormat;
import java.util.Date; @Component @RabbitListener(queues =
"delayed.goods.order") public class DelayedReceiver { @RabbitHandler public
void process(String msg) { SimpleDateFormat sdf = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("接收时间:" +
sdf.format(new Date())); System.out.println("消息内容:" + msg); } }
3.4 测试队列
import com.example.rabbitmq.RabbitmqApplication; import
com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import
org.junit.runner.RunWith; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.boot.test.context.SpringBootTest; import
org.springframework.test.context.junit4.SpringRunner; import
java.text.SimpleDateFormat; import java.util.Date; @RunWith(SpringRunner.class)
@SpringBootTest public class DelayedTest { @Autowired private DelayedSender
sender; @Test public void Test() throws InterruptedException { SimpleDateFormat
sf = new SimpleDateFormat("yyyy-MM-dd"); sender.send("Hi Admin.");
Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试 } }
执行结果如下:
发送时间:2018-09-11 20:47:51 接收时间:2018-09-11 20:47:54 消息内容:Hi Admin.

完整代码访问我的GitHub:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq

四、总结


到此为止我们已经使用“rabbitmq-delayed-message-exchange”插件实现了延迟功能,但是需要注意的一点是,如果使用命令“rabbitmq-plugins
disable rabbitmq_delayed_message_exchange”禁用了延迟插件,那么所有未发送的延迟消息都将丢失。