Redis消息队列

Posted 起床oO

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis消息队列相关的知识,希望对你有一定的参考价值。

1.消息队列有两种模式:发布者订阅者模式,生产者消费者模式

发布者订阅者模式:发布者发送消息到队列,每个订阅者都能收到一样的信息。

生产者消费者模式:生产者将消息放入队列,多个消费者共同监听,谁先抢到资源,谁就从队列中取走消息去处理,注意,每个消息最多只能被一个消费者接收。

2.Redis消息队列使用场景

可以使用消息队列来实现短信的服务化,任何需要发送短信息的模块,都可以直接调用短信服务来完成短信的发送。比如用户系统登录注册短信,订单系统的下单成功的短信等。

3.Spring MVC中实现Redis消息队列(GitHub地址:Redis_MQ)

3.1 引入Redis相应的依赖包,jedis-2.9.0.jar    commons-pool2-2.5.0.jar    spring-data-redis-1.6.0.RELEASE.jar (安利个下jar包的网站:http://www.mvnjar.com/search.html

3.2 新建redis.properties文件

host=10.0.20.251
port=6379

3.3配置application-redis.xml文件(applicationContext-redis.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:cache="http://www.springframework.org/schema/cache"
    xmlns:redis="http://www.springframework.org/schema/redis"
    xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
                            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-34.0.xsd     
                            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
                            http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop.xsd
                            http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
                            http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">
    <description>spring-data-redis配置</description>
    <context:component-scan base-package="com.fpc.Entity"></context:component-scan>
    <!-- 引入配置文件 -->  
    <bean id="propertyConfigurer"  
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
        <property name="location" value="classpath:redis.properties" />  
    </bean> 

    <bean id="redisConnectionFactory"
        class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
        <property name="hostName" value="${host}"></property>
        <property name="port" value="${port}"></property>
        <property name="usePool" value="true"></property>
    </bean>

    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
        <property name="connectionFactory" ref="redisConnectionFactory"></property>
    </bean>
    <!-- 序列化:一般我们想Redis发送一个消息定义的Java对象,这个对象需要序列化。这里使用JdkSerializationRedisSerializer -->
    <bean id="jdkSerializer"
        class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />

    <!-- 监听器 -->
    <bean id="smsMessageListener"
        class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="smsMessageDelegateListener" />
        <property name="serializer" ref="jdkSerializer" />
    </bean>

    <!-- 发送者 -->
    <bean id="sendMessage" class="com.fpc.Entity.sendMessage">
        <property name="redisTemplate" ref="redisTemplate"/>
    </bean>

    <!-- redis:listener-container:定义消息监听,method:监听消息执行的方法,serializer:序列化,topic:监听主题(可以理解为队列的名称) -->
    <redis:listener-container>
        <redis:listener ref="smsMessageListener" method="handleMessage"
            serializer="jdkSerializer" topic="sms_queue_web_online" />
    </redis:listener-container>

    <!-- jedis -->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxIdle" value="300" /> <!-- 最大能够保持idel状态的对象数  -->
        <property name="maxTotal" value="60000" /> <!-- 最大分配的对象数 -->
        <property name="testOnBorrow" value="true" /> <!-- 当调用borrow Object方法时,是否进行有效性检查 -->
    </bean>

    <bean id="jedisPool" class="redis.clients.jedis.JedisPool">
        <constructor-arg index="0" ref="jedisPoolConfig" />
        <constructor-arg index="1" value="${host}" type="java.lang.String"/>
        <constructor-arg index="2" value="${port}" type="int" />
    </bean>

</beans>

3.4新建一个消息实体(Message)

package com.fpc.Entity;

import java.io.Serializable;
import java.util.Date;

public class Message implements Serializable {
    private Integer messageId;
    private String mobileNumber;//电话号码
    private Byte type;//消息类型,1:登录验证码,2:订单信息
    
    private Date createDate;//消息创建的时间
    
    //消息处理时间
    private Date processTime;
    
    //消息状态:1:未发送,2:发送成功,3:发送失败
    private Byte status;
    
    private String content;//消息主体

    public Integer getMessageId() {
        return messageId;
    }

    public void setMessageId(Integer messageId) {
        this.messageId = messageId;
    }

    public String getMobileNumber() {
        return mobileNumber;
    }

    public void setMobileNumber(String mobileNumber) {
        this.mobileNumber = mobileNumber;
    }

    public Byte getType() {
        return type;
    }

    public void setType(Byte type) {
        this.type = type;
    }

    public Date getCreateDate() {
        return createDate;
    }

    public void setCreateDate(Date createDate) {
        this.createDate = createDate;
    }

    public Date getProcessTime() {
        return processTime;
    }

    public void setProcessTime(Date processTime) {
        this.processTime = processTime;
    }

    public Byte getStatus() {
        return status;
    }

    public void setStatus(Byte status) {
        this.status = status;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

3.5定义一个发送消息的对象(sendMessage

package com.fpc.Entity;

import java.io.Serializable;

import org.springframework.data.redis.core.RedisTemplate;

//定义一个消息发送对象
public class sendMessage {
    private RedisTemplate<String, Object> redisTemplate;

    public RedisTemplate<String, Object> getRedisTemplate() {
        return redisTemplate;
    }

    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    
    public void sendMessages( String channel,Serializable message ){
        redisTemplate.convertAndSend(channel, message);
    }
}

3.6定义一个监听器(smsMessageDelegateListener

package com.fpc.Entity;

import java.io.Serializable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.springframework.stereotype.Component;

import redis.clients.jedis.Jedis;

//监听消息
@Component("smsMessageDelegateListener")
public class smsMessageDelegateListener {
    //监听Redis消息
    public void handleMessage( Serializable message ){
        Message mess = (Message) message;
        
        //发送短信
        //手机号: mess.getMobileNumber()
        //短信内容:mess.getContent();
        //send,发送状态sendStatus
        //如果发送不成功则直接return,离开该方法,或者继续重试
        //如果发送成功则需要,异步改写短信的状态;
        Jedis jedis = new Jedis("10.0.20.251");
        jedis.set("message", mess.getContent());
        Executor executor = Executors.newSingleThreadExecutor();
        executor.execute(new Runnable() {
            
            @Override
            public void run() {
                // TODO 自动生成的方法存根
                //读写短信数据表,将短信的发送状态改为已发送
            }
        });
        
    }
}

注:该监听器中注释已经写得很明白了,你可以在此处对数据进行持久化即写库操作,由于本demo中没有写库(建一张message表,然后写dao,service,serviceImpl等,就跟正常操作别的表是一样的,所以此处做数据持久化了)

然后在该监听器可以开启一个异步线程,异步改写短信的状态。

3.7.在controller中写一个get_message方法(get_message

@RequestMapping("/get_message")
    public ModelAndView get_message( @RequestParam("mobileNumber") String moblieNumber ){
        //取得电话号码,构造消息对象,然后通过短信服务器生成验证码发送到该手机上
        Message message = new Message();
        message.setMobileNumber(moblieNumber);
        message.setType((byte)1); //验证消息
        message.setContent("23456"); //消息的内容
        message.setStatus((byte)1);//未发送状态
        message.setCreateDate(new Date());
        //1.可以把待发送的消息存库,也可以不存库,现在先不存库
        
        //2.异步发送短信到redis队列
        //2.1先构造一个消息发送对象
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-redis.xml");
        sendMessage sendm = (sendMessage)context.getBean("sendMessage");
        sendm.sendMessages("sms_queue_web_online", message);
        System.out.println(message.getContent());
        ModelAndView modelAndView = new ModelAndView();
        modelAndView.addObject("code",message.getContent());
        modelAndView.setViewName("user");
        return modelAndView;
}

注:此处没有真正调用发短信的方法,而是硬编码为“23456”

3.8编写jsp页面,user.jsp

<%@ page language="java" contentType="text/html; charset=ISO-8859-1"
    pageEncoding="UTF-8"%>
<%@ taglib prefix="shiro" uri="http://shiro.apache.org/tags" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>Insert title here</title>
</head>
<body>
    <shiro:user>
        welcome[<shiro:principal/>]login,<a href="Shiro/logout">logout</a>
    </shiro:user>
    <form action="Shiro/get_message" method="POST">
        phonenumber: <input type="text" name="mobileNumber"/>
        <br>
        code: <input type="text" name="password" value="${code}"/>
        <br>
        <input type="submit" value="get code">
    </form>
</body>
</html>

运行效果:

然后我们去Redis服务器端看下,数据有没有被保存在Redis中,然后借此判断监听器有没有执行:

可以看到Redis服务器中已经有值存在了,说明消息队列监听器是确实在监听的,你可以根据实际的业务改变监听器中的代码和controller中的代码。

以上是关于Redis消息队列的主要内容,如果未能解决你的问题,请参考以下文章

消息队列为啥用redis实现

Redis的基本使用(二) 消息队列

redis怎么做消息队列

Redis(五)-特性-消息队列

Redis进阶学习04---秒杀优化和消息队列

spring-data-redis 怎么监听消息队列有消息来了