三、Routing路由模式
1.模式说明
- 队列与交换机绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方向在Exchange发送消息时,也必须指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列 RoutingKey与消息的RoutingKey一致时,才会接收到消息
2.代码实现
在生产者producer工程下创建Routing类,完成以下代码,分别执行发送info消息和error消息
package com.ailuti.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Routing { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.147.17.158"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建交换机 //exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) /** * 1.exchange:交换机名称 * 2.type:交换机类型 * DIRECT("direct"), //定向 * FANOUT("fanout"), //扇形(广播),发送消息到每一个绑定的队列 * TOPIC("topic"), // 通配符方式 * HEADERS("headers"); //参数匹配 * 3.durable:是否持久化 * 4.autoDelete :是否自动删除 * 5.internal: 内部使用。一般都是false * 6.arguments: 参数 */ String exchangeName = "direct_test"; //广播 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null); //6.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 String queueuName1 = "direct_test_1"; String queueuName2 = "direct_test_2"; channel.queueDeclare(queueuName1,true,false,false,null); channel.queueDeclare(queueuName2,true,false,false,null); //7.绑定队列和交换机 //queueBind(String queue, String exchange, String routingKey) //queue:队列名称 //exchange:交换机名称 //routingKey: 路由键,绑定规则,如果交换机类型为fanout,则为"",即广播给所有 //队列1 的绑定error channel.queueBind(queueuName1,exchangeName,"error"); //队列1 的绑定info/error/warning channel.queueBind(queueuName2,exchangeName,"info"); channel.queueBind(queueuName2,exchangeName,"error"); channel.queueBind(queueuName2,exchangeName,"warning"); //8.发送消息 //String body = "日志信息:张三调用了select方法...日志级别:info..."; String body = "日志信息:张三调用了delete方法且报错了...日志级别:error..."; //channel.basicPublish(exchangeName,"info",null,body.getBytes()); channel.basicPublish(exchangeName,"error",null,body.getBytes()); //9.释放资源 channel.close(); connection.close(); } }
在消费者consumer工程下创建Routing1和Routing2类,然后执行
package com.ailuti.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Routing1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.147.17.158"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 String queueuName1 = "direct_test_1"; String queueuName2 = "direct_test_2"; channel.queueDeclare(queueuName1,true,false,false,null); //5.接收消息 Consumer consumer = new DefaultConsumer(channel){ /** * 回调方法,当收到消息后,会执行该方法 * @param consumerTag 消息标识 * @param envelope 获取对应的信息,交换机信息...路由key信息... * @param properties 配置信息 * @param body 接收到的真实数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+ new String(body)); System.out.println("将日志信息保存到数据库...."); } }; //basicConsume(String queue, boolean autoAck, Consumer callback) //参数: //queue:队列名称 //autoAck :是否自动确认 //callback : 回调对象 channel.basicConsume(queueuName1,true,consumer); //最后不要关闭资源,关闭了没办法接收生产者消息 } }
package com.ailuti.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Routing2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.147.17.158"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 String queueuName1 = "direct_test_1"; String queueuName2 = "direct_test_2"; channel.queueDeclare(queueuName1,true,false,false,null); //5.接收消息 Consumer consumer = new DefaultConsumer(channel){ /** * 回调方法,当收到消息后,会执行该方法 * @param consumerTag 消息标识 * @param envelope 获取对应的信息,交换机信息...路由key信息... * @param properties 配置信息 * @param body 接收到的真实数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+ new String(body)); System.out.println("将日志信息输出到控制台...."); } }; //basicConsume(String queue, boolean autoAck, Consumer callback) //参数: //queue:队列名称 //autoAck :是否自动确认 //callback : 回调对象 channel.basicConsume(queueuName2,true,consumer); //最后不要关闭资源,关闭了没办法接收生产者消息 } }
执行后发现info信息只被队列routing2接收到了。但是error信息却被routing1和routing2同时接收到了
Routing路由方式要求队列在绑定交换机的时候要绑定RoutingKey,消息转发的符合的Routingkey的队列。
文章评论