kafka数据到hudi丢失数据问题
Posted Mr.zhou_Zxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka数据到hudi丢失数据问题相关的知识,希望对你有一定的参考价值。
kafka数据到hudi丢失数据问题
1.报错问题
Caused by: java.lang.IllegalStateException: Cannot fetch offset 196 (GroupId: spark-kafka-source-6f1df211-fdcb-4bcc-813d-55c4f9661c9d-1732697149-executor, TopicPartition: news-0).
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss0(KafkaDataConsumer.scala:642)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss(KafkaDataConsumer.scala:448)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:269)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
-
翻译结果
最终应用程序状态:失败,exitCode:15,(原因:用户类引发异常:org.apache.spark.sql.streaming.StreamingQueryException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近的失败:阶段2.0中的任务0.3丢失(TID 5,hadoop,executor 1):java.lang.IllegalStateException:无法获取偏移量196(GroupId:spark-kafka-source-e2868915-6d7a-4aef-99a8-3d1c5ef45147-1732697149-executor,主题分区:news-0)。
一些数据可能已经丢失,因为它们在卡夫卡不再可用;要么是数据被卡夫卡过时了,要么是主题在处理完主题中的所有数据之前被删除了。如果您不希望流式查询在这种情况下失败,请将源选项“failOnDataLoss”设置为“false”。
2.根据提示添加配置文件 -> option(“failOnDataLoss”,“false”)
//5.读取Kafka源数据
val df: DataFrame = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", params.brokerList)
.option("subscribe", params.topic)
.option("startingOffsets", "latest")
.option("kafka.consumer.commit.groupid", "action-log-group01")
.option("failOnDataLoss","false")
.load()
tips:认为添加这个配置不太妥当,但尚未找到适宜的方法
哪位博主知道的,希望可以指点指点
以上是关于kafka数据到hudi丢失数据问题的主要内容,如果未能解决你的问题,请参考以下文章
Flink 实战系列Flink SQL 实时同步 Kafka 数据到 Hudi(parquet + snappy)
Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive