Spark 1.6 scala 创建数据行

Posted

技术标签:

【中文标题】Spark 1.6 scala 创建数据行【英文标题】:Spark 1.6 scala create data rows 【发布时间】:2016-11-26 19:27:47 【问题描述】:

我有以下代码。

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val baseDF = sqlContext.read.json(fileFullPath)

我的 json 有 2 个感兴趣的字段:ProductId 和 Quantity。我在寻找什么


    "sales": 
        "saledate": "17Mar2008",
        "sale": [
            "productid": 1,
            "quantity": 10
        , 
            "productid": 2,
            "quantity": 1
        , 
            "productid": 3,
            "quantity": 3
        , 
            "productid": 4,
            "quantity": 5
        ]
    

我想将其更改为具有 2 列、productid 和数量但基于数量的多行的 spark RDD 或 DF。我想要每个数量 1 个。

在上面的示例中,产品 1 有 10 行,产品 2 有 1 行,产品 3 有 3 行,产品 4 有 5 行,总共 19 行,即 # rows = sum(quantity)。

任何帮助表示赞赏。我正在使用 spark 1.6.2 和 scala。

【问题讨论】:

请重新格式化您的问题,目前完全无法阅读 对不起......堆栈上的第一篇文章......谢谢:@gasparms 没问题 - 我写了它,因为其他人可以对问题投反对票,因为格式不好;) 【参考方案1】:

这应该做的事情:

import org.apache.spark.sql.functions._

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._

val baseDF = sqlContext.read.json(fileFullPath)
val listFromQuantity = udf  quantity: Int => List.fill(quantity)(quantity) 

baseDF.select(explode($"sales.sale")).select($"col.productId", explode(listFromQuantity($"col.quantity"))).show()

返回:

+---------+--------+
|productId|quantity|
+---------+--------+
|        1|      10|
|        1|      10|
|        1|      10|
|        1|      10|
|        1|      10|
|        1|      10|
|        1|      10|
|        1|      10|
|        1|      10|
|        1|      10|
|        2|       1|
|        3|       3|
|        3|       3|
|        3|       3|
|        4|       5|
|        4|       5|
|        4|       5|
|        4|       5|
|        4|       5|
+---------+--------+

如果您希望在第二列中有一个数量(例如,值 1 而不是 5),您应该将 List.fill(quantity)(quantity) 替换为 List.fill(quantity)(1)

【讨论】:

以上是关于Spark 1.6 scala 创建数据行的主要内容,如果未能解决你的问题,请参考以下文章

窗口函数/scala/spark 1.6

Apache Spark:迭代数据帧行并通过 MutableList (Scala) 创建新数据帧

通过在spark中使用scala加载csv文件来创建数据帧

无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值

Spark/Scala 1.6 如何使用 dataframe groupby agg 来实现以下逻辑?

通过读取具有不同数据类型的 Scala 序列来创建 Spark 数据帧