六、延迟队列
1、介绍
延迟队列,即消息到达队列时不会立即消费,只有到达指定的时间后才会被消费。
应用场景:
1.下单后,30分钟以后判断用户是否未支付,如未支付则订单自动取消,回滚库存
2.新用户注册成功7天后,发送消息问候
不过RabbitMQ中没有提供延迟队列,但是可以使用:TTL+死信队列 组合实现延迟队列效果。
2、代码实现
在producer创建死信队列并设置SLL过期
//----------------延迟队列----------------- public static final String queue_delay = "queue_delay"; //队列名称 public static final String exchange_delay = "test_exchange_dlx"; //交换机名称 @Bean("exchangeDelay") public Exchange exchangeDelay(){ return ExchangeBuilder.topicExchange(exchange_delay).durable(true).build(); } @Bean("queueDelay") public Queue queueDelay(){ Map<String, Object> arguments = new HashMap<>(); //指定死信交换机名称 arguments.put("x-dead-letter-exchange",exchange_delay_die); //指定死信routing-key arguments.put("x-dead-letter-routing-key","delay.haha"); //设置订单超时时间 arguments.put("x-message-ttl",(1000*60*30)); return QueueBuilder.durable(queue_delay).withArguments(arguments).build(); } @Bean public Binding bindingExchangeQueueDelay(@Qualifier("queueDelay") Queue queue, @Qualifier("exchangeDelay") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs(); } //2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx) public static final String queue_delay_die = "queue_delay_die"; //队列名称 public static final String exchange_delay_die = "exchange_delay_die"; //交换机名称 @Bean("exchangeDelayDie") public Exchange exchangeDelayDie(){ return ExchangeBuilder.topicExchange(exchange_delay_die).durable(true).build(); } @Bean("queueDelayDie") public Queue queueDelayDie(){ return QueueBuilder.durable(queue_delay_die).build(); } @Bean public Binding bindingExchangeQueueDelayDie(@Qualifier("queueDelayDie") Queue queue, @Qualifier("exchangeDelayDie") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("delay.#").noargs(); }
测试延迟队列发送信息
@Test public void testDelay(){ //1.测试消息过期成为死信队列 rabbitTemplate.convertAndSend(RabbitMQConfig.exchange_delay,"order.2022030205045645","用户下了一个华为手机订单,库存-1"); }
在consumer消费端创建监听死信队列,观察超过SSL时间后消息到死信队列并被接收
package com.ailuti.rabbitmq.config; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DlxListener { /** * 测试 拒绝消息到死信队列 * @param message * @param channel * @throws Exception */ @RabbitListener(queues = "test_queue_dlx") //生产者创建的队列名称 public void ListenerQueue(Message message, Channel channel)throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { int i = 5 / 0; //模拟抛出异常 //1.接受转换消息 System.out.println(new String(message.getBody())); //2.处理业务逻辑 System.out.println("业务逻辑处理...."); //3.手动签收 channel.basicAck(deliveryTag,true); } catch (Exception e) { System.out.println("出现异常,拒绝介绍消息"); //3.拒绝签收,requeue必须=false:不重回到队列里去才能到死信队列 channel.basicNack(deliveryTag,true,false); } } }
30分钟后由于消息没有被确认而到了死信队列,再进行相关的业务处理
七、日志与监控
1、日志
RabbitMQ的默认日志存放路径:/var/log/rabbitmq/rabbit@xxx(主机名).log
日志将来可能出问题的时候可能需要进行观察分析
2、监控
监控指的就是RabbitMQ的web管理页面,这里就不多做介绍了
3、命令
rabbitmq常见命令
查看队列 # rabbitmqctl list_queues 查看exchanges # rabbitmactl list_exchanges 查看用户 # rabbitmactl list_users 查看连接 # rabbitmgctl list_connections 查看环境变量 # rabbitmgctl environment 查看未被确认的队列 # rabbitmgctl list_queues name messages_unacknowledged 查看单个队列的内存使用 # rabbitmgctl list_queues name memory 查看准备就绪的队列 # rabbitmqctl list_queues name messages_ready 查看消费者信息 # rabbitmgctl list_consumers
文章评论