一、work queues 工作队列模式
1.模式说明
- work queues:与简单模式相比,多了一个或多个消费者,多个消费者共同消费一个队列里的消息,他们之间是竞争的关系,并不能同时获取消息,是轮询获取消息。
- 应用场景:对于任务过重或任务较多情况下使用工作队列可以提高任务的处理速度。例如:短信服务费部署多个,只需要有一个节点成功发送即可。
2.代码实现
在生产者producer工程下创建一个WorkQueues类,完成以下代码
package com.ailuti.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkQueues { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.40.94.18"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 channel.queueDeclare("work_queues",true,false,false,null); //6.发送多条消息 //参数: //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) //exchange: 交换机名称,简单模式下使用“” //routingKey:路由器名称,和队列名称写一样会自动绑定 //props:配置信息 //body: 发送的消息数据 for (int i = 0; i < 10; i++) { String body= "消息" + (i+1) + ":工作队列模式"; channel.basicPublish("","work_queues",null,body.getBytes()); } //7.释放资源 channel.close(); connection.close(); } }
在消费者consumer工程下创建WorkQueues1、WorkQueues2类,完成以下代码
package com.ailuti.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkQueues { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.40.94.18"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("yinlt"); //用户名,默认guest factory.setPassword("admin@123"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 channel.queueDeclare("work_queues",true,false,false,null); //5.接收消息 Consumer consumer = new DefaultConsumer(channel){ /** * 回调方法,当收到消息后,会执行该方法 * @param consumerTag 消息标识 * @param envelope 获取对应的信息,交换机信息...路由key信息... * @param properties 配置信息 * @param body 接收到的真实数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+ new String(body)); } }; //basicConsume(String queue, boolean autoAck, Consumer callback) //参数: //queue:队列名称 //autoAck :是否自动确认 //callback : 回调对象 channel.basicConsume("work_queues",true,consumer); //最后不要关闭资源,关闭了没办法接收生产者消息 } }
然后先执行消费者WorkQueues1、WorkQueues2类,再执行生产者WorkQueues类,会发现消息被WorkQueues1、WorkQueues2分别均匀的分担了,这就是工作队列模式。
文章评论