【Flink on k8s】Flink on Kubernetes 部署模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了【Flink on k8s】Flink on Kubernetes 部署模式相关的知识,希望对你有一定的参考价值。

参考技术A

Flink 选择 Kubernetes 的主要原因是结合 Flink 和 Kubernetes 的 长稳性
Flink 特性 :提供的实时服务是需要 长时间、稳定地运行 ,常应用于电信网络质量监控、实时风控、实时推荐等稳定性要求较高的场景;
Kubernetes 优势 为应用提供了部署、管理能力,同时保证其稳定运行 。Kubernetes 具有很好的生态,可以 集成各种运维工具 ,例如 prometheus、主流日志采集工具等。Kubernetes 具有很好的 扩缩容机制 ,可以大大提高资源利用率。

预先构建 Flink 集群,且该集群长期处于运行状态,但不能自动扩缩容 。用户通过 client 提交作业到运行中的 JobManager,而 JobManager 将任务分配到运行中的 TaskManager。

Flink 集群是预先启动运行的。用户提交作业的时候,作业可以立即分配到 TaskManager,即 作业启动速度快

资源利用率低 ,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反正 TaskManager 资源不足。
作业隔离性差 ,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启。

参考: Flink on Standalone Kubernetes Reference
集群配置
集群配置通过 configmap 挂载到容器中
flink-configuration-configmap.yaml

Deployment 文件
Flink 镜像 上传到 私有镜像仓 。编辑 jobmanager-service.yaml、jobmanager-deployment.yaml、taskmanager-deployment.yaml

jobmanager-deployment.yaml

taskmanager-deployment.yaml

jobmanager-service.yaml

执行 yaml
通过 kubectl create -f 命令创建 Flink 集群

每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。

一个作业独占一个集群, 作业的隔离性好

资源利用率低 ,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之 TaskManager 资源不足。同时,JobManager 不能复用。

类似 Session 模式,需要 预先构建 JobManager 。不同点是用户通过 Flink Client 向 JobManager 提交作业后, 根据作业需要的 Slot 数量,JobManager 直接向 Kubernetes 申请 TaskManager 资源 ,最后把作业提交到 TaskManager 上。

TaskManager 的资源是实时的、按需进行的创建,对 资源的利用率更高

作业真正运行起来的时间较长 ,因为需要等待 TaskManager 创建。

参考: Native Kubernetes - Session Mode
集群配置
集群配置通过 configmap 挂载到容器中,如上 2.1 所示。
新增如下配置:
flink-configuration-configmap.yaml

② 配置 jobmanager-deployment.yaml
如上 2.1 所示,需要把启动脚本修改为 ./bin/kubernetes-session.sh

jobmanager-deployment.yaml

执行 yaml
通过 kubectl create -f 命令创建 Flink 集群

类似 Application 模式,每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。不同点是 Native 特性 ,即 Flink 直接与 Kubernetes 进行通信并 按需申请资源 ,无需用户指定 TaskManager 资源的数量。

① 一个作业独占一个集群,作业的隔离性好。
资源利用率相对较高 ,按需申请 JobManager 和 TaskManager。

① 一个作业独占一个集群, JobManager 不能复用
作业启动较慢 ,在作业提交后,才开始创建 JobManager 和 TaskManager。

Flink Client 使用技巧和心得(Flink on Zeppelin)

Flink 链接Kafka
先建立catalog

CREATE CATALOG flink_hive WITH (
    'type' = 'hive',
    'default-database' = 'imods',
    'hive-conf-dir' = '/home/admin/flink/conf'
);

建立kafka table

use catalog flink_hive;
--创建kafka源表
CREATE TABLE IF NOT EXISTS kafka_table (
vin  string,
age int,
...
)--with 写入链接信息以及各种设置
   WITH (
     'connector' = 'kafka',
     'topic' = '自定义的topic',
     'properties.group.id' = '自定义的id',
     'properties.bootstrap.servers' = '自己知道的地址1:端口号1,自己知道的地址2:端口号2',
     'properties.security.protocol'='SASL_PLAINTEXT',
     'properties.sasl.mechanism'='PLAIN',
     'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";', --设定用户名与密码
     'properties.key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
     'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
     --'scan.startup.mode' = 'latest-offset',--五种选项下面会注意说明
      --'scan.startup.mode' = 'earliest-offset',
      'scan.startup.mode' = 'group-offsets',
     'json.fail-on-missing-field' = 'false',--是否允许失败策略
     'json.ignore-parse-errors' = 'true',--是否开启忽略错误策略
     'format' = 'json'--传入的格式
);

group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.从提交给ZK的offset开始消费(必须注明groupID 才可以)
earliest-offset: start from the earliest offset possible. 从最初点开始消费
latest-offset: start from the latest offset.程序运行时有新消息才消费新消息
timestamp: start from user-supplied timestamp for each partition. 指定时间戳开始进行消费
specific-offsets: start from user-supplied specific offsets for each partition.
指定位置进行消费

建立对应的hivetable

--创建HIVE目标表
set table.sql-dialect=hive;
create table if not exists hive_table   --table字段类型顺序务必与Kafkatable一致,严格要求
 (
vin  string,
age int,
...
)
comment '我是Hive表'
 partitioned by (dt string)  --option
 stored as parquet           --option
TBLPROPERTIES (
  'sink.rolling-policy.file-size'='128MB',
  'sink.rolling-policy.rollover-interval'='30 min',  
  'sink.partition-commit.policy.kind'='metastore,success-file',--合并小文件选项
  'auto-compaction'='true',
  'compaction.file-size'='128MB',
  'sink.shuffle-by-partition.enable'='true'
)
;
--执行insert语句动态分区插入
set pipeline.name=设定英文任务名;  -- 设定英文任务名 不需要加引号
set table.sql-dialect=default;
insert into  hive_table
select 
vin  string as vin,
age int as age,
...
from  kafka_table;

--记录一个casewhen语句 用于时间戳的转换:case when CHARACTER_LENGTH(cast (eventTime as string)) = 13 then from_unixtime(cast (substr (cast (eventTime as string),0,10) as BIGINT),'yyyyMMdd') else '19700101' end  as dt

以上是关于【Flink on k8s】Flink on Kubernetes 部署模式的主要内容,如果未能解决你的问题,请参考以下文章

flink on k8s部署方案调研

flink on k8s部署方案调研

flink on k8s部署方案实践--详细步骤

flink on k8s部署方案实践--详细步骤

flink on k8s部署方案实践--详细步骤

flink on k8s native 再次实践