定时发布(上)

Posted 赵四司机

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了定时发布(上)相关的知识,希望对你有一定的参考价值。

个人简介: 

> 📦个人主页:赵四司机
> 🏆学习方向:JAVA后端开发 
> ⏰往期文章:SpringBoot项目整合微信支付
> 🔔博主推荐网站:牛客网 刷题|面试|找工作神器
> 📣种一棵树最好的时间是十年前,其次是现在!
> 💖喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。

前言:

最近在做一个基于SpringCloud+Springboot+Docker的新闻头条微服务项目,用的是黑马的教程,现在项目开发进入了尾声,我打算通过写文章的形式进行梳理一遍,并且会将梳理过程中发现的Bug进行修复,有需要改进的地方我也会继续做出改进。这一系列的文章我将会放入微服务项目专栏中,这个项目适合刚接触微服务的人作为练手项目,假如你对这个项目感兴趣你可以订阅我的专栏进行查看,需要资料可以私信我,当然要是能给我点个小小的关注就更好了,你们的支持是我最大的动力。

如果你想要一个可以系统学习的网站,那么我推荐的是牛客网,个人感觉用着还是不错的,页面很整洁,而且内容也很全面,语法练习,算法题练习,面试知识汇总等等都有,论坛也很活跃,传送门链接:牛客刷题神器

目录

一:前期准备

1.需求分析

2. 延迟任务概述

3.技术对比

DelayQueue

RabbitMQ实现延迟任务

Redis实现

4.实现思路

二:环境搭建

1.搭建模块

2.数据库准备 

 ​编辑 3.在docker中安装redis

4.项目中集成redis

三:代码编写

1.实体类导入

2.创建taskService

3.功能实现


一:前期准备

1.需求分析

        当创作者创作好文章之后可以选择立马发布,还能选择定时发布(见下图),这个相信大家在CSDN创作时候都知道,我们需要使用延迟任务来实现文章的定时发布。

2. 延迟任务概述

在介绍延迟任务之前,我们先了解一下定时任务,定时任务就是有固定的的执行频率,每隔一段时间就执行一次,定时任务与延迟任务区别如下:

  • 定时任务:有固定周期的,有明确的触发时间
  • 延迟任务:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟。

  • 延迟任务使用场景:  

  1. 商品下单30分钟之内没有付款则将订单取消

  2. 接口对接出现网络问题,1分钟之后重试,如果再失败则2分钟之后重试,直至达到阈值       

3.技术对比

  • DelayQueue

    JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

    DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法

    getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。

    compareTo方法:用于排序,确定元素出队列的顺序。

        需要注意的是,使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,要保证数据不丢失,就需要持久化(磁盘)。

  • RabbitMQ实现延迟任务

          RabbitMQ实现延迟任务有两种形式,一种是传统的TTL+DLX(死信交换机)来实现,另一种是直接使用插件。TTL+DLX的实现原理是给每个队列设置过期时间,当消息过期之后变成Dead message,就将死信消息发送到另一个交换机,这个交换机叫做死信交换机。 

        不过更方便的还是直接使用RabbitMQ提供的延迟队列插件比较方便,实践过程可以翻看我前面关于微信支付的文章,里面就是使用了RabbitMQ提供的延迟插件来实现订单的管理。

  • Redis实现

          由于项目后面还需要用到Redis缓存热度靠前的文章,所以这里我选择了使用Redis来实现延迟队列,也正好可以了解其实现原理。

        我们都知道Redis中的ZSet数据类型可以实现对数据的排序,那么我们就可以利用这个特点来实现延迟队列,使用时间戳作为score来进行排序。

4.实现思路

  

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

2.为什么redis中使用两种数据类型,list和zset?

效率问题,算法的时间复杂度

3.在添加zset数据的时候,为什么不需要预加载?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

二:环境搭建

1.搭建模块

①在service模块下创建一个tbug-headlines-schedule模块

  

②添加bootstrap.yml

server:
  port: 51701
spring:
  application:
    name: headlines-schedule
  cloud:
    nacos:
      discovery:
        server-addr: 49.234.52.192:8848
      config:
        server-addr: 49.234.52.192:8848
        file-extension: yml

③在nacos添加相关配置

spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/headlines_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false
    username: root
    password: 440983
# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml
  # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.my.model.schedule.pojos

2.数据库准备 

taskinfo任务表 

taksinfo_log任务日志表

  3.在docker中安装redis

4.项目中集成redis

①在项目中导入redis依赖

<!--spring data redis & cache-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>

②在nacos添加redis配置信息

spring:
  redis:
    host: 49.234.52.192
    password: 440983
    port: 6379

③拷贝CacheService类到tbug-headlines-common模块下,并添加自动配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\\
  com.my.common.exception.ExceptionCatch,\\
  com.my.common.swagger.SwaggerConfiguration,\\
  com.my.common.swagger.Swagger2Configuration,\\
  com.my.common.tencentcloud.TextDetection,\\
  com.my.common.tencentcloud.ImageDetection,\\
  com.my.common.tess4j.Tess4jClient,\\
  com.my.common.redis.CacheService,\\

三:代码编写

1.实体类导入

①创建task类,用于接收添加任务的参数

package com.my.model.schedule.dtos;

import lombok.Data;

import java.io.Serializable;

@Data
public class Task implements Serializable 

    /**
     * 任务id
     */
    private Long taskId;
    /**
     * 类型
     */
    private Integer taskType;

    /**
     * 优先级
     */
    private Integer priority;

    /**
     * 执行id
     */
    private long executeTime;

    /**
     * task参数
     */
    private byte[] parameters;
    

②创建mapper

package com.my.schedule.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.my.model.schedule.pojos.Taskinfo;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.Date;
import java.util.List;

/**
 * <p>
 *  Mapper 接口
 * </p>
 *
 * @author itheima
 */
@Mapper
public interface TaskInfoMapper extends BaseMapper<Taskinfo> 

    List<Taskinfo> queryFutureTime(@Param("taskType")int type, @Param("priority")int priority, @Param("future")Date future);

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.my.schedule.mapper.TaskInfoMapper">

    <select id="queryFutureTime" resultType="com.my.model.schedule.pojos.Taskinfo">
        select *
        from taskinfo
        where task_type = #taskType
          and priority = #priority
          and execute_time <![CDATA[<]]> #future,javaType=java.util.Date
    </select>

</mapper>

package com.my.schedule.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.my.model.schedule.pojos.TaskinfoLogs;
import org.apache.ibatis.annotations.Mapper;

/**
 * <p>
 *  Mapper 接口
 * </p>
 *
 * @author itheima
 */
@Mapper
public interface TaskInfoLogsMapper extends BaseMapper<TaskinfoLogs> 


2.创建taskService

package com.my.schedule.service;

import com.my.model.schedule.dtos.Task;

/**
 * 对外访问接口
 */
public interface TaskService 
    /**
     * 添加任务接口
     * @param task  任务对象
     * @return  任务ID
     */
    long addTask(Task task);

    /**
     * 取消任务
     * @param taskId        任务id
     * @return              取消结果
     */
    boolean cancelTask(long taskId);

    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    Task poll(int type,int priority);


3.功能实现

 ScheduleConstants常量类

package com.my.model.common.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum TaskTypeEnum 

    NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
    REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
    private final int taskType; //对应具体业务
    private final int priority; //业务不同级别
    private final String desc; //描述信息

TaskServiceImpl

package com.my.schedule.service.serviceImpl;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.my.common.constans.ScheduleConstants;
import com.my.common.redis.CacheService;
import com.my.model.schedule.dtos.Task;
import com.my.model.schedule.pojos.Taskinfo;
import com.my.model.schedule.pojos.TaskinfoLogs;
import com.my.schedule.mapper.TaskInfoLogsMapper;
import com.my.schedule.mapper.TaskInfoMapper;
import com.my.schedule.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Set;

