Java Apache Beam PCollections 以及如何使它们工作?

Posted

技术标签:

【中文标题】Java Apache Beam PCollections 以及如何使它们工作?【英文标题】:Java Apache Beam PCollections and how to make them work? 【发布时间】:2019-12-18 09:29:38 【问题描述】:

首先让我描述一下场景。

第 1 步。我必须逐行读取文件。该文件是一个 .json 文件,每一行的格式如下:


"schema":Several keys that are to be deleted,
"payload":"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"USD","key5":"100"

第 2 步。删除模式对象并最终得到(为后续步骤添加更多示例):

"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"USD","key5":"100"
"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"US","key5":"90"
"key1":2002,"key2":"cccc","key3":"hhhh","key4":"CN","key5":"80"

步骤 3. 将这些值拆分为 key 和 value,方法是在内存中将它们 json 并使用字符串作为键和值与 map

"key1":20001,"key2":"aaaa","key3":"bbbb" = "key4":"USD","key5":"100"
"key1":20001,"key2":"aaaa","key3":"bbbb" = "key4":"US","key5":"90"
"key1":2002,"key2":"cccc","key3":"hhhh" = "key4":"CN","key5":"80"

第 4 步,由于我对 Pcollections 缺乏了解,我无法完成第 4 步。我需要抓取所有读取的行并执行 GroupByKey 以便它最终会像:

"key1":20001,"key2":"aaaa","key3":"bbbb" = [ 
                                        "key4":"USD","key5":"100",
                                        "key4":"US","key5":"90"    ]
"key1":2002,"key2":"cccc","key3":"hhhh" = "key4":"CN","key5":"80"

现在我的代码如下所示:

static void runSimplePipeline(PipelineOptionsCustom options) 
            Pipeline p = Pipeline.create(options);

            p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
                .apply("TransformData", ParDo.of(new DoFn<String, String>() 
                    @ProcessElement
                    public void processElement(ProcessContext c)  
                        Gson gson = new GsonBuilder().create();
                        ObjectMapper oMapper = new ObjectMapper();
                        JSONObject obj_key = new JSONObject();
                        JSONObject obj_value = new JSONObject();
                        List<String> listMainKeys = Arrays.asList(new String[]"Key1", "Key2", "Key3");


                        HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);
                        parsedMap.remove("schema");

                        Map<String, String> map = oMapper.convertValue(parsedMap.get("payload"), Map.class);
                        for (Map.Entry<String,String> entry : map.entrySet()) 
                            if (listMainKeys.contains(entry.getKey())) 
                                obj_key.put(entry.getKey(),entry.getValue());
                             else 
                                obj_value.put(entry.getKey(),entry.getValue());
                            

                        
                        KV objectKV = KV.of(obj_key.toJSONString(), obj_value.toJSONString());

                        System.out.print(obj_key.toString() + " : " + obj_value.toString() +"\n");

                    
                )); <------- RIGHT HERE

            p.run().waitUntilFinish();
          

现在最明显的部分是,在它说“就在这里”的地方,我应该使用 CountByKey 进行另一个申请,但这需要完整的 PCollection,这就是我不太了解的地方。

【问题讨论】:

您已经拥有KV 对,因此您只需申请GroupByKey。 Example 我认为这让我工作起来了,但是我仍然不太了解 PCollections,所以我不知道如何 Println() 这些东西。 没关系,谢谢。 【参考方案1】:

这是代码,感谢 Guillem Xercavins 链接的 Github:

static void runSimplePipeline(PipelineOptionsCustom options) 
    Pipeline p = Pipeline.create(options);

    PCollection<Void> results = p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
            .apply("TransformData", ParDo.of(new DoFn<String, KV<String, String>>() 
                @ProcessElement
                public void processElement(ProcessContext c) 
                    Gson gson = new GsonBuilder().create();
                    ObjectMapper oMapper = new ObjectMapper();
                    JSONObject obj_key = new JSONObject();
                    JSONObject obj_value = new JSONObject();
                    List<String> listMainKeys = Arrays
                            .asList(new String[]  "EBELN", "AEDAT", "BATXT", "EKOTX", "Land1", "WAERS" );

                    HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);
                    parsedMap.remove("schema");

                    Map<String, String> map = oMapper.convertValue(parsedMap.get("payload"), Map.class);
                    for (Map.Entry<String, String> entry : map.entrySet()) 
                        if (listMainKeys.contains(entry.getKey())) 
                            obj_key.put(entry.getKey(), entry.getValue());
                         else 
                            obj_value.put(entry.getKey(), entry.getValue());
                        

                    
                    KV objectKV = KV.of(obj_key.toJSONString(), obj_value.toJSONString());
                    c.output(objectKV);

                
            )).apply("Group By Key", GroupByKey.<String, String>create())
            .apply("Continue Processing", ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() 
                @ProcessElement
                public void processElement(ProcessContext c) 
                    System.out.print(c.element());
                
            ));

    p.run().waitUntilFinish();

【讨论】:

以上是关于Java Apache Beam PCollections 以及如何使它们工作?的主要内容,如果未能解决你的问题,请参考以下文章

Java Apache Beam PCollections 以及如何使它们工作?

Apache Beam 数据流 BigQuery

使用 Java 写入数据库时​​ Apache Beam 管道中的异常处理

Beam编程系列之Java SDK Quickstart(官网的推荐步骤)

Apache Beam 的 BigQueryIO (Java):无法将 TIMESTAMP 字段写入 BigQuery——fastxml.jackson 异常“类型不支持”

JAVA - Apache BEAM- GCP:GroupByKey 与 Direct Runner 一起工作正常,但与 Dataflow runner 一起失败