Flink 可以通过 java 代码使用 Web Ui 将多个作业附加到 Stream 本地环境吗?

Posted

技术标签:

【中文标题】Flink 可以通过 java 代码使用 Web Ui 将多个作业附加到 Stream 本地环境吗?【英文标题】:Can Flink attach multiple jobs to Stream local envirnoment with Web Ui by java code? 【发布时间】:2017-05-09 13:39:14 【问题描述】:

Flink 可以通过 java 代码将多个作业附加到带有 Web Ui 的 Stream 本地环境吗?

我的代码是这样的

env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); 
env.addSource(...); 
env.addSink(...); 
env.execute("stream job1"); 

我想要做的是循环并通过 web ui 管理将 job2 job3 附加到同一环境。这如何在java代码中完成?

我尝试使用env2 = StreamExecutionEnvironment.getExecutionEnvironment(), 但似乎新的 job2 没有附加到我之前创建的环境中。

请帮助我提前谢谢

【问题讨论】:

【参考方案1】:

您应该能够重新使用您存储在env 中的已创建StreamExecutionEnvironment

StreamExecutionEnvironment.createLocalEnvironmentWithWebUI 不会设置相应的上下文环境工厂。因此,StreamExecutionEnvironment.getExecutionEnvironment 不会返回创建的启用了 Web UI 的本地环境。

【讨论】:

是的,我确实尝试过恢复它,但我收到错误消息,说地址绑定已在使用中,我认为 env 出于某种原因尝试使用相同的 jobmanager 地址。我将很快用代码和错误消息更新我的问题。感谢您的帮助。 你是对的。问题是底层LocalStreamEnvironment 为每个execute 创建一个使用相同端口的新LocalFlinkMiniCluster。这可能会导致问题。您是否尝试在同一个本地集群上同时运行 Flink 作业? 是的,我正在尝试在一个本地集群中运行多个作业。 并发还是一个接一个? 同时运行它们将不起作用,因为 LocalFlinkMiniCluster 只会以第一个作业运行所需的插槽数启动。不过,您可以做的是启动LocalFlinkMiniCluster,然后使用RemoteStreamEnvironment 将作业提交到该集群。但是,请注意,您必须使用尽可能多的插槽来启动集群,以便所有作业同时运行。或者,您也可以将所有作业合二为一。然后运营商可以共享插槽。【参考方案2】:

似乎工作(单个LocalStreamEnvironment)一个做多个

DataStream stream = env.addSource(...);
...
stream.addSink(...);

最后

env.execute();

然而,这似乎是一项具有多个来源->汇流的工作,而不是多个工作。


好的。此处用于将作业提交到本地集群的 hack。

在主线程中,使用一些配置启动本地集群

Configuration configuration = new Configuration();
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 100);

// start cluster
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
exec.start();
// sleep or wait for all job finishes
Thread.sleep(Long.MAX_VALUE);

然后在其他线程中,提交作业(使用主线程的exec

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream stream = env.addSource(...);
...
stream.addSink(...);

StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setJobName(name);
JobGraph jobGraph = streamGraph.getJobGraph();
exec.submitJobAndWait(jobGraph, true);

注意StreamExecutionEnvironment 仅用于生成JobGraphLocalFlinkMiniCluster 是执行的地方。

在最后一行,可以选择是否等待作业(submitJobDetached)。

【讨论】:

以上是关于Flink 可以通过 java 代码使用 Web Ui 将多个作业附加到 Stream 本地环境吗?的主要内容,如果未能解决你的问题,请参考以下文章

Flink 本地运行 Web UI

Java SPI 机制在 Flink 中的应用(源码分析)

Flink SQL 客户端如何使用

Flink SQL 客户端如何使用

使用Java lambda表达式实现Flink WordCount

应用安全 - Web框架 - Apache Flink - 漏洞汇总