从 JavaRDD<Row> 创建的 Spark DataFrame 将所有列数据复制到第一列

Posted

技术标签:

【中文标题】从 JavaRDD<Row> 创建的 Spark DataFrame 将所有列数据复制到第一列【英文标题】:Spark DataFrame created from JavaRDD<Row> copies all columns data into first column 【发布时间】:2015-07-22 19:59:26 【问题描述】:

我有一个 DataFrame,我需要将其转换为 JavaRDD&lt;Row&gt; 并返回 DataFrame 我有以下代码

DataFrame sourceFrame = hiveContext.read().format("orc").load("/path/to/orc/file");
//I do order by in above sourceFrame and then I convert it into JavaRDD
JavaRDD<Row> modifiedRDD = sourceFrame.toJavaRDD().map(new Function<Row,Row>(
    public Row call(Row row) throws Exception 
       if(row != null) 
           //updated row by creating new Row
           return RowFactory.create(updateRow);
       
      return null;
);
//now I convert above JavaRDD<Row> into DataFrame using the following
DataFrame modifiedFrame = sqlContext.createDataFrame(modifiedRDD,schema);

sourceFramemodifiedFrame 架构相同,当我调用 sourceFrame.show() 时,输出是预期的例如的第一列值假设源 DataFrame 有 3 列,如下所示

_col1    _col2    _col3
 ABC       10      DEF
 GHI       20      JKL

当我打印从 JavaRDD 转换而来的 modifiedFrame 时,它​​按以下顺序显示

_col1        _col2      _col3
ABC,10,DEF
GHI,20,JKL

如上所示,_col1 包含所有值,_col2 和 _col3 为空。不知道怎么回事。

【问题讨论】:

你的 updateRow 变量到底是什么? 我通过将 Row 对象转换为 java 对象集合列表来更新 Row,然后更新一个字段值,然后将该列表作为 RowFactory.create 的一部分返回 【参考方案1】:

正如我在问题评论中提到的那样;

这可能是因为将列表作为一个参数而发生的。

return RowFactory.create(updateRow);

在调查 Apache Spark 文档和源代码时;在那specifying schema example他们分别为所有列分配参数。只需大致调查一些源代码RowFactory.java 类,而 GenericRow 类没有分配那个参数。所以试着分别给行列的参数。

return RowFactory.create(updateRow.get(0),updateRow.get(1),updateRow.get(2)); // List Example

您可以尝试将列表转换为数组,然后作为参数传递。

YourObject[] updatedRowArray= new YourObject[updateRow.size()];
updateRow.toArray(updatedRowArray);
return RowFactory.create(updatedRowArray);

顺便说一句,RowFactory.create() 方法正在创建 Row 对象。 In Apache Spark documentation about Row object and RowFactory.create() method;

表示关系运算符的一行输出。允许通过序数进行通用访问,这将产生装箱开销 原语,以及本机原语访问。使用无效 用于检索 null 值的本机原始接口, 相反,用户必须在尝试检索之前检查 isNullAt 可能为 null 的值。

要创建一个新行,请在 Java 中使用 RowFactory.create() 或在 Java 中使用 Row.apply() 斯卡拉。

可以通过提供字段值来构造 Row 对象。示例:

导入 org.apache.spark.sql._

// 从值创建一个行。

行(value1, value2, value3, ...)

// 从值序列创建一行。

Row.fromSeq(Seq(value1, value2, ...))

根据文档;您还可以在分别创建 Row 对象时应用自己所需的算法来分隔行列。但我认为将列表转换为数组并将参数作为数组传递对您有用(我无法尝试请发布您的反馈,谢谢)。

【讨论】:

您好,感谢您的回复。实际上,为了简单起见,我有 45 个字段/列,我只显示了三列。 Row Factory.create() 接受变量参数,因此它接受列表 java 如果我手动传递所有值会有什么不同 您好,我刚刚根据您的要求改进了答案。 您好,非常感谢它转换 List int 数组。

以上是关于从 JavaRDD<Row> 创建的 Spark DataFrame 将所有列数据复制到第一列的主要内容,如果未能解决你的问题,请参考以下文章

将 JavaDStream<String> 转换为 JavaRDD<String>

如何将数据集转换为 JavaPairRDD?

从文字值创建 DataFrame 和 JavaRDD

为 Spark Rows 定义新模式

Spark SQL Java GenericRowWithSchema无法强制转换为java.lang.String

Spark 中的 JavaRDD<String>.foreach 之后 Arraylist 为空