在远程集群上运行 Flink 作业而不提供 .jar

Posted

技术标签:

【中文标题】在远程集群上运行 Flink 作业而不提供 .jar【英文标题】:Run a Flink job on a remote cluster without providing a .jar 【发布时间】:2018-07-10 22:10:40 【问题描述】:

我有以下问题:我想在本地机器上的 IntelliJ 中创建一个 Flink 作业,并在位于 VM 中的远程集群上运行它。我使用了 createRemoteEnvironment 函数,但匿名类和 lambda 表达式需要 jar 文件(如果我没有遗漏任何内容)。有没有一种(或另一种)方法可以在不提供 jar 文件的情况下在远程集群上运行 Flink 作业?以下代码是我想在远程集群上运行的一个简单的 Flink 作业(没有 jar)。

public static void main(String[] args) throws Exception 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.56.101", 6123);

    DataStream<Tuple2<String, Integer>> dataStream = env
        .socketTextStream("192.168.56.102", 8080)
        .flatMap((String sentence, Collector<Tuple2<String, Integer>> out) ->
            
                for (String word: sentence.split(" ")) 
                    out.collect(new Tuple2<String, Integer>(word, 1));
                
            );


    // Alternative approach with an anonymous class
    /*DataStream<Tuple2<String, Integer>> dataStream = env
        .socketTextStream("localhost", 8080)
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() 
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) 
        throws Exception 
            for (String word: sentence.split(" ")) 
                out.collect(new Tuple2<String, Integer>(word, 1));
            
        
    );*/

    dataStream.print();

    env.execute("Window WordCount");

感谢您的帮助!

【问题讨论】:

当您尝试提交时遇到什么异常? @DawidWysakowicz 如果我使用 Lambda 表达式运行代码,我会得到一个“ClassCastException”Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.userFunction of type org.apache.flink.api.common.functions.Function in instance - 带有匿名类的代码我会得到一个“StreamTaskException”:Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.mymaster.test.WindowWordCount$1 - 两个版本都可以正常工作如果我包括一个罐子 【参考方案1】:

根据here 的描述,该错误可能不那么直观,但它本质上意味着您需要一个依赖项。

从概念上讲,这意味着您需要提供依赖项。

这通常是通过在罐子中提供它来完成的。

因此,根据依赖项的一般工作方式,我认为答案是您确实必须提供 Jar 才能运行需要依赖项的代码。

【讨论】:

以上是关于在远程集群上运行 Flink 作业而不提供 .jar的主要内容,如果未能解决你的问题,请参考以下文章

Flink:在Flink集群上执行Jar文件

Flink集群模式部署及案例执行

Flink 作业提交到 YARN 集群是如何启动的?源码详解!

Flink 本地运行 Web UI

Flink 本地运行 Web UI

Flink运行时架构及各部署模式下作业提交流程