Flink SQL 获取FileSystem时,如果FileName发生更改在则会报错
Posted 青冬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL 获取FileSystem时,如果FileName发生更改在则会报错相关的知识,希望对你有一定的参考价值。
FLink SQL 在设定各类数据源和数据目标端的时候非常方便,可以说写个建表语句就能获取各种各样的数据。但是在使用FileSystem获取某目录下的数据时需要小心使用,比如:
CREATE TABLE `cfg_city`(
`provincecode` int, `city_id` int, `city_name` string
)
WITH (
'connector'='filesystem',
'path'='hdfs://pathroot/cfg_city',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true'
)
;
在程序正常运行时不会报错,但是如果程序fail and retry时,遇到hdfs://pathroot/cfg_city路径下的文件发生改变则可能报错。原因是因为Flink SQL FileSystem 是Bounded Source,在第一次运行时就记录了FileList,导致如果有新文件出现不会被读取/旧文件改变文件名会报错。
在社区汇报了该issue后,发现也有类似的需求将FileSystem Bounded Change to Stream,Link。最下面有不使用SQL的办法,但使用SQL的各位还没有类似的配置。
以上是关于Flink SQL 获取FileSystem时,如果FileName发生更改在则会报错的主要内容,如果未能解决你的问题,请参考以下文章
Flink 实战系列Flink SQL 使用 filesystem connector 同步 Kafka 数据到 HDFS(parquet 格式 + snappy 压缩)
Flink 实战系列Flink SQL 使用 filesystem connector 同步 Kafka 数据到 HDFS(parquet 格式 + snappy 压缩)
基础flink sqlhdfs到hive的逻辑处理一:通过正则获取日志数据
Flink实战之FileSystem-parquet支持ZSTD压缩