分布式集群架构场景优化解决方案:分布式调度问题
Posted 丿涛哥哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式集群架构场景优化解决方案:分布式调度问题相关的知识,希望对你有一定的参考价值。
分布式集群架构场景优化解决方案:分布式调度问题
分布式调度问题
调度—>定时任务,分布式调度—>在分布式集群环境下定时任务这件事
Elastic-job(当当网开源的分布式调度框架)
1、 定时任务的场景
定时任务形式:每隔一定时间/特定某一时刻执行
例如:
- 订单审核、出库
- 订单超时自动取消、⽀付退款
- 礼券同步、生成、发放作业
- 物流信息推送、抓取作业、退换货处理作业
- 数据积压监控、日志监控、服务可用性探测作业
- 定时备份数据
- ⾦融系统每天的定时结算
- 数据归档、清理作业
- 报表、离线数据分析作业
2、 什么是分布式调度
什么是分布式任务调度?有两层含义
1)运行在分布式集群环境下的调度任务(同一个定时任务程序部署多份,只应该有一个定时任务在执行)
2)分布式调度—>定时任务的分布式—>定时任务的拆分(即为把一个大的作业任务拆分为多个小的作业任务,同时执行)
3、 定时任务与消息队列的区别
1、共同点
-
异步处理:
比如注册、下单事件
-
应用解耦:
不管定时任务作业还是MQ都可以作为两个应用之间的齿轮实现应用解耦,这个齿轮可以中转数据,当然单体服务不需要考虑这些,服务拆分的时候往往都会考虑
-
流量削峰:
双⼗一的时候,任务作业和MQ都可以用来扛流量,后端系统根据服务能力定时处理订单或者从MQ抓取订单抓取到一个订单到来事件的话触发处理,对于前端用户来说看到的结果是已经下单成功了,下单是不受任何影响的
2、本质不同
定时任务作业是时间驱动,而MQ是事件驱动;
时间驱动是不可代替的,比如⾦融系统每日的利息结算,不是说利息来一条(利息到来事件)就算一下,而往往是通过定时任务批量计算;
所以,定时任务作业更倾向于批处理,MQ倾向于逐条处理;
4、 定时任务的实现方式
定时任务的实现方式有多种。早期没有定时任务框架的时候,我们会使用JDK中的Timer机制和多线程机制(Runnable+线程休眠)来实现定时或者间隔一段时间执行某一段程序;后来有了定时任务框架,比如Quartz任务调度框架,使用时间表达式(包括:秒、分、时、日、周、年)配置某一个任务什么时间去执行:
任务调度框架Quartz回顾示意(本省并不适用于分布式场景)
1、引入jar
<!--任务调度框架quartz-->
<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
2、定时任务作业主调度程序
package quartz;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
public class QuartzMain
//1.创建任务调度器(好比公交调度站)
public static Scheduler createScheduler() throws SchedulerException
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
return scheduler;
//2.创建一个任务(好比某一个公交车的出行)
public static JobDetail createJob()
JobBuilder jobBuilder = JobBuilder.newJob(DemoJob.class); //TODO 自定义任务类
jobBuilder.withIdentity("jobName","myJob");
JobDetail jobDetail = jobBuilder.build();
return jobDetail;
/**
* 创建作业任务时间触发器(类似于公交⻋出⻋时间表)
* cron表达式由七个位置组成,空格分隔
* 1、Seconds(秒) 0~59
* 2、Minutes(分) 0~59
* 3、Hours(⼩时) 0~23
* 4、Day of Month(天)1~31,注意有的⽉份不⾜31天
* 5、Month(⽉) 0~11,或者JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC
* 6、Day of Week(周) 1~7,1=SUN或者 SUN,MON,TUE,WEB,THU,FRI,SAT
* 7、Year(年)1970~2099 可选项
* 示例:
* 0 0 11 * * ? 每天的11点触发执⾏⼀次
* 0 30 10 1 * ? 每⽉1号上午10点半触发执⾏⼀次
*/
public static Trigger createTrigger()
//创建时间触发器
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName","myTrigger")
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule("*/2 * * * * ?")).build();
return cronTrigger;
/**
* main方法中开启定时任务
* @param args
*/
public static void main(String[] args) throws SchedulerException
//1.创建任务调度器(好比公交调度站)
Scheduler scheduler = QuartzMain.createScheduler();
//2.创建一个任务(好比某一个公交车的出行)
JobDetail job = QuartzMain.createJob();
//3.创建任务的时间触发器(好比这个公交车的出行时间表)
Trigger trigger = QuartzMain.createTrigger();
//4.使用任务调度器根据时间触发器执行任务
scheduler.scheduleJob(job,trigger);
scheduler.start();
3、定义一个job,需实现Job接⼝
package quartz;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class DemoJob implements Job
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException
System.out.println("定时任务执行逻辑");
以上,是回顾一下任务调度框架Quartz的大致用法,那么在分布式架构环境中使用Quartz已经不能更好的满足我们需求,我们可以使用专业的分布式调度框架,这里我们推荐使用Elastic-job。
5、 分布式调度框架Elastic-Job
5.1、 Elastic-Job介绍
Elastic-Job是当当网开源的一个分布式调度解决方案,基于Quartz二次开发的,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。我们要学习的是 Elastic-Job-Lite,它定位为轻量级无中心化解决方案,使用Jar包的形式提供分布式任务的协调服务,而Elastic-Job-Cloud子项目需要结合Mesos以及Docker在云环境下使用。
Elastic-Job的github地址:https://github.com/elasticjob
主要功能介绍
-
分布式调度协调
在分布式环境中,任务能够按指定的调度策略执行,并且能够避免同一任务多实例重复执行
-
丰富的调度策略
基于成熟的定时任务作业框架Quartz cron表达式执行定时任务
-
弹性扩容缩容
当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例时,它所执行的任务能被转移到别的实例来执行。
-
失效转移
某实例在任务执行失败后,会被转移到其他实例执行
-
⽀持并行调度
⽀持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。
-
作业分片一致性
当任务被分片后,保证同一分片在分布式环境中仅一个执行实例。
5.2、 Elastic-Job-Lite应用
jar包(API) + 安装zk软件
Elastic-Job依赖于Zookeeper进行分布式协调,所以需要安装Zookeeper软件(3.4.6版本以上),Zookeeper的本质功能: 存储+通知。
1、安装Zookeeper(此处单例配置)
1)我们使用3.4.10版本,在linux平台解压下载的zookeeper-3.4.10.tar.gz
2)进入conf目录,cp zoo_sample.cfg zoo.cfg
3) 进入bin目录,启动zk服务
启动 ./zkServer.sh start
停⽌ ./zkServer.sh stop
查看状态 ./zkServer.sh status
Zookeeper的树形节点结构图
2、引入Jar包
<!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-core -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
3、定时任务实例
需求:每隔两秒钟执行一次定时任务(resume表中未归档的数据归档到resume_bak表中, 每次归档1条记录)
1)resume_bak和resume表结构完全一样
2)resume表中数据归档之后不删除,只将state置为"已归档"
数据表结构:
-- ----------------------------
-- Table structure for resume
-- ----------------------------
DROP TABLE IF EXISTS `resume`;
CREATE TABLE `resume` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`sex` varchar(255) DEFAULT NULL,
`phone` varchar(255) DEFAULT NULL,
`address` varchar(255) DEFAULT NULL,
`education` varchar(255) DEFAULT NULL,
`state` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
程序开发:
JdbcUtil工具类
package elasticjob;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class JdbcUtil
//url
private static String url = "jdbc:mysql://localhost:3306/job?characterEncoding=utf8&useSSL=false";
//user
private static String user = "root";
//password
private static String password = "123456";
//驱动程序类
private static String driver = "com.mysql.jdbc.Driver";
static
try
Class.forName(driver);
catch (ClassNotFoundException e)
// TODO Auto-generated catch block
e.printStackTrace();
public static Connection getConnection()
try
return DriverManager.getConnection(url, user, password);
catch (SQLException e)
// TODO Auto-generated catch block
e.printStackTrace();
return null;
public static void close(ResultSet rs, PreparedStatement ps, Connection con)
if (rs != null)
try
rs.close();
catch (SQLException e)
// TODO Auto-generated catch block
e.printStackTrace();
finally
if (ps != null)
try
ps.close();
catch (SQLException e)
// TODO Auto-generated catch block
e.printStackTrace();
finally
if (con != null)
try
con.close();
catch (SQLException e)
// TODO Auto-generated catch block
e.printStackTrace();
/***
* DML操作(增删改)
* 1.获取连接数据库对象
* 2.预处理
* 3.执行更新操作
* @param sql
* @param obj
*/
//调用者只需传入一个sql语句,和一个Object数组。该数组存储的是SQL语句中的占位符
public static void executeUpdate(String sql,Object...obj)
Connection con = getConnection();//调用getConnection()方法连接数据库
PreparedStatement ps = null;
try
ps = con.prepareStatement(sql);//预处理
for (int i = 0; i < obj.length; i++) //预处理声明占位符
ps.setObject(i + 1, obj[i]);
ps.executeUpdate();//执行更新操作
catch (SQLException e)
// TODO Auto-generated catch block
e.printStackTrace();
finally
close(null, ps, con);//调用close()方法关闭资源
/***
* DQL查询
* Result获取数据集
*
* @param sql
* @param obj
* @return
*/
public static List<Map<String,Object>> executeQuery(String sql, Object...obj)
Connection con = getConnection();
ResultSet rs = null;
PreparedStatement ps = null;
try
ps = con.prepareStatement(sql);
for (int i = 0; i < obj.length; i++)
ps.setObject(i + 1, obj[i]);
rs = ps.executeQuery();
//new 一个空的list集合用来存放查询结果
List<Map<String, Object>> list = new ArrayList<>();
//获取结果集的列数
int count = rs.getMetaData().getColumnCount();
//对结果集遍历每一条数据是一个Map集合,列是k,值是v
while (rs.next())
//一个空的map集合,用来存放每一行数据
Map<String, Object> map = new HashMap<String, Object>();
for (int i = 0; i < count; i++)
Object ob = rs.getObject(i + 1);//获取值
String key = rs.getMetaData().getColumnName(i + 1);//获取k即列名
map.put(key, ob);
list.add(map);
return list;
catch (SQLException e)
// TODO Auto-generated catch block
e.printStackTrace();
finally
close(rs, ps, con);
return null;
定时任务类
package elasticjob;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import java.util.List;
import java.util.Map;
/**
* ElasticJobLite定时任务业务逻辑处理类
*/
public class ArchivieJob implements SimpleJob
/**
* 需求:(resume表中未归档的数据归档到resume_bak表中,每次归档1条记录)
* execute方法中写业务逻辑(execute方法每次定时任务执行都会执行一次)
* @param shardingContext
*/
@Override
public void execute(ShardingContext shardingContext)
//1.从resume表查询出一条记录(未归档)
String selectSql = "select * from resume where state = '未归档' limit 1";
List<Map<String, Object>> list = JdbcUtil.executeQuery(selectSql);
if (list == null || list.size() == 0)
System.out.println("数据已经处理完毕!");
return;
//2."未归档"更改为"已归档"
Map<String, Object> stringObjectMap = list.get(0);
long id = (long) stringObjectMap.get("id");
String name = (String) stringObjectMap.get("name");
String education = (String) stringObjectMap.get("education");
System.out.println("====>>>id:" + id + " name:" + name + " education:" + education);
String updateSql = "update resume set state = '已归档' where id = ?";
JdbcUtil.executeUpdate(updateSql,id);
//3.归档这条记录,把这条记录插入到resume_bak表
String insertSql = "insert into resume_bak select * from resume where id = ?";
JdbcUtil.executeUpdate(insertSql,id);
主类
package elasticjob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
public class ElasticJobMain
public static void main(String[] args)
//配置分布式协调服务(注册中心)Zookeeper
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181","data-archive-job");
CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
coordinatorRegistryCenter.init();
//配置任务(时间事件、定时任务业务逻辑、调度器)
JobCoreConfiguration jobCoreConfiguration = 分布式集群架构场景优化解决方案:Session共享问题