集群中的 Apache Flink 流不会将作业与工作人员分开

Posted

技术标签:

【中文标题】集群中的 Apache Flink 流不会将作业与工作人员分开【英文标题】:Apache Flink streaming in cluster does not split jobs with workers 【发布时间】:2015-11-23 03:22:10 【问题描述】:

我的目标是使用 Kafka 作为源和 Flink 作为流处理引擎来设置一个高吞吐量的集群。这就是我所做的。

我已经在 master 和 worker 上设置了一个 2 节点集群,配置如下。

掌握 flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 256

taskmanager.heap.mb: 512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

Worker flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 512 #256

taskmanager.heap.mb: 1024 #512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

Master 节点上的slaves 文件如下所示:

<WORKER_IP_ADDR>
localhost

两个节点上的 flink 设置位于同名文件夹中。我通过运行在主服务器上启动集群

bin/start-cluster-streaming.sh

这将启动 Worker 节点上的任务管理器。

我的输入源是 Kafka。这是sn-p。

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> stream = 
    env.addSource(
    new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);

env.execute("Kafka stream");

这是我的 Sink 函数

public class MySink implements SinkFunction<String> 

    private static final long serialVersionUID = 1L;

    public void invoke(String arg0) throws Exception 
        processMessage(arg0);
        System.out.println("Processed Message");
    

这是我的 pom.xml 中的 Flink 依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>0.9.0</version>
</dependency>

然后我在master上用这个命令运行打包好的jar

bin/flink run flink-test-jar-with-dependencies.jar

但是,当我将消息插入 Kafka 主题时,我能够仅在主节点上考虑来自我的 Kafka 主题的所有消息(通过我的SinkFunction 实现的调用方法中的调试消息)。

在作业管理器 UI 中,我可以看到 2 个任务管理器,如下所示:

仪表板也看起来像这样: 问题:

    为什么工作节点没有收到任务? 我是否缺少某些配置?

【问题讨论】:

感谢您发布这么好的问题!您如何在主服务器上获取这些调试消息?在 master (JobManager) 上执行用户代码是不可能的,你使用的是 Flink 0.9.0 还是 0.10-SNAPSHOT?你的stringSinkFunction 怎么样? (它只是打印到标准输出吗?) @rmetzger,不客气。我已经更新了这个问题。任何帮助将不胜感激。 @SudarshanShubakar,从屏幕截图中可以看出,您已经注册了 2 个 TM,每个 TM 有 50 个插槽。这也与您的配置相对应。此外,看起来您的工作执行得很好。在 100 个插槽中的每一个中,都部署了任务 Custom Source -&gt; Stream Sink (x/100)。因此,我想知道什么对您不起作用。会不会是您的主题少于100 分区?由于 Flink 在 Kafka 分区和源任务之间创建了映射,因此会有一些任务没有收到任何输入。 ok@TillRohrmann 这可能是问题所在。我相信关于 Kafka 主题的分区少于 100 个。让我在更改后报告。 好吧@TillRohrmann 评论解决了这个问题。你能把你的评论变成答案吗? 【参考方案1】:

在 Flink 中从 Kafka 源读取时,源任务的最大并行度受给定 Kafka 主题的分区数限制。 Kafka 分区是 Flink 中源任务可以消耗的最小单位。如果分区多于源任务,那么有些任务会消耗多个分区。

因此,为了向所有 100 个任务提供输入,您应该确保 Kafka 主题至少有 100 个分区。

如果您无法更改主题的分区数,则还可以使用 setParallelism 方法使用较低并行度从 Kafka 进行初始读取。或者,您可以使用 rebalance 方法,该方法将在前面操作的所有可用任务中随机播放您的数据。

【讨论】:

接受了这个答案,因为它解决了我的问题。但是,我想进一步了解您在答案的最后一部分中提到的内容。当您说“最初使用 setParallelism 方法使用较低并行度从 Kafka 读取”时,您到底是什么意思?设置后如何重置?另外我必须尝试重新平衡,只是将它添加到代码中似乎对我没有这个问题。 您可以基于算子设置并行度(例如 map、filter、reduce 函数)。然后将使用给定的并行度执行运算符。 setParallelism 方法覆盖默认并行度或您在 ExecutionEnvironment 处指定的并行度。

以上是关于集群中的 Apache Flink 流不会将作业与工作人员分开的主要内容,如果未能解决你的问题,请参考以下文章

如何从程序中停止 flink 流作业

字节跳动Jstorm 到Apache Flink 的迁移实践

官宣|Apache Flink 1.13.0 正式发布,流处理应用更加简单高效!

官宣|Apache Flink 1.13.0 正式发布,流处理应用更加简单高效!

Apache Flink——侧输出流(side output)

Apache 流框架 Flink,Spark Streaming,Storm对比分析 - Part2