Flink 的批处理执行模式如何实现 BOUNDED source?
Posted
技术标签:
【中文标题】Flink 的批处理执行模式如何实现 BOUNDED source?【英文标题】:How to implement a BOUNDED source for Flink's batch execution mode? 【发布时间】:2022-01-09 16:26:53 【问题描述】:我正在尝试执行 Flink (1.12.1) 批处理作业,步骤如下:
自定义 SourceFunction 以连接 MongoDB 做任何平面图和地图来转换一些数据 在其他 MongoDB 中下沉我正在尝试使用 RuntimeExexutionMode.BATCH 在 StreamExecutionEnvironment 中运行它,但应用程序抛出异常,因为检测到我的源为 UNBOUNDED...而且我无法将它设置为 BOUNDED(它必须在收集所有文档后完成在 mongo 集合中)
例外:
exception in thread "main" java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:335)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:258)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1958)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1943)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at com.grupotsk.bigdata.matadatapmexporter.MetadataPMExporter.main(MetadataPMExporter.java:33)
一些代码:
执行环境
public static StreamExecutionEnvironment getBatch()
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.addSource(new MongoSource()).print();
return env;
蒙古来源:
public class MongoSource extends RichSourceFunction<Document>
private static final long serialVersionUID = 8321722349907219802L;
private MongoClient mongoClient;
private MongoCollection mc;
@Override
public void open(Configuration con)
mongoClient = new MongoClient(
new MongoClientURI("mongodb://localhost:27017/database"));
mc=mongoClient.getDatabase("database").getCollection("collection");
@Override
public void run(SourceContext<Document> ctx) throws Exception
MongoCursor<Document> itr=mc.find(Document.class).cursor();
while(itr.hasNext())
ctx.collect(itr.next());
this.cancel();
@Override
public void cancel()
mongoClient.close();
谢谢!
【问题讨论】:
【参考方案1】:与RuntimeExecutionMode.BATCH
一起使用的源必须实现Source
而不是SourceFunction
。并且接收器应该实现Sink
而不是SinkFunction
。
有关这些新接口的介绍,请参阅Integrating Flink into your ecosystem - How to build a Flink connector from scratch 。它们在FLIP-27: Refactor Source Interface 和FLIP-143: Unified Sink API 中有描述。
【讨论】:
谢谢!!!!!!以上是关于Flink 的批处理执行模式如何实现 BOUNDED source?的主要内容,如果未能解决你的问题,请参考以下文章
Flink-DataStream流处理应用(Local模式下)运行流程-源码分析
Flink使用Flink实现索引数据到Elasticsearch