将 PubSub 流保存到 GCS 中的分区拼花文件

Posted

技术标签:

【中文标题】将 PubSub 流保存到 GCS 中的分区拼花文件【英文标题】:Save PubSub stream to a partitioned parquet file in GCS 【发布时间】:2021-06-26 12:59:18 【问题描述】:

我有一个 spark-streaming 应用程序,它从 pubsub 主题(例如 kafka)读取消息,对每个主题应用一些转换,并将它们保存为 GCS 中的 parquet 文件,由任意列分区。使用结构化流和 spark-gcs 连接器相对容易。 例如,每条消息如下所示:


  "app_id": "app1", 
  "user_id": "u001", 
  "evt_timestamp": 1617105047, 
  "evt_data":  ... 

我将其读取为结构化流数据帧,然后将其划分为例如app_iduser_id,然后将其保存到 GCS 存储桶中,然后看起来像这样:

gs://my-bucket/data/app_id=app1/user_id=u001/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u002/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u003/XXX.part
gs://my-bucket/data/app_id=app2/user_id=u001/XXX.part
...

我想将我的数据处理转移到 GCP,这样我就不必管理我的 Spark 基础架构。我可以重写我的应用程序以使用 DStreams 并在 Dataproc 上运行它,但重要的人不愿意使用 Spark。 我一直无法找到对数据进行分区的方法。 BigQuery 支持集群,这似乎是我需要的,但我仍然需要不断地将它保存到 GCS。它可以在 GCP 中轻松完成,还是我的用例被破坏了?

编辑:

正如接受的答案所建议的那样,我设法使用writeDynamic 和我的FileIO.Write.FileNaming 实现来实现这一点。 大致是这样的:

PCollection<String> pubsubMessages = ... // read json string messages from pubsub
PCollection<ParsedMessage> messages = pubsubMessages
    .apply(ParDo.of(new ParseMessage())) // convert json pubsub message to a java bean
    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(2))));

FileIO.Write<Partition, JsonMessage> writer = FileIO.<Partition, JsonMessage>writeDynamic()
    .by(jsonMessage -> new Partition(/* some jsonMessage fields */))
    .via(
        Contextful.fn(JsonMessage::toRecord), // convert message to Sink type, in this case GenericRecord
        ParquetIO.sink(OUT_SCHEMA)) // create a parquet sink
    .withNaming(part -> new PartitionFileName(/* file name based on `part` fields */))
    .withDestinationCoder(AvroCoder.of(Partition.class, Partition.SCHEMA))
    .withNumShards(1)
    .to("output");

PartitionFileName 可以是这样的

class PartFileName implements FileIO.Write.FileNaming 
  private final String[] partNames;
  private final Serializable[] partValues;


  public PartFileName(String[] partNames, Serializable[] partValues) 
    this.partNames = partNames;
    this.partValues = partValues;
  

  @Override
  public String getFilename(
      BoundedWindow window,
      PaneInfo pane,
      int numShards,
      int shardIndex,
      Compression compression) 

    StringBuilder dir = new StringBuilder();
    for (int i = 0; i < this.partNames.length; i++)   
       dir
        .append(partNames[i])
        .append("=")
        .append(partValues[i])
        .append("/");
    

    String fileName = String.format("%d_%d_%d.part", shardIndex, numShards, window.maxTimestamp().getMillis());

    return String.format("%s/%s", dir.toString(), fileName);
  

这会产生类似的目录结构

output/date=20200301/app_id=1001/0_1_1617727449999.part

【问题讨论】:

【参考方案1】:

我相信您正在寻找 Apache Beam/Google Cloud Dataflow streaming pipelines 的 Pubsub。

是的,它可以毫不费力地做你想做的事。您可以在流媒体上定义窗口,并使用Parquet IO 将其写入 GCS。

虽然不是 Parquet,但this example 从 Pubsub 读取并将文本文件写入 GCS。

要实现动态文件名功能,FileIO 的 writeDynamic 和您自己的 FilenamePolicy 应该可以正常工作。

【讨论】:

如果分区的数量(和值)事先已知,Apache Beam 允许分区。因此,通过例如分区app_id 和日期是不可能的。窗口化只是根据时间戳将传入的数据拆分为微批次。 您可以创建自己的 FilenamePolicy 来实现这一点。示例:gist.github.com/ryanmcdowell/40fe297ebf9576bf9ba14fd6645c82e6 您使用的是哪个 SDK?在 Python beam.apache.org/releases/pydoc/2.17.0/… 和 Java beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/… 中找到参考

以上是关于将 PubSub 流保存到 GCS 中的分区拼花文件的主要内容,如果未能解决你的问题,请参考以下文章

从 Impala 分区拼花表创建文本表

在 AzureML 上保存分区拼花

尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错

无法从 Beam 中的 GCS 读取 PubSub gz 文件

我们可以使用单个 Google Cloud Dataflow 将来自多个 Pubsub(源)的数据写入多个 GCS(接收器)吗?

编写拼花文件时如何避免空文件?