Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)相关的知识,希望对你有一定的参考价值。

我想做什么:

  1. 按格式连续读取和解压缩GZ文件(约3000个文件,解压缩后每个文件分别具有1.2MB和9 MB)>
  2. 替换每个CSV文件中的某些字符序列
  3. 将CSV文件压缩到GZ,并将修改后的文件保存到其自己的路径。
  4. 实际代码:

static void run(final BeeswaxDataflowOptions options) 
final Pipeline pipeline = Pipeline.create(options);
final PCollection<MatchResult.Metadata> matches =
    pipeline.apply(
        "Read",
        FileIO.match()
            .filepattern(options.getSourcePath() + options.getSourceFilesPattern())
            .continuously(
                Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));

matches
    .apply(FileIO.readMatches().withCompression(GZIP))
    .apply(
        Window.<FileIO.ReadableFile>into(
                FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
            .accumulatingFiredPanes()
            .withAllowedLateness(Duration.ZERO)
            .triggering(
                Repeatedly.forever(AfterPane.elementCountAtLeast(1).getContinuationTrigger())))
    .apply(
        "Uncompress",
        MapElements.into(
                TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
            .via(
                file -> 
                  final String filePath = file.getMetadata().resourceId().toString();
                  try 
                    return KV.of(filePath, file.readFullyAsUTF8String());
                   catch (final IOException e) 
                    return KV.of(filePath, "");
                  
                ))
    .apply("Prepare for BigQuery import", ParDo.of(new BigQueryDataPreparatorFn()))
    .apply(
        "Save results",
        FileIO.<String, KV<String, String>>writeDynamic()
            .withCompression(GZIP)
            .by(KV::getKey)
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(KV::getValue), TextIO.sink())
            .withNumShards(options.getShards())
            .to(options.getOutputPath())
            .withTempDirectory(options.getTempLocation())
            .withNaming(AbsoluteNaming::new));

pipeline.run().waitUntilFinish();

问题出在OutOfMemory异常(是的,我知道readFullyAsUTF8String对此可疑)。如何处理这种情况?

我的观察结果是,在“解压缩”步骤中读取并收集了所有〜3000个文件。因此,在进行“准备导入BigQuery”和“保存结果”之前,它已经以某种方式累积并读取到RAM中。

最好以某种方式将这个管道排队-像最多50个元素要经过步骤并等待结果,然后再开始。这可能吗?如果没有,如何处理不同]

我想做的事情:按模式连续读取和解压缩GZ文件(约3000个文件),解压缩后每个文件分别有1.2MB和9 MB替换每个CSV文件中的某些字符序列...

答案
您可以在这里做几件事。

以上是关于Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)的主要内容,如果未能解决你的问题,请参考以下文章

Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)

Spring Cloud Dataflow 与 Apache Beam/GCP 数据流说明

JAVA - Apache BEAM- GCP:GroupByKey 与 Direct Runner 一起工作正常,但与 Dataflow runner 一起失败

从 Apache Beam(GCP 数据流)写入 ConfluentCloud

使用 Apache Beam 在 GCP DataflowRunner 上没有名为“IPython”的模块

在 mac zsh 终端上安装 apache-beam[gcp] 时出错 - “zsh: no match found: apache-beam[gcp]”