Flink基于ZookeeperCurator的高可用原理1
Posted 我不需要这个昵称
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink基于ZookeeperCurator的高可用原理1相关的知识,希望对你有一定的参考价值。
本篇是基于Flink1.16对老版本(Flink1.12以下)高可用原理、以Standalone模式下的WebMonitorEndpoint为例的一篇更新
本篇聚焦于Zookeeper的高可用原理。
1. leader的选取
分布式任务调度系统往往是一个服务集群,但是为了防止任务重复执行,通常只有一个leader去任务池里取任务,leaderLatch和leaderSelector 就是Curator基于zookeeper封装的leader选举工具类。
LeaderLatch 原理是利用临时有序节点,最先创建的序号最小的节点成为leader节点
LeaderSelector利用Curator中InterProcessMutex分布式锁进行抢主,抢到锁的即为Leader
leaderLatch和Selector 看起来原理都是利用分布式锁原理,区别在于,selector中leader丢失leader之后会重新进入leader争夺,即在这个目录下再创建一个临时节点,等待。
Flink中既是利用LeaderLatch作为Dispacher、ResourceManager、WebMonitorEndpoint三大组件的选主工具类。
2. 依赖
pom.xml
<dependencies>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>
<!--log4j2-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>$log4j.version</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>$log4j.version</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jul</artifactId>
<version>$log4j.version</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>$log4j.version</version>
</dependency>
</dependencies>
log4j2.xml (非必要)
<?xml version="1.0" encoding="UTF-8"?>
<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
<!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出-->
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->
<configuration status="INFO" monitorInterval="30">
<!--先定义所有的appender-->
<appenders>
<!--这个输出控制台的配置-->
<console name="Console" target="SYSTEM_OUT">
<!--输出日志的格式-->
<PatternLayout pattern="[%dyyyy-MM-dd HH:mm:ss:SSS] [%p] - %l - %m%n"/>
</console>
</appenders>
<!--然后定义logger,只有定义了logger并引入的appender,appender才会生效-->
<loggers>
<root level="ERROR">
<appender-ref ref="Console"/>
</root>
</loggers>
</configuration>
3. 代码
public class CuratorFrameworkProperties
// 连接地址
public static final String CONNECT_ADDRESS = "localhost:2181";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 5000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 5000;
// 命名空间
public static final String NAMESPACE = "MyCuratorDemo";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 1);
public class LeaderLatchRunnable implements Runnable
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
@SneakyThrows
@Override
public void run()
// 使用不同的CuratorFramework实例,表示不同的分布式服务节点
CuratorFramework curator = getCuratorFramework();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
// 模拟随机加入的分布式服务节点
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 创建LeaderLatch实例(用于Leader选举)
// curator是CuratorFramework实例,用于与ZooKeeper交互
// "/services/leader"是latchPath,Leader节点会成功创建该节点(其他节点则会失败)
// 将线程名(Thread.currentThread().getName())作为分布式服务节点的id
// LeaderLatch.CloseMode.NOTIFY_LEADER表示close模式,即节点进行close操作后的模式
LeaderLatch latch = new LeaderLatch(
curator,
"/services/leader",
Thread.currentThread().getName(),
LeaderLatch.CloseMode.NOTIFY_LEADER);
// 给LeaderLatch实例添加监听器(LeaderLatchListenerImpl实例)
// EXECUTOR_SERVICE表示执行该LeaderLatchListenerImpl实例的Executor实例
latch.addListener(new LeaderLatchListenerImpl(latch), EXECUTOR_SERVICE);
System.out.println(latch.getId() + "准备好了!");
// 开始Leader选举
latch.start();
System.out.println(latch.getId() + "开始Leader选举!");
private CuratorFramework getCuratorFramework()
// 创建CuratorFramework实例
return CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
@RequiredArgsConstructor
public class LeaderLatchListenerImpl implements LeaderLatchListener
private final LeaderLatch LATCH;
@SneakyThrows
@Override
public void isLeader()
System.out.println("--------------------------------" + LATCH.getId() +
"被选举为Leader--------------------------------");
LATCH.getParticipants().forEach(System.out::println);
// 睡眠5秒就close(该节点会从Leader选举中移除),其他节点会重新进行Leader选举
Thread.sleep(5000);
LATCH.close();
@SneakyThrows
@Override
public void notLeader()
// 节点调用了close方法,只有在LeaderLatch.CloseMode.NOTIFY_LEADER模式下会调用该方法
// LeaderLatch.CloseMode.SILENT模式下不会调用该方法
System.out.println("--------------------------------" + LATCH.getId() +
"离开,重新进行Leader选举--------------------------------");
public class Application
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception
for (int i = 0; i < 7; i++)
EXECUTOR_SERVICE.execute(new LeaderLatchRunnable());
Thread.sleep(10000000);
控制台
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
--------------------------------pool-1-thread-1被选举为Leader--------------------------------
Participantid=‘pool-1-thread-1’, isLeader=true
pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-2准备好了!
pool-1-thread-2开始Leader选举!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
--------------------------------pool-1-thread-1离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-6被选举为Leader--------------------------------
Participantid=‘pool-1-thread-6’, isLeader=true
Participantid=‘pool-1-thread-5’, isLeader=false
Participantid=‘pool-1-thread-2’, isLeader=false
Participantid=‘pool-1-thread-3’, isLeader=false
Participantid=‘pool-1-thread-7’, isLeader=false
Participantid=‘pool-1-thread-4’, isLeader=false
…
ZkCli.sh
[zk: localhost:2181(CONNECTED) 97] ls /MyCuratorDemo/services/leader
[_c_691dbf34-67a2-4700-a314-24e4f1a914b9-latch-0000000004, _c_743eca28-ee10-4f38-ba98-8aa41cc10960-latch-0000000000, _c_7642e5f2-6c67-4113-a671-8bc4d14d9c8d-latch-0000000001, _c_774d824a-79b0-49e1-a13a-fcbba9104f5e-latch-0000000005, _c_b88258b4-4a3d-4fd6-a278-d16c76b39fd4-latch-0000000006, _c_cb797157-fbf3-4524-b9aa-5e9a2b9188f2-latch-0000000002, _c_cd3e7b12-d400-4ee1-9291-e726ea8c272a-latch-0000000003]
flink的安装模式
Flink 常见的部署模式:本地模式、Standalone 模式和 Flink On Yarn 模式,然后分别讲解三种模式的使用场景和部署中常见的问题,最后将讲解在生产环境中 Flink 集群的高可用配置。
Flink 常见的部署模式
环境准备
在绝大多数情况下,我们的 Flink 都是运行在 Unix 环境中的,推荐在 Mac OS 或者 Linux 环境下运行 Flink。如果是集群模式,那么可以在自己电脑上安装虚拟机,保证有一个 master 节点和两个 slave 节点。
同时,要注意在所有的机器上都应该安装 JDK 和 SSH。JDK 是我们运行 JVM 语言程序必须的,而 SSH 是为了在服务器之间进行跳转和执行命令所必须的。关于服务器之间通过 SSH 配置公钥登录,你可以直接搜索安装和配置方法,我们不做过度展开。
Flink 的安装包可以在这里下载。需要注意的是,如果你要和 Hadoop 进行集成,那么我们需要使用到对应的 Hadoop 依赖,下面将会详细讲解。
Local 模式
Local 模式是 Flink 提供的最简单部署模式,一般用来本地测试和演示使用。
我们在这里下载 Apache Flink 1.10.0 for Scala 2.11 版本进行演示,该版本对应 Scala 2.11 版本。
将压缩包下载到本地,并且直接进行解压,使用 Flink 默认的端口配置,直接运行脚本启动:
复制? [SoftWare]# tar -zxvf flink-1.10.0-bin-scala_2.11.tgz
上图则为解压完成后的目录情况。
然后,我们可以直接运行脚本启动 Flink :
复制? [flink-1.10.0]# ./bin/start-cluster.sh
上图显示我们的 Flink 启动成功。
我们直接访问本地的 8081 端口,可以看到 Flink 的后台管理界面,验证 Flink 是否成功启动。
可以看到 Flink 已经成功启动。当然,我们也可以查看运行日志来确认 Flink 是不是成功启动了,在 log 目录下有程序的启动日志:
我们尝试提交一个测试任务:
复制./bin/flink run examples/batch/WordCount.jar
我们在控制台直接看到输出。同样,在 Flink 的后台管理界面 Completed Jobs 一栏可以看到刚才提交执行的程序:
Standalone 模式
Standalone 模式是集群模式的一种,但是这种模式一般并不运行在生产环境中,原因和 on yarn 模式相比:
Standalone 模式的部署相对简单,可以支持小规模,少量的任务运行;
Stabdalone 模式缺少系统层面对集群中 Job 的管理,容易遭成资源分配不均匀;
资源隔离相对简单,任务之间资源竞争严重。
我们在 3 台虚拟机之间搭建 standalone 集群:
在 master 节点,将 Apache Flink 1.10.0 for Scala 2.11 包进行解压:
复制? [SoftWare]# tar -zxvf flink-1.10.0-bin-scala_2.11.tgz
重点来啦,我们需要修改 Flink 的配置文件,并且将修改好的解压目录完整的拷贝到两个从节点中去。在这里,我强烈建议主节点和从节点的目录要保持一致。
我们修改 conf 目录下的 flink-conf.yaml:
flink-conf.yaml 文件中有大量的配置参数,我们挑选其中必填的最基本参数进行修改:
复制jobmanager.rpc.address: master
jobmanager.heap.size: 1024m
jobmanager.rpc.port: 6123
taskmanager.memory.process.size: 1568m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
io.tmp.dirs: /tmp
它们分别代表:
如果你对其他的参数有兴趣的话,可以直接参考官网。
接下来我们修改 conf 目录下的 master 和 slave 文件。
vim master,将内容修改为:
复制master
vim slave,将内容修改为:
复制slave01
slave02
然后,将整个修改好的 Flink 解压目录使用 scp 远程拷贝命令发送到从节点:
复制scp -r /SoftWare/flink-1.10.0 slave01:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave02:/SoftWare/
在 master、slave01、slave02 上分别配置环境变量,vim /etc/profile,将内容修改为:
复制export FLINK_HOME=/SoftWare/flink-1.10.0
export PATH=$PATH:$FLINK_HOME/bin
到此为止,我们整个的基础配置已经完成,下面需要启动集群,登录 master 节点执行:
复制/SoftWare/flink-1.10.0/bin/start-cluster.sh
可以在浏览器访问:http://192.168.2.100:8081/ 检查集群是否启动成功。
集群搭建过程中,可能出现的问题:
端口被占用,我们需要手动杀掉占用端口的程序;
目录找不到或者文件找不到,我们在 flink-conf.yaml 中配置过 io.tmp.dirs ,这个目录需要手动创建。
On Yarn 模式和 HA 配置
上图是 Flink on Yarn 模式下,Flink 和 Yarn 的交互流程。Yarn 是 Hadoop 三驾马车之一,主要用来做资源管理。我们在 Flink on Yarn 模式中也是借助 Yarn 的资源管理优势,需要在三个节点中配置 YARN_CONF_DIR、HADOOP_CONF_DIR、HADOOP_CONF_PATH 中的任意一个环境变量即可。
本课时中集群的高可用 HA 配置是基于独立的 ZooKeeper 集群。当然,Flink 本身提供了内置 ZooKeeper 插件,可以直接修改 conf/zoo.cfg,并且使用 /bin/start-zookeeper-quorum.sh 直接启动。
环境准备:
ZooKeeper-3.x
Flink-1.10.0
Hadoop-2.6.5
我们使用 5 台虚拟机搭建 on yarn 的高可用集群:
如果你在使用 Flink 的最新版本 1.10.0 时,那么需要在本地安装 Hadoop 环境并进行下面的操作。
首先,添加环境变量:
复制vi /etc/profile
# 添加环境变量
export HADOOP_CONF_DIR=/Software/hadoop-2.6.5/etc/hadoop
# 环境变量生效
source /etc/profile
其次,下载对应的的依赖包,并将对应的 Hadoop 依赖复制到 flink 的 lib 目录下,对应的 hadoop 依赖可以在这里下载。
与 standalone 集群不同的是,我们需要修改 flink-conf.yaml 文件中的一些配置:
复制high-availability: zookeeper
high-availability.storageDir: hdfs://cluster/flinkha/
high-availability.zookeeper.quorum: slave01:2181,slave02:2181,slave03:2181
它们分别代表:
然后分别修改 master、slave、zoo.cfg 三个配置文件。
vim master,将内容修改为:
复制master01:8081
master02:8081
vim slave,将内容修改为:
复制slave01
slave02
slave03
vim zoo.cfg,将内容修改为:
复制server.1=slave01:2888:3888
server.2=slave02:2888:3888
server.3=slave03:2888:3888
然后,我们将整个修改好的 Flink 解压目录使用 scp 远程拷贝命令发送到从节点:
复制scp -r /SoftWare/flink-1.10.0 slave01:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave02:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave03:/SoftWare/
分别启动 Hadoop 和 ZooKeeper,然后在主节点,使用命令启动集群:
复制/SoftWare/flink-1.10.0/bin/start-cluster.sh
我们同样直接访问 http://192.168.2.100:8081/ 端口,可以看到 Flink 的后台管理界面,验证 Flink 是否成功启动。
在 Flink on yarn 模式下,启动集群的方式有两种:
直接在 yarn 上运行任务
yarn session 模式
直接在 yarn 上运行任务相当于将 job 直接提交到 yarn 上,每个任务会根据用户的指定进行资源申请,任务之间互不影响。
复制./bin/flink run -yjm 1024m -ytm 4096m -ys 2 ./examples/batch/WordCount.jar
更多关于参数的含义,可以参考官网。
使用 yarn session 模式,我们需要先启动一个 yarn-session 会话,相当于启动了一个 yarn 任务,这个任务所占用的资源不会变化,并且一直运行。我们在使用 flink run 向这个 session 任务提交作业时,如果 session 的资源不足,那么任务会等待,直到其他资源释放。当这个 yarn-session 被杀死时,所有任务都会停止。
例如我们启动一个 yarn session 任务,该任务拥有 8G 内存、32 个槽位。
复制./bin/yarn-session.sh -tm 8192 -s 32
我们在 yarn 的界面上可以看到这个任务的 ID,然后向这个 session ID 提交 Flink 任务:
复制./bin/flink run -m yarn-cluster -yid application_xxxx ./examples/batch/WordCount.jar
其中,application_xxxx 即为上述的 yarn session 任务 ID。
总结
本课时我们讲解了 Flink 的三种部署模式和高可用配置,并且对这三种部署模式的适用场景进行了讲解。在生产上,我们最常用的方式当然是 Flink on Yarn,借助 Yarn 在资源管理上的绝对优势,确保集群和任务的稳定
以上是关于Flink基于ZookeeperCurator的高可用原理1的主要内容,如果未能解决你的问题,请参考以下文章