在一个 flink 作业中使用 collect() 和 env.execute()

Posted

技术标签:

【中文标题】在一个 flink 作业中使用 collect() 和 env.execute()【英文标题】:use collect() and env.execute() in one flink job 【发布时间】:2017-11-28 11:29:30 【问题描述】:

我正在尝试在 Flink 中编写一个需要两个阶段的计算。

在第一阶段,我创建一个 Graph 并获取它的顶点 ID:

List<String> ids = graph.getVertexIds().collect();

在第二阶段,我想使用这些 id 为每个顶点运行 SingleSourceShortestPath。

for (String id: ids)
        System.out.println("Source Id: "+id);
        graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
    

它在本地工作(在 IntelliJ IDE 和命令行中使用 ./bin/flink run ...),但是当我使用其 WebUI 在 Flink 上提交作业时,程序只会执行到 collect() 方法并且不运行程序的其余部分(对于声明和print())。

有什么问题?

这是我的代码:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.library.SingleSourceShortestPaths;

import java.util.ArrayList;
import java.util.List;

public class Main 
    public static void main(String[] args) throws Exception 

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        Edge<String, Double> e1 = new Edge<String, Double>("1", "2", 0.5);
        Edge<String, Double> e2 = new Edge<String, Double>("2", "3", 0.5);
        Edge<String, Double> e3 = new Edge<String, Double>("4", "5", 0.5);
        Edge<String, Double> e4 = new Edge<String, Double>("5", "6", 0.5);
        Edge<String, Double> e5 = new Edge<String, Double>("7", "8", 0.5);


        List<Edge<String, Double>> edgeList = new ArrayList<Edge<String, Double>>();
        edgeList.add(e1);
        edgeList.add(e2);
        edgeList.add(e3);
        edgeList.add(e4);
        edgeList.add(e5);


        Graph<String, String, Double> graph = Graph.fromCollection(edgeList,
                new MapFunction<String, String>() 
                    public String map(String value) 
                        return value;
                    
                , env);

        List<String> ids = graph.getVertexIds().collect();

        for (String id: ids)
            System.out.println("Source Id: "+id);
            graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
        
    

【问题讨论】:

【参考方案1】:

基于这个link,Flink 转换是惰性的,这意味着它们在调用 sink 操作 之前不会执行。

Flink 中的sink 操作 触发流的执行以产生程序所需的结果,例如将结果保存到文件系统或打印到标准输出

Dataset.collect()Dataset.Count()Dataset.print()等方法是触发实际数据转换的sink操作。

【讨论】:

以上是关于在一个 flink 作业中使用 collect() 和 env.execute()的主要内容,如果未能解决你的问题,请参考以下文章

是否可以在 Flink 的 Job Manager 中运行一个简单的作业?

我可以在同一个 Flink 作业中使用 DataSet API 和 DataStream API 吗?

如何在 Flink 独立集群上的 Flink 作业中使用两个 Kerberos 密钥表(用于 Kafka 和 Hadoop HDFS)?

运行 Apache Flink 作业时链接失败

如何通过在 Apache Flink 中使用上传的 jar 来提交作业?

Flink作业使用windows的nc命令产生socket流