如何从程序中停止 flink 流作业
Posted
技术标签:
【中文标题】如何从程序中停止 flink 流作业【英文标题】:How to stop a flink streaming job from program 【发布时间】:2017-11-10 11:44:48 【问题描述】:我正在尝试为 Flink 流作业创建 JUnit 测试,该作业将数据写入 kafka 主题并分别使用 FlinkKafkaProducer09
和 FlinkKafkaConsumer09
从同一 kafka 主题读取数据。我正在通过产品中的测试数据:
DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
并检查来自消费者的数据是否与:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);
使用TestListResultSink
。
通过打印流,我能够按预期看到来自消费者的数据。但无法获得 Junit 测试结果,因为即使在消息完成后消费者仍会继续运行。所以它没有来测试部分。
Flink
或 FlinkKafkaConsumer09
中是否有任何方法可以停止进程或运行特定时间?
【问题讨论】:
【参考方案1】:潜在的问题是流媒体程序通常不是有限的并且可以无限期地运行。
至少目前最好的方法是在您的流中插入一个特殊的控制消息,让源正确终止(只需通过离开读取循环来停止读取更多数据)。这样 Flink 会告诉所有下游操作者,他们可以在消耗完所有数据后停止。
或者,您可以在源中抛出一个特殊异常(例如,在一段时间后),这样您就可以区分“正确”终止与失败情况(通过检查错误原因)。在源中抛出异常将使程序失败。
【讨论】:
您好@TillRohrmann,感谢您的回复。在处理完所有 3 个元素后,我尝试在 map 函数中抛出一些异常。但是在我不想要的那种情况下,JUnit 测试显示为失败。如果你能给我举个例子,那就太好了。非常感谢! 我设法让我的 Flink 作业在单元测试中停止,通过以下方法:1)在代码中添加一个仅测试标志,2)在图表的某个阶段,@987654321 @然后抛出一个已知消息的异常,3)try env.execute(); catch ...
并检查异常消息,如果是已知消息,吞下异常;否则再次抛出异常。【参考方案2】:
关注@TillRohrman
如果您使用 EmbeddedKafka 实例,您可以结合特殊异常方法并在单元测试中处理它,然后读取 EmbeddedKafka 主题并断言消费者值。
我发现https://github.com/asmaier/mini-kafka/blob/master/src/test/java/de/am/KafkaProducerIT.java 在这方面非常有用。
唯一的问题是您将丢失触发异常的元素,但您始终可以调整测试数据以解决此问题。
【讨论】:
【参考方案3】:你不能在反序列化器中使用 isEndOfStream 覆盖来停止从 Kafka 获取吗?如果我没看错,flink/Kafka09Fetcher 在它的 run 方法中有以下代码,它会破坏事件循环
if (deserializer.isEndOfStream(value))
// end of stream signaled
running = false;
break;
我的想法是使用 Till Rohrmann 的控制消息的想法结合这个 isEndOfStream 方法来告诉 KafkaConsumer 停止阅读。
有什么不可行的原因吗?或者也许我忽略了一些极端情况?
https://github.com/apache/flink/blob/07de86559d64f375d4a2df46d320fc0f5791b562/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L146
【讨论】:
【参考方案4】:在您的测试中,您可以在单独的线程中启动作业执行,等待一段时间允许它进行数据处理,取消线程(它将中断作业)并进行断言。
CompletableFuture<Void> handle = CompletableFuture.runAsync(() ->
try
environment.execute(jobName);
catch (Exception e)
e.printStackTrace();
);
try
handle.get(seconds, TimeUnit.SECONDS);
catch (TimeoutException e)
handle.cancel(true); // this will interrupt the job execution thread, cancel and close the job
// Make assertions here
【讨论】:
以上是关于如何从程序中停止 flink 流作业的主要内容,如果未能解决你的问题,请参考以下文章