如何从程序中停止 flink 流作业

Posted

技术标签:

【中文标题】如何从程序中停止 flink 流作业【英文标题】:How to stop a flink streaming job from program 【发布时间】:2017-11-10 11:44:48 【问题描述】:

我正在尝试为 Flink 流作业创建 JUnit 测试,该作业将数据写入 kafka 主题并分别使用 FlinkKafkaProducer09FlinkKafkaConsumer09 从同一 kafka 主题读取数据。我正在通过产品中的测试数据:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");

并检查来自消费者的数据是否与:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);

使用TestListResultSink

通过打印流,我能够按预期看到来自消费者的数据。但无法获得 Junit 测试结果,因为即使在消息完成后消费者仍会继续运行。所以它没有来测试部分。

FlinkFlinkKafkaConsumer09 中是否有任何方法可以停止进程或运行特定时间?

【问题讨论】:

【参考方案1】:

潜在的问题是流媒体程序通常不是有限的并且可以无限期地运行。

至少目前最好的方法是在您的流中插入一个特殊的控制消息,让源正确终止(只需通过离开读取循环来停止读取更多数据)。这样 Flink 会告诉所有下游操作者,他们可以在消耗完所有数据后停止。

或者,您可以在源中抛出一个特殊异常(例如,在一段时间后),这样您就可以区分“正确”终止与失败情况(通过检查错误原因)。在源中抛出异常将使程序失败。

【讨论】:

您好@TillRohrmann,感谢您的回复。在处理完所有 3 个元素后,我尝试在 map 函数中抛出一些异常。但是在我不想要的那种情况下,JUnit 测试显示为失败。如果你能给我举个例子,那就太好了。非常感谢! 我设法让我的 Flink 作业在单元测试中停止,通过以下方法:1)在代码中添加一个仅测试标志,2)在图表的某个阶段,@987654321 @然后抛出一个已知消息的异常,3)try env.execute(); catch ...并检查异常消息,如果是已知消息,吞下异常;否则再次抛出异常。【参考方案2】:

关注@TillRohrman

如果您使用 EmbeddedKafka 实例,您可以结合特殊异常方法并在单元测试中处理它,然后读取 EmbeddedKafka 主题并断言消费者值。

我发现https://github.com/asmaier/mini-kafka/blob/master/src/test/java/de/am/KafkaProducerIT.java 在这方面非常有用。

唯一的问题是您将丢失触发异常的元素,但您始终可以调整测试数据以解决此问题。

【讨论】:

【参考方案3】:

你不能在反序列化器中使用 isEndOfStream 覆盖来停止从 Kafka 获取吗?如果我没看错,flink/Kafka09Fetcher 在它的 run 方法中有以下代码,它会破坏事件循环

    if (deserializer.isEndOfStream(value)) 
                        // end of stream signaled
                        running = false;
                        break;
                    

我的想法是使用 Till Rohrmann 的控制消息的想法结合这个 isEndOfStream 方法来告诉 KafkaConsumer 停止阅读。

有什么不可行的原因吗?或者也许我忽略了一些极端情况?

https://github.com/apache/flink/blob/07de86559d64f375d4a2df46d320fc0f5791b562/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L146

【讨论】:

【参考方案4】:

在您的测试中,您可以在单独的线程中启动作业执行,等待一段时间允许它进行数据处理,取消线程(它将中断作业)并进行断言。

CompletableFuture<Void> handle = CompletableFuture.runAsync(() -> 
    try 
        environment.execute(jobName);
     catch (Exception e) 
        e.printStackTrace();
    
);
try 
    handle.get(seconds, TimeUnit.SECONDS);
 catch (TimeoutException e) 
    handle.cancel(true); // this will interrupt the job execution thread, cancel and close the job


// Make assertions here

【讨论】:

以上是关于如何从程序中停止 flink 流作业的主要内容,如果未能解决你的问题,请参考以下文章

Flink运行时之流处理程序生成流图

Flink运行时之流处理程序生成流图

Flink执行时之流处理程序生成流图

如何将 SPARK/Flink 流数据处理创建为微服务(REST API)

Flink 是如何统一批流引擎的

使用Flink进行高吞吐,低延迟和Exactly-Once语义流处理