Flink 使用另一个 ObjectA 流中的 List<ObjectB> 创建新的 ObjectB 流

Posted

技术标签:

【中文标题】Flink 使用另一个 ObjectA 流中的 List<ObjectB> 创建新的 ObjectB 流【英文标题】:Flink Create new ObjectB Stream using List<ObjectB> which is in another ObjectA stream 【发布时间】:2021-11-07 01:15:01 【问题描述】:

我有一个SingleOutputStreamOperator&lt;ObjectA&gt; objAStream,只需要将ObjectA 流中的List&lt;ObjectB&gt; 列表对象下沉到kafka。

    public class ObjectA 
        public int id;
        public List<ObjectB> objBList;

        public ObjectA() 
        
    

SingleOutputStreamOperator<ObjectA> objAStream = someStream.map(new SomeMapper());

//required to sink only the member `ObjectB` objects in `objAStream` ObjectA

我是 flink 的新手,是否可以使用 FlapMap 或任何其他方式进行操作。请给你的cmets。

【问题讨论】:

【参考方案1】:

我相信你需要这些方面的东西:

DataStream<List<ObjectB>> objBStream = objAStream
  .map(a -> a.objBList)
  .returns(TypeInformation.of(new TypeHint<List<ObjectB>>()));

objBStream.addSink(...);

您可以尝试不使用returns(...) 子句,但我预计它会在运行时失败。问题是 Flink 需要能够序列化正在流式传输的数据,并且由于类型擦除,泛型类型信息(在本例中为 ObjectB)不可用。

【讨论】:

谢谢大卫,是的,返回(...)它在运行时失败。如果没有返回,我似乎需要根据抛出的异常实现“ResultTypeQueryable”接口。 “由于类型擦除,无法自动确定。您可以通过对转换调用的结果使用返回(...)方法或让您的函数实现 'ResultTypeQueryable' 接口来提供类型信息提示。” 您可以通过returns(...) 方法或通过ResultTypeQueryable 接口提供必要的类型信息。我已经修改了我的答案以建议 .returns(TypeInformation.of(Types.LIST(ObjectB.class))),但这可能不是完全需要指定的方式。这部分取决于 ObjectB 是否是 Flink 的 pov 中的正确 POJO。

以上是关于Flink 使用另一个 ObjectA 流中的 List<ObjectB> 创建新的 ObjectB 流的主要内容,如果未能解决你的问题,请参考以下文章

Apache flink:使用keyBy / connect维护流中的消息输入顺序

Flink 窗口分配器 WindowAssigner

Flink 窗口分配器 WindowAssigner

在另一个 objectB 中创建 objectA 时,objectA 是不是是 objectS 的本地对象,并且 objectS 是不是存在于对象实例化之外?

flink入门-流式计算概念

Flink中的算子操作