基于队列和基于消息的TTL

TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。
队列消息过期时间-Per-Queue Message TTL:
通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。

死信交换机DLX

队列中的消息在以下三种情况下会变成死信 (1)消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false;
(2)消息的过期时间到期了; (3)队列长度限制超过了。
当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key.
@Bean public Queue lindQueue() { return QueueBuilder.durable(LIND_QUEUE)
.withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//设置死信交换机
.withArgument("x-message-ttl", makeCallExpire)
.withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey
.build(); }
实现的过程
graph TD publisher-->正常queue 正常queue-->TTL TTL-->dead.queue
dead.queue-->subscriber
完整的代码
@Component public class AmqpConfig { /** *
主要测试一个死信队列,功能主要实现延时消费,原理是先把消息发到正常队列, *
正常队列有超时时间,当达到时间后自动发到死信队列,然后由消费者去消费死信队列里的消息. */ public static final String
LIND_EXCHANGE = "lind.exchange"; public static final String LIND_DL_EXCHANGE =
"lind.dl.exchange"; public static final String LIND_QUEUE = "lind.queue";
public static final String LIND_DEAD_QUEUE = "lind.queue.dead"; public static
final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange"; /** * 单位为微秒. */
@Value("${tq.makecall.expire:60000}") private long makeCallExpire; /** *
创建普通交换机. */ @Bean public TopicExchange lindExchange() { return (TopicExchange)
ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true) .build(); } /** *
创建死信交换机. */ @Bean public TopicExchange lindExchangeDl() { return
(TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true)
.build(); } /** * 创建普通队列. */ @Bean public Queue lindQueue() { return
QueueBuilder.durable(LIND_QUEUE) .withArgument("x-dead-letter-exchange",
LIND_DL_EXCHANGE)//设置死信交换机 .withArgument("x-message-ttl", makeCallExpire)
.withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey
.build(); } /** * 创建死信队列. */ @Bean public Queue lindDelayQueue() { return
QueueBuilder.durable(LIND_DEAD_QUEUE).build(); } /** * 绑定死信队列. */ @Bean public
Binding bindDeadBuilders() { return BindingBuilder.bind(lindDelayQueue())
.to(lindExchangeDl()) .with(LIND_DEAD_QUEUE); } /** * 绑定普通队列. * * @return */
@Bean public Binding bindBuilders() { return BindingBuilder.bind(lindQueue())
.to(lindExchange()) .with(LIND_QUEUE); } /** * 广播交换机. * * @return */ @Bean
public FanoutExchange fanoutExchange() { return new
FanoutExchange(LIND_FANOUT_EXCHANGE); } } //----------------- @Component public
class Publisher { @Autowired private RabbitTemplate rabbitTemplate; public void
publish(String message) { try { rabbitTemplate
.convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE,
message); } catch (Exception e) { e.printStackTrace(); } } }
//----------------- @Component @Slf4j public class Subscriber {
@RabbitListener(queues = AmqpConfig.LIND_QUEUE) public void customerSign(String
data) { try { log.info("从队列拿到数据 :{}", data); } catch (Exception ex) {
e.printStackTrace(); } } }

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信