Spark Dataframe - 将行作为输入和数据框具有输出的方法
Posted
技术标签:
【中文标题】Spark Dataframe - 将行作为输入和数据框具有输出的方法【英文标题】:Spark Dataframe - Method to take row as input & dataframe has output 【发布时间】:2018-01-19 02:48:41 【问题描述】:我需要编写一个方法来迭代 DF2 中的所有行并根据某些条件生成一个 Dataframe。
这是输入 DF1 和 DF2:
val df1Columns = Seq("Eftv_Date","S_Amt","A_Amt","Layer","SubLayer")
val df2Columns = Seq("Eftv_Date","S_Amt","A_Amt")
var df1 = List(
List("2016-10-31","1000000","1000","0","1"),
List("2016-12-01","100000","950","1","1"),
List("2017-01-01","50000","50","2","1"),
List("2017-03-01","50000","100","3","1"),
List("2017-03-30","80000","300","4","1")
)
.map(row =>(row(0), row(1),row(2),row(3),row(4))).toDF(df1Columns:_*)
+----------+-------+-----+-----+--------+
| Eftv_Date| S_Amt|A_Amt|Layer|SubLayer|
+----------+-------+-----+-----+--------+
|2016-10-31|1000000| 1000| 0| 1|
|2016-12-01| 100000| 950| 1| 1|
|2017-01-01| 50000| 50| 2| 1|
|2017-03-01| 50000| 100| 3| 1|
|2017-03-30| 80000| 300| 4| 1|
+----------+-------+-----+-----+--------+
val df2 = List(
List("2017-02-01","0","400")
).map(row =>(row(0), row(1),row(2))).toDF(df2Columns:_*)
+----------+-----+-----+
| Eftv_Date|S_Amt|A_Amt|
+----------+-----+-----+
|2017-02-01| 0| 400|
+----------+-----+-----+
现在我需要编写一个方法,根据 DF2 每一行的 Eftv_Date 值过滤 DF1。 例如,df2.Eftv_date=Feb 01 2017 的第一行,所以需要过滤掉 Eftv_date 小于或等于 Feb 01 2017 记录的 df1。所以这将生成如下 3 条记录:
预期结果:
+----------+-------+-----+-----+--------+
| Eftv_Date| S_Amt|A_Amt|Layer|SubLayer|
+----------+-------+-----+-----+--------+
|2016-10-31|1000000| 1000| 0| 1|
|2016-12-01| 100000| 950| 1| 1|
|2017-01-01| 50000| 50| 2| 1|
+----------+-------+-----+-----+--------+
我已经编写了如下方法并使用map函数调用它。
def transformRows(row: Row ) =
val dateEffective = row.getAs[String]("Eftv_Date")
val df1LayerMet = df1.where(col("Eftv_Date").leq(dateEffective))
df1 = df1LayerMet
df1
val x = df2.map(transformRows)
但是在调用这个时我遇到了这个错误:
Error:(154, 24) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val x = df2.map(transformRows)
注意:我们可以使用 join 来实现这个,但是我需要实现一个自定义的 scala 方法来做到这一点,因为这涉及到很多转换。为简单起见,我只提到了一个条件。
【问题讨论】:
您不能在远程代码中使用数据帧(在 transformRows 中) 如果 df2 很小,您可以收集它(生成 Array[Row]),然后映射到 transformRows,然后合并所有数据帧。 【参考方案1】:看来你需要一个非 equi 连接:
df1.alias("a").join(
df2.select("Eftv_Date").alias("b"),
df1("Eftv_Date") <= df2("Eftv_Date") // non-equi join condition
).select("a.*").show
+----------+-------+-----+-----+--------+
| Eftv_Date| S_Amt|A_Amt|Layer|SubLayer|
+----------+-------+-----+-----+--------+
|2016-10-31|1000000| 1000| 0| 1|
|2016-12-01| 100000| 950| 1| 1|
|2017-01-01| 50000| 50| 2| 1|
+----------+-------+-----+-----+--------+
【讨论】:
我知道我们可以进行非 equi 连接。但我需要通过包含大量转换的“自定义方法调用”来处理 不能嵌套分布式数据集。通过像您所做的那样嵌套转换,对于每个地图,您都会返回一个在自然界中分布的数据框,这会导致问题。所以这取决于你想要做什么,你可能必须想出一种方法来使用连接或类似的东西来转换数据。 仅供参考,如果您想转换df2
中每一行的匹配,一种可能是为df2
创建一个行ID,加入df1
,然后按此分组id
并应用您的转换。
我没明白。基本上我需要循环 df1 的所有记录,根据 df2 的值执行一些转换。以上是关于Spark Dataframe - 将行作为输入和数据框具有输出的方法的主要内容,如果未能解决你的问题,请参考以下文章