二、Pub/Sub订阅模式
1.模式说明
在订阅模式中,引用了一个新的角色Exchange,而且过程有变化
- P: produrce(生产者),也就是要发送消息的程序,但是不发送到队列中,而是发送给X(交换机)
- C1、C2:consomer(消费者),会一直等待接收消息
- Queue:图中红色部分(消息队列),用来接收消息并缓存消息
- X:Exchange(交换机),一方面接收生产者的消息,另一方面,知道如何处理消息,把消息分给哪个队列、或递交所有队列、或是将消息丢弃。到底如何操作,取决于Exchange类型。Exchange常见的类型有以下三种:
- Fanout:广播,将消息传递给所有绑定的队列
- Direct:定向,把消息传递给指定的routing key的队列
- Topic:通配符,把消息交给符合的routing pattan(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或是没有符合的路由规则的队列,那么消息会丢失。
2.代码实现
在生产者producer工程下创建一个PubSub类,完成以下代码
package com.ailuti.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PubSub { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.147.17.158"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建交换机 //exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) /** * 1.exchange:交换机名称 * 2.type:交换机类型 * DIRECT("direct"), //定向 * FANOUT("fanout"), //扇形(广播),发送消息到每一个绑定的队列 * TOPIC("topic"), // 通配符方式 * HEADERS("headers"); //参数匹配 * 3.durable:是否持久化 * 4.autoDelete :是否自动删除 * 5.internal: 内部使用。一般都是false * 6.arguments: 参数 */ String exchangeName = "fanout_test"; //广播 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); //6.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 String queueuName1 = "fanout_test_1"; String queueuName2 = "fanout_test_2"; channel.queueDeclare(queueuName1,true,false,false,null); channel.queueDeclare(queueuName2,true,false,false,null); //7.绑定队列和交换机 //queueBind(String queue, String exchange, String routingKey) //queue:队列名称 //exchange:交换机名称 //routingKey: 路由键,绑定规则,如果交换机类型为fanout,则为"",即广播给所有 channel.queueBind(queueuName1,exchangeName,""); channel.queueBind(queueuName2,exchangeName,""); //8.发送消息 String body = "日志信息:张三调用了findAll方法...日志级别:info..."; channel.basicPublish(exchangeName,"",null,body.getBytes()); //9.释放资源 channel.close(); connection.close(); } }
在消费者consumer工程下创建PubSub1和PubSub2类,分别监听fanout_test_1和fanout_test_2两个队列
package com.ailuti.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PubSub1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.147.17.158"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 String queueuName1 = "fanout_test_1"; String queueuName2 = "fanout_test_2"; channel.queueDeclare(queueuName1,true,false,false,null); //5.接收消息 Consumer consumer = new DefaultConsumer(channel){ /** * 回调方法,当收到消息后,会执行该方法 * @param consumerTag 消息标识 * @param envelope 获取对应的信息,交换机信息...路由key信息... * @param properties 配置信息 * @param body 接收到的真实数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+ new String(body)); System.out.println("将日志信息打印到控制台...."); } }; //basicConsume(String queue, boolean autoAck, Consumer callback) //参数: //queue:队列名称 //autoAck :是否自动确认 //callback : 回调对象 channel.basicConsume(queueuName1,true,consumer); //最后不要关闭资源,关闭了没办法接收生产者消息 } }
package com.ailuti.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PubSub2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.147.17.158"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 String queueuName1 = "fanout_test_1"; String queueuName2 = "fanout_test_2"; channel.queueDeclare(queueuName2,true,false,false,null); //5.接收消息 Consumer consumer = new DefaultConsumer(channel){ /** * 回调方法,当收到消息后,会执行该方法 * @param consumerTag 消息标识 * @param envelope 获取对应的信息,交换机信息...路由key信息... * @param properties 配置信息 * @param body 接收到的真实数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+ new String(body)); System.out.println("将日志信息保存到数据库...."); } }; //basicConsume(String queue, boolean autoAck, Consumer callback) //参数: //queue:队列名称 //autoAck :是否自动确认 //callback : 回调对象 channel.basicConsume(queueuName2,true,consumer); //最后不要关闭资源,关闭了没办法接收生产者消息 } }
然后执行生产者工程下的PubSub会发现rabbitmq web管理下出现了两个队列,再执行消费者PubSub1和消费者PubSub2 ,可以发现两个消费者都接收到了数据,并执行了不同操作。
PubSub模式和WorkQueues模式区别是:WorkQueues有很多消费者监听一个队列,但是只能有一个消费者收到,PubSub则是很多消费者都监听自己的队列,每一个消费者都可以接收到这个消息。
文章评论