Flink 使用另一个 ObjectA 流中的 List<ObjectB> 创建新的 ObjectB 流
Posted
技术标签:
【中文标题】Flink 使用另一个 ObjectA 流中的 List<ObjectB> 创建新的 ObjectB 流【英文标题】:Flink Create new ObjectB Stream using List<ObjectB> which is in another ObjectA stream 【发布时间】:2021-11-07 01:15:01 【问题描述】:我有一个SingleOutputStreamOperator<ObjectA>
objAStream,只需要将ObjectA
流中的List<ObjectB>
列表对象下沉到kafka。
public class ObjectA
public int id;
public List<ObjectB> objBList;
public ObjectA()
SingleOutputStreamOperator<ObjectA> objAStream = someStream.map(new SomeMapper());
//required to sink only the member `ObjectB` objects in `objAStream` ObjectA
我是 flink 的新手,是否可以使用 FlapMap 或任何其他方式进行操作。请给你的cmets。
【问题讨论】:
【参考方案1】:我相信你需要这些方面的东西:
DataStream<List<ObjectB>> objBStream = objAStream
.map(a -> a.objBList)
.returns(TypeInformation.of(new TypeHint<List<ObjectB>>()));
objBStream.addSink(...);
您可以尝试不使用returns(...)
子句,但我预计它会在运行时失败。问题是 Flink 需要能够序列化正在流式传输的数据,并且由于类型擦除,泛型类型信息(在本例中为 ObjectB)不可用。
【讨论】:
谢谢大卫,是的,返回(...)它在运行时失败。如果没有返回,我似乎需要根据抛出的异常实现“ResultTypeQueryable”接口。 “由于类型擦除,无法自动确定。您可以通过对转换调用的结果使用返回(...)方法或让您的函数实现 'ResultTypeQueryable' 接口来提供类型信息提示。” 您可以通过returns(...) 方法或通过ResultTypeQueryable 接口提供必要的类型信息。我已经修改了我的答案以建议.returns(TypeInformation.of(Types.LIST(ObjectB.class)))
,但这可能不是完全需要指定的方式。这部分取决于 ObjectB 是否是 Flink 的 pov 中的正确 POJO。以上是关于Flink 使用另一个 ObjectA 流中的 List<ObjectB> 创建新的 ObjectB 流的主要内容,如果未能解决你的问题,请参考以下文章
Apache flink:使用keyBy / connect维护流中的消息输入顺序
在另一个 objectB 中创建 objectA 时,objectA 是不是是 objectS 的本地对象,并且 objectS 是不是存在于对象实例化之外?