Spark Dataframe - 将行作为输入和数据框具有输出的方法

Posted

技术标签:

【中文标题】Spark Dataframe - 将行作为输入和数据框具有输出的方法【英文标题】:Spark Dataframe - Method to take row as input & dataframe has output 【发布时间】:2018-01-19 02:48:41 【问题描述】:

我需要编写一个方法来迭代 DF2 中的所有行并根据某些条件生成一个 Dataframe。

这是输入 DF1 和 DF2:

val df1Columns = Seq("Eftv_Date","S_Amt","A_Amt","Layer","SubLayer")
val df2Columns = Seq("Eftv_Date","S_Amt","A_Amt")
var df1 = List(
      List("2016-10-31","1000000","1000","0","1"),
      List("2016-12-01","100000","950","1","1"),
      List("2017-01-01","50000","50","2","1"),
      List("2017-03-01","50000","100","3","1"),
      List("2017-03-30","80000","300","4","1")
    )
      .map(row =>(row(0), row(1),row(2),row(3),row(4))).toDF(df1Columns:_*)

+----------+-------+-----+-----+--------+
| Eftv_Date|  S_Amt|A_Amt|Layer|SubLayer|
+----------+-------+-----+-----+--------+
|2016-10-31|1000000| 1000|    0|       1|
|2016-12-01| 100000|  950|    1|       1|
|2017-01-01|  50000|   50|    2|       1|
|2017-03-01|  50000|  100|    3|       1|
|2017-03-30|  80000|  300|    4|       1|
+----------+-------+-----+-----+--------+

val df2 = List(
  List("2017-02-01","0","400")
).map(row =>(row(0), row(1),row(2))).toDF(df2Columns:_*)

+----------+-----+-----+
| Eftv_Date|S_Amt|A_Amt|
+----------+-----+-----+
|2017-02-01|    0|  400|
+----------+-----+-----+

现在我需要编写一个方法,根据 DF2 每一行的 Eftv_Date 值过滤 DF1。 例如,df2.Eftv_date=Feb 01 2017 的第一行,所以需要过滤掉 Eftv_date 小于或等于 Feb 01 2017 记录的 df1。所以这将生成如下 3 条记录:

预期结果:

+----------+-------+-----+-----+--------+
| Eftv_Date|  S_Amt|A_Amt|Layer|SubLayer|
+----------+-------+-----+-----+--------+
|2016-10-31|1000000| 1000|    0|       1|
|2016-12-01| 100000|  950|    1|       1|
|2017-01-01|  50000|   50|    2|       1|
+----------+-------+-----+-----+--------+

我已经编写了如下方法并使用map函数调用它。

def transformRows(row: Row ) = 
  val dateEffective = row.getAs[String]("Eftv_Date")
  val df1LayerMet    =  df1.where(col("Eftv_Date").leq(dateEffective))
  df1 = df1LayerMet
  df1
 

val x = df2.map(transformRows)

但是在调用这个时我遇到了这个错误:

Error:(154, 24) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
val x = df2.map(transformRows)

注意:我们可以使用 join 来实现这个,但是我需要实现一个自定义的 scala 方法来做到这一点,因为这涉及到很多转换。为简单起见,我只提到了一个条件。

【问题讨论】:

您不能在远程代码中使用数据帧(在 transformRows 中) 如果 df2 很小,您可以收集它(生成 Array[Row]),然后映射到 transformRows,然后合并所有数据帧。 【参考方案1】:

看来你需要一个非 equi 连接:

df1.alias("a").join(
    df2.select("Eftv_Date").alias("b"), 
    df1("Eftv_Date") <= df2("Eftv_Date")          // non-equi join condition
).select("a.*").show
+----------+-------+-----+-----+--------+
| Eftv_Date|  S_Amt|A_Amt|Layer|SubLayer|
+----------+-------+-----+-----+--------+
|2016-10-31|1000000| 1000|    0|       1|
|2016-12-01| 100000|  950|    1|       1|
|2017-01-01|  50000|   50|    2|       1|
+----------+-------+-----+-----+--------+

【讨论】:

我知道我们可以进行非 equi 连接。但我需要通过包含大量转换的“自定义方法调用”来处理 不能嵌套分布式数据集。通过像您所做的那样嵌套转换,对于每个地图,您都会返回一个在自然界中分布的数据框,这会导致问题。所以这取决于你想要做什么,你可能必须想出一种方法来使用连接或类似的东西来转换数据。 仅供参考,如果您想转换df2 中每一行的匹配,一种可能是为df2 创建一个行ID,加入df1,然后按此分组id 并应用您的转换。 我没明白。基本上我需要循环 df1 的所有记录,根据 df2 的值执行一些转换。

以上是关于Spark Dataframe - 将行作为输入和数据框具有输出的方法的主要内容,如果未能解决你的问题,请参考以下文章

使用Python Dataframe将行与下一行进行比较并从结果中创建一个新列?

Spark dataFrame在更新其列后显示时间过长

spark:如何将行合并到 jsons 数组

Spark中的迭代RDD / Dataframe处理

如何将行转换为pyspark中的字典列表?

将行值转换为列,其值来自 spark scala 中的另一列 [重复]