在 Spark Scala 中动态创建数据帧
Posted
技术标签:
【中文标题】在 Spark Scala 中动态创建数据帧【英文标题】:Dynamically creating dataframes in Spark Scala 【发布时间】:2019-08-26 16:52:04 【问题描述】:我有几个列数据来自 Dataframe 1,在一个循环中(来自不同的行)。我想用所有这些不同的行/列数据创建一个 Dataframe 2。
以下是示例数据,我尝试使用 Seq:
var DF1 = Seq(
("11111111", "0101","6573","X1234",12763),
("44444444", "0148","8382","Y5678",-2883),
("55555555", "0154","5240","Z9011", 8003))
我想在上面的 Seq 中添加下面的 2 个动态行,然后使用最终的 Seq 创建一个 Dataframe。
("88888888", "1333","7020","DEF34",500)
("99999999", "1333","7020","GHI56",500)
Final Seq 或 Dataframe 应如下所示:
var DF3 = Seq(
("11111111", "0101","6573","X1234",12763),
("44444444", "0148","8382","Y5678",-2883),
("55555555", "0154","5240","Z9011", 8003),
("88888888", "1333","7020","DEF34",500),
("99999999", "1333","7020","GHI56",500))
使用 Seq 尝试了下面的代码,创建了案例类,以尽可能使用它。问题是当向 Seq 添加新行时,它会返回一个添加了新行的新 Seq。如何获取添加了新行的更新 Seq ?如果不是 Seq,使用 ArrayBuffer 是个好主意吗?
case class CreateDFTestCaseClass(ACCOUNT_NO: String, LONG_IND: String, SHORT_IND: String,SECURITY_ID: String, QUANTITY: Integer)
val sparkSession = SparkSession
.builder()
.appName("AllocationOneViewTest")
.master("local")
.getOrCreate()
val sc = sparkSession.sparkContext
import sparkSession.sqlContext.implicits._
def main(args: Array[String]): Unit =
var acctRulesPosDF = Seq(
("11111111", "0101","6573","X1234",12763),
("44444444", "0148","8382","Y5678",-2883),
("55555555", "0154","5240","Z9011", 8003))
acctRulesPosDF:+ ("88888888", "1333","7020","DEF34",500)
acctRulesPosDF:+ ("99999999", "1333","7020","GHI56",500))
var DF3 = acctRulesPosDF.toDF
DF3.show()
【问题讨论】:
【参考方案1】:这不是最优雅的方式,但要使您的代码尽可能与原始代码相似,您只需将结果分配回您的变量即可。
var acctRulesPosDF = Seq(
("11111111", "0101","6573","X1234",12763),
("44444444", "0148","8382","Y5678",-2883),
("55555555", "0154","5240","Z9011", 8003))
acctRulesPosDF = acctRulesPosDF:+ ("88888888", "1333","7020","DEF34",500)
acctRulesPosDF = acctRulesPosDF:+ ("99999999", "1333","7020","GHI56",500)
spark-shell 中的快速示例
scala> var acctRulesPosDF = Seq(
| ("11111111", "0101","6573","X1234",12763),
| ("44444444", "0148","8382","Y5678",-2883),
| ("55555555", "0154","5240","Z9011", 8003))
acctRulesPosDF: Seq[(String, String, String, String, Int)] = List((11111111,0101,6573,X1234,12763), (44444444,0148,8382,Y5678,-2883), (55555555,0154,5240,Z9011,8003))
scala> acctRulesPosDF = acctRulesPosDF:+ ("88888888", "1333","7020","DEF34",500)
acctRulesPosDF: Seq[(String, String, String, String, Int)] = List((11111111,0101,6573,X1234,12763), (44444444,0148,8382,Y5678,-2883), (55555555,0154,5240,Z9011,8003), (88888888,1333,7020,DEF34,500))
scala> acctRulesPosDF = acctRulesPosDF:+ ("99999999", "1333","7020","GHI56",500)
acctRulesPosDF: Seq[(String, String, String, String, Int)] = List((11111111,0101,6573,X1234,12763), (44444444,0148,8382,Y5678,-2883), (55555555,0154,5240,Z9011,8003), (88888888,1333,7020,DEF34,500), (99999999,1333,7020,GHI56,500))
scala> var DF3 = acctRulesPosDF.toDF
DF3: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 3 more fields]
scala> DF3.show()
+--------+----+----+-----+-----+
| _1| _2| _3| _4| _5|
+--------+----+----+-----+-----+
|11111111|0101|6573|X1234|12763|
|44444444|0148|8382|Y5678|-2883|
|55555555|0154|5240|Z9011| 8003|
|88888888|1333|7020|DEF34| 500|
|99999999|1333|7020|GHI56| 500|
+--------+----+----+-----+-----+
【讨论】:
【参考方案2】:即使您附加了新行,您得到相同的旧 Seq 的原因是,默认情况下导入的 Seq 类型为 scala.collection.immutable.Seq
(不会更改),除非您单独指定导入可变 Seq @987654322 @使用scala.collection.mutable.Seq
。因此,要么通过在 scala 中显式设置 import 来使用可变 Seq,要么按照@SCouto 在另一个答案中的建议进行操作。
【讨论】:
以上是关于在 Spark Scala 中动态创建数据帧的主要内容,如果未能解决你的问题,请参考以下文章
Spark多个动态聚合函数,countDistinct不起作用