将 PubSub 流保存到 GCS 中的分区拼花文件
Posted
技术标签:
【中文标题】将 PubSub 流保存到 GCS 中的分区拼花文件【英文标题】:Save PubSub stream to a partitioned parquet file in GCS 【发布时间】:2021-06-26 12:59:18 【问题描述】:我有一个 spark-streaming 应用程序,它从 pubsub 主题(例如 kafka)读取消息,对每个主题应用一些转换,并将它们保存为 GCS 中的 parquet 文件,由任意列分区。使用结构化流和 spark-gcs 连接器相对容易。 例如,每条消息如下所示:
"app_id": "app1",
"user_id": "u001",
"evt_timestamp": 1617105047,
"evt_data": ...
我将其读取为结构化流数据帧,然后将其划分为例如app_id
和 user_id
,然后将其保存到 GCS 存储桶中,然后看起来像这样:
gs://my-bucket/data/app_id=app1/user_id=u001/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u002/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u003/XXX.part
gs://my-bucket/data/app_id=app2/user_id=u001/XXX.part
...
我想将我的数据处理转移到 GCP,这样我就不必管理我的 Spark 基础架构。我可以重写我的应用程序以使用 DStreams 并在 Dataproc 上运行它,但重要的人不愿意使用 Spark。 我一直无法找到对数据进行分区的方法。 BigQuery 支持集群,这似乎是我需要的,但我仍然需要不断地将它保存到 GCS。它可以在 GCP 中轻松完成,还是我的用例被破坏了?
编辑:
正如接受的答案所建议的那样,我设法使用writeDynamic
和我的FileIO.Write.FileNaming
实现来实现这一点。
大致是这样的:
PCollection<String> pubsubMessages = ... // read json string messages from pubsub
PCollection<ParsedMessage> messages = pubsubMessages
.apply(ParDo.of(new ParseMessage())) // convert json pubsub message to a java bean
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(2))));
FileIO.Write<Partition, JsonMessage> writer = FileIO.<Partition, JsonMessage>writeDynamic()
.by(jsonMessage -> new Partition(/* some jsonMessage fields */))
.via(
Contextful.fn(JsonMessage::toRecord), // convert message to Sink type, in this case GenericRecord
ParquetIO.sink(OUT_SCHEMA)) // create a parquet sink
.withNaming(part -> new PartitionFileName(/* file name based on `part` fields */))
.withDestinationCoder(AvroCoder.of(Partition.class, Partition.SCHEMA))
.withNumShards(1)
.to("output");
PartitionFileName 可以是这样的
class PartFileName implements FileIO.Write.FileNaming
private final String[] partNames;
private final Serializable[] partValues;
public PartFileName(String[] partNames, Serializable[] partValues)
this.partNames = partNames;
this.partValues = partValues;
@Override
public String getFilename(
BoundedWindow window,
PaneInfo pane,
int numShards,
int shardIndex,
Compression compression)
StringBuilder dir = new StringBuilder();
for (int i = 0; i < this.partNames.length; i++)
dir
.append(partNames[i])
.append("=")
.append(partValues[i])
.append("/");
String fileName = String.format("%d_%d_%d.part", shardIndex, numShards, window.maxTimestamp().getMillis());
return String.format("%s/%s", dir.toString(), fileName);
这会产生类似的目录结构
output/date=20200301/app_id=1001/0_1_1617727449999.part
【问题讨论】:
【参考方案1】:我相信您正在寻找 Apache Beam/Google Cloud Dataflow streaming pipelines 的 Pubsub。
是的,它可以毫不费力地做你想做的事。您可以在流媒体上定义窗口,并使用Parquet IO 将其写入 GCS。
虽然不是 Parquet,但this example 从 Pubsub 读取并将文本文件写入 GCS。
要实现动态文件名功能,FileIO 的 writeDynamic
和您自己的 FilenamePolicy
应该可以正常工作。
【讨论】:
如果分区的数量(和值)事先已知,Apache Beam 允许分区。因此,通过例如分区app_id 和日期是不可能的。窗口化只是根据时间戳将传入的数据拆分为微批次。 您可以创建自己的 FilenamePolicy 来实现这一点。示例:gist.github.com/ryanmcdowell/40fe297ebf9576bf9ba14fd6645c82e6 您使用的是哪个 SDK?在 Python beam.apache.org/releases/pydoc/2.17.0/… 和 Java beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/… 中找到参考以上是关于将 PubSub 流保存到 GCS 中的分区拼花文件的主要内容,如果未能解决你的问题,请参考以下文章
尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错
无法从 Beam 中的 GCS 读取 PubSub gz 文件
我们可以使用单个 Google Cloud Dataflow 将来自多个 Pubsub(源)的数据写入多个 GCS(接收器)吗?