在远程集群上运行 Flink 作业而不提供 .jar
Posted
技术标签:
【中文标题】在远程集群上运行 Flink 作业而不提供 .jar【英文标题】:Run a Flink job on a remote cluster without providing a .jar 【发布时间】:2018-07-10 22:10:40 【问题描述】:我有以下问题:我想在本地机器上的 IntelliJ 中创建一个 Flink 作业,并在位于 VM 中的远程集群上运行它。我使用了 createRemoteEnvironment 函数,但匿名类和 lambda 表达式需要 jar 文件(如果我没有遗漏任何内容)。有没有一种(或另一种)方法可以在不提供 jar 文件的情况下在远程集群上运行 Flink 作业?以下代码是我想在远程集群上运行的一个简单的 Flink 作业(没有 jar)。
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.56.101", 6123);
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("192.168.56.102", 8080)
.flatMap((String sentence, Collector<Tuple2<String, Integer>> out) ->
for (String word: sentence.split(" "))
out.collect(new Tuple2<String, Integer>(word, 1));
);
// Alternative approach with an anonymous class
/*DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 8080)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>()
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out)
throws Exception
for (String word: sentence.split(" "))
out.collect(new Tuple2<String, Integer>(word, 1));
);*/
dataStream.print();
env.execute("Window WordCount");
感谢您的帮助!
【问题讨论】:
当您尝试提交时遇到什么异常? @DawidWysakowicz 如果我使用 Lambda 表达式运行代码,我会得到一个“ClassCastException”Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.userFunction of type org.apache.flink.api.common.functions.Function in instance
- 带有匿名类的代码我会得到一个“StreamTaskException”:Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.mymaster.test.WindowWordCount$1
- 两个版本都可以正常工作如果我包括一个罐子
【参考方案1】:
根据here 的描述,该错误可能不那么直观,但它本质上意味着您需要一个依赖项。
从概念上讲,这意味着您需要提供依赖项。
这通常是通过在罐子中提供它来完成的。
因此,根据依赖项的一般工作方式,我认为答案是您确实必须提供 Jar 才能运行需要依赖项的代码。
【讨论】:
以上是关于在远程集群上运行 Flink 作业而不提供 .jar的主要内容,如果未能解决你的问题,请参考以下文章