SparkSQL + Java:使用数据集时将 Pojo 转换为表格格式

Posted

技术标签:

【中文标题】SparkSQL + Java:使用数据集时将 Pojo 转换为表格格式【英文标题】:SparkSQL + Java: Pojo to Tabular Format while working with Datasets 【发布时间】:2017-10-02 10:53:15 【问题描述】:

我对 Spark SQL 还是很陌生。在执行其中一项训练任务时,我遇到了以下问题并且找不到答案(以下所有示例都有些愚蠢,但出于演示目的应该仍然可以)。

我的应用读取 parquet 文件并根据其内容创建数据集:

DataFrame input = sqlContext.read().parquet("src/test/resources/integration/input/source.gz.parquet");
Dataset<Row> dataset = input.as(RowEncoder$.MODULE$.apply(input.schema()));

dataset.show() 调用导致:

+------------+----------------+--------+
+    Names   +       Gender   +   Age  +
+------------+----------------+--------+
| Jack, Jill |  Male, Female  | 30, 25 |

然后我将数据集转换成一个新的数据集,其中包含 Person 类型:

public static Dataset<Person> transformToPerson(Dataset<Row> rawData) 
    return rawData
            .flatMap((Row sourceRow) -> 
                // code to parse an input row and split person data goes here
                Person person1 = new Person(name1, gender1, age1);
                Person person2 = new Person(name2, gender2, age2);
                return Arrays.asList(person1, person2);
            , Encoders.bean(Person.class));

在哪里

public abstract class Human implements Serializable 
   protected String name;
   protected String gender;
   // getters/setters go here
   // default constructor + constructor with the name and gender params
 
 public class Person extends Human 
   private String age;
   // getters/setters for the age param go here
   // default constructor + constructor with the age, name and gender params
   // overriden toString() method which returns the string: (<name>, <gender>, <age>)
 

最后,当我展示我希望看到的数据集内容时

 +------------+----------------+--------+
 +    name    +       gender   +   age  +
 +------------+----------------+--------+
 |     Jack   |     Male       |   30   |
 |     Jill   |     Femail     |   25   |

但是,我明白了

+-------------------+----------------+--------+
+      name         +       gender   +   age  +
+-------------------+----------------+--------+
|(Jack, Male, 30)   |                |        |
|(Jill, Femail, 25) |                |        |

这是 toString() 方法的结果,而标题是正确的。 我认为编码器有问题,就好像我使用它显示的 Encoders.javaSerizlization(T) 或 Encoders.kryo(T) 一样

+------------------+
+        value     +
+------------------+
|(Jack, Male, 30)  |
|(Jill, Femail, 25)|

最让我担心的是编码器的错误使用可能会导致错误的 SerDe 和/或性能损失。 在我能找到的所有 Spark Java 示例中,我看不出有什么特别之处……

您能否建议我做错了什么?

更新 1

这是我的项目的依赖项:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

解决方案

正如 abaghel 建议的那样,我将版本升级到 2.0.2(请注意,在 2.0.0 版本上有 the bug for Windows),在我的代码中到处使用 Dataset 而不是 DataFrames(似乎 DataFrames 不是 Apache 的一部分Spark starting from 2.0.0),并使用基于迭代器的 flatMap 函数从 Row 转换为 Person。

只是为了分享,在版本 1.6.2 中使用 TraversableOnce-based flatMap 的方法对我不起作用,因为它引发了“MyPersonConversion$function1 not Serializable”异常。

现在一切都按预期进行。

【问题讨论】:

【参考方案1】:

您使用的是什么版本的 Spark?您提供的 flatMap 方法未使用 2.2.0 版编译。所需的返回类型是Iterator&lt;Person&gt;。请使用下面的 FlatMapFunction,您将获得所需的输出。

public static Dataset<Person> transformToPerson(Dataset<Row> rawData) 
    return rawData.flatMap(row -> 
        String[] nameArr = row.getString(0).split(",");
        String[] genArr = row.getString(1).split(",");
        String[] ageArr = row.getString(2).split(",");
        Person person1 = new Person(nameArr[0], genArr[0], ageArr[0]);
        Person person2 = new Person(nameArr[1], genArr[1], ageArr[1]);
        return Arrays.asList(person1, person2).iterator();
    , Encoders.bean(Person.class));


//Call function
Dataset<Person> dataset1 = transformToPerson(dataset);
dataset1.show();

【讨论】:

阿巴赫尔,谢谢!请参阅原始问题中的“更新 1”:我添加了详细信息 感谢@Dmitry 的更新。 Spark 1.6.2 版本中的 bean 编码器似乎存在一些问题。请尝试使用 Spark 2.0 及更高版本。

以上是关于SparkSQL + Java:使用数据集时将 Pojo 转换为表格格式的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL - 从 oracle 导入时将 oracle 日期数据类型错误转换为时间戳(java.sql)

转储数据集时将数据从 Hive 加载到 Pig 错误

如何防止 Azure ML Studio 在导入数据集时将特征列转换为 DateTime

将数据库代码移动到单独的程序集时将连接字符串放在哪里

使用 SparkSQL 读取多个 parquet 文件时将子文件夹作为列获取

sparksql写入表中bigint类型显示null