hera源码剖析:项目启动之分布式锁

Posted scx_white

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hera源码剖析:项目启动之分布式锁相关的知识,希望对你有一定的参考价值。

文章目录

前言

本文章主要是为了让使用者能够更加了解 hera 的原理,并且能够在之基础上进行改进所进行。hera 是一款分布式任务调度与开发平台,具体不再描述,开源地址:https://github.com/scxwhite/hera

获取当前机器ip

hera 中,有一些静态代码块,这里只说一个很重要的部分,WorkContext 类中有这样一部分代码


    static 
        host = NetUtils.getLocalAddress();
        HeraLog.info("-----------------------------当前机器的IP为:-----------------------------", host);
        //省略部分代码
      

该代码会获取当前机器的 ip 信息,由于多网卡的原因可能获取的ip不是很准确,此时你可以通过在 hera-admin/src/main/resources/config/hera.properties 文件,修改server.ip=127.0.0.1 配置为当前机器 ip 即可。具体的代码不再深入。

分布式锁

hera 是使用 spring boot 开发的,启动项目执行 AdminBootstrap.main 方法,由于 DistributeLock#init 方法使用了 @PostConstruct 注册,首先会进入该方法

  @PostConstruct
    public void init() 
        workClient.workSchedule.scheduleAtFixedRate(this::checkLock, 10, 60, TimeUnit.SECONDS);
    

在该方法中会向 ScheduledThreadPoolExecutor 线程池提交一个每隔 60 秒执行的任务,具体任务内容在 DistributeLock#checkLock方法

  public void checkLock() 
        try 
            String ON_LINE = "online";
            //1.从hera_lock表查询最新记录
            HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE);
            //2.如果当前没有master,直接以当前机器ip插入hera_lock新记录
            if (heraLock == null) 
                Date date = new Date();
                heraLock = HeraLock.builder()
                        .id(1)
                        .host(WorkContext.host)
                        .serverUpdate(date)
                        .subgroup(ON_LINE)
                        .gmtCreate(date)
                        .gmtModified(date)
                        .build();
                Integer lock = heraLockService.insert(heraLock);
                if (lock == null || lock <= 0) 
                    return;
                
            
			//3.master判断
            if (isMaster = WorkContext.host.equals(heraLock.getHost().trim())) 
                heraLock.setServerUpdate(new Date());
                heraLockService.update(heraLock);
                HeraLog.info("hold lock and update time");
                heraSchedule.startup();
             else 
                long currentTime = System.currentTimeMillis();
                long lockTime = heraLock.getServerUpdate().getTime();
                long interval = currentTime - lockTime;
                long timeout = 1000 * 60 * 5L;
                if (interval > timeout && isPreemptionHost()) 
                    Date date = new Date();
                    Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());
                    if (lock != null && lock > 0) 
                        ErrorLog.error("master 发生切换, 抢占成功", WorkContext.host);
                        heraSchedule.startup();
                        heraLock.setHost(WorkContext.host);
                        //TODO  接入master切换通知
                     else 
                        HeraLog.info("master抢占失败,由其它worker抢占成功");
                    
                 else 
                    //非主节点,调度器不执行 主要是为了避免手动修改hera_lock表后出现多master问题
                    heraSchedule.shutdown();
                
            
            //4.初始化work服务
            workClient.init();
            //5.连接master
            workClient.connect(heraLock.getHost().trim());
         catch (Exception e) 
                    //出现异常时,对master节点做failFast操作。避免出现双master
            heraSchedule.shutdown();
            ErrorLog.error("检测锁异常", e);
        
    

在解释这些代码之前大家要知道 hera 系统有个 hera_lock 表,该表最多只会有一条记录,并且该记录保存着当前的 master ip。也就是说大家如果想切换 master,可以直接通过修改该条记录的 ip 来实现。

  • DistributeLock#checkLock每次被调用时第一步会从 hera_lock 表查询出最新的 master 信息,至于在这里使用了 online 进行过滤没有实际意义。如果当前没有 master,即 hera_lock 等于 null (一般在第一次部署启动 hera 时会有该情况),为了方便调试此时会自动把当前机器设置为 master,当然如果插入当前 hera_lock 记录失败(被其它 work 插入了),会直接返回等待下次调用。
  • 在第 3 部分,首先会判断当前机器的 ip 信息与 hera_lock 表的 ip 是否匹配,如果匹配则表示当前机器为 master ,然后更新数据库的 server_update 时间,所以一定要保证你的所有机器时钟一致哦,最后调用heraSchedule.startup() 方法来启动 master 服务。
  • 如果发现当前机器的 ip 信息与 hera_lock 表的 ip 不匹配,则表示当前机器是 work,此时会通过 long interval = currentTime - lockTime; 来计算 master上次的心跳时间间隔,如果发现 master 已经超出5分钟未发送新的心跳,则通过 isPreemptionHost 方法判断当前机器是否允许抢占 master,如果允许则通过Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost()); 方法来进行抢占。抢占 sql
# 乐观锁方式进行抢占,保证同一时间只有一台机器能够抢占成功
update hera_lock set gmt_modified = #gmtModified,host = #host,server_update = #serverUpdate where host = #lastHost

如果发现 work 抢占 master 成功,则调用 heraSchedule.startup();来启动master 服务。

  private boolean isPreemptionHost() 
        List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnv.preemptionMasterGroup);
        if (preemptionHostList.contains(WorkContext.host)) 
            return true;
         else 
            HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString());
            return false;
        
    

isPreemptionHost 方法主要是判断当前机器是否在允许抢占的机器组,该配置为:hera.preemptionMasterGroup

  • 在第 4 部分会进行 work 服务的初始化,此时需要注意的是:master 会启动 master 服务和 work 服务,work 只启动 work 服务。也就是说,你可以在本地 idea 启动 hera 来进行调试,你也可以在生产环境只有一台机器进行任务调度。
  • 在第 5 部分会进行 work 服务连接 master 服务的操作,即 netty 通信打开

看到这里,想必你已经了解了 DistributeLock 类是一个定时进行分布式锁检测的类,它决定着当前机器是启动 master 服务还是 work 服务

知识点总结

  • 可以通过直接修改 hera_lock 表的数据来切换 master
  • 可以只启动一台机器来进行 hera 的调度与开发
  • 可以通过配置hera.preemptionMasterGroup参数来让某些机器允许抢占master

master服务

在分布式锁中,如果当前机器抢到了 master,那么此时该机器会调用HeraSchedule#startup 启动 master 服务

    public void startup() 
        if (!running.compareAndSet(false, true)) 
            return;
        
        HeraLog.info("begin to start master context");
        masterContext.init();
    

为了保证 master 服务只被启动一次,使用了原子类 AtomicBoolean
继续往下看

 public void init() 
        //主要处理work的请求信息
        threadPool = new ThreadPoolExecutor(
                0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy());
        //主要管理master的一些延迟任务处理
        masterSchedule = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("master-schedule", false));
        masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES);
        masterSchedule.allowCoreThreadTimeOut(true);
        //开启quartz服务
        getQuartzSchedulerService().start();
        dispatcher = new Dispatcher();
        //初始化master端的netty消息handler
        handler = new MasterHandler(this);
        //初始化master server
        masterServer = new MasterServer(handler);
        masterServer.start(HeraGlobalEnv.getConnectPort());
        master.init(this);

		//master的定时任务管理者
        choreService = new ChoreService(5, "chore-service");
        //重跑任务初始化
        rerunJobInit = new RerunJobInit(master);
        choreService.scheduledChore(rerunJobInit);
        //重跑任务启动
        rerunJobLaunch = new RerunJobLaunch(master);
        choreService.scheduledChore(rerunJobLaunch);
        //信号丢失检测
        lostJobCheck = new LostJobCheck(master, new DateTime().getMinuteOfHour());
        choreService.scheduledChore(lostJobCheck);
        //心跳检测
        heartCheck = new WorkHeartCheck(master);
        choreService.scheduledChore(heartCheck);
        //版本生成
        actionInit = new JobActionInit(master);
        choreService.scheduledChore(actionInit);
        //任务是否完成检测
        finishCheck = new JobFinishCheck(master);
        choreService.scheduledChore(finishCheck);
        //队列扫描
        queueScan = new JobQueueScan(master);
        choreService.scheduledChoreOnce(queueScan);
        HeraLog.info("end init master content success ");
    

这部分代码主要功能是

  • 初始化master 端的消息处理线程池threadPool
  • 初始化 master 端的延迟任务线程池 masterSchedule
  • 开启 quartz 服务
  • 初始化 master 端的 netty 消息 handler
  • 初始化 master 的定时任务管理者,任务有:任务重跑初始化、重跑任务启动、信号丢失检测、心跳检测、版本生成、任务是否完成检测、队列扫描等

work服务

		//保证只启动一次
        if (!clientSwitch.compareAndSet(false, true)) 
            return;
        
        workContext.setWorkClient(this);
        //在这里目前是空的,公司内部初始化了一些公共数据源配置
        workContext.init();
        eventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NiosocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception 
                        ch.pipeline().addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS))
                                .addLast("frameDecoder", new ProtobufVarint32FrameDecoder())
                                .addLast("decoder", new ProtobufDecoder(RpcSocketMessage.SocketMessage.getDefaultInstance()))
                                .addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender())
                                .addLast("encoder", new ProtobufEncoder())
                                .addLast(new WorkHandler(workContext));
                    
                );
        HeraLog.info("init work client success ");
		workSchedule.schedule(new Runnable() 

            private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat();
            private int failCount = 0;

            @Override
            public void run() 
                try 
                    if (workContext.getServerChannel() != null) 
                        boolean send = workerHandlerHeartBeat.send(workContext);
                        if (!send) 
                            failCount++;
                            ErrorLog.error("send heart beat failed ,failCount :" + failCount);
                         else 
                            failCount = 0;
                            HeartLog.debug("send heart beat success:", workContext.getServerChannel().getRemoteAddress());
                        
                     else 
                        ErrorLog.error("server channel can not find on " + WorkContext.host);
                    
                 catch (Exception e) 
                    ErrorLog.error("heart beat error:", e);
                 finally 
                    workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);
                
            
        , HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);
		//省略定时更新日志的代码

work 服务这边同样使用了原子类保证只启动一次,然后就是初始化 worknetty 服务,再往下就是定时发送心跳信息给 master

work连接master

DistributeLock#checkLock 方法做的最后一件事情就是 work 连接 master,打开通信,调用的方法为:workClient.connect(heraLock.getHost().trim());

 public synchronized void connect(String host) throws Exception 

        if (workContext.getServerChannel() != null) 
            if (workContext.getServerHost().equals(host)) 
                return;
             else 
                workContext.getServerChannel().close();
                workContext.setServerChannel(null);
            
        
        workContext.setServerHost(host);
        CountDownLatch latch = new CountDownLatch(1);
        ChannelFutureListener futureListener = (future) -> 
            try 
                if (future.isSuccess()) 
                    workContext.setServerChannel(FailFastCluster.wrap(future.channel()));
                    SocketLog.info(workContext.getServerChannel().toString());
                
             catch (Exception e) 
                ErrorLog.error("连接master异常", e);
             finally 
                latch.countDown();
            
        ;
        ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, HeraGlobalEnv.getConnectPort()));
        connectFuture.addListener(futureListener);
        if (!latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS)) 
            connectFuture.removeListener(futureListener);
            connectFuture.cancel(true);
            throw new ExecutionException(new TimeoutException("connect server consumption of 2 seconds"));
        
        if (!connectFuture.isSuccess()) 
            throw new RuntimeException("connect server failed " + host,
                    connectFuture.cause());
        
        SocketLog.info("connect server success");
    

连接 master 时会首先判断当前是否已经连接了 master,如果已经连接并且和当前 master ip 一致则直接返回,否则关闭 netty 通信,重置当前 workmaster 信息。

然后通过 CountDownLatchawait(long timeout, TimeUnit unit)方法,来进行workmaster 的超时连接判断,通过ChannelFutureListenermasterwork 通信连接成功时设置 ServerChannel,并且容错方式为快速失败FailFastCluster.wrap(future.channel())

以上是关于hera源码剖析:项目启动之分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合SSM三大框架源码剖析之SpringBoot源码剖析

赫拉(hera)分布式任务调度系统之操作文档

剖析Tomcat核心思想和源码

汇总汇总-Spring&Cloud&Alibaba&源码剖析&分布式锁/事务-从入门到进阶到源码-学完保证吊打面试官

剖析 Vue.js 内部运行机制

一文解密Kafka,Kafka源码设计与实现原理剖析,真正的通俗易懂