Flink Gelly 扩展边缘类并在 DataSet 中使用

Posted

技术标签:

【中文标题】Flink Gelly 扩展边缘类并在 DataSet 中使用【英文标题】:Flink Gelly extending edge class and using it in DataSet 【发布时间】:2016-10-12 20:00:39 【问题描述】:

在 Gelly 中,我正在尝试制作一个称为 Temporal edge 的特殊 Edge,为了使这更容易,我创建了一个名为 Temporaledgev3 的类:

public class TemporalEdgev3<K, V> extends Edge<K, Tuple3<V,Integer,Integer>> 

/*
Creates new temporaledge with only null values
 */
public TemporalEdgev3() 

/*
* Constructor to make a temporal edge version 2, has 5 input values but makes a
* typle 3 which is compatible with Gelly
* */
public TemporalEdgev3(K src, K trg, V val, Integer start, Integer end) 
    this.f0 = src;
    this.f1 = trg;
    this.f2 = new Tuple3<V,Integer,Integer>(val,start,end);

现在我正在尝试将这些边添加到 Flink DataSet 中,以便它可以在图形中使用,但我似乎无法弄清楚如何。但是,当我使用具有相同构造函数的 Edge 类时,它可以工作。

代码如下,最后一行报错

// a temporal set created with Flink, now we need to make it into a temporal set into gelly
DataSet<Tuple5<Long,Long, Double,Integer, Integer>> temporalset = env.readCsvFile("./datasets/testdata")
        .fieldDelimiter(",")  // node IDs are separated by spaces
        .ignoreComments("%")  // comments start with "%"
        .types(Long.class,Long.class,Double.class,Integer.class,Integer.class); // read the node IDs as Longs

DataSet<TemporalEdgev3<Long,Double>> edgeset3 = temporalset.map(new MapFunction<Tuple5<Long, Long, Double, Integer, Integer>, TemporalEdgev3<Long, Double>>() 
    @Override
    public TemporalEdgev3<Long, Double> map(Tuple5<Long, Long, Double, Integer, Integer> value) throws Exception 

        return new TemporalEdgev3<Long, Double>(value.f0,value.f1,value.f2,value.f3,value.f4);
    
);

DataSet<Edge<Long,Tuple3<Double,Integer,Integer>>> edgeset4 = temporalset.map(new MapFunction<Tuple5<Long, Long, Double, Integer, Integer>, Edge<Long, Tuple3<Double, Integer, Integer>>>() 
    @Override
    public Edge<Long, Tuple3<Double, Integer, Integer>> map(Tuple5<Long, Long, Double, Integer, Integer> value) throws Exception 
        return new Edge<Long, Tuple3<Double, Integer, Integer>>(value.f0,value.f1, new Tuple3<Double, Integer, Integer>(value.f2,value.f3,value.f4));
    
);

Graph<Long, NullValue, Tuple3<Double,Integer,Integer>> temporalgraph = Graph.fromDataSet(edgeset4,env);

Graph<Long,NullValue, Tuple3<Double,Integer,Integer>> temporalgraph2 = Graph.fromDataSet(edgeset3,env);

错误: 第一个参数类型错误。找到:'org.apache.flink.api.java.DataSet>',需要:'org.apache.flink.api.java.DataSet>' 少...

fromDataSet
(org.apache.flink.api.java.DataSet<org.apache.flink.graph.Edge<K,EV>>,
ExecutionEnvironment)
in Graph cannot be applied
to
(org.apache.flink.api.java.DataSet<flink.gelly.school.TemporalEdgev3<java.lang.Long,java.lang.Double>>,
ExecutionEnvironment)
 
 reason: no instance(s) of type variable(s) EV, K exist so that TemporalEdgev3<Long, Double> conforms to Edge<K, EV>

也许我只是不知道如何使用泛型类型

【问题讨论】:

【参考方案1】:

您不需要扩展Edge 类型。您可以简单地使用Tuple3 或自定义边缘值类型。

您的图形声明 Graph&lt;Long, NullValue, Tuple3&lt;Double,Integer,Integer&gt;&gt; 需要 DataSet&lt;Edge&lt;Long, Tuple3&lt;Double,Integer,Integer&gt;&gt;&gt; 作为边输入。这就是为什么您的 temporalgraph2 声明有效但 temporalgraph 无效的原因。

【讨论】:

以上是关于Flink Gelly 扩展边缘类并在 DataSet 中使用的主要内容,如果未能解决你的问题,请参考以下文章

使用Flink和Gelly无法实现高CPU利用率

使用 Flink 和 Gelly 无法实现高 CPU 利用率

Flink Gelly 在计算期间更新图

Gelly Library 可以用于 Flink 中类似于 Spark 中的 Graph Frame 的图形查询吗

如何导入 Apache Flink SNAPSHOT 工件?

你可以扩展 SQLAlchemy 查询类并在同一个会话中使用不同的类吗?