ssm+RabbitMQ 整合

Posted winddogg

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ssm+RabbitMQ 整合相关的知识,希望对你有一定的参考价值。

1. 配置说明

1.配置rabbitmq.properties

rmq.ip=192.168.5.109
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=test
rmq.manager.password=123456

2.使用注解配置RabbitMqConfiguration类,注入spring容器

@Component
public class RabbitMqConfiguration {
    
    private static Properties props;
    
    private static String ip;
    private static String port;
    private static String username;
    private static String password;

    private static final String CONF_NAME = "rabbitmq.properties";

    static {
        props = new Properties();
        try {
            String path = Tools.getRootPlugInPath() + CONF_NAME;
            System.out.println(path);
            props.load(new InputStreamReader(new FileInputStream(path), "UTF-8"));
            ip = props.getProperty("rmq.ip");
            port = props.getProperty("rmq.port");
            username = props.getProperty("rmq.manager.user");
            password = props.getProperty("rmq.manager.password");
        } catch (IOException e) {
            System.out.println("配置文件读取异常");
        }
    }
    
    @Bean
    public AmqpTemplate rabbitTemplate() {
        System.out.println("#############################-------RabbitMq Start--------################################");
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        RetryTemplate retryTemplate = new RetryTemplate();
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(500);
        backOffPolicy.setMultiplier(10.0);
        backOffPolicy.setMaxInterval(30000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        template.setRetryTemplate(retryTemplate);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        System.out.println("#############################-------RabbitMq End--------################################");
        return template;
    }
    
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(ip);
        connectionFactory.setPort(Integer.valueOf(port));
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        System.out.println(connectionFactory);
        return connectionFactory;
    }
}

3.工具类RabbitmqService,写了发送消息、接收消息的公用方法

@Service
public class RabbitmqService {
    
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Resource
    private AmqpTemplate amqpTemplate;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    public boolean sendMsg(String exchangeName,String queueName, Object object) {
        try {
            amqpTemplate.convertAndSend(exchangeName,queueName, object);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
        return true;
    }

    public String reciveMsg(String queueName) {

        try {
            Message ms = amqpTemplate.receive(queueName);
            return ms.toString();
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
        return "";
    }

    public int getCount(String exchangeName,String queueName){
        ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
        // 创建连接
        Connection connection = connectionFactory.createConnection();
        // 创建通道
        Channel channel = connection.createChannel(false);
        // 设置消息交换机
        try {
            channel.exchangeDeclare(exchangeName, "direct", true, false, null);
        } catch (IOException e) {
            e.printStackTrace();
        }
        AMQP.Queue.DeclareOk declareOk = null;
        try {
            declareOk = channel.queueDeclarePassive(queueName);
        } catch (IOException e) {
            e.printStackTrace();
        }
        //获取队列中的消息个数
        int queueCount = declareOk.getMessageCount();

        // 关闭通道和连接
        try {
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        connection.close();

        return queueCount;

    }
    
}

2. 发送消息调用(生产者)

@Resource
private RabbitmqService rabbitmqService;

rabbitmqService.sendMsg("writeSYLog-exchange","writeSYLog", message);

在代码中注入RabbitmqService ,调用sendMsg方法,其中

参数1 为交换机名称

参数2为 队列名称

参数3为要发送的消息

3. 接收消息调用(消费者)

① 第一种:同步接收消息

 @Resource
private RabbitmqService rabbitmqService;

在代码中注入RabbitmqService ,分别调用getCount

reciveMsg方法,获取队列中的消息数量以及接收消息

其中:

getCount()方法(默认使用了direct类型) 根据交换机名称和队列名称查询队列中所有的待消费消息

参数1 为交换机名称

参数2为 队列名称

reciveMsg()方法,根据队列名消费消息

参数 为队列名称

 

@Resource
private RabbitmqService rabbitmqService;

@RequestMapping("testrecive")
    public void testrecive() {
        int count = rabbitmqService.getCount("writeSYLog-exchange","writeSYLog");
        for (int i =0;i<count;i++){
            String resultMessage = rabbitmqService.reciveMsg("writeSYLog");
            System.out.println(resultMessage);
        }

    }

② 第二种:异步接收消息

使用监听器来异步接收消息,配置spring-rabbitmq.xml

<!-- 配置rabbitMQ服务基本信息 -->

<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}" username="${rmq.manager.user}" password="${rmq.manager.password}"/>
<rabbit:admin connection-factory="connectionFactory"/>

 

<!-- 以下配置可配置多个 -->
<!-- 配置rabbitMQ监听器类 -->
<bean id="queueListenter" class="com.censoft.cends.listener.DirectReceiver"></bean>
<!-- 配置监听器监听的队列名 -->
 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
   <rabbit:listener queue-names="writeSYLog" ref="queueListenter" />
</rabbit:listener-container>

 

   创建com.censoft.cends.listener.DirectReceiver类继承MessageListener类,重写onMessage方法,获取消息进行消费

public class DirectReceiver implements MessageListener {
    @Override
    public void onMessage(Message msg) {
        String str = "";
        try {
            str = new String(msg.getBody(), "UTF-8");
            logger.info("==获取消息==" + str);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

 



以上是关于ssm+RabbitMQ 整合的主要内容,如果未能解决你的问题,请参考以下文章

Java项目:网上图书商城系统(java+SSM+Jsp+MySQL+Redis+JWT+Shiro+RabbitMQ+Vue+EasyUI)

SpringBoot整合SSM(代码实现Demo)

RabbitMQ---延迟队列,整合springboot

史上最详细的IDEA优雅整合Maven+SSM框架(详细思路+附带源码)

SSM框架整合

SSM 项目整合详细解读