@Slf4j
@Service
@Transactional
public class TaskServiceImpl implements TaskService 
    /**
     * 添加延迟任务
     * @param task  任务对象
     * @return
     */
    @Override
    public long addTask(Task task) 
        //1.将任务添加到数据库中
        boolean success = addTaskToDb(task);

        if(success) 
            //2.将任务添加到redis
            addTaskToCache(task);
        
        return task.getTaskId();
    

    @Autowired
    private TaskInfoMapper taskInfoMapper;
    @Autowired
    private TaskInfoLogsMapper taskInfoLogsMapper;
    /**
     * 将任务添加到数据库中
     * @param task
     * @return
     */
    private boolean addTaskToDb(Task task) 
        boolean flag = false;

        try 
            //1.保存任务表
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task,taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskInfoMapper.insert(taskinfo);

            //2.设置任务id
            task.setTaskId(taskinfo.getTaskId());

            //3.保存任务日志数据
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo,taskinfoLogs);
            taskinfoLogs.setVersion(1);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskInfoLogsMapper.insert(taskinfoLogs);

            flag = true;
         catch (Exception e) 
            e.printStackTrace();
        

        return flag;
    

    @Autowired
    private CacheService cacheService;
    /**
     * 将任务添加到redis
     * @param task
     */
    private void addTaskToCache(Task task) 
        //1.构造key
        String key = task.getTaskType()+"_"+task.getPriority();

        //2.获取5分钟之后的时间 毫秒级
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE,5);
        long nextScheduleTime = calendar.getTimeInMillis();

        //3.如果任务执行时间小于等于当前时间,存入list
        if(task.getExecuteTime() <= System.currentTimeMillis()) 
            cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
         else if(task.getExecuteTime() <= nextScheduleTime) 
            //4.如果任务执行时间在5分钟之内,存入zSet
            cacheService.zAdd(ScheduleConstants.FUTURE + key,JSON.toJSONString(task),task.getExecuteTime());
        
    

    /**
     * 删除任务
     * @param taskId      任务id
     * @return
     */
    @Override
    public boolean cancelTask(long taskId) 
        boolean flag = false;
        //1.删除任务并更新日志
        Task task = UpdateDb(taskId,ScheduleConstants.EXECUTED);

        //2.从Redis中删除任务
        if(task != null) 
            removeFromCache(task);
            log.info("删除Redis中的任务成功:",taskId);
            flag = true;
        

        return flag;
    

    /**
     * 从redis中删除任务
     * @param task
     */
    private void removeFromCache(Task task) 
        String key = task.getTaskType() + "_" + task.getPriority();

        if(task.getExecuteTime() <= System.currentTimeMillis()) 
            log.info("删除正要执行的任务...");
            cacheService.lRemove(ScheduleConstants.TOPIC + key,0,JSON.toJSONString(task));
         else 
            log.info("删除将要执行的任务...");
            cacheService.zRemove(ScheduleConstants.FUTURE + key,JSON.toJSONString(task));
        
    

    /**
     * 删除任务,更新日志
     * @param taskId
     * @param status
     * @return
     */
    private Task UpdateDb(long taskId, int status) 
        Task task = null;
        try 
            //1.删除任务
            log.info("删除数据库中的任务...");
            taskInfoMapper.deleteById(taskId);

            //2.更新日志
            log.info("更新任务日志...");
            TaskinfoLogs taskinfoLogs = taskInfoLogsMapper.selectById(taskId);
            taskinfoLogs.setStatus(status);
            taskInfoLogsMapper.updateById(taskinfoLogs);

            //3.设置返回值
            task = new Task();
            BeanUtils.copyProperties(taskinfoLogs,task);
            task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
         catch (BeansException e) 
            throw new RuntimeException(e);
        

        return task;
    

    /**
     * 消费任务
     * @param type  任务类型
     * @param priority 任务优先级
     * @return  Task
     */
    @Override
    public Task poll(int type, int priority) 
        Task task = null;
        try 
            String key = type + "_" + priority;

            String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
            if(StringUtils.isNotBlank(task_json)) 
                task = JSON.parseObject(task_json,Task.class);
                //更新数据库
                UpdateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
            
         catch (Exception e) 
            e.printStackTrace();
            log.error("poll task exception");
        
        return task;
    

主要包括添加任务、取消任务、消费任务三个功能。

下篇预告:实现数据定时刷新

友情链接: 牛客网  刷题|面试|找工作神器

以上是关于定时发布(上)的主要内容,如果未能解决你的问题,请参考以下文章

[奇思异想]使用RabbitMQ实现定时任务

定时发布(上)

如何删除 Azure 服务总线主题上的死信消息

AWS - 在 SNS 订阅或 Lambda 函数上设置死信队列有啥区别?

老司机大型车祸现场

查看Azure服务总线死信的申请