将 csv 数据发布/订阅到 Dataflow 到 BigQuery
Posted
技术标签:
【中文标题】将 csv 数据发布/订阅到 Dataflow 到 BigQuery【英文标题】:Pub/Sub csv data to Dataflow to BigQuery 【发布时间】:2021-04-09 01:26:16 【问题描述】:我的管道是 IoTCore -> Pub/Sub -> 数据流 -> BigQuery。最初我得到的数据是 Json 格式,并且管道工作正常。现在我需要转向 csv,问题是我使用的 Google 定义的数据流模板使用 Json 输入而不是 csv。有没有一种简单的方法可以通过数据流将 csv 数据从 pub/sub 传输到 bigquery。模板可能可以更改,但它是用我从未使用过的 Java 实现的,因此需要很长时间才能实现。我还考虑过在 python 中实现一个完整的自定义模板,但这会花费太长时间。 这是google提供的模板的链接: https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java
示例:目前我的 pub/sub 消息是 JSON 并且可以正常工作
""Id":"123","Temperature":"50","Charge":"90""
但我需要将其更改为逗号分隔值
"123,50,90"
【问题讨论】:
【参考方案1】:很简单:什么都不做!!如果您查看this line,您会看到使用的消息类型是 PubSub 消息 JSON,而不是 JSON 中的内容。
所以,为了防止出现任何问题(查询和插入),写入另一个表,它应该可以正常工作!
【讨论】:
您指出的那一行确实为我指明了正确的方向,但我不明白您回答的第二部分。代码中的 cmets 提到了这个 github.com/GoogleCloudPlatform/DataflowTemplates/blob/… 那么是否可以创建一个将 csv 转换为 json 的 UDF,然后模板的其余部分可以像以前一样工作? 是的,您可以在 options 参数中定义一个 javascript 函数,该函数在参数中接受一个字符串并返回一个字符串。您的有效负载,即 PubSub 消息,不仅是 CSV,而且整个 PubSub JSON 消息都会提交给 JS 函数。【参考方案2】:您能否分享您现有的解析 JSON 格式数据和新旧数据示例的 Python 代码,以便可以相应地对其进行自定义。
另外你可以参考Python code这里,它已经对PCollection进行了字数转换逻辑,希望它可以给你一些参考来定制你的代码。
【讨论】:
我添加了新旧数据样本。我现在没有 python 代码,因为我使用的是上面链接的谷歌提供的 Java 代码。问题是这段代码只支持 Json 格式作为输入,但我需要转移到 csv。我只会将 Python 作为最后的手段,因为从头开始编写整个代码会非常耗时。以上是关于将 csv 数据发布/订阅到 Dataflow 到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章
在 Dataflow 中使用啥转换来合并具有不同列的 csv 文件,同时将它们加载到 BigQuery?
在 Dataflow 的云存储桶上哪里可以找到这个 pubsub 订阅?
Dataflow 是不是应该使用来自 Pub/Sub 主题或订阅的事件? [复制]
Dataflow/Apache Beam 在啥阶段确认发布/订阅消息?