如何在Apache Beam / Google Dataflow中使用ParseJsons?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在Apache Beam / Google Dataflow中使用ParseJsons?相关的知识,希望对你有一定的参考价值。

java新手在这里。我很难理解如何在我的Apache Beam管道中使用ParseJsons来将字符串PCollection解析为对象PCollection。

我的理解是我需要首先定义一个匹配json结构的类,然后使用ParseJsons将json字符串映射到该类的对象。

但是,ParseJsons文档看起来很神秘。我不确定如何使用Apache Beam实际执行转换。有人能给我一个如何解析行分隔的json字符串的快速而肮脏的例子吗?

这是我做过的尝试之一,但不幸的是语法不正确。

class Product {
  private String name = null;
  private String url = null;
}

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
 .apply(new ParseJsons.of(Product))
 .apply("WriteCounts", TextIO.write().to(options.getOutput()));
答案

我想你想要:

PCollectoion<Product> = 
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
   .apply(new ParseJsons.of(Product.class))
   .setCoder(SerializableCoder.of(MyPojo.class));

以上是关于如何在Apache Beam / Google Dataflow中使用ParseJsons?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?

Apache Beam 不会将文件写入本地环境或 Google 存储

通过代码从 Apache Beam 应用程序向 Google Cloud 进行身份验证

确认 Apache Beam 上的 Google Pub/Sub 消息

Apache Beam/Google Dataflow - 将数据从 Google Datastore 导出到 Cloud Storage 中的文件

尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错