Flink`textInputFormat`不处理来自aws S3`Source`文件系统的GZ压缩文件

Posted

技术标签:

【中文标题】Flink`textInputFormat`不处理来自aws S3`Source`文件系统的GZ压缩文件【英文标题】:Flink `textInputFormat` does not process GZ compressed files from aws S3 `Source` file system 【发布时间】:2022-01-22 16:06:12 【问题描述】:

我关注 (ZIP compressed input for Apache Flink) 并编写了以下代码片段,以使用简单的 TextInputFormat 处理目录中的 .gz 日志文件。它适用于我的本地测试目录,扫描并自动打开.gz 文件内容。但是,当我使用 s3 存储桶源运行它时,它不会处理 .gz 压缩文件。不过,这个 Flink 作业仍然会打开 s3 存储桶上的 .log 文件。似乎它只是不解压缩 .gz 文件。如何在 s3 文件系统上解决此问题?

public static void main(String[] args) throws Exception 

    final ParameterTool params = ParameterTool.fromArgs(args);
    final String sourceLogDirPath = params.get("source_log_dir_path", "s3://my-test-bucket-logs/"); // "/Users/my.user/logtest/logs"
    final Long checkpointInterval = Long.parseLong(params.get("checkpoint_interval", "60000"));
    
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().enableExternalizedCheckpoints(
        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getConfig().setGlobalJobParameters(params);

    TextInputFormat textInputFormat = new TextInputFormat(new Path(sourceLogDirPath));
    textInputFormat.setNestedFileEnumeration(true);

    DataStream<String> stream = env.readFile(
            textInputFormat, sourceLogDirPath,
            FileProcessingMode.PROCESS_CONTINUOUSLY, 100);

    stream.print();
    env.execute();

这是我的类路径 jar flink 库:

/opt/flink/lib/flink-csv-1.13.2.jar:/opt/flink/lib/flink-json-1.13.2.jar:/opt/flink/lib/flink-shaded-zookeeper- 3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.13.2.jar:/opt/flink/lib/flink-table_2.12-1.13.2.jar:/opt/flink /lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/ opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/sentry_log4j2_deploy.jar:/opt/flink/lib/flink-dist_2.12-1.13.2.jar:::

附:我也尝试了s3a://&lt;bucket&gt;/,但没有成功。

【问题讨论】:

【参考方案1】:

可能你可以把日志改成调试模式,观察文件拆分时是否过滤掉文件。

默认情况下,以“.”或“_”开头的文件将被过滤掉

【讨论】:

这个答案真的很有帮助。查看调试日志后,我注意到文件路径中有 // (基本上是一个空的目录 / 没有名称),因此 Flink 无法在该目录中打开和扫描。这个双斜杠文件路径是vector(从数据狗到管道日志的服务)配置在设置 s3 接收器时的常见问题。

以上是关于Flink`textInputFormat`不处理来自aws S3`Source`文件系统的GZ压缩文件的主要内容,如果未能解决你的问题,请参考以下文章

在 spark 中设置 textinputformat.record.delimiter

Hadoop TextInputFormat源码分析

Flink 复杂事件处理

4 weekend110的textinputformat对切片规划的源码分析 + 倒排索引的mr实现 + 多个job在同一个main方法中提交

Flink流处理- 数据流操作

Flink学习