面试官竟让我用Redis实现一个消息队列!
Posted 数据和云
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了面试官竟让我用Redis实现一个消息队列!相关的知识,希望对你有一定的参考价值。
>>>千人线上直播活动报名倒计时(今晚20:00):
众所周知,redis是一个高性能的分布式key-value存储系统,在NoSQL数据库市场上,redis自己就占据了将近半壁江山,足以见到其强大之处。同时,由于redis的单线程特性,我们可以将其用作为一个消息队列。本篇文章就来讲讲如何将redis整合到spring boot中,并用作消息队列的……
一、什么是消息队列
“消息队列”是在消息的传输过程中保存消息的容器。——《百度百科》
消息我们可以理解为在计算机中或在整个计算机网络中传递的数据。
队列是我们在学习数据结构的时候学习的基本数据结构之一,它具有先进先出的特性。
所以,消息队列就是一个保存消息的容器,它具有先进先出的特性。
为什么会出现消息队列?
异步:常见的B/S架构下,客户端向服务器发送请求,但是服务器处理这个消息需要花费的时间很长的时间,如果客户端一直等待服务器处理完消息,会造成客户端的系统资源浪费;而使用消息队列后,服务器直接将消息推送到消息队列中,由专门的处理消息程序处理消息,这样客户端就不必花费大量时间等待服务器的响应了;
-
解耦:传统的软件开发模式,模块之间的调用是直接调用,这样的系统很不利于系统的扩展,同时,模块之间的相互调用,数据之间的共享问题也很大,每个模块都要时时刻刻考虑其他模块会不会挂了;使用消息队列以后,模块之间不直接调用,而是通过数据,且当某个模块挂了以后,数据仍旧会保存在消息队列中。最典型的就是生产者-消费者模式,本案例使用的就是该模式; 削峰填谷:某一时刻,系统的并发请求暴增,远远超过了系统的最大处理能力后,如果不做任何处理,系统会崩溃;使用消息队列以后,服务器把请求推送到消息队列中,由专门的处理消息程序以合理的速度消费消息,降低服务器的压力。
由上图可以看到,消息队列充当了一个中间人的角色,我们可以通过操作这个消息队列来保证我们的系统稳定。
二、环境准备
Java环境:jdk1.8
spring boot版本:2.2.1.RELEASE
redis-server版本:3.2.100
三、相关依赖
这里只展示与redis相关的依赖,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
第一个依赖是对redis NoSQL的支持
-
第二个依赖是spring integration与redis的结合,这里添加这个代码主要是为了实现分布式锁
四、配置文件
这里只展示与redis相关的配置
# redis所在的的地址
spring.redis.host=localhost
# redis数据库索引,从0开始,可以从redis的可视化客户端查看
spring.redis.database=1
# redis的端口,默认为6379
spring.redis.port=6379
# redis的密码
spring.redis.password=
# 连接redis的超时时间(ms),默认是2000
spring.redis.timeout=5000
# 连接池最大连接数
spring.redis.jedis.pool.max-active=16
# 连接池最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接池最大空闲连接
spring.redis.jedis.pool.max-idle=16
# 连接池最大阻塞等待时间(负数表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接redis的客户端名
spring.redis.client-name=mall
五、代码配置
配置RedisTemplate
/**
* 配置RedisTemplate,解决乱码问题
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
LOGGER.debug("redis序列化配置开始");
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// string序列化方式
RedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
// 设置默认序列化方式
template.setDefaultSerializer(serializer);
template.setKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
LOGGER.debug("redis序列化配置结束");
return template;
}
RedisTemplate几种序列化方式的简要介绍
六、redis队列监听器(消费者)
上面说了,与redis队列监听器相关的类为一个名为MessageListener的接口,下面是该接口的源码
public interface MessageListener {
void onMessage(Message message, @Nullable byte[] pattern);
}
message:redis消息类,该类中仅有两个方法
-
byte[] getBody()以二进制形式获取消息体 -
byte[] getChannel()以二进制形式获取消息通道 pattern:二进制形式的消息通道,和message.getChannel()返回值相同
介绍完接口,我们来实现一个简单的redis队列监听器
@Component
public class RedisListener implement MessageListener{
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
LOGGER.debug("从消息通道={}监听到消息",new String(pattern));
LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一个用于反序列化的对象,注意这里的对象要和前面配置的一样
// 因为我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer
// 所以这里的实现方式为GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));
}
}
代码很简单,就是输出参数中包含的关键信息。需要注意的是,RedisSerializer的实现要与上面配置的序列化方式一致。
队列监听器实现完以后,我们还需要将这个监听器添加到redis队列监听器容器中,代码如下:
@Bean
public public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(redisListener, new PatternTopic("demo-channel"));
return container;
}
这几行代码大概意思就是新建一个Redis消息监听器容器,然后将监听器和管道名想绑定,最后返回这个容器。
这里要注意的是,这个管道名和下面将要说的推送消息时的管道名要一致,不然监听器监听不到消息。
七、redis队列推送服务(生产者)
代码如下:
@Service
public class Publisher{
@Autowrite
private RedisTemplate redis;
public void publish(Object msg){
redis.convertAndSend("demo-channel",msg);
}
}
这里还是要注意上面所说的,生产者和消费者的通道名要相同。
至此,消息队列的生产者和消费者已经全部编写完成。
八、遇到的问题及解决办法
1、spring boot使用log4j2日志框架问题
在我添加了spring-boot-starter-log4j2依赖并在spring-boot-starter-web中排除了spring-boot-starter-logging后,运行项目,还是会提示下面的错误:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
这个错误就是maven中有多个日志框架导致的。后来通过依赖分析,发现在spring-boot-starter-data-redis中,也依赖了spring-boot-starter-logging,解决办法也很简单,下面贴出详细代码
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
2、redis队列监听器线程安全问题
单一消费者(一个通道只有一个消费者)的解决办法
多个消费者(一个通道有多个消费者)的解决办法
那么这种问题如何解决呢?分布式锁。
下面来简要科普一下什么是分布式锁:
分布式锁是指在分布式环境下,同一时间只有一个客户端能够从某个共享环境中(例如redis)获取到锁,只有获取到锁的客户端才能执行程序。
然后分布式锁一般要满足:排他性(即同一时间只有一个客户端能够获取到锁)、避免死锁(即超时后自动释放)、高可用(即获取或释放锁的机制必须高可用且性能佳)
首先讲一下如何使用,导入了依赖以后,首先配置一个Bean
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) {
return new RedisLockRegistry(factory, "demo-lock",60);
}
使用锁的方法,下面是对监听器的修改
@Component
public class RedisListener implement MessageListener{
@Autowrite
private RedisLockRegistry redisLockRegistry;
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
Lock lock=redisLockRegistry.obtain("lock");
try{
lock.lock(); //上锁
LOGGER.debug("从消息通道={}监听到消息",new String(pattern));
LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一个用于反序列化的对象,注意这里的对象要和前面配置的一样
// 因为我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer
// 所以这里的实现方式为GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //解锁
}
}
}
以上就是本文的全部内容。
近期精选:
1.
2.
点击图片了解更多 ↓
云和恩墨大讲堂 | 一个分享交流的地方
请备注:云和恩墨大讲堂
点个“在看” 以上是关于面试官竟让我用Redis实现一个消息队列!的主要内容,如果未能解决你的问题,请参考以下文章 一个简单需求竟让我改了十几处代码,必须控诉到底什么是重复代码! 面试了一位 46 岁的程序员,思绪万千,最后结局竟让我大惊失色!
你的喜欢会被看到