flink写入hdfs

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink写入hdfs相关的知识,希望对你有一定的参考价值。

参考技术A /**

* 该策略在以下三种情况下滚动处于 In-progress 状态的部分文件(part file):

*

* 它至少包含 10 分钟的数据

* 最近 5 分钟没有收到新的记录

* 文件大小达到 1GB (写入最后一条记录后)

* 部分文件(part file)可以处于以下三种状态之一:

*

* In-progress :当前文件正在写入中

* Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态

* Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态

*/

DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy

.builder()

.withMaxPartSize(1024*1024*1024)// 设置每个文件的最大大小 ,默认是128M。这里设置为1G

        .withRolloverInterval(TimeUnit.MINUTES.toMillis(5))// 至少包含10分钟的数据

        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))// 60s空闲,就滚动写入新的文件 近 5 分钟没有收到新的记录

        .build();

/**

* 输出配置

*/

OutputFileConfig config = OutputFileConfig

.builder()

.withPartPrefix("kefu_client_")//前缀

        .withPartSuffix(".log")//后缀

        .build();

StreamingFileSink sink = StreamingFileSink

.forRowFormat(new Path(properties.getProperty("etl_hadoop_url")

+ properties.getProperty("etl_storm_kefu_basepath")),new SimpleStringEncoder("UTF-8"))

.withRollingPolicy(rollingPolicy)

.withBucketAssigner(new BasePathBucketAssigner())

.withBucketCheckInterval(1000L)// 桶检查间隔,这里设置为1s

        .withOutputFileConfig(config)

.build();

如何在 Flink 独立集群上的 Flink 作业中使用两个 Kerberos 密钥表(用于 Kafka 和 Hadoop HDFS)?

【中文标题】如何在 Flink 独立集群上的 Flink 作业中使用两个 Kerberos 密钥表(用于 Kafka 和 Hadoop HDFS)?【英文标题】:How to use two Kerberos keytabs (for Kafka and Hadoop HDFS) from a Flink job on a Flink standalone cluster? 【发布时间】:2018-10-12 05:26:01 【问题描述】:

问题

在服务器上运行的 Flink 独立集群上,我正在使用 Scala 开发 Flink 流式作业。该作业使用来自 1 个以上 Kafka 主题的数据,(进行一些格式化)并将结果写入 HDFS。

Kafka 主题之一和 HDFS,它们都需要单独的 Kerberos 身份验证(因为它们属于完全不同的集群)。

我的问题是:

    是否可以(如果是,如何?)在服务器上运行的 Flink 集群上的 Flink 作业中使用两个 Kerberos 密钥表(一个用于 Kafka,另一个用于 HDFS)? (因此 Flink 作业可以同时从 Kafka 主题消费并写入 HDFS) 如果不可能,当 Kafka 和 HDFS 都受 Kerberos 保护时,对于 Kafka-Flink-HDFS 数据流,什么是合理的解决方法?

注意

我对这里提到的大多数技术都很陌生。 如果 Flink 作业不需要使用需要 Kerberos 的主题,它可以写入 HDFS。在这种情况下,我在flink-conf.yaml中将HDFS的信息指定为security.kerberos.login.keytabsecurity.kerberos.login.principal 我正在使用HDFS Connector provided from Flink 写入 HDFS。

可以在两个主体之间手动切换 Kerberos 身份验证。在 krb5.conf 文件的 [realm] 部分,我指定了两个领域,一个用于 Kafka,另一个用于 HDFS。

kinit -kt path/to/hdfs.keytab [principal: xxx@XXX.XXX...]

kinit -kt path/to/kafka.keytab [principal: yyy@YYY.YYY...]

环境

Flink (v1.4.2) https://ci.apache.org/projects/flink/flink-docs-stable/ Kafka 客户端 (v0.10.X) HDFS(Hadoop 集群 HDP 2.6.X)

感谢您的关注和反馈!

【问题讨论】:

