springboot集成rabbitmq:fanouttopic
Posted 做一道光
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot集成rabbitmq:fanouttopic相关的知识,希望对你有一定的参考价值。
编写Fanout模式的消息接收
其他模块和上文保持一致https://blog.csdn.net/weixin_59334478/article/details/127740411?spm=1001.2014.3001.5501
ReceiveServiceImpl实现类
package com.it.rabbitmq.impl;
import com.it.rabbitmq.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService
//注入amqp的模板类,利用这个对象来发送和接收消息
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void receiveMessage()
/**
* 发送消息
* 参数1为交换机名称
* 参数2位RoutingKey
* 参数3为具体发送的消息数据
*/
String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
System.out.println(message);
/**
* @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息
* 这个方法不需要手动调用,spring会自动监听
* 属性queues:用于指定一个已经存在的队列名称,用于队列的监听
* @param message 参数就是接收到的具体消息数据
*/
@RabbitListener(queues = "bootDirectQueue")
public void directReceive(String message)
System.out.println("监听器接收的消息:"+message);
@RabbitListener(bindings =
@QueueBinding(value = @Queue(),
exchange = @Exchange(name="fanoutExchange",type = "fanout")
))
public void fanoutReceive1(String message)
System.out.println("fanoutReceive1监听器接收的消息:"+message);
@RabbitListener(bindings =
@QueueBinding(value = @Queue(),
exchange = @Exchange(name="fanoutExchange",type = "fanout")
))
public void fanoutReceive2(String message)
System.out.println("fanoutReceive2监听器接收的消息:"+message);
编写Fanout模式的消息接收
1.SendService接口
package com.it.rabbitmq;
public interface SendService
void sendMessage(String message);
void sendFanoutMessage(String message);
SendServiceImpl实现类
package com.it.rabbitmq.impl;
import com.it.rabbitmq.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendService")
public class SendServiceImpl implements SendService
//注入amqp的模板类,利用这个对象来发送和接收消息
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String message)
/**
* 发送消息
* 参数1为交换机名称
* 参数2位RoutingKey
* 参数3为具体发送的消息数据
*/
amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);
@Override
public void sendFanoutMessage(String message)
amqpTemplate.convertAndSend("fanoutExchange","",message);
2.RabbitMQConfig类
package com.it.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig
//配置一个direct类型的交换机
@Bean
public DirectExchange directExchange()
return new DirectExchange("bootDirectExchange");
//配置一个队列
@Bean
public Queue directQueue()
return new Queue("bootDirectQueue");
/**
* 配置一个交换机和队列的绑定
*
* @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
* @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
* @return
*/
@Bean
public Binding directBinding(Queue directQueue, DirectExchange directExchange)
//完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkey
return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
//配置一个fanout类型的交换机
@Bean
public FanoutExchange fanoutExchange()
return new FanoutExchange("fanoutExchange");
3.运行主函数
package com.it;
import com.it.rabbitmq.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class RabbitmqSpringbootApplication
public static void main(String[] args)
ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);
SendService sendService = (SendService) ac.getBean("sendService");
// sendService.sendMessage("boot的测试数据");
sendService.sendFanoutMessage("boot的fanout测试数据!");
编写Topic模式消息接收
ReceiveServiceImpl实现类
package com.it.rabbitmq.impl;
import com.it.rabbitmq.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService
//注入amqp的模板类,利用这个对象来发送和接收消息
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void receiveMessage()
/**
* 发送消息
* 参数1为交换机名称
* 参数2位RoutingKey
* 参数3为具体发送的消息数据
*/
String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
System.out.println(message);
/**
* @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息
* 这个方法不需要手动调用,spring会自动监听
* 属性queues:用于指定一个已经存在的队列名称,用于队列的监听
* @param message 参数就是接收到的具体消息数据
*/
@RabbitListener(queues = "bootDirectQueue")
public void directReceive(String message)
System.out.println("监听器接收的消息:"+message);
@RabbitListener(bindings =
@QueueBinding(value = @Queue(),
exchange = @Exchange(name="fanoutExchange",type = "fanout")
))
public void fanoutReceive1(String message)
System.out.println("fanoutReceive1监听器接收的消息:"+message);
@RabbitListener(bindings =
@QueueBinding(value = @Queue(),
exchange = @Exchange(name="fanoutExchange",type = "fanout")
))
public void fanoutReceive2(String message)
System.out.println("fanoutReceive2监听器接收的消息:"+message);
@RabbitListener(bindings =
@QueueBinding(value = @Queue("topic1"),
key = "aa",
exchange = @Exchange(name="topicExchange",type = "topic")
))
public void topicReceive1(String message)
System.out.println("topic1消费者---aa---"+message);
@RabbitListener(bindings =
@QueueBinding(value = @Queue("topic2"),
key = "aa.*",
exchange = @Exchange(name="topicExchange",type = "topic")
))
public void topicReceive2(String message)
System.out.println("topic2消费者---aa---"+message);
@RabbitListener(bindings =
@QueueBinding(value = @Queue("topic3"),
key = "aa.#",
exchange = @Exchange(name="topicExchange",type = "topic")
))
public void topicReceive3(String message)
System.out.println("topic3消费者---aa---"+message);
编写Topic模式消息发送
1.SendService接口
package com.it.rabbitmq;
public interface SendService
void sendMessage(String message);
void sendFanoutMessage(String message);
void sendTopicMessage(String message);
SendServiceImpl类
package com.it.rabbitmq.impl;
import com.it.rabbitmq.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendService")
public class SendServiceImpl implements SendService
//注入amqp的模板类,利用这个对象来发送和接收消息
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String message)
/**
* 发送消息
* 参数1为交换机名称
* 参数2位RoutingKey
* 参数3为具体发送的消息数据
*/
amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);
@Override
public void sendFanoutMessage(String message)
amqpTemplate.convertAndSend("fanoutExchange","",message);
@Override
public void sendTopicMessage(String message)
amqpTemplate.convertAndSend("topicExchange","",message);
2.RabbitMQConfig类,提前声明一个topic的交换机
package com.it.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig
//配置一个direct类型的交换机
@Bean
public DirectExchange directExchange()
return new DirectExchange("bootDirectExchange");
//配置一个队列
@Bean
public Queue directQueue()
return new Queue("bootDirectQueue");
/**
* 配置一个交换机和队列的绑定
*
* @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
* @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
* @return
*/
@Bean
public Binding directBinding(Queue directQueue, DirectExchange directExchange)
//完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkey
return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
//配置一个fanout类型的交换机
@Bean
public FanoutExchange fanoutExchange()
return new FanoutExchange("fanoutExchange");
//配置一个topic类型的交换机
@Bean
public TopicExchange topicExchange()
return new TopicExchange("topicExchange");
3.运行主函数
package com.it;
import com.it.rabbitmq.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class RabbitmqSpringbootApplication
public static void main(String[] args)
ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);
SendService sendService = (SendService) ac.getBean("sendService");
// sendService.sendMessage("boot的测试数据");
// sendService.sendFanoutMessage("boot的fanout测试数据!");
sendService.sendTopicMessage("boot的topic测试数据,key:aa");
功能测试:
查看接收类
修改发送信息的routingkey
修改发送信息的routingkey
以上是关于springboot集成rabbitmq:fanouttopic的主要内容,如果未能解决你的问题,请参考以下文章