四、TTL(过期时间)
1、介绍
- 当消息到达存活时间后,还没有被消费掉,会被自动清除
- RabbitMQ可以对消息设置存活时间,也可以对整个队列(Queue)设置过期时间,如果同时设置了队列过期时间和消息过期时间,则会以最短的时间将消息清除掉
2、完成代码
(1)针对队列统一设置过期时间
在创建队列的时候加入withArgument("x-message-ttl",10000),单位毫秒
//----------------测试TTL过期时间 public static final String exchange_ttl_name = "test_exchange_ttl"; //交换机名称 public static final String queue_ttl_name = "test_queue_ttl"; //队列名称 // 1.交换机 @Bean("ttlBootExchange") public Exchange ttlBootExchange(){ return ExchangeBuilder.topicExchange(exchange_ttl_name).durable(true).build(); } //2.queue队列 @Bean("ttlBootQueue") public Queue ttlBootQueue(){ //创建队列的过期时间: withArgument("x-message-ttl","10000") 单位毫秒 return QueueBuilder.durable(queue_ttl_name).withArgument("x-message-ttl",10000).build(); } //3. 队列和交换机绑定关系 Binding /** * 1.知道哪个队列 * 2.指定哪个交换机 * 3.routingKey * @return */ @Bean public Binding bingingTTLQueueExchange(@Qualifier("ttlBootQueue") Queue queue, @Qualifier("ttlBootExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs(); }
(2)针对消息设置单独过期时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("20000"); return message; } }; rabbitTemplate.convertAndSend(RabbitMQConfig.exchange_name,"confirm","message TTL . hello world",messagePostProcessor); //rabbitmq为了节约性能,消费过期后,只有消息在顶端,才会判断其是否到期将其移除(也就是说读到该条消息的时候才会将其移除) for (int i = 0; i < 10; i++) { if(i==5){ rabbitTemplate.convertAndSend(RabbitMQConfig.exchange_name,"confirm","message TTL . hello world",messagePostProcessor); } rabbitTemplate.convertAndSend(RabbitMQConfig.exchange_name,"confirm","message TTL . hello world"); }
五、死信队列
1、介绍
死信队列,引文缩写DLX。Dead Letter Exchange(死信交换机),当消息成为Dead Message后,可以被重新发送到另外一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
- 队列消息长度到达限制:例如一个队列设置了最大能存储10个消息,当11个消息存不下的时候会成为死信;
- 消费者拒绝消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列消息存在过期时间,消息到达过期时间而未被消费的;
队列绑定死信交换机:
给队列设置参数:x-dead-letter-exchange 和 x-dead-letter-routing-key
业务场景:废弃资源再利用,比如超时订单被自动取消掉。
2、完成代码
1.声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
public static final String test_queue_dlx = "test_queue_dlx"; //队列名称 public static final String test_exchange_dlx = "test_exchange_dlx"; //交换机名称 @Bean("testExchangeDlx") public Exchange testExchangeDlx(){ return ExchangeBuilder.topicExchange(test_exchange_dlx).durable(true).build(); } @Bean("testQueueDlx") public Queue testQueueDlx(){ return QueueBuilder.durable(test_queue_dlx).build(); } @Bean public Binding bindingTestExchangeQueueDlx(@Qualifier("testQueueDlx") Queue queue, @Qualifier("testExchangeDlx") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs(); }
2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
public static final String queue_dlx = "queue_dlx"; //队列名称 public static final String exchange_dlx = "exchange_dlx"; //交换机名称 @Bean("exchangeDlx") public Exchange exchangeDlx(){ return ExchangeBuilder.topicExchange(exchange_dlx).durable(true).build(); } @Bean("queueDlx") public Queue queueDlx(){ return QueueBuilder.durable(queue_dlx).build(); } @Bean public Binding bindingExchangeQueueDlx(@Qualifier("queueDlx") Queue queue, @Qualifier("exchangeDlx") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs(); }
3.在正常的队列(test_queue_dlx)下设置参数:x-dead-letter-exchange 和 x-dead-letter-routing-key
@Bean("testQueueDlx") public Queue testQueueDlx(){ Map<String, Object> arguments = new HashMap<>(); //指定死信交换机名称 arguments.put("x-dead-letter-exchange","exchange_dlx"); //指定死信routing-key arguments.put("x-dead-letter-routing-key","dlx.haha"); //为了测试死信队列,设置个过期时间 arguments.put("x-message-ttl",20000); //为了测试死信队列,设置队列消息长度限制 arguments.put("x-max-length",10); return QueueBuilder.durable(test_queue_dlx).withArguments(arguments).build(); }
4.在producer工程下创建测试类,测试消息过期成为死信队列 和 测试消息条数达到限制成为死信队列,注意观察rabbitmq管理页面中的队列变化
@Test public void testDlx(){ //1.测试消息过期成为死信队列 //rabbitTemplate.convertAndSend(RabbitMQConfig.test_exchange_dlx,"test.dlx.hahaha","我是一条消息,我会成为死信队列吗?"); for (int i = 0; i < 11; i++) { //2.测试消息条数达到限制成为死信队列 rabbitTemplate.convertAndSend(RabbitMQConfig.test_exchange_dlx,"test.dlx.hahaha","我是一条消息,我会成为死信队列吗?"); } }
5.在consumer工程下创建监听类,测试客户端拒收消息到死信队列,需要注意的是消息必须设置不重回到队列里去才能到死信队列,然后观察rabbitmq管理页面中的队列变化
package com.ailuti.rabbitmq.config; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class DlxListener { /** * 测试 拒绝消息到死信队列 * @param message * @param channel * @throws Exception */ @RabbitListener(queues = "test_queue_dlx") //生产者创建的队列名称 public void ListenerQueue(Message message, Channel channel)throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { int i = 5 / 0; //模拟抛出异常 //1.接受转换消息 System.out.println(new String(message.getBody())); //2.处理业务逻辑 System.out.println("业务逻辑处理...."); //3.手动签收 channel.basicAck(deliveryTag,true); } catch (Exception e) { System.out.println("出现异常,拒绝介绍消息"); //3.拒绝签收,requeue必须=false:不重回到队列里去才能到死信队列 channel.basicNack(deliveryTag,true,false); } } }
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果正常队列绑定了死信交换机则消息回到死信队列里,如果没有绑定消息则会被丢弃
- 消息成为死信的三种情况:
- 消息超过设置的TTL过期时间
- 消息超过队列设定的长度限制
- 客户端拒收消息且不重回到队列里取
文章评论