hera源码剖析:项目启动之分布式锁
Posted scx_white
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hera源码剖析:项目启动之分布式锁相关的知识,希望对你有一定的参考价值。
文章目录
前言
本文章主要是为了让使用者能够更加了解
hera
的原理,并且能够在之基础上进行改进所进行。hera
是一款分布式任务调度与开发平台,具体不再描述,开源地址:https://github.com/scxwhite/hera
获取当前机器ip
在 hera
中,有一些静态代码块,这里只说一个很重要的部分,WorkContext
类中有这样一部分代码
static
host = NetUtils.getLocalAddress();
HeraLog.info("-----------------------------当前机器的IP为:-----------------------------", host);
//省略部分代码
该代码会获取当前机器的 ip
信息,由于多网卡的原因可能获取的ip不是很准确,此时你可以通过在 hera-admin/src/main/resources/config/hera.properties
文件,修改server.ip=127.0.0.1
配置为当前机器 ip
即可。具体的代码不再深入。
分布式锁
hera
是使用 spring boot
开发的,启动项目执行 AdminBootstrap.main
方法,由于 DistributeLock#init
方法使用了 @PostConstruct
注册,首先会进入该方法
@PostConstruct
public void init()
workClient.workSchedule.scheduleAtFixedRate(this::checkLock, 10, 60, TimeUnit.SECONDS);
在该方法中会向 ScheduledThreadPoolExecutor
线程池提交一个每隔 60
秒执行的任务,具体任务内容在 DistributeLock#checkLock
方法
public void checkLock()
try
String ON_LINE = "online";
//1.从hera_lock表查询最新记录
HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE);
//2.如果当前没有master,直接以当前机器ip插入hera_lock新记录
if (heraLock == null)
Date date = new Date();
heraLock = HeraLock.builder()
.id(1)
.host(WorkContext.host)
.serverUpdate(date)
.subgroup(ON_LINE)
.gmtCreate(date)
.gmtModified(date)
.build();
Integer lock = heraLockService.insert(heraLock);
if (lock == null || lock <= 0)
return;
//3.master判断
if (isMaster = WorkContext.host.equals(heraLock.getHost().trim()))
heraLock.setServerUpdate(new Date());
heraLockService.update(heraLock);
HeraLog.info("hold lock and update time");
heraSchedule.startup();
else
long currentTime = System.currentTimeMillis();
long lockTime = heraLock.getServerUpdate().getTime();
long interval = currentTime - lockTime;
long timeout = 1000 * 60 * 5L;
if (interval > timeout && isPreemptionHost())
Date date = new Date();
Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());
if (lock != null && lock > 0)
ErrorLog.error("master 发生切换, 抢占成功", WorkContext.host);
heraSchedule.startup();
heraLock.setHost(WorkContext.host);
//TODO 接入master切换通知
else
HeraLog.info("master抢占失败,由其它worker抢占成功");
else
//非主节点,调度器不执行 主要是为了避免手动修改hera_lock表后出现多master问题
heraSchedule.shutdown();
//4.初始化work服务
workClient.init();
//5.连接master
workClient.connect(heraLock.getHost().trim());
catch (Exception e)
//出现异常时,对master节点做failFast操作。避免出现双master
heraSchedule.shutdown();
ErrorLog.error("检测锁异常", e);
在解释这些代码之前大家要知道 hera
系统有个 hera_lock
表,该表最多只会有一条记录,并且该记录保存着当前的 master ip
。也就是说大家如果想切换 master
,可以直接通过修改该条记录的 ip
来实现。
DistributeLock#checkLock
每次被调用时第一步会从hera_lock
表查询出最新的master
信息,至于在这里使用了online
进行过滤没有实际意义。如果当前没有master
,即hera_lock
等于null
(一般在第一次部署启动hera
时会有该情况),为了方便调试此时会自动把当前机器设置为master
,当然如果插入当前hera_lock
记录失败(被其它work
插入了),会直接返回等待下次调用。- 在第
3
部分,首先会判断当前机器的ip
信息与hera_lock
表的ip
是否匹配,如果匹配则表示当前机器为master
,然后更新数据库的server_update
时间,所以一定要保证你的所有机器时钟一致哦,最后调用heraSchedule.startup()
方法来启动master
服务。 - 如果发现当前机器的
ip
信息与hera_lock
表的ip
不匹配,则表示当前机器是work
,此时会通过long interval = currentTime - lockTime;
来计算master
上次的心跳时间间隔,如果发现master
已经超出5分钟未发送新的心跳,则通过isPreemptionHost
方法判断当前机器是否允许抢占master
,如果允许则通过Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());
方法来进行抢占。抢占sql
为
# 乐观锁方式进行抢占,保证同一时间只有一台机器能够抢占成功
update hera_lock set gmt_modified = #gmtModified,host = #host,server_update = #serverUpdate where host = #lastHost
如果发现 work
抢占 master
成功,则调用 heraSchedule.startup();
来启动master
服务。
private boolean isPreemptionHost()
List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnv.preemptionMasterGroup);
if (preemptionHostList.contains(WorkContext.host))
return true;
else
HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString());
return false;
isPreemptionHost
方法主要是判断当前机器是否在允许抢占的机器组,该配置为:hera.preemptionMasterGroup
- 在第
4
部分会进行work
服务的初始化,此时需要注意的是:master
会启动master
服务和work
服务,work
只启动work
服务。也就是说,你可以在本地idea
启动hera
来进行调试,你也可以在生产环境只有一台机器进行任务调度。 - 在第
5
部分会进行work
服务连接master
服务的操作,即netty
通信打开
看到这里,想必你已经了解了 DistributeLock
类是一个定时进行分布式锁检测的类,它决定着当前机器是启动 master
服务还是 work
服务
知识点总结
- 可以通过直接修改
hera_lock
表的数据来切换master
- 可以只启动一台机器来进行
hera
的调度与开发 - 可以通过配置
hera.preemptionMasterGroup
参数来让某些机器允许抢占master
master服务
在分布式锁中,如果当前机器抢到了 master
,那么此时该机器会调用HeraSchedule#startup
启动 master
服务
public void startup()
if (!running.compareAndSet(false, true))
return;
HeraLog.info("begin to start master context");
masterContext.init();
为了保证 master
服务只被启动一次,使用了原子类 AtomicBoolean
继续往下看
public void init()
//主要处理work的请求信息
threadPool = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy());
//主要管理master的一些延迟任务处理
masterSchedule = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("master-schedule", false));
masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES);
masterSchedule.allowCoreThreadTimeOut(true);
//开启quartz服务
getQuartzSchedulerService().start();
dispatcher = new Dispatcher();
//初始化master端的netty消息handler
handler = new MasterHandler(this);
//初始化master server
masterServer = new MasterServer(handler);
masterServer.start(HeraGlobalEnv.getConnectPort());
master.init(this);
//master的定时任务管理者
choreService = new ChoreService(5, "chore-service");
//重跑任务初始化
rerunJobInit = new RerunJobInit(master);
choreService.scheduledChore(rerunJobInit);
//重跑任务启动
rerunJobLaunch = new RerunJobLaunch(master);
choreService.scheduledChore(rerunJobLaunch);
//信号丢失检测
lostJobCheck = new LostJobCheck(master, new DateTime().getMinuteOfHour());
choreService.scheduledChore(lostJobCheck);
//心跳检测
heartCheck = new WorkHeartCheck(master);
choreService.scheduledChore(heartCheck);
//版本生成
actionInit = new JobActionInit(master);
choreService.scheduledChore(actionInit);
//任务是否完成检测
finishCheck = new JobFinishCheck(master);
choreService.scheduledChore(finishCheck);
//队列扫描
queueScan = new JobQueueScan(master);
choreService.scheduledChoreOnce(queueScan);
HeraLog.info("end init master content success ");
这部分代码主要功能是
- 初始化
master
端的消息处理线程池threadPool
- 初始化
master
端的延迟任务线程池masterSchedule
- 开启
quartz
服务 - 初始化
master
端的netty
消息handler
- 初始化
master
的定时任务管理者,任务有:任务重跑初始化、重跑任务启动、信号丢失检测、心跳检测、版本生成、任务是否完成检测、队列扫描等
work服务
//保证只启动一次
if (!clientSwitch.compareAndSet(false, true))
return;
workContext.setWorkClient(this);
//在这里目前是空的,公司内部初始化了一些公共数据源配置
workContext.init();
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NiosocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS))
.addLast("frameDecoder", new ProtobufVarint32FrameDecoder())
.addLast("decoder", new ProtobufDecoder(RpcSocketMessage.SocketMessage.getDefaultInstance()))
.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender())
.addLast("encoder", new ProtobufEncoder())
.addLast(new WorkHandler(workContext));
);
HeraLog.info("init work client success ");
workSchedule.schedule(new Runnable()
private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat();
private int failCount = 0;
@Override
public void run()
try
if (workContext.getServerChannel() != null)
boolean send = workerHandlerHeartBeat.send(workContext);
if (!send)
failCount++;
ErrorLog.error("send heart beat failed ,failCount :" + failCount);
else
failCount = 0;
HeartLog.debug("send heart beat success:", workContext.getServerChannel().getRemoteAddress());
else
ErrorLog.error("server channel can not find on " + WorkContext.host);
catch (Exception e)
ErrorLog.error("heart beat error:", e);
finally
workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);
, HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);
//省略定时更新日志的代码
在 work
服务这边同样使用了原子类保证只启动一次,然后就是初始化 work
的 netty
服务,再往下就是定时发送心跳信息给 master
work连接master
DistributeLock#checkLock
方法做的最后一件事情就是 work
连接 master
,打开通信,调用的方法为:workClient.connect(heraLock.getHost().trim());
public synchronized void connect(String host) throws Exception
if (workContext.getServerChannel() != null)
if (workContext.getServerHost().equals(host))
return;
else
workContext.getServerChannel().close();
workContext.setServerChannel(null);
workContext.setServerHost(host);
CountDownLatch latch = new CountDownLatch(1);
ChannelFutureListener futureListener = (future) ->
try
if (future.isSuccess())
workContext.setServerChannel(FailFastCluster.wrap(future.channel()));
SocketLog.info(workContext.getServerChannel().toString());
catch (Exception e)
ErrorLog.error("连接master异常", e);
finally
latch.countDown();
;
ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, HeraGlobalEnv.getConnectPort()));
connectFuture.addListener(futureListener);
if (!latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS))
connectFuture.removeListener(futureListener);
connectFuture.cancel(true);
throw new ExecutionException(new TimeoutException("connect server consumption of 2 seconds"));
if (!connectFuture.isSuccess())
throw new RuntimeException("connect server failed " + host,
connectFuture.cause());
SocketLog.info("connect server success");
连接 master
时会首先判断当前是否已经连接了 master
,如果已经连接并且和当前 master ip
一致则直接返回,否则关闭 netty
通信,重置当前 work
的 master
信息。
然后通过 CountDownLatch
的 await(long timeout, TimeUnit unit)
方法,来进行work
与 master
的超时连接判断,通过ChannelFutureListener
在 master
与work
通信连接成功时设置 ServerChannel
,并且容错方式为快速失败FailFastCluster.wrap(future.channel())
以上是关于hera源码剖析:项目启动之分布式锁的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot整合SSM三大框架源码剖析之SpringBoot源码剖析