flink写入hdfs
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink写入hdfs相关的知识,希望对你有一定的参考价值。
参考技术A /*** 该策略在以下三种情况下滚动处于 In-progress 状态的部分文件(part file):
*
* 它至少包含 10 分钟的数据
* 最近 5 分钟没有收到新的记录
* 文件大小达到 1GB (写入最后一条记录后)
* 部分文件(part file)可以处于以下三种状态之一:
*
* In-progress :当前文件正在写入中
* Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
* Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
*/
DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
.builder()
.withMaxPartSize(1024*1024*1024)// 设置每个文件的最大大小 ,默认是128M。这里设置为1G
.withRolloverInterval(TimeUnit.MINUTES.toMillis(5))// 至少包含10分钟的数据
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))// 60s空闲,就滚动写入新的文件 近 5 分钟没有收到新的记录
.build();
/**
* 输出配置
*/
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("kefu_client_")//前缀
.withPartSuffix(".log")//后缀
.build();
StreamingFileSink sink = StreamingFileSink
.forRowFormat(new Path(properties.getProperty("etl_hadoop_url")
+ properties.getProperty("etl_storm_kefu_basepath")),new SimpleStringEncoder("UTF-8"))
.withRollingPolicy(rollingPolicy)
.withBucketAssigner(new BasePathBucketAssigner())
.withBucketCheckInterval(1000L)// 桶检查间隔,这里设置为1s
.withOutputFileConfig(config)
.build();
如何在 Flink 独立集群上的 Flink 作业中使用两个 Kerberos 密钥表(用于 Kafka 和 Hadoop HDFS)?
【中文标题】如何在 Flink 独立集群上的 Flink 作业中使用两个 Kerberos 密钥表(用于 Kafka 和 Hadoop HDFS)?【英文标题】:How to use two Kerberos keytabs (for Kafka and Hadoop HDFS) from a Flink job on a Flink standalone cluster? 【发布时间】:2018-10-12 05:26:01 【问题描述】:问题
在服务器上运行的 Flink 独立集群上,我正在使用 Scala 开发 Flink 流式作业。该作业使用来自 1 个以上 Kafka 主题的数据,(进行一些格式化)并将结果写入 HDFS。
Kafka 主题之一和 HDFS,它们都需要单独的 Kerberos 身份验证(因为它们属于完全不同的集群)。
我的问题是:
-
是否可以(如果是,如何?)在服务器上运行的 Flink 集群上的 Flink 作业中使用两个 Kerberos 密钥表(一个用于 Kafka,另一个用于 HDFS)? (因此 Flink 作业可以同时从 Kafka 主题消费并写入 HDFS)
如果不可能,当 Kafka 和 HDFS 都受 Kerberos 保护时,对于 Kafka-Flink-HDFS 数据流,什么是合理的解决方法?
注意
我对这里提到的大多数技术都很陌生。 如果 Flink 作业不需要使用需要 Kerberos 的主题,它可以写入 HDFS。在这种情况下,我在flink-conf.yaml
中将HDFS的信息指定为security.kerberos.login.keytab
和security.kerberos.login.principal
我正在使用HDFS Connector provided from Flink 写入 HDFS。
可以在两个主体之间手动切换 Kerberos 身份验证。在 krb5.conf
文件的 [realm] 部分,我指定了两个领域,一个用于 Kafka,另一个用于 HDFS。
kinit -kt path/to/hdfs.keytab [principal: xxx@XXX.XXX...]
kinit -kt path/to/kafka.keytab [principal: yyy@YYY.YYY...]
环境
Flink (v1.4.2) https://ci.apache.org/projects/flink/flink-docs-stable/ Kafka 客户端 (v0.10.X) HDFS(Hadoop 集群 HDP 2.6.X)感谢您的关注和反馈!
【问题讨论】:
“单独的 Kerberos 身份验证(因为它们属于完全不同的集群)” > 这不是一个很有说服力的论点。如果域之间存在“信任”(例如,用于集群服务的专用 MIT Kerberos 域与用于身份验证的全局 Active Directory 林),则可以在不同的目标域中授予相同的 Kerberos TGT(身份证明)Kerberos 服务票证 i>,或者如果两个领域都附加到相同的根领域(例如,同一林中的不同 AD 域)。这就是它在具有严格安全性的典型 Big Corp 环境中的工作方式。 但即使使用单个 Kerberos 主体/密钥表,您也可能会遇到问题,因为通常 (TBC with Flink) HDFS 客户端使用 static HadoopUserGroupInformation
绕过了 Kerberos 的标准 Java 实现的一部分,通常 (...) Kafka 客户端使用原始 JAAS 配置来实现标准 Java 实现。他们一起玩得不好。或者至少,几年前我在自定义 Java 代码中测试 HDFS + Hive JDBC 时他们没有;也许 Flink 会绕过 UGI 以避免副作用。
我也面临同样的问题。我们目前正在使用Hadoop Map-Reduce来处理这种情况。在map中启动子进程,使用主进程从kafka读取数据并发送到子进程,然后将数据发送到 HDFS。
【参考方案1】:
基于this very similar question的回答和评论
似乎没有明确的方法可以在单个 Flink 作业中使用两个凭据。
有希望的方法或解决方法:
建立信任 Kafka 和 HDFS 在同一个平台上共同安装 使用其他方法来弥补差距最后一点的例子:
您可以使用 NiFi 或 Streams Replication Manager 等工具将数据从源 Kafka 带到集群中的 Kafka。 NiFi 更加模块化,并且可以为每个步骤配置 kerberos 凭据。之后,您将处于 Flink 可以处理的单一上下文中。
全面披露:我是 Cloudera 的一名员工,是 NiFi、Kafka、HDFS、Streams Replication Manager 和最近 Flink 的推动力
【讨论】:
【参考方案2】:在我最初发帖三年后,我们的架构已经从独立的裸机服务器转移到 Mesos 上的 Docker 容器,但让我总结一下解决方法(针对 Flink 1.8):
将krb5.conf
与所有领域定义和领域-领域映射放在一起(例如在容器的/etc/
下)
放置 Hadoop krb5.keytab
(例如在 /kerberos/HADOOP_CLUSTER.ORG.EXAMPLE.COM/
下)
在flink-conf.yaml
中配置Flink的security.kerberos.login.*
属性
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.principal: username@HADOOP_CLUSTER.ORG.EXAMPLE.COM
security.kerberos.login.contexts
不应配置。这可确保 Flink 不会将 Hadoop 的凭据用于 Kafka 和 Zookeeper。
将 Kafka 的密钥表复制到容器内的单独目录中(例如在 /kerberos/KAFKA_CLUSTER.ORG.EXAMPLE.COM/
下)
定期运行自定义脚本以更新票证缓存
KINIT_COMMAND_1='kinit -kt /kerberos/HADOOP_CLUSTER.ORG.EXAMPLE.COM/krb5.keytab username@HADOOP_CLUSTER.ORG.EXAMPLE.COM'
KINIT_COMMAND_2='kinit -kt /kerberos/KAFKA_CLUSTER.ORG.EXAMPLE.COM/krb5.keytab username@KAFKA_CLUSTER.ORG.EXAMPLE.COM -c /tmp/krb5cc_kafka'
...
在将每个 FlinkKafkaConsumer 实例化为实际的 JAAS 配置字符串时,设置属性 sasl.jaas.config
。
绕过全局 JAAS 配置。如果我们在全局范围内设置它,我们就不能使用具有不同凭据的不同 Kafka 实例,或者将不安全的 Kafka 与安全的 Kafka 一起使用。
props.setProperty("sasl.jaas.config",
"com.sun.security.auth.module.Krb5LoginModule required " +
"refreshKrb5Config=true " +
"useKeyTab=true " +
"storeKey=true " +
"debug=true " +
"keyTab=\"/kerberos/KAFKA_CLUSTER.ORG.EXAMPLE.COM/krb5.keytab\" " +
"principal=\"username@KAFKA_CLUSTER.ORG.EXAMPLE.COM\";")
【讨论】:
以上是关于flink写入hdfs的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Flink SQL 写入 kafka 自定义分区策略
大数据ClickHouse(十九):Flink 写入 ClickHouse API