Apache Beam GCP 在动态创建的目录中上传 Avro
Posted
技术标签:
【中文标题】Apache Beam GCP 在动态创建的目录中上传 Avro【英文标题】:Apache Beam GCP upload Avro in dynamically created directories 【发布时间】:2020-08-12 06:36:58 【问题描述】:我想在 GCP 中创建一个流式 Apache Beam 管道,它从 Google Pub/Sub 读取数据并将其推送到 GCS。我有一点可以从 Pub/Sub 读取数据。 我当前的代码看起来像这样(从 GCP Apache 梁模板之一中提取)
pipeline.apply("Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
.apply(
"Write File(s)",
AvroIO.write(AdEvent.class)
.to(
new WindowedFilenamePolicy(
options.getOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(NestedValueProvider.of(
options.getAvroTempDirectory(),
(SerializableFunction<String, ResourceId>) input ->
FileBasedSink.convertToFileResourceIfPossible(input)))
.withWindowedWrites()
.withNumShards(options.getNumShards()));
它可以生成如下所示的文件
windowed-file2020-04-28T09:00:00.000Z-2020-04-28T09:02:00.000Z-pane-0-last-00-of-01.avro
我想将 GCS 中的数据存储在动态创建的目录中。在以下目录中,2020-04-28/01
、2020-04-28/02
等 - 01
和 02
是子目录,表示数据流流处理管道处理数据的时间。
例子:
gs://data/2020-04-28/01/0000000.avro
gs://data/2020-04-28/01/0000001.avro
gs://data/2020-04-28/01/....
gs://data/2020-04-28/02/0000000.avro
gs://data/2020-04-28/02/0000001.avro
gs://data/2020-04-28/02/....
gs://data/2020-04-28/03/0000000.avro
gs://data/2020-04-28/03/0000001.avro
gs://data/2020-04-28/03/....
...
0000000、0000001 等是我用来说明的简单文件名,我不希望这些文件是顺序名称。 您认为这在 GCP 数据流流设置中可行吗?
【问题讨论】:
【参考方案1】:您可以实现自己的FilenamePolicy(也许使用WindowedFilenamePolicy
作为起点)来使用自己的逻辑来定义输出路径。您可以根据需要在文件路径中使用/
字符(顺便说一下,GCS 存储桶是"flat",它们实际上没有目录)。要获取日期/时间,windowedFilename
方法将窗口信息作为参数,因此您可以在返回值中使用它,但您认为合适。
【讨论】:
【参考方案2】:您需要使用writeDynamic
而不是Write
。不幸的是,正如here 所提到的,AvroIO 本身不支持 writeDynamic,而是您需要使用 FileIO。
以下是使用 Scio 在 Scala 中的示例实现
val dynamicOutput: FileIO.Write[String, GenericRecord] = FileIO
.writeDynamic[String, GenericRecord]()
.by((input: GenericRecord) =>
input.get("id").toString.toUpperCase + "/"
)
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1) // Since input is small, restrict to one file per bucket
.withNaming(
new SerializableFunction[String, FileNaming]
override def apply(partitionCol: String): FileNaming =
FileIO.Write.defaultNaming(s"Id=$partitionCol", ".parquet")
)
.via(Contextful.fn[GenericRecord,GenericRecord](
new SerializableFunction[GenericRecord,GenericRecord]
override def apply(input: GenericRecord): GenericRecord =
val r = new GenericData.Record(outputSchema)
r.put("amount",input.get("amount"))
r.put("name",input.get("name"))
r.put("type",input.get("type"))
r
),
ParquetIO.sink(outputSchema)
)
.to("gs://bucket-name/table-name")
在上面的示例中,我使用 GenericRecord 类型并指定模式并创建动态分区并以 Parquet 格式写入文件。您可以选择以任何格式写入数据。
【讨论】:
有趣,让我看看这个。谢谢你的建议 @Jayadeep 使用这种方法,您将如何处理倾斜数据和小文件问题(有没有办法根据大小指定文件翻转策略)【参考方案3】:您可以使用Pub/Sub to Cloud Storage Avro template 是一个流式管道,它从 Pub/Sub 主题读取数据并将 Avro 文件写入指定的 Cloud Storage 存储桶。此管道支持可选的用户提供的窗口持续时间,用于执行窗口写入。
【讨论】:
所以我实际上得到了 github 模板的代码,它似乎部分工作,但我一直在数据流中遇到这个错误 - “java.lang.RuntimeException:org.apache.beam。 sdk.util.UserCodeException: java.lang.IllegalArgumentException: GCS 对象名称不能包含回车或换行字符。" --stagingLocation=gs://test/dataflow/pipelines/staging/ \ --tempLocation=gs://test/dataflow/pipelines/temp/ \ --runner=DataflowRunner \ -- windowDuration=1m \ --numShards=1 \ --inputTopic=projects/test/topics/testTopic \ --outputDirectory=gs://test/output/YYYY/MM/DD/ \ --outputFilenameSuffix=.avro --avroTempDirectory =gs://test/avro-temp-dir/ \ --outputShardTemplate=-SSS-NNN" 这些是我用来传递给作业以运行的参数 ^^以上是关于Apache Beam GCP 在动态创建的目录中上传 Avro的主要内容,如果未能解决你的问题,请参考以下文章
使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表
请求的身份验证范围不足 - GCP 上的 Dataflow/Apache Beam
如何从 GCP 存储桶中读取 Apache Beam 中的多个文件
GCP Dataflow + Apache Beam - 缓存问题