Flink:在DataStream API的批处理模式下左连接相当于Dataset API?

Posted

技术标签:

【中文标题】Flink:在DataStream API的批处理模式下左连接相当于Dataset API?【英文标题】:Flink: Left join equivalent of Dataset API in Batch mode of DataStream API? 【发布时间】:2021-09-29 18:37:47 【问题描述】:

Flink 文档中已经提到 DataSet API will be deprecated 将来。因此,我正在考虑在 Batch Mode(我相信它现在处于 Beta 版)迁移中将此 Dataset API 原型设计为 DataStream API。

我们的代码库中有这个(类似的)代码,它在数据集上使用 leftOuterJoin

 DataSet<SomeOutType> joined_out =  datasetA.
                leftOuterJoin(datasetB, JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND)
                .where((left) -> coalesce(left.getId(), -9999999L))
                .equalTo((right) -> right.company_id).with((JoinFunction<SomeTypeA, SomeTypeB, SomeOutType>) (left, right) -> 
                    SomeOutType recNew = SomeOutType.newBuilder().build();
                    recNew.setCustomerId(left.getCustomerId());
                    recNew.setCustomerName((right != null && right.cust_name != null) ? right.cust_name : "Blank");
                    ....
                    ....
                    ....
                    return  recNew;

                );

问题是我无法在 Datastream API 文档 - Join 中找到 Left JoinLeft Outer Join 等效项。

由于他们正在考虑完全弃用 DataSet API,我假设现在应该有一种方法可以在 DataStream API 中执行此 Left Outer Join。

有人可以指导我以正确的方式做到这一点吗? TIA

【问题讨论】:

【参考方案1】:

DataSet 上的关系操作(例如连接)已被弃用,取而代之的是使用 Table/SQL API 提供的关系操作,该 API 可与 DataStream API 完全互操作。

请参阅 https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/table/tableapi/#joins 和 https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/。

您可以使用表连接器并直接在它们生成的表上执行连接,或者在执行连接之前将数据流转换为表。如果需要进一步处理,您可以从表转换回数据流。鉴于表/流的二元性,这些“转换”实际上并不需要任何成本。见https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/table/data_stream_api/。

FWIW,Flink 1.14 刚刚发布,其中包含许多与该主题相关的改进。特别是,只有在 1.14(及更高版本)中,您才能将 Table API 与 DataStream API 以批处理模式结合起来。

【讨论】:

嗨,大卫。感谢您的回复,我也将研究 Table API。从昨天开始,在这里的问题之后,我做了一些搜索,找到了Process Function部分。你认为像左连接这样的东西也可以用这个来模拟吗? 您当然可以将左连接实现为 KeyedCoProcessFunction,但工作量会更大。 感谢大卫的帮助!

以上是关于Flink:在DataStream API的批处理模式下左连接相当于Dataset API?的主要内容,如果未能解决你的问题,请参考以下文章

Flink学习之DataStream API(python版本)

Flink学习之DataStream API(python版本)

第三章 flink流处理API - map和flatmap

我可以在同一个 Flink 作业中使用 DataSet API 和 DataStream API 吗?

Apache Flink -Streaming(DataStream API)

Flink DataStream API