rabbitmq-topic-支持通配符的订阅模式
Posted gavin苗
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq-topic-支持通配符的订阅模式相关的知识,希望对你有一定的参考价值。
生产者:
package com.gavin.mq.topic; import com.gavin.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 路由订阅模型routing-topic模式: * 相比direct模式,topic模式下,routingKey可以使用通配符 * # 匹配零个或多个词 * * 匹配有且仅有一个词 * audit.# 匹配audit.irs.properties 和audit.irs * audit.* 匹配audit.irs */ public class WorkProvider { @Test public void testSendMessage() throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机 参数1:交换机名称 channel.exchangeDeclare("logs_topic","topic"); //routingKey为info channel.basicPublish("logs_topic","log.info", MessageProperties.PERSISTENT_BASIC,("hello topic").getBytes()); RabbitMQUtils.close(channel,connection); } }
消费者1:
package com.gavin.mq.topic; import com.gavin.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkConsumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_topic","topic"); String tempQueue = channel.queueDeclare().getQueue(); channel.queueBind(tempQueue,"logs_topic","log.*"); channel.basicConsume(tempQueue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
消费者2:
package com.gavin.mq.topic; import com.gavin.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkConsumer2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_topic","topic"); String tempQueue = channel.queueDeclare().getQueue(); channel.queueBind(tempQueue,"logs_topic","log.#"); channel.basicConsume(tempQueue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumer2:"+new String(body)); } }); } }
以上是关于rabbitmq-topic-支持通配符的订阅模式的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式
RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式
如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?
RabbitMQ02_简单模式Publish/Subscribe发布与订阅模式Routing路由模式Topics通配符模式Work模式-轮询公平
RabbitMQ02_简单模式Publish/Subscribe发布与订阅模式Routing路由模式Topics通配符模式Work模式-轮询公平