xxl-job+rabbitmq 进行定时的微信消息推送

Posted BlogY

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了xxl-job+rabbitmq 进行定时的微信消息推送相关的知识,希望对你有一定的参考价值。

文章目录

下载xxl-job的源码

https://gitee.com/xuxueli0323/xxl-job

项目结构

|_doc:项目文档,包含数据库初始化 sql 和架构图等
|_xxl-job-admin:调度中心
|_xxl-job-core:公共依赖
|_xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
    |__xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
    |__xxl-job-executor-sample-frameless:无框架版本;

部署调度中心

  • 首先初始化数据库,在mysql中创建名为 xxl_job 的数据库,执行 ./doc/db/tables_xxl_job.sql 文件

  • 其次修改 xxl-job-admin 项目的配置文件 ./xxl-job-admin/src/main/resources/application.properties 将数据库配置改成自己本地即可。

  • 最后启动调度中心项目,访问项目地址:http://localhost:8080/xxl-job-admin/,默认账户:admin + 123456

  • 后续部署时可以直接将项目进行打包,然后部署到服务器上

添加XXL-JOB依赖和配置

        <!-- xxl-job-core -->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.3.0</version>
        </dependency>
@Configuration
public class XxlJobConfig 
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

//调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。
//执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
    @Value("$xxl.job.admin.addresses")
    private String adminAddresses;
    
// 执行器通讯TOKEN [选填]:非空时启用;
    @Value("$xxl.job.accessToken")
    private String accessToken;
    
//执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
    @Value("$xxl.job.executor.appname")
    private String appname;

    @Value("$xxl.job.executor.address")
    private String address;
//执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;
//地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
    @Value("$xxl.job.executor.ip")
    private String ip;
//### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
    @Value("$xxl.job.executor.port")
    private int port;
//### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
    @Value("$xxl.job.executor.logpath")
    private String logPath;
// 执行器日志保存天数 [选填] :值大于3时生效,启用执行器Log文件定期清理功能,否则不生效;
    @Value("$xxl.job.executor.logretentiondays")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() 
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>$version</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


修改执行器项目配置文件

./xxl-job-executor-sample-springboot/src/main/resources/application.properties 中配置执行器的相关参数。

### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册""任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册""调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则,-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30

调度中心添加定时任务配置

添加执行器

添加定时任务

编写定时任务代码

@Slf4j
@Component
public class WechatRemindXxlJob 
	//可以使用 XxlJobHelper.log() 打印日志,在调度中心平台上可以直接看到日志详情。
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);

    @Autowired
    private AppointLogService appointLogService;


    // @XxlJob("")值为调度中心新增任务时的控制器名称
    @XxlJob("startRemindHandler")
    public void startRemind()
        try 
        	//XxlJobHelper.getJobParam():获取定时任务中的参数,多个时可用“,”分割
            bean.setUserId(XxlJobHelper.getJobParam());
            XxlJobHelper.log("XXL-JOB, Start AppointLogWechatRemindSix 6点开启");
            appointLogService.startRemind(bean);
         catch (Exception e) 
            e.printStackTrace();
            XxlJobHelper.log("定时活动消息提示失败"+e.getMessage());
        
    


真正执行的代码

    @Override
    public void startRemindSix(bean) throws ParseException 

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String time = sdf.format(new Date()) + " 00:00:00";
        List<AppointLog> todayLogs = AppointLogRepository.findByStartTime(sdf2.parse(time));

        //今天没有预约数据
        if (CollectionUtils.isEmpty(todayLogs))
            return;
        
        //将查询的预约数据发送到mq
        todayLogs.forEach(log -> rabbitTemplate.convertAndSend(MQConstant.RECORD_NOTICE,  JSONObject.toJSONString(log)));
    

mq的接听者监听到进行消费并发送微信推送

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MQConstant.RECORD_NOTICE, durable = "true"),
            exchange = @Exchange(value = MQConstant.RECORD_NOTICE))
    )
    public void saveOrUpdate(String json) 
        AppointLog bean = JSON.parseObject(json, AppointLog.class);

        SimpleDateFormat sdf = new SimpleDateFormat("MM-dd HH:mm");
        //每条要发送的数据内容
        String typeName = SuperviseType.getTypeName(bean.getType());
        String time = sdf.format(bean.getStartTime()) + "~" + sdf.format(bean.getEndTime());
        String address = bean.getPlaceName();
        WechatSend(bean.getUserId(), time, address, typeName, bean.getId());
    

微信消息推送

要想实现消息订阅及发送总体有几个步骤,一、选择消息模板,二、提醒用户订阅模板,三、给用户发送订阅消息。

首先,在微信公众平台开通消息推送功能,并添加消息模板。可以从模板库选择模板也可以创建一个模板,模板添加之后,模板ID我们接下来要用的。
发送模板消息需要用到accesstoken、formId和openID。formID就是消息模板ID,openID我们最好在获取用户信息或用户登录时储存到全局变量里。

参考链接: 参考文章

创建模板


调用小程序推送接口

    /**
     * 微信小程序通知发送
     *
     * @param userId
     * @param time
     * @param address
     * @param typeName
     */
    private void WechatSend(String userId, String time, String address, String typeName, String recordId) 


//构建要发送的消息内容  一个TemplateField代表一个列
        List<TemplateField> list = new ArrayList<>();

        TemplateField templateField1 = new TemplateField();
        templateField1.setName("thing1");
        templateField1.setValue("xxxx");
        list.add(templateField1);


        TemplateField templateField2 = new TemplateField();
        templateField2.setName("character_string2");
        templateField2.setValue(time);
        list.add(templateField2);

        TemplateField templateField3 = new TemplateField();
        templateField3.setName("thing3");
        templateField3.setValue(address);
        list.add(templateField3);

        TemplateField templateField4 = new TemplateField();
        templateField4.setName("thing4");
        templateField4.setValue("请前往签到或取消");
        list.add(templateField4);

        TemplateField templateField5 = new TemplateField();
        templateField5.setName("thing5");
        templateField5.setValue(typeName);
        list.add(templateField5);

        WxMsgDTO wxMsgDTO = new WxMsgDTO();
        wxMsgDTO.setUserId(userId);
        wxMsgDTO.setTemplateId(sendTemplateId);
        wxMsgDTO.setLink(pages+recordId);
        wxMsgDTO.setFields(list);
        iWechatService.Send(wxMsgDTO);
    
    @Override
    public Result Send(WxMsg bean) 
        Messages messages = wechatAccountUtil.getWechat(bean.getTargetScid()).msg();

        WxUserinfo wxUserinfo = wxUserinfoRepository.findFirstByUidAndTypeAndDeletedAtIsNull(bean.getUserId(),2);
        Long messageLong = messages.subscribeSendTemplate(wxUserinfo.getOpenid(), bean.getTemplateId(), bean.getLink(), bean.getFields(), null);
        return success(messageLong);
    
    public Result subscribeSend(WxMsg bean) 
        WxUserinfo wxUserinfo = wxUserinfoRepository.findFirstByUidAndTypeAndDeletedAtIsNull(bean.getUserId(),2);
        Long messageLong = messages.subscribeSendTemplate(wxUserinfo.getOpenid(), bean.getTemplateId(), bean.getLink(), bean.getFields(), null);
        return success(messageLong);
    

以上是关于xxl-job+rabbitmq 进行定时的微信消息推送的主要内容,如果未能解决你的问题,请参考以下文章

【基建—xxl-job定时任务平台】

分布式定时调度-xxl-job

Xxl-job 一文读懂

Python-定时爬取指定城市天气-发送给关心的微信好友

wechat_pusher - 基于Golang开发的微信消息定时推送框架

springboot整合xxl-job分布式定时任务图文完整版