flink on k8s native 再次实践
Posted Jeseva
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink on k8s native 再次实践相关的知识,希望对你有一定的参考价值。
基于flink 1.13.2版本做的实践
本次主要实践flink on k8s native 的两种方式, 分别是sesion 和 application方式
第一步: k8s环境准备
1, 创建一个namespace
kubectl create namespace flink-session-cluster-test-1213
2, 新建一个serviceaccount, 用来提交flink的任务
kubectl create serviceaccount flink -n flink-session-cluster-test-1213
3, 做好绑定
kubectl create clusterrolebinding flink-role-binding-flink-session-cluster-test-1213_flink \\
--clusterrole=edit --serviceaccount=flink-session-cluster-test-1213:flink
第二步: 镜像准备
使用hdfs作为flink的checkpoint存储,所以需要在flink的lib目录中放入hadoop的jar包
创建Dockerfile文件,并添加如下内容:
vi Dockerfile
FROM flink:1.13.2-scala_2.11-java8
COPY ./flink-shaded-hadoop-2-uber-2.7.5-10.0.jar $FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
构建image
docker build -t native_realtime:1.0.3 .
后续的session与application均使用该镜像镜像实践
为了解决hosts映射以及用户自定义jar包等问题, 需要使用yaml模板
vi flink-template.yaml
apiVersion: v1
kind: Pod
metadata:
name: flink-pod-template
spec:
initContainers:
- name: artifacts-fetcher
image: native_realtime:1.0.3
# 添加自定义运行的jar包以及各种配置文件
command: ["/bin/sh","-c"]
args: ["wget http://xxxxxx:8082/flinkhistory/1.13.2/tt.sql -O /opt/flink/usrHome/taa.sql ; wget http://xxxx:8082/flinkhistory/1.13.2/realtime-dw-service-1.0.1-SNAPSHOT.jar -O /opt/flink/usrHome/realtime-dw-service-1.0.1.jar"]
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
hostAliases:
- ip: 10.1.1.103
hostnames:
- "cdh103"
- ip: 10.1.1.104
hostnames:
- "cdh104"
- ip: 10.1.1.105
hostnames:
- "cdh105"
- ip: 10.1.1.106
hostnames:
- "cdh106"
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
volumes:
- name: flink-usr-home
hostPath:
path: /tmp
type: Directory
使用run application模式提交任务
/data/flink-1.13.0/bin/flink run-application \\
--target kubernetes-application \\
-Dresourcemanager.taskmanager-timeout=345600 \\
-Dkubernetes.namespace=flink-session-cluster-test-1213 \\
-Dkubernetes.service-account=flink \\
-Dkubernetes.cluster-id=flink-stream-reatime-dw11 \\
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \\
-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery \\
-Dkubernetes.container.image=native_realtime:1.0.3 \\
-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 \\
-Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 \\
-Dexecution.checkpointing.interval=2s \\
-Dexecution.checkpointing.mode=EXACTLY_ONCE \\
-Dstate.backend=filesystem \\
-Dkubernetes.rest-service.exposed.type=NodePort \\
-Drestart-strategy=failure-rate \\
-Drestart-strategy.failure-rate.delay=1s \\
-Drestart-strategy.failure-rate.failure-rate-interval=5s \\
-Drestart-strategy.failure-rate.max-failures-per-interval=1 \\
-Dtaskmanager.memory.process.size=1096m \\
-Dkubernetes.taskmanager.cpu=1 \\
-Dtaskmanager.numberOfTaskSlots=1 \\
-Dkubernetes.pod-template-file=./flink-template.yaml \\
-c com.xxx.bigdata.rt.dw.service.runtime.RealtimeWarehouseMain \\
local:///opt/flink/usrHome/realtime-dw-service-1.0.1.jar \\
-cfc state.checkpoint.interval=60000 -cfp 1 -cfm no -cfn kafka_es -cfs /opt/flink/usrHome/taa.sql
使用session模式提交任务
-- 创建session
/data/flink-1.13.0/bin/kubernetes-session.sh \\
-Dkubernetes.cluster-id=stream-wordcount-application-cluster \\
-Dtaskmanager.memory.process.size=1096m \\
-Dkubernetes.taskmanager.cpu=1 \\
-Dtaskmanager.numberOfTaskSlots=4 \\
-Dkubernetes.container.image=native_realtime:1.0.3 \\
-Dkubernetes.service.exposed.type=NodePort \\
-Dkubernetes.jobmanager.service-account=flink \\
-Dkubernetes.service-account=flink \\
-Dkubernetes.namespace=flink-session-cluster-test-1213\\
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \\
-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery \\
-Dkubernetes.container.image=native_realtime:1.0.3 \\
-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 \\
-Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 \\
-Dexecution.checkpointing.interval=2s \\
-Dexecution.checkpointing.mode=EXACTLY_ONCE \\
-Dstate.backend=filesystem \\
-Dkubernetes.rest-service.exposed.type=NodePort \\
-Drestart-strategy=failure-rate \\
-Drestart-strategy.failure-rate.delay=1s \\
-Drestart-strategy.failure-rate.failure-rate-interval=5s \\
-Drestart-strategy.failure-rate.max-failures-per-interval=1 \\
-Dkubernetes.pod-template-file=./flink-template.yaml \\
提交任务
/data/flink-1.13.0/bin/flink run -d -e kubernetes-session \\
-Dkubernetes.cluster-id=stream-wordcount-application-cluster \\
-Dkubernetes.namespace=flink-session-cluster-test-1213 \\
-Dkubernetes.taskmanager.service-account=flink \\
-Dexecution.attached=true \\
-c com.xxx.bigdata.rt.dw.service.runtime.RealtimeWarehouseMain \\
./realtime-dw-service-1.0.1-SNAPSHOT.jar \\
-cfc state.checkpoint.interval=60000 -cfp 1 -cfm no -cfn kafka_es -cfs ./tt.sql
问题及解决:
1, flink任务的hosts问题?
可以通过flink 提供的yaml模板, 将hosts配置放在yaml中, 然后在命令使用-Dkubernetes.pod-template-file指定
2, 关于使用session 模式出现启动taskManager时, 获取configmap权限不够的问题?
可以使用 -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.service-account=flink -Dkubernetes.taskmanager.service-account=flink 来解决
3, 如果使用application模式, 解决自定义jar包不想打入镜像的问题?
可以在yaml模板中, initContainers使用wget方式引入
session模式与application模式的相互比较
session模式, 先启动jobmanager, 再之后根据提交的任务, 来启动taskManager, 导致多个任务日志耦合在一起, 但是自定义的jar包不需要再构建镜像, 相对提交比较简单
application模式, 需要将自定义的jar包构建在镜像中, 或者使用yaml模板的initContainers的方式. 日志可以分开.
这两种模式均可以实现高可用, 不管是application还是session模式, 均可以自动以checkpoint自动重启
本次未实践日志的收集等相关
以上是关于flink on k8s native 再次实践的主要内容,如果未能解决你的问题,请参考以下文章
flink on k8s模式通过web UI界面查看任务运行情况
flink on k8s模式通过web UI界面查看任务运行情况