Flink SQL 获取FileSystem时,如果FileName发生更改在则会报错

Posted 青冬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL 获取FileSystem时,如果FileName发生更改在则会报错相关的知识,希望对你有一定的参考价值。

FLink SQL 在设定各类数据源和数据目标端的时候非常方便,可以说写个建表语句就能获取各种各样的数据。但是在使用FileSystem获取某目录下的数据时需要小心使用,比如:

CREATE TABLE `cfg_city`(
    `provincecode` int, `city_id` int, `city_name` string
)
WITH (
    'connector'='filesystem',
    'path'='hdfs://pathroot/cfg_city',
    'format' = 'csv',
    'csv.field-delimiter' = ',',
    'csv.ignore-parse-errors' = 'true'
)
;

在程序正常运行时不会报错,但是如果程序fail and retry时,遇到hdfs://pathroot/cfg_city路径下的文件发生改变则可能报错。原因是因为Flink SQL FileSystem 是Bounded Source,在第一次运行时就记录了FileList,导致如果有新文件出现不会被读取/旧文件改变文件名会报错。

在社区汇报了该issue后,发现也有类似的需求将FileSystem Bounded Change to Stream,Link。最下面有不使用SQL的办法,但使用SQL的各位还没有类似的配置。

以上是关于Flink SQL 获取FileSystem时,如果FileName发生更改在则会报错的主要内容,如果未能解决你的问题,请参考以下文章

Flink 实战系列Flink SQL 使用 filesystem connector 同步 Kafka 数据到 HDFS(parquet 格式 + snappy 压缩)

Flink 实战系列Flink SQL 使用 filesystem connector 同步 Kafka 数据到 HDFS(parquet 格式 + snappy 压缩)

基础flink sqlhdfs到hive的逻辑处理一:通过正则获取日志数据

Flink实战之FileSystem-parquet支持ZSTD压缩

Flink: FlieSystem SQL Connector

95-910-165-源码-FlinkSQL-Flink SQL 中的时间属性