在 Apache Spark Dataset<Row> 上应用 flatMap 操作时出现意外的编码器行为

Posted

技术标签:

【中文标题】在 Apache Spark Dataset<Row> 上应用 flatMap 操作时出现意外的编码器行为【英文标题】:Unexpected encoder behaviour when applying a flatMap operation on a Apache Spark Dataset<Row> 【发布时间】:2018-04-05 15:29:14 【问题描述】:

我正在尝试将实际包含双精度值的 csv 字符串转换为 spark-ml 兼容数据集。由于我事先不知道预期的功能数量,我决定使用一个帮助类“实例”,它已经包含分类器使用的正确数据类型,并且在其他一些情况下已经按预期工作:

public class Instance implements Serializable 
    /**
     * 
     */
    private static final long serialVersionUID = 6091606543088855593L;
    private Vector indexedFeatures;
    private double indexedLabel;
    ...getters and setters for both fields...

我得到意外行为的部分是这个:

    Encoder<Instance> encoder = Encoders.bean(Instance.class);
    System.out.println("encoder.schema()");
    encoder.schema().printTreeString();
    Dataset<Instance> dfInstance = df.select("value").as(Encoders.STRING())
              .flatMap(s -> 
                String[] splitted = s.split(",");

                int length = splitted.length;
                double[] features = new double[length-1];
                for (int i=0; i<length-1; i++) 
                    features[i] = Double.parseDouble(splitted[i]);
                

                if (length < 2) 
                    return Collections.emptyIterator();
                 else 
                    return Collections.singleton(new Instance( 
                        Vectors.dense(features), 
                        Double.parseDouble(splitted[length-1])
                        )).iterator();
                
              , encoder);

    System.out.println("dfInstance");
    dfInstance.printSchema();
    dfInstance.show(5);

我在控制台上得到以下输出:

encoder.schema()
root
 |-- indexedFeatures: vector (nullable = true)
 |-- indexedLabel: double (nullable = false)

dfInstance
root
 |-- indexedFeatures: struct (nullable = true)
 |-- indexedLabel: double (nullable = true)

+---------------+------------+
|indexedFeatures|indexedLabel|
+---------------+------------+
|             []|         0.0|
|             []|         0.0|
|             []|         1.0|
|             []|         0.0|
|             []|         1.0|
+---------------+------------+
only showing top 5 rows

编码器架构正确地将 indexedFeatures 行数据类型显示为向量。但是当我应用编码器并进行转换时,它会给我一行结构类型,不包含真实对象。

我想了解,为什么 Spark 会为我提供结构类型而不是正确的向量类型。

【问题讨论】:

【参考方案1】:

实际上,我的回答不是解释为什么你会得到一个结构类型。但是基于previous question,我可能可以提供一种解决方法。

原始输入用DataFrameReader's csv function解析,再次使用VectorAssembler:

Dataset<Row> csv = spark.read().option("inferSchema", "true")
  .csv(inputDf.select("value").as(Encoders.STRING()));
String[] fieldNames = csv.schema().fieldNames();    
VectorAssembler assembler = new VectorAssembler().setInputCols(
  Arrays.copyOfRange(fieldNames, 0, fieldNames.length-1))
  .setOutputCol("indexedFeatures");
Dataset<Row> result = assembler.transform(csv)
  .withColumn("indexedLabel", functions.col(fieldNames[fieldNames.length-1]))
  .select("indexedFeatures", "indexedLabel");

【讨论】:

非常感谢!那正在解决我的实际问题。你能给我一个提示,我可以在哪里学习这种火花编码?我努力使用官方文档,但大部分 java 部分的文档很少。你能推荐一本书、课程或任何我可以学习和学习的地方吗? 我同意 java 部分的文档不是最好的。通常,我使用 Scala 文档和代码示例,然后尝试将其转换为 Java。 我也尝试过,但有时并不那么容易。不过,非常感谢大家的帮助。

以上是关于在 Apache Spark Dataset<Row> 上应用 flatMap 操作时出现意外的编码器行为的主要内容,如果未能解决你的问题,请参考以下文章

如何在 apache spark 中同时使用 dataset.select 和 selectExpr

如何解决 Spark 中的“aggregateByKey 不是 org.apache.spark.sql.Dataset 的成员”?

Apache Spark :org.apache.spark.sql.Dataset.drop(String... colNames) 方法用于 Java

Apache Spark:SparkSQL的使用

使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet

Apache Spark 2.0三种API的传说:RDDDataFrame和Dataset