通过 Google Cloud Dataflow 将 PubSub 消息插入 BigQuery

Posted

技术标签:

【中文标题】通过 Google Cloud Dataflow 将 PubSub 消息插入 BigQuery【英文标题】:Insert PubSub messages into BigQuery through Google Cloud Dataflow 【发布时间】:2015-12-14 10:57:58 【问题描述】:

我想使用 Google Cloud Dataflow 将来自某个主题的 PubSub 消息数据插入到 BigQuery 表中。 一切正常,但在 BigQuery 表中,我可以看到不可读的字符串,如“ ?”。 这是我的管道:

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
     .withSchema(schema)
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))

而我的简单 StringToRowConverter 函数是:

class StringToRowConverter extends DoFn<String, TableRow> 
private static final long serialVersionUID = 0;

@Override
public void processElement(ProcessContext c) 
    for (String word : c.element().split(",")) 
      if (!word.isEmpty()) 
          System.out.println(word);
        c.output(new TableRow().set("data", word));
      
    


这是我通过 POST 请求发送的消息:

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish

 "messages": [
  
   "attributes":
"key": "tablet, smartphone, desktop",
"value": "eng"
   ,
   "data": "34gf5ert"
  
 ]

我错过了什么? 谢谢!

【问题讨论】:

This 是一个开源软件,可用于将发布/订阅定向到 BQ 【参考方案1】:

根据https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage,pubsub 消息的 JSON 有效负载是 base64 编码的。默认情况下,Dataflow 中的 PubsubIO 使用字符串 UTF8 编码器。您提供的示例字符串“34gf5ert”,在经过 base64 解码然后解释为 UTF-8 字符串时,准确地给出了“哇”。

【讨论】:

【参考方案2】:

这就是我解压缩 pubsub 消息的方式:

@Override
public void processElement(ProcessContext c) 

    String json = c.element();

    HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>().getType());
    String unpacked = items.get("JsonKey");

希望对你有用。

【讨论】:

以上是关于通过 Google Cloud Dataflow 将 PubSub 消息插入 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表

通过 Google Cloud Dataflow 将 PubSub 消息插入 BigQuery

用于 NRT 数据应用的 Google Cloud DataFlow

通过Dataflow管道写入Cloud SQL非常慢

Google Cloud Dataflow 和 Google Cloud Dataproc 有啥区别?

在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名