四、Topics通配符模式
1.模式说明
*和#分别代表通配符,*代表一个单词,#代表零个或多个单词
2.代码实现
在生产者producer工程下创建Topics类,完成以下代码
package com.ailuti.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Topics { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.147.17.158"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建交换机 //exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) /** * 1.exchange:交换机名称 * 2.type:交换机类型 * DIRECT("direct"), //定向 * FANOUT("fanout"), //扇形(广播),发送消息到每一个绑定的队列 * TOPIC("topic"), // 通配符方式 * HEADERS("headers"); //参数匹配 * 3.durable:是否持久化 * 4.autoDelete :是否自动删除 * 5.internal: 内部使用。一般都是false * 6.arguments: 参数 */ String exchangeName = "topic_test"; //广播 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); //6.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 String queueuName1 = "topic_test_1"; String queueuName2 = "topic_test_2"; channel.queueDeclare(queueuName1,true,false,false,null); channel.queueDeclare(queueuName2,true,false,false,null); //7.绑定队列和交换机 //queueBind(String queue, String exchange, String routingKey) //queue:队列名称 //exchange:交换机名称 //routingKey: 路由键,绑定规则,如果交换机类型为fanout,则为"",即广播给所有 //假如routingkey 系统的名称.日志的级别 //需求:所有error级别的日志 信息存储到数据库,所有order系统日志存储到数据库 channel.queueBind(queueuName1,exchangeName,"#.error"); channel.queueBind(queueuName1,exchangeName,"order.*"); channel.queueBind(queueuName2,exchangeName,"*.*"); //8.发送消息 String body = "日志信息:张三调用了findAll...日志级别:info..."; channel.basicPublish(exchangeName,"goods.error",null,body.getBytes()); //9.释放资源 channel.close(); connection.close(); } }
在消费者consumer工程下,创建Topic1和Topic2类,分别完成以下代码
package com.ailuti.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Topic1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.147.17.158"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 String queueuName1 = "topic_test_1"; String queueuName2 = "topic_test_2"; channel.queueDeclare(queueuName1,true,false,false,null); //5.接收消息 Consumer consumer = new DefaultConsumer(channel){ /** * 回调方法,当收到消息后,会执行该方法 * @param consumerTag 消息标识 * @param envelope 获取对应的信息,交换机信息...路由key信息... * @param properties 配置信息 * @param body 接收到的真实数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+ new String(body)); System.out.println("将日志信息保存到数据库...."); } }; //basicConsume(String queue, boolean autoAck, Consumer callback) //参数: //queue:队列名称 //autoAck :是否自动确认 //callback : 回调对象 channel.basicConsume(queueuName1,true,consumer); //最后不要关闭资源,关闭了没办法接收生产者消息 } }
package com.ailuti.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Topic2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("10.147.17.158"); //IP地址,默认值:localhost factory.setPort(5672); //端口,默认5672 factory.setVirtualHost("itcast"); //虚拟机,默认/根目录 factory.setUsername("xxx"); //用户名,默认guest factory.setPassword("xxx"); //密码,默认guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列 //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //参数: // queue:队列名称 // durable:是否持久化,当mq重启后还在 // exclusive:是否独占(只能有一个消费者监听这队列),当Connection连接关闭时,是否删除队列 // autoDelete:是否自动删除。当没有consumer(消费者)时,自动删除 // arguments:参数信息 //如果没有hello_world的队列则会创建,如果有则不会 String queueuName1 = "topic_test_1"; String queueuName2 = "topic_test_2"; channel.queueDeclare(queueuName1,true,false,false,null); //5.接收消息 Consumer consumer = new DefaultConsumer(channel){ /** * 回调方法,当收到消息后,会执行该方法 * @param consumerTag 消息标识 * @param envelope 获取对应的信息,交换机信息...路由key信息... * @param properties 配置信息 * @param body 接收到的真实数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+ new String(body)); System.out.println("将日志信息打印到控制台...."); } }; //basicConsume(String queue, boolean autoAck, Consumer callback) //参数: //queue:队列名称 //autoAck :是否自动确认 //callback : 回调对象 channel.basicConsume(queueuName2,true,consumer); //最后不要关闭资源,关闭了没办法接收生产者消息 } }
然后执行消费者工程下Topic1 和Topic2类,接着测试生产者发送不同类型的消息,可以观察到通配符让我们的控制更加灵活
五、总结
简单模式 HelloWorld:一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
工作队列模式 Work Queue:一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
发布订阅模式 PubSub(Publish/Subscribe):需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当消息发送到交换机后,交换机会将消息发送到所有绑定的队列
路由模式 RoutingKey:需要设置类型为direct的交换机,交换机和队列进行绑定,并指定RoutingKey,当消息发送到交换机后,交换机会根据RoutingKey将消息发送到对应的队列
最后附上代码:点我下载完整代码
文章评论