教你如何基于Redis来实现高性能延时消息队列!
Posted Javatutouhouduan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了教你如何基于Redis来实现高性能延时消息队列!相关的知识,希望对你有一定的参考价值。
最近在倒腾自建博客后端系统,需要用到延时任务的功能,但手头只有一套mysql和Redis,如果搞一套MQ成本有点大,于是想着用redis实现延时消息队列。有些场景用数据库的定时扫表也能简单实现延时消息的功能,不过对于我这边的实际场景(比如计数系统)其实是把数据存到redis中,如果用数据库实现延时消息会对数据库有比较大的压力。
系统设计
这里参考了有赞的延迟队列设计
数据结构设计
事件消息体
type EventEntity struct
EventId int64
Topic string
Body string
EffectTime time.Time
复制代码
- EVENT_POOL: 使用redis的hash,里面存储了任务事件的完整信息,key=prefix+namespace+topic,field=EventId, val=EventEntity;
- EVENT_BUCKET: 使用redis的zset,里面存储了任务事件的有序集合,key=prefix+namespace+topic,score=EffectTime, member=EventId;
- EVENT_QUEUE: 使用redis的list, list中存储了到期待消费任务的EventId。
延迟队列的执行流程
1、当有新增延时任务过来时,会在EVENT_POOL对应的topic中添加一条记录,同时也会把任务添加到EVENT_BUCKET中,按生效时间排序;
2、搬运线程会定时扫描EVENT_BUCKET中已经到期的任务,将这些任务push到EVENT_QUEUE对应topic的队列当中,之后将这些任务从EVENT_BUCKET中删除;
3、EVENT_QUEUE每个topic会有一个监听线程,当发现当前topic队列中有待消费的任务,则会将任务pop出来,并从EVENT_POOL中查询任务详情,交给consumer消费。
代码实现
核心代码
发布延时任务
func (q *DelayQueue) PublishEvent(ctx context.Context, event *EventEntity) error
pipeline := q.redisClient.WithContext(ctx).Pipeline()
defer pipeline.Close()
// 向EVENT_POOL中添加任务
pipeline.HSet(q.genPoolKey(event.Topic), strconv.FormatInt(event.EventId, 10), util.ToJsonString(event))
// 将任务id添加到EVENT_BUCKET中,按生效时间排序
pipeline.ZAdd(q.genBucketKey(event.Topic), redis.Z
Member: strconv.FormatInt(event.EventId, 10),
Score: float64(event.EffectTime.Unix()),
)
_, err := pipeline.Exec()
if err != nil
logs.CtxWarn(ctx, "pipeline.Exec", logs.String("err", err.Error()))
return err
return nil
复制代码
搬运线程扫描到期任务
func (q *DelayQueue) carryEventToQueue(topic string) error
ctx := context.Background()
// 扫描zset中到期的任务
members, err := q.redisClient.WithContext(ctx).ZRangeByScoreWithScores(q.genBucketKey(topic), redis.ZRangeByMin: "0", Max: util.ToString(time.Now().Unix())).Result()
if err != nil && err != redis.Nil
logs.CtxWarn(ctx, "[carryEventToQueue] ZRangeByScoreWithScores", logs.String("err", err.Error()))
return err
if len(members) == 0
return nil
errMap := make(map[string]error)
// 将任务添加到对应topic的待消费队列里
for _, m := range members
eventId := m.Member.(string)
err = q.redisClient.WithContext(ctx).LPush(q.genQueueKey(topic), eventId).Err()
if err != nil
logs.CtxWarn(ctx, "[carryEventToQueue] LPush", logs.String("err", err.Error()))
errMap[eventId] = err
// 从Bucket中删除已进入待消费队列的事件
var doneMembers []interface
for _, m := range members
eventId := m.Member.(string)
if _, ok := errMap[eventId]; !ok
doneMembers = append(doneMembers, eventId)
if len(doneMembers) == 0
return nil
err = q.redisClient.WithContext(ctx).ZRem(q.genBucketKey(topic), doneMembers...).Err()
if err != nil
logs.CtxWarn(ctx, "[carryEventToQueue] ZRem", logs.String("err", err.Error()))
return nil
复制代码
监听线程消费任务
这里使用了List的BLPop命令,当有数据时会立即返回,没有数据则会一直阻塞直到有数据过来;这样可以避免定时扫描列表浪费资源。
func (q *DelayQueue) runConsumer(topic string, subscriberList []IEventSubscriber) error
for
ctx := context.Background()
kvPair, err := q.redisClient.WithContext(ctx).BLPop(60*time.Second, q.genQueueKey(topic)).Result()
if err != nil
logs.CtxWarn(ctx, "[InitOnce] BLPop", logs.String("err", err.Error()))
continue
if len(kvPair) < 2
continue
eventId := kvPair[1]
data, err := q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
if err != nil && err != redis.Nil
logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
if q.persistFn != nil
_ = q.persistFn(&EventEntity
EventId: util.String2Int64(eventId),
Topic: topic,
)
continue
event := &EventEntity
_ = jsoniter.UnmarshalFromString(data, event)
for _, subscriber := range subscriberList
util.Retry(3, 0, func() (success bool)
err = subscriber.Handle(ctx, event)
if err != nil
logs.CtxWarn(ctx, "[InitOnce] subscriber.Handle", logs.String("err", err.Error()))
return false
return true
)
err = q.redisClient.WithContext(ctx).HDel(q.genPoolKey(topic), eventId).Err()
if err != nil
logs.CtxWarn(ctx, "[InitOnce] HDel", logs.String("err", err.Error()))
复制代码
其他
1、优雅关闭
DelayQueue对象中使用wg、isRunning、stop三个变量来实现优雅关闭,具体可参考源码。
type DelayQueue struct
namespace string
redisClient *redis.Client
once sync.Once
wg sync.WaitGroup
isRunning int32
stop chan struct
persistFn PersistFn
复制代码
// gracefully shudown
func (q *DelayQueue) ShutDown()
if !atomic.CompareAndSwapInt32(&q.isRunning, 1, 0)
return
close(q.stop)
q.wg.Wait()
复制代码
2、消费失败后持久化任务
可为DelayQueue对象设置持久化方法persistFn,用来在监听线程消费任务失败时将任务id持久化以便人工处理。
...
q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
if err != nil && err != redis.Nil
logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
if q.persistFn != nil
_ = q.persistFn(&EventEntity
EventId: util.String2Int64(eventId),
Topic: topic,
)
continue
...
复制代码
源码地址
redis_delay_queue: github.com/hudingyu/re…
手把手教你用redis实现一个简单的mq消息队列(java)
众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ.
但是如果你不想为你的系统引入一个重量级(相对 redis 来说)的 mq,但是想要享受解耦、异步消息等特性,通过本文你就 get 到了,通过 redis 实现一个简单版的 mq。
为什么是 redis
- redis 通常作为缓存服务引入,因此大部分系统都会有 redis
- redis 本身的资源消耗是极小的,符合我们的轻量要求
- redis 速度很快,几乎不会出现速度瓶颈
- redis 有持久化方案,调整配置项可以在数据安全和速度间进行取舍(参考这篇)[https://segmentfault.com/a/1190000002906345]
如何实现
利用 redis 的队列结构来实现消息队列。redis 单个队列最多支持 2*32-1 条数据,对于大部分应用是完全够用的。
简单来说就是:
- 每个 topic 对应一条队列
- 从队列一段写入数据,从另一端读取数据
- 消费失败,重新将消息放入队列
注意:代码仅供个人尝鲜使用,请勿用于真实生产环境
代码仅可在 springboot 环境中使用
首先定义注解和接口类
注解代码如下:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqConsumer {
/**
* 队列主题
*/
String topic() default "default_es_topic";
}
被该注解修饰的类,将会接收 topic 下的消息。
接口代码如下:
public interface RedisConsumer {
/**
* 功能描述: 消费方法,消费者类必须继承此方法
*
* @param message 数据载体
* @author 123
* @date 2020/3/28 22:41
*/
void deal(String message);
}
本接口用于定于接受消息的处理方法。
扫描注解修饰类
本部分为核心代码,首先需要获取代码中被注解修饰的类,然后建立一个循环从 redis 队列中取数据,最后调用类对象的 deal 方法消费消息,如果 deal 方法抛出错误,认为消费失败,重新将该数据放入队列中。
- 扫描部分代码如下:
/**
* MqConfiguration.java
*/
@Override
public void run(ApplicationArguments args) {
Map<String, Object> map = context.getBeansWithAnnotation(MqConsumer.class);
map.values().forEach(item -> {
if (!(item instanceof RedisConsumer)) {
log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName());
return;
}
MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);
MqConsumer annotation = annotations[0];
String topic = annotation.topic();
if (topicMap.containsKey(topic)) {
log.error("多个消费者{},消费同一个消息:{},已忽略", item.getClass().getCanonicalName(), topic);
} else {
topicMap.put(topic, (RedisConsumer) item);
}
});
log.info("redis订阅信息汇总完毕!!!!!!");
//由一个线程始终循环获取es队列数据
threadPoolExecutor.execute(loop());
}
run 方法在 spring 扫描完毕后调用,通过实现ApplicationRunner
接口实现,通过 spring 的方法来获取所有被MqConsumer
接口注解的类(否则需要自己写类加载器)。数据汇总完毕后使用一个线程来进行无线循环从 redis 队列中取数据。
- 执行线程部分代码如下:
private Runnable loop() {
return () -> {
while (true) {
AtomicInteger count = new AtomicInteger(0);
topicMap.forEach((k, v) -> {
try {
String message = mqUtil.getRedisTemplate().opsForList().rightPop(k);
if (message == null) {
count.getAndIncrement();
} else {
pushTask(v, message, k);
}
} catch (RedisConnectionFailureException connException) {
log.error("redis无法连接,10s后重试", connException);
sleep(10);
} catch (Exception e) {
log.error("redis消息队列异常", e);
}
});
if (count.get() == topicMap.keySet().size()) {
//当所有的队列都为空时休眠1s
sleep(1);
}
}
};
}
private void pushTask(RedisConsumer item, String value, String key) {
threadPoolExecutor.execute(() -> {
try {
item.deal(value);
} catch (Exception e) {
log.error("执行消费任务出错", e);
//非广播消息进行数据回补
mqUtil.getRedisTemplate().opsForList().rightPush(key, value);
}
});
}
loop 方法无限循环根据 topic 从 redis 中取数据,如果取到数据,调用 pushTask 方法执行,如果执行报错将会进行数据回补。
完整代码见本文结尾
测试
运行项目后调用,MainController
中的接口即可测试。
完整代码:github
本文原创发布于:手把手教你用 redis 实现一个简单的 mq 消息队列
以上是关于教你如何基于Redis来实现高性能延时消息队列!的主要内容,如果未能解决你的问题,请参考以下文章