二、Consumer Ack
1、介绍
ack 指Acknowledge,确认 。表示消费端收到消息后的确认方式。
消费者有三种确认方式:
- 自动确认:acknowledge="none"
- 手动确认:acknowledge="manual"
- 根据异常情况确认:acknowledge="auto"(这种方式使用起来比较麻烦,不做讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应的message从RebbitMQ的消息缓存中移除。但在实际业务处理中,很有可能消息接收到,业务处理异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck()进行手动签收,如果出现异常,则用channel.basicNack()方法,让其重新方式消息。
2、完成代码
设置手动签收: 在yml 添加listener: direct: acknowledge-mode: manual
spring: rabbitmq: host: 10.147.17.158 #IP主机名 port: 5672 #端口 username: guest #用户名 password: guest #密码 virtual-host: / listener: type: direct direct: acknowledge-mode: manual
添加消费者Consumer监听类 RabbitMQListener,完成以下代码
package com.ailuti.rabbitmq.config; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class RabbitMQListener { /** * Consumer Ack 机制(默认就是自动签收,这里不做介绍) * 1.设置手动签收 在yml 添加listener: direct: acknowledge-mode: manual * 2.添加Channel参数 * 3.如果消息成功处理,则调用Channel 的 basicAck()签收 * 4.如果消息处理失败,则调用Channel 的basicNack() 拒收,让消息重新发送 */ @RabbitListener(queues = "test_queue_confirm") //生产者创建的队列名称 public void ListenerQueue(Message message, Channel channel)throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //模拟出错 int i= 7/0; //1.接受转换消息 System.out.println(new String(message.getBody())); //2.处理业务逻辑 System.out.println("业务逻辑处理...."); //3.手动签收 channel.basicAck(deliveryTag,true); } catch (IOException e) { //e.printStackTrace(); //3.拒绝签收,第三个参数:requeue:重回队列,如果为true则消息重新回到queue队列,broker会重新将该消息发送给消费端 channel.basicNack(deliveryTag,true,true); } } }
三、消费端限流
1.介绍
如果业务访问突然暴增,例如每秒有5000个请求,如果全部发送到A系统上,A系统可能因为业务量巨大会崩溃宕机,但是如果将请求发送到MQ去,然后MQ每秒再给A系统1000个请求,这样就能保证业务的正常运行,这就是对消费者的一个限流。
还有一种情况,假如A系统在做升级维护停掉了,所有访问都在MQ累加到了很多,然后A系统恢复之后会有大量请求发送到A系统,这时候也需要进行这么一个限流,所以限流是非常重要的。
2、完成代码
确保ack是手动确认模式,在yml配置中加入 listener: direct: prefetch: 1 #限流机制,数字写几代表每次消费端从mq拉取几个消息,直到消费完才会进行下一次拉取
spring: rabbitmq: host: 10.40.94.18 #IP主机名 port: 5672 #端口 username: guest #用户名 password: guest #密码 virtual-host: / listener: type: direct direct: prefetch: 2 #限流机制,数字写几代表每次消费端从mq拉取几个消息,直到消费完才会进行下一次拉取 acknowledge-mode: manual # Ack 机制 ,设置自动签收或者手动签收
添加消费者Consumer监听类QosListener,完成以下代码
package com.ailuti.rabbitmq.config; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * * Consumer 限流机制 * 1. 确保ack是手动确认模式 * 2.在yml配置中加入 listener: direct: prefetch: 1 #限流机制,数字写几代表每次消费端从mq拉取几个消息,直到消费完才会进行下一次拉取 */ @Component public class QosListener { /** * @param message * @param channel * @throws Exception */ @RabbitListener(queues = "test_queue_confirm") //生产者创建的队列名称 public void ListenerQueue(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { Thread.sleep(2000); //1.接受转换消息 System.out.println(new String(message.getBody())); //2.处理业务逻辑 System.out.println("业务逻辑处理...."); //3.手动签收 channel.basicAck(deliveryTag,true); } catch ( InterruptedException e) { //e.printStackTrace(); //3.拒绝签收,第三个参数:requeue:重回队列,如果为true则消息重新回到queue队列,broker会重新将该消息发送给消费端 channel.basicNack(deliveryTag,true,true); } } }
文章评论