基于Curator(zookeeper)实现leader选举
Posted 、楽.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Curator(zookeeper)实现leader选举相关的知识,希望对你有一定的参考价值。
在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来,如下图所示,这个就是所谓的leader选举,而zookeeper作为leader选举的功能,在很多中间件中都有使用,比如kafka基于zookeeper实现leader选举,Hadoop、Spark等。
Curator实现leader选举
除了作为集群节点的leader选举之外,leader选举还可以用在其他的场景,比如在分布式调度任务系统中,从可靠性角度出发,集群也是必不可少的。但往往,为了保证任务不会重复分配,分配任务的节点只能有一个,这种情况就需要从集群中选出一个Leader(老大)去任务池里取任务,如下图所示。
本文就会介绍Curator基于Zookeeper封装的Leader选举工具类LeaderLatch与LeaderSelector的使用及原理分析,Curator有两种选举recipe(Leader Latch和Leader Election),两种实现机制上有一定的差异,后续会逐步说明。
1. LeaderLatch使用实战
首先我们实现定时调度任务。
Quartz中最重要的三个对象:Job、Trigger、Scheduler。
-
Job,表示任务
-
Trigger,配置调度参数
-
Scheduler,代表一个调度容器,一个调度容器中可以注册多个JobDetail和Trigger
我们首先引入相关依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
接下来我们通过继承SchedulerFactoryBean
从而可以进行一个定时任务的触发。引入LeaderLatch
以及定义相关namespace。
public class ZkSchedulerFactoryBean extends SchedulerFactoryBean
private LeaderLatch leaderLatch;
private final String LEADER_PATH = "/leader"; //namespace
编写该类的构造方法对LeaderLatch
进行相关初始化:
首先需要关闭自动开启定时任务,然后初始化LeaderLatch
的时候传入客户端以及相关路径,同时添加相关监听,以便leader挂掉之后可以监听新leader。
public ZkSchedulerFactoryBean() throws Exception
this.setAutoStartup(false); //应用启动的时候不自动开启定时任务
leaderLatch = new LeaderLatch(getClient(), LEADER_PATH);
leaderLatch.addListener(new DemoLeaderLatchListener(this)); //当leader发生变化的时候,需要触发监听
leaderLatch.start();
private CuratorFramework getClient()
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(15000)
.connectionTimeoutMs(20000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
curatorFramework.start();
return curatorFramework;
我们需要创建一个新的监听:通过构造器传入SchedulerFactoryBean
以便控制定时任务启动和停止。如果抢占成功则开启定时任务,如果抢占失败则停止定时任务。
public class DemoLeaderLatchListener implements LeaderLatchListener
//控制定时任务启动和停止的方法
private SchedulerFactoryBean schedulerFactoryBean;
public DemoLeaderLatchListener(SchedulerFactoryBean schedulerFactoryBean)
this.schedulerFactoryBean = schedulerFactoryBean;
@Override
public void isLeader()
System.out.println(Thread.currentThread().getName()+"成为了leader");
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.start();
@Override
public void notLeader()
System.out.println(Thread.currentThread().getName()+"抢占leader失败,不执行任务");
schedulerFactoryBean.setAutoStartup(false);
schedulerFactoryBean.stop();
我们还需要在ZkSchedulerFactoryBean
类中重写startScheduler
及destroy
方法。
@Override
protected void startScheduler(Scheduler scheduler, int startupDelay) throws SchedulerException
if (this.isAutoStartup())
super.startScheduler(scheduler, startupDelay);
/**
* 释放资源
* @throws SchedulerException
*/
@Override
public void destroy() throws SchedulerException
CloseableUtils.closeQuietly(leaderLatch);
super.destroy();
上面完成之后,我们就可以去定义具体的定时任务了。
创建一个类继承QuartzJobBean
即可,然后在executeInternal
方法中定义我们需要执行的任务。
public class QuartzJob extends QuartzJobBean
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException
System.out.println("开始执行定时任务");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("当前执行的系统时间:" + sdf.format(new Date()));
具体的定时任务创建完成后,我们就可以去定义我们的触发器了。
首先创建类,添加Configuration
将该类交给Spring管理,然后需要声明我们之前定义的ZkSchedulerFactoryBean
类,将其交给Spring容器管理,这样才能对我们的定时任务进行一个触发。然后声明触发器以及定时任务。(方法中的参数都会通过依赖注入的方式传入)
@Configuration
public class QuartzConfiguration
//触发
@Bean
public ZkSchedulerFactoryBean schedulerFactoryBean(JobDetail jobDetail, Trigger trigger) throws Exception
ZkSchedulerFactoryBean zkSchedulerFactoryBean = new ZkSchedulerFactoryBean();
zkSchedulerFactoryBean.setJobDetails(jobDetail);
zkSchedulerFactoryBean.setTriggers(trigger);
return zkSchedulerFactoryBean;
//定时任务
@Bean
public JobDetail jobDetail()
return JobBuilder.newJob(QuartzJob.class).storeDurably().build();
@Bean
public Trigger trigger(JobDetail jobDetail)
//定义一个简单执行器,一秒执行一次,重复执行。
SimpleScheduleBuilder simpleScheduleBuilder =
SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).repeatForever();
return TriggerBuilder.newTrigger().forJob(jobDetail).withSchedule(simpleScheduleBuilder).build();
至此我们的代码全部编写完成,接下来就是测试了。首先我们需要开启两个springboot项目,注意需要自行修改下端口号。
这个时候我们打开我们的zk服务器,然后启动两个项目即可。
我们可以发现leader2抢占了leader开始执行定时任务,leader1还在继续等待。
这时候查看我们ZK上面的节点信息:可以发现两个临时节点。
我们手动将leader2停止,查看leader1的效果。(有可能不是实时的,会有一些延迟。)
以上便是我们实现高可用的一种简单方法。
2. LeaderSelector实战
LeaderSelector和Leader Latch最的差别在于,leader可以释放领导权以后,还可以继续参与竞争。
我们通过以下一个简单案例来了解一下。
public class SelectorClientExample extends LeaderSelectorListenerAdapter implements Closeable
private final String name;
private final LeaderSelector leaderSelector;
public SelectorClientExample(String path, String name)
leaderSelector = new LeaderSelector(getClient(), path, this);
leaderSelector.autoRequeue();
this.name = name;
@Override
public void close() throws IOException
leaderSelector.close();
public void start()
leaderSelector.start();
@Override
public void takeLeadership(CuratorFramework client) throws Exception
System.out.println(name + " 成为Leader");
Thread.sleep(1000);
private CuratorFramework getClient()
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(15000)
.connectionTimeoutMs(20000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
curatorFramework.start();
return curatorFramework;
public static void main(String[] args) throws IOException
String path = "/leader";
for (int i = 0; i < 10; i++)
SelectorClientExample selectorClientExample =
new SelectorClientExample(path, "Client:" + i);
selectorClientExample.start();
System.in.read();
这时候我们进行测试,查看节点信息可以发现各个节点在重复尝试竞争leader。
项目地址
以上是关于基于Curator(zookeeper)实现leader选举的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理