【Flink on k8s】Flink on Kubernetes 部署模式
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了【Flink on k8s】Flink on Kubernetes 部署模式相关的知识,希望对你有一定的参考价值。
参考技术A Flink 选择 Kubernetes 的主要原因是结合 Flink 和 Kubernetes 的 长稳性 。
① Flink 特性 :提供的实时服务是需要 长时间、稳定地运行 ,常应用于电信网络质量监控、实时风控、实时推荐等稳定性要求较高的场景;
② Kubernetes 优势 : 为应用提供了部署、管理能力,同时保证其稳定运行 。Kubernetes 具有很好的生态,可以 集成各种运维工具 ,例如 prometheus、主流日志采集工具等。Kubernetes 具有很好的 扩缩容机制 ,可以大大提高资源利用率。
预先构建 Flink 集群,且该集群长期处于运行状态,但不能自动扩缩容 。用户通过 client 提交作业到运行中的 JobManager,而 JobManager 将任务分配到运行中的 TaskManager。
Flink 集群是预先启动运行的。用户提交作业的时候,作业可以立即分配到 TaskManager,即 作业启动速度快 。
① 资源利用率低 ,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反正 TaskManager 资源不足。
② 作业隔离性差 ,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启。
参考: Flink on Standalone Kubernetes Reference
① 集群配置
集群配置通过 configmap 挂载到容器中
flink-configuration-configmap.yaml
② Deployment 文件
把 Flink 镜像 上传到 私有镜像仓 。编辑 jobmanager-service.yaml、jobmanager-deployment.yaml、taskmanager-deployment.yaml
jobmanager-deployment.yaml
taskmanager-deployment.yaml
jobmanager-service.yaml
③ 执行 yaml
通过 kubectl create -f 命令创建 Flink 集群
每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。
一个作业独占一个集群, 作业的隔离性好 。
资源利用率低 ,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之 TaskManager 资源不足。同时,JobManager 不能复用。
类似 Session 模式,需要 预先构建 JobManager 。不同点是用户通过 Flink Client 向 JobManager 提交作业后, 根据作业需要的 Slot 数量,JobManager 直接向 Kubernetes 申请 TaskManager 资源 ,最后把作业提交到 TaskManager 上。
TaskManager 的资源是实时的、按需进行的创建,对 资源的利用率更高 。
作业真正运行起来的时间较长 ,因为需要等待 TaskManager 创建。
参考: Native Kubernetes - Session Mode
① 集群配置
集群配置通过 configmap 挂载到容器中,如上 2.1 所示。
新增如下配置:
flink-configuration-configmap.yaml
② 配置 jobmanager-deployment.yaml
如上 2.1 所示,需要把启动脚本修改为 ./bin/kubernetes-session.sh
jobmanager-deployment.yaml
③ 执行 yaml
通过 kubectl create -f 命令创建 Flink 集群
类似 Application 模式,每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。不同点是 Native 特性 ,即 Flink 直接与 Kubernetes 进行通信并 按需申请资源 ,无需用户指定 TaskManager 资源的数量。
① 一个作业独占一个集群,作业的隔离性好。
② 资源利用率相对较高 ,按需申请 JobManager 和 TaskManager。
① 一个作业独占一个集群, JobManager 不能复用 。
② 作业启动较慢 ,在作业提交后,才开始创建 JobManager 和 TaskManager。
Flink Client 使用技巧和心得(Flink on Zeppelin)
Flink 链接Kafka
先建立catalog
CREATE CATALOG flink_hive WITH (
'type' = 'hive',
'default-database' = 'imods',
'hive-conf-dir' = '/home/admin/flink/conf'
);
建立kafka table
use catalog flink_hive;
--创建kafka源表
CREATE TABLE IF NOT EXISTS kafka_table (
vin string,
age int,
...
)--with 写入链接信息以及各种设置
WITH (
'connector' = 'kafka',
'topic' = '自定义的topic',
'properties.group.id' = '自定义的id',
'properties.bootstrap.servers' = '自己知道的地址1:端口号1,自己知道的地址2:端口号2',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";', --设定用户名与密码
'properties.key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
--'scan.startup.mode' = 'latest-offset',--五种选项下面会注意说明
--'scan.startup.mode' = 'earliest-offset',
'scan.startup.mode' = 'group-offsets',
'json.fail-on-missing-field' = 'false',--是否允许失败策略
'json.ignore-parse-errors' = 'true',--是否开启忽略错误策略
'format' = 'json'--传入的格式
);
group-offsets
: start from committed offsets in ZK / Kafka brokers of a specific consumer group.从提交给ZK的offset开始消费(必须注明groupID 才可以)
earliest-offset
: start from the earliest offset possible. 从最初点开始消费
latest-offset
: start from the latest offset.程序运行时有新消息才消费新消息
timestamp
: start from user-supplied timestamp for each partition. 指定时间戳开始进行消费
specific-offsets
: start from user-supplied specific offsets for each partition.
指定位置进行消费
建立对应的hivetable
--创建HIVE目标表
set table.sql-dialect=hive;
create table if not exists hive_table --table字段类型顺序务必与Kafkatable一致,严格要求
(
vin string,
age int,
...
)
comment '我是Hive表'
partitioned by (dt string) --option
stored as parquet --option
TBLPROPERTIES (
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='30 min',
'sink.partition-commit.policy.kind'='metastore,success-file',--合并小文件选项
'auto-compaction'='true',
'compaction.file-size'='128MB',
'sink.shuffle-by-partition.enable'='true'
)
;
--执行insert语句动态分区插入
set pipeline.name=设定英文任务名; -- 设定英文任务名 不需要加引号
set table.sql-dialect=default;
insert into hive_table
select
vin string as vin,
age int as age,
...
from kafka_table;
--记录一个casewhen语句 用于时间戳的转换:case when CHARACTER_LENGTH(cast (eventTime as string)) = 13 then from_unixtime(cast (substr (cast (eventTime as string),0,10) as BIGINT),'yyyyMMdd') else '19700101' end as dt
以上是关于【Flink on k8s】Flink on Kubernetes 部署模式的主要内容,如果未能解决你的问题,请参考以下文章