Apache Beam GCP 在动态创建的目录中上传 Avro

Posted

技术标签:

【中文标题】Apache Beam GCP 在动态创建的目录中上传 Avro【英文标题】:Apache Beam GCP upload Avro in dynamically created directories 【发布时间】:2020-08-12 06:36:58 【问题描述】:

我想在 GCP 中创建一个流式 Apache Beam 管道,它从 Google Pub/Sub 读取数据并将其推送到 GCS。我有一点可以从 Pub/Sub 读取数据。 我当前的代码看起来像这样(从 GCP Apache 梁模板之一中提取)

pipeline.apply("Read PubSub Events",
  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
                .apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
                .apply(
                        options.getWindowDuration() + " Window",
                        Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
                .apply(
                        "Write File(s)",
                        AvroIO.write(AdEvent.class)
                                .to(
                                        new WindowedFilenamePolicy(
                                                options.getOutputDirectory(),
                                                options.getOutputFilenamePrefix(),
                                                options.getOutputShardTemplate(),
                                                options.getOutputFilenameSuffix()))
                                .withTempDirectory(NestedValueProvider.of(
                                        options.getAvroTempDirectory(),
                                        (SerializableFunction<String, ResourceId>) input ->
                                                FileBasedSink.convertToFileResourceIfPossible(input)))
                                .withWindowedWrites()
                                .withNumShards(options.getNumShards()));

它可以生成如下所示的文件 windowed-file2020-04-28T09:00:00.000Z-2020-04-28T09:02:00.000Z-pane-0-last-00-of-01.avro

我想将 GCS 中的数据存储在动态创建的目录中。在以下目录中,2020-04-28/012020-04-28/02 等 - 0102 是子目录,表示数据流流处理管道处理数据的时间。

例子:

gs://data/2020-04-28/01/0000000.avro
gs://data/2020-04-28/01/0000001.avro
gs://data/2020-04-28/01/....

gs://data/2020-04-28/02/0000000.avro
gs://data/2020-04-28/02/0000001.avro
gs://data/2020-04-28/02/....

gs://data/2020-04-28/03/0000000.avro
gs://data/2020-04-28/03/0000001.avro
gs://data/2020-04-28/03/....
...

0000000、0000001 等是我用来说明的简单文件名,我不希望这些文件是顺序名称。 您认为这在 GCP 数据流流设置中可行吗?

【问题讨论】:

【参考方案1】:

您可以实现自己的FilenamePolicy(也许使用WindowedFilenamePolicy 作为起点)来使用自己的逻辑来定义输出路径。您可以根据需要在文件路径中使用/ 字符(顺便说一下,GCS 存储桶是"flat",它们实际上没有目录)。要获取日期/时间,windowedFilename 方法将窗口信息作为参数,因此您可以在返回值中使用它,但您认为合适。

【讨论】:

【参考方案2】:

您需要使用writeDynamic 而不是Write。不幸的是,正如here 所提到的,AvroIO 本身不支持 writeDynamic,而是您需要使用 FileIO。

以下是使用 Scio 在 Scala 中的示例实现

    val dynamicOutput: FileIO.Write[String, GenericRecord] = FileIO
      .writeDynamic[String, GenericRecord]()
      .by((input: GenericRecord) => 
        input.get("id").toString.toUpperCase  + "/"
      )
      .withDestinationCoder(StringUtf8Coder.of())
      .withNumShards(1) // Since input is small, restrict to one file per bucket
      .withNaming(
        new SerializableFunction[String, FileNaming] 
          override def apply(partitionCol: String): FileNaming = 
            FileIO.Write.defaultNaming(s"Id=$partitionCol", ".parquet")
          
        
      )
      .via(Contextful.fn[GenericRecord,GenericRecord](
          new SerializableFunction[GenericRecord,GenericRecord]
            override def apply(input: GenericRecord): GenericRecord = 
              val r = new GenericData.Record(outputSchema)
              r.put("amount",input.get("amount"))
              r.put("name",input.get("name"))
              r.put("type",input.get("type"))
              r
            
          
        ),
        ParquetIO.sink(outputSchema)
      )
      .to("gs://bucket-name/table-name")

在上面的示例中,我使用 GenericRecord 类型并指定模式并创建动态分区并以 Parquet 格式写入文件。您可以选择以任何格式写入数据。

【讨论】:

有趣,让我看看这个。谢谢你的建议 @Jayadeep 使用这种方法,您将如何处理倾斜数据和小文件问题(有没有办法根据大小指定文件翻转策略)【参考方案3】:

您可以使用Pub/Sub to Cloud Storage Avro template 是一个流式管道,它从 Pub/Sub 主题读取数据并将 Avro 文件写入指定的 Cloud Storage 存储桶。此管道支持可选的用户提供的窗口持续时间,用于执行窗口写入。

【讨论】:

所以我实际上得到了 github 模板的代码,它似乎部分工作,但我一直在数据流中遇到这个错误 - “java.lang.RuntimeException:org.apache.beam。 sdk.util.UserCodeException: java.lang.IllegalArgumentException: GCS 对象名称不能包含回车或换行字符。" --stagingLocation=gs://test/dataflow/pipelines/staging/ \ --tempLocation=gs://test/dataflow/pipelines/temp/ \ --runner=DataflowRunner \ -- windowDuration=1m \ --numShards=1 \ --inputTopic=projects/test/topics/testTopic \ --outputDirectory=gs://test/output/YYYY/MM/DD/ \ --outputFilenameSuffix=.avro --avroTempDirectory =gs://test/avro-temp-dir/ \ --outputShardTemplate=-SSS-NNN" 这些是我用来传递给作业以运行的参数 ^^

以上是关于Apache Beam GCP 在动态创建的目录中上传 Avro的主要内容,如果未能解决你的问题,请参考以下文章

使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表

请求的身份验证范围不足 - GCP 上的 Dataflow/Apache Beam

如何从 GCP 存储桶中读取 Apache Beam 中的多个文件

GCP Dataflow + Apache Beam - 缓存问题

从 Apache Beam(GCP 数据流)写入 ConfluentCloud

Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)