基于Curator(zookeeper)实现leader选举

Posted 、楽.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Curator(zookeeper)实现leader选举相关的知识,希望对你有一定的参考价值。

在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来,如下图所示,这个就是所谓的leader选举,而zookeeper作为leader选举的功能,在很多中间件中都有使用,比如kafka基于zookeeper实现leader选举,Hadoop、Spark等。

Curator实现leader选举

除了作为集群节点的leader选举之外,leader选举还可以用在其他的场景,比如在分布式调度任务系统中,从可靠性角度出发,集群也是必不可少的。但往往,为了保证任务不会重复分配,分配任务的节点只能有一个,这种情况就需要从集群中选出一个Leader(老大)去任务池里取任务,如下图所示。

本文就会介绍Curator基于Zookeeper封装的Leader选举工具类LeaderLatch与LeaderSelector的使用及原理分析,Curator有两种选举recipe(Leader Latch和Leader Election),两种实现机制上有一定的差异,后续会逐步说明。

1. LeaderLatch使用实战

首先我们实现定时调度任务。

Quartz中最重要的三个对象:Job、Trigger、Scheduler。

  • Job,表示任务

  • Trigger,配置调度参数

  • Scheduler,代表一个调度容器,一个调度容器中可以注册多个JobDetail和Trigger

我们首先引入相关依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
    <version>2.5.3</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>

接下来我们通过继承SchedulerFactoryBean从而可以进行一个定时任务的触发。引入LeaderLatch以及定义相关namespace。

public class ZkSchedulerFactoryBean extends SchedulerFactoryBean 

    private LeaderLatch leaderLatch;

    private final String LEADER_PATH = "/leader"; //namespace


编写该类的构造方法对LeaderLatch进行相关初始化:

首先需要关闭自动开启定时任务,然后初始化LeaderLatch的时候传入客户端以及相关路径,同时添加相关监听,以便leader挂掉之后可以监听新leader。

    public ZkSchedulerFactoryBean() throws Exception 
        this.setAutoStartup(false); //应用启动的时候不自动开启定时任务

        leaderLatch = new LeaderLatch(getClient(), LEADER_PATH);
        leaderLatch.addListener(new DemoLeaderLatchListener(this)); //当leader发生变化的时候,需要触发监听
        leaderLatch.start();
    

    private CuratorFramework getClient() 
            CuratorFramework curatorFramework = CuratorFrameworkFactory
                    .builder()
                    .connectString("localhost:2181")
                    .sessionTimeoutMs(15000)
                    .connectionTimeoutMs(20000)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 10))
                    .build();
            curatorFramework.start();
            return curatorFramework;
        

我们需要创建一个新的监听:通过构造器传入SchedulerFactoryBean以便控制定时任务启动和停止。如果抢占成功则开启定时任务,如果抢占失败则停止定时任务。

public class DemoLeaderLatchListener implements LeaderLatchListener 
    //控制定时任务启动和停止的方法
    private SchedulerFactoryBean schedulerFactoryBean;

    public DemoLeaderLatchListener(SchedulerFactoryBean schedulerFactoryBean) 
        this.schedulerFactoryBean = schedulerFactoryBean;
    

    @Override
    public void isLeader() 
        System.out.println(Thread.currentThread().getName()+"成为了leader");
        schedulerFactoryBean.setAutoStartup(true);
        schedulerFactoryBean.start();
    

    @Override
    public void notLeader() 
        System.out.println(Thread.currentThread().getName()+"抢占leader失败,不执行任务");
        schedulerFactoryBean.setAutoStartup(false);
        schedulerFactoryBean.stop();
    

我们还需要在ZkSchedulerFactoryBean类中重写startSchedulerdestroy方法。

@Override
protected void startScheduler(Scheduler scheduler, int startupDelay) throws SchedulerException 
    if (this.isAutoStartup()) 
        super.startScheduler(scheduler, startupDelay);
    


/**
 * 释放资源
 * @throws SchedulerException
 */
@Override
public void destroy() throws SchedulerException 
    CloseableUtils.closeQuietly(leaderLatch);
    super.destroy();

上面完成之后,我们就可以去定义具体的定时任务了。

创建一个类继承QuartzJobBean即可,然后在executeInternal方法中定义我们需要执行的任务。

public class QuartzJob extends QuartzJobBean 

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException 
        System.out.println("开始执行定时任务");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("当前执行的系统时间:" + sdf.format(new Date()));
    

具体的定时任务创建完成后,我们就可以去定义我们的触发器了。

首先创建类,添加Configuration将该类交给Spring管理,然后需要声明我们之前定义的ZkSchedulerFactoryBean类,将其交给Spring容器管理,这样才能对我们的定时任务进行一个触发。然后声明触发器以及定时任务。(方法中的参数都会通过依赖注入的方式传入)

@Configuration
public class QuartzConfiguration 

    //触发
    @Bean
    public ZkSchedulerFactoryBean schedulerFactoryBean(JobDetail jobDetail, Trigger trigger) throws Exception 
        ZkSchedulerFactoryBean zkSchedulerFactoryBean = new ZkSchedulerFactoryBean();
        zkSchedulerFactoryBean.setJobDetails(jobDetail);
        zkSchedulerFactoryBean.setTriggers(trigger);
        return zkSchedulerFactoryBean;
    

    //定时任务
    @Bean
    public JobDetail jobDetail() 
        return JobBuilder.newJob(QuartzJob.class).storeDurably().build();
    

    @Bean
    public Trigger trigger(JobDetail jobDetail) 
        //定义一个简单执行器,一秒执行一次,重复执行。
        SimpleScheduleBuilder simpleScheduleBuilder =
                SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).repeatForever();
        return TriggerBuilder.newTrigger().forJob(jobDetail).withSchedule(simpleScheduleBuilder).build();
    


至此我们的代码全部编写完成,接下来就是测试了。首先我们需要开启两个springboot项目,注意需要自行修改下端口号。

这个时候我们打开我们的zk服务器,然后启动两个项目即可。

我们可以发现leader2抢占了leader开始执行定时任务,leader1还在继续等待。

这时候查看我们ZK上面的节点信息:可以发现两个临时节点。

我们手动将leader2停止,查看leader1的效果。(有可能不是实时的,会有一些延迟。)

以上便是我们实现高可用的一种简单方法。

2. LeaderSelector实战

LeaderSelector和Leader Latch最的差别在于,leader可以释放领导权以后,还可以继续参与竞争。

我们通过以下一个简单案例来了解一下。

public class SelectorClientExample extends LeaderSelectorListenerAdapter implements Closeable 

    private final String name;
    private final LeaderSelector leaderSelector;

    public SelectorClientExample(String path, String name) 
        leaderSelector = new LeaderSelector(getClient(), path, this);
        leaderSelector.autoRequeue();
        this.name = name;
    

    @Override
    public void close() throws IOException 
        leaderSelector.close();
    

    public void start() 
        leaderSelector.start();
    

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception 
        System.out.println(name + " 成为Leader");
        Thread.sleep(1000);
    

    private CuratorFramework getClient() 
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(15000)
                .connectionTimeoutMs(20000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 10))
                .build();
        curatorFramework.start();
        return curatorFramework;
    

    public static void main(String[] args) throws IOException 
        String path = "/leader";
        for (int i = 0; i < 10; i++) 
            SelectorClientExample selectorClientExample =
                    new SelectorClientExample(path, "Client:" + i);
            selectorClientExample.start();
        
        System.in.read();
    

这时候我们进行测试,查看节点信息可以发现各个节点在重复尝试竞争leader。

项目地址

zk demo

以上是关于基于Curator(zookeeper)实现leader选举的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理

Curator实现zookeeper分布式锁的基本原理

Zookeeper开源客户端Curator之基本功能讲解

Curator场景应用

SpringBoot基于Zookeeper和Curator生成唯一ID

Curator实现zookeeper的节点监听