“单独的 Kerberos 身份验证(因为它们属于完全不同的集群)” > 这不是一个很有说服力的论点。如果域之间存在“信任”(例如,用于集群服务的专用 MIT Kerberos 域与用于身份验证的全局 Active Directory 林),则可以在不同的目标域中授予相同的 Kerberos TGT(身份证明)Kerberos 服务票证 i>,或者如果两个领域都附加到相同的根领域(例如,同一林中的不同 AD 域)。这就是它在具有严格安全性的典型 Big Corp 环境中的工作方式。 但即使使用单个 Kerberos 主体/密钥表,您也可能会遇到问题,因为通常 (TBC with Flink) HDFS 客户端使用 static Hadoop UserGroupInformation 绕过了 Kerberos 的标准 Java 实现的一部分,通常 (...) Kafka 客户端使用原始 JAAS 配置来实现标准 Java 实现。他们一起玩得不好。或者至少,几年前我在自定义 Java 代码中测试 HDFS + Hive JDBC 时他们没有;也许 Flink 会绕过 UGI 以避免副作用。 我也面临同样的问题。我们目前正在使用Hadoop Map-Reduce来处理这种情况。在map中启动子进程,使用主进程从kafka读取数据并发送到子进程,然后将数据发送到 HDFS。 【参考方案1】:

基于this very similar question的回答和评论

似乎没有明确的方法可以在单个 Flink 作业中使用两个凭据。

有希望的方法或解决方法:

建立信任 Kafka 和 HDFS 在同一个平台上共同安装 使用其他方法来弥补差距

最后一点的例子:

您可以使用 NiFi 或 Streams Replication Manager 等工具将数据从源 Kafka 带到集群中的 Kafka。 NiFi 更加模块化,并且可以为每个步骤配置 kerberos 凭据。之后,您将处于 Flink 可以处理的单一上下文中。

全面披露:我是 Cloudera 的一名员工,是 NiFi、Kafka、HDFS、Streams Replication Manager 和最近 Flink 的推动力

【讨论】:

【参考方案2】:

在我最初发帖三年后,我们的架构已经从独立的裸机服务器转移到 Mesos 上的 Docker 容器,但让我总结一下解决方法(针对 Flink 1.8):

krb5.conf 与所有领域定义和领域-领域映射放在一起(例如在容器的/etc/ 下)

放置 Hadoop krb5.keytab(例如在 /kerberos/HADOOP_CLUSTER.ORG.EXAMPLE.COM/ 下)

flink-conf.yaml中配置Flink的security.kerberos.login.*属性

security.kerberos.login.use-ticket-cache: true security.kerberos.login.principal: username@HADOOP_CLUSTER.ORG.EXAMPLE.COM security.kerberos.login.contexts 不应配置。这可确保 Flink 不会将 Hadoop 的凭据用于 Kafka 和 Zookeeper。

将 Kafka 的密钥表复制到容器内的单独目录中(例如在 /kerberos/KAFKA_CLUSTER.ORG.EXAMPLE.COM/ 下)

定期运行自定义脚本以更新票证缓存

KINIT_COMMAND_1='kinit -kt /kerberos/HADOOP_CLUSTER.ORG.EXAMPLE.COM/krb5.keytab username@HADOOP_CLUSTER.ORG.EXAMPLE.COM'
KINIT_COMMAND_2='kinit -kt /kerberos/KAFKA_CLUSTER.ORG.EXAMPLE.COM/krb5.keytab username@KAFKA_CLUSTER.ORG.EXAMPLE.COM -c /tmp/krb5cc_kafka'
...
在将每个 FlinkKafkaConsumer 实例化为实际的 JAAS 配置字符串时,设置属性 sasl.jaas.config。 绕过全局 JAAS 配置。如果我们在全局范围内设置它,我们就不能使用具有不同凭据的不同 Kafka 实例,或者将不安全的 Kafka 与安全的 Kafka 一起使用。
props.setProperty("sasl.jaas.config", 
    "com.sun.security.auth.module.Krb5LoginModule required " +
    "refreshKrb5Config=true " +
    "useKeyTab=true " +
    "storeKey=true " +
    "debug=true " +
    "keyTab=\"/kerberos/KAFKA_CLUSTER.ORG.EXAMPLE.COM/krb5.keytab\" " +
    "principal=\"username@KAFKA_CLUSTER.ORG.EXAMPLE.COM\";")

【讨论】:

以上是关于flink写入hdfs的主要内容,如果未能解决你的问题,请参考以下文章

2021年最新最全Flink系列教程__Flink综合案例

Flink HDFS Connector

Flink实战系列Flink SQL 写入 kafka 自定义分区策略

大数据ClickHouse(十九):Flink 写入 ClickHouse API

flink自定义clickhouseSink写入到clickhouse

《从0到1学习Flink》—— Flink 写入数据到 Kafka