在 Pyspark 中旋转时无法解析列名

Posted

技术标签:

【中文标题】在 Pyspark 中旋转时无法解析列名【英文标题】:Cannot resolve column name when pivoting in Pyspark 【发布时间】:2021-09-01 08:09:42 【问题描述】:

这是我的数据集

reloadmonthly

DataFrame[year: string, month: string, msisdn: string, reload_min: double, reload_max: double, reload_avg: double, reload_sum: double, rembal_min: string, rembal_max: string, rembal_avg: double, rembal_sum: double, period: string, application_type: string, periodloan: string, ix: string, last_x_month: double]

reloadmonthly.show(2)

+----+-----+-------------+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+
|year|month|       msisdn|reload_min|reload_max|reload_avg|reload_sum|rembal_min|rembal_max|rembal_avg|rembal_sum|period|application_type|periodloan| ix|last_x_month|
+----+-----+-------------+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+
|2019|   10| 628176789488|    5000.0|    5000.0|    5000.0|    5000.0|    5189.0|    5189.0|    5189.0|    5189.0|201910|              10|    202001|  1|         1.0|
|2019|   10|6281802031321|   25000.0|   25000.0|   25000.0|   25000.0|   25633.0|   25633.0|   25633.0|   25633.0|201910|             100|    202001|  1|         2.0|
+----+-----+-------------+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+
only showing top 2 rows

这是我的代码

reloadid = reloadmonthly.dropDuplicates(["msisdn"])
reloadid = reloadid.join(

    packetmonthly.withColumn("p", F.expr("concat('reload_sum_l', last_x_month)"))
    .groupBy("msisdn")
    .pivot("p")
    .sum("reload_sum"),

    on=["msisdn"],
    how="left_outer",
)

这是错误信息

AnalysisException: 'Cannot resolve column name "reload_sum" among (year, month, msisdn, packet_min, packet_max, packet_avg, packet_sum, period, application_type, periodloan, ix, last_x_month, p);'

【问题讨论】:

@darkman 这是一个字符串 就像pandas中的add_prefix 【参考方案1】:

您在join 之前(内部)执行pivot。因此,您正在尝试旋转packetmonthly,它显然不包含任何列reload_sum(此列出现在reloadmonthly 中)。我编辑了您的代码以阐明您在联接中执行枢轴的部分。

也许,您只需要在枢轴之前进行连接 - 我无法真正测试,因为您没有给出 packetmonthly 的定义,但代码应该如下所示:

reloadid = (
    reloadid.join(
        packetmonthly,
        on=["msisdn"],
        how="left_outer",
    )
    .withColumn("p", F.expr("concat('reload_sum_l', last_x_month)"))
    .groupBy("msisdn")
    .pivot("p")
    .sum("reload_sum")
)

【讨论】:

我也是在你回答这个问题前几分钟才意识到我的错误 我很确定这一切都可以在单个 groupby 中完成,而不是 groubpy + join + pivot。 @darkman 这只是一个 join + pivot (group by 是 pivot 语法的一部分)。 我们不知道真实数据的大小,所以我们不能说它只是一个join + pivot + groupby。它可以通过使用数据集 api 进行改进。【参考方案2】:
package somePackage.notepad

import somePackage.SparkSessionTestWrapper
import org.apache.spark.sql.DataFrame
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.apache.spark.sql.functions.col, expr

class NotepadSpec extends AnyFunSuite with SparkSessionTestWrapper with Matchers  

  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._

  test ("Notepad test") 
    val inputData: Seq[NotepadInput] = Seq(
      NotepadInput(
        year = "2019",
        month =  "10",
        msisdn = "628176789488",
        reload_min = 5000.0,
        reload_max = 5000.0,
        reload_avg = 5000.0,
        reload_sum = 5000.0,
        rembal_min = "5189.0",
        rembal_max = "5189.0",
        rembal_avg = 5189.0,
        rembal_sum = 5189.0,
        period = "201910",
        application_type = "10",
        periodloan = "202001",
        ix = "1",
        last_x_month = 1.0,
      ),
      NotepadInput(
        year = "2019",
        month =  "10",
        msisdn = "6281802031321",
        reload_min = 25000.0,
        reload_max = 25000.0,
        reload_avg = 25000.0,
        reload_sum = 25000.0,
        rembal_min = "25633.0",
        rembal_max = "25633.0",
        rembal_avg = 25633.0,
        rembal_sum = 25633.0,
        period = "201910",
        application_type = "10",
        periodloan = "202001",
        ix = "1",
        last_x_month = 2.0,
      )
    )

    val df: DataFrame = inputData.toDF.dropDuplicates("msisdn")

    val pivotedDf: DataFrame =
      df
        .withColumn("p", expr("concat('reload_sum', last_x_month)"))
        .groupBy("msisdn")
        .pivot("p")
        .sum("reload_sum")

    val outDf: DataFrame = df
      .join(pivotedDf, Seq("msisdn"), "left_outer")

    println(outDf.show(false))
  




+-------------+----+-----+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+-------------+-------------+
|msisdn       |year|month|reload_min|reload_max|reload_avg|reload_sum|rembal_min|rembal_max|rembal_avg|rembal_sum|period|application_type|periodloan|ix |last_x_month|reload_sum1.0|reload_sum2.0|
+-------------+----+-----+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+-------------+-------------+
|6281802031321|2019|10   |25000.0   |25000.0   |25000.0   |25000.0   |25633.0   |25633.0   |25633.0   |25633.0   |201910|10              |202001    |1  |2.0         |null         |25000.0      |
|628176789488 |2019|10   |5000.0    |5000.0    |5000.0    |5000.0    |5189.0    |5189.0    |5189.0    |5189.0    |201910|10              |202001    |1  |1.0         |5000.0       |null         |
+-------------+----+-----+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+-------------+-------------+

【讨论】:

我不明白您是如何得出这个解决方案的。 packetmonthly 甚至没有在问题中定义......它是 Scala F.expr 表明它是 Pyspark,而 Pyspark 只是 Spark 的一个 API,它是用 Scala 编写的,因此我在 Scala 中做出了回应。你现在明白了吗? 这里没有定义 packetmonthly 没关系。重要的是示例方法和加入流程以获得正确的输出。只要您使用正确的架构,您就可以使用任何您想要的名称。 我不明白你为什么给我减号纯粹是因为你缺乏知识不知道 Pyspark 与 Scala 的关系。 reload_sum_l 不是packetmonthly 的一部分。在阅读"concat('reload_sum_l', last_x_month)" 时,您会看到reload_sum_l 是一个简单的字符串,而不是一个列。他正在一个字符串和last_x_month 列之间进行连接。但是你可以假设任何你想要的,并回答任何看起来正确的事情......

以上是关于在 Pyspark 中旋转时无法解析列名的主要内容,如果未能解决你的问题,请参考以下文章

AnalysisException,pyspark 无法解析数据框查询中的变量

使用 pyspark 解析 JSON 时嵌套动态模式不起作用

pyspark delta湖优化 - 无法解析SQL

两个 Spark DataFrame 的简单连接因“org.apache.spark.sql.AnalysisException:无法解析列名”而失败

org.apache.spark.sql.AnalysisException:给定pyspark中的输入列,无法解析'sub_tot`'

PYSPARK org.apache.spark.sql.AnalysisException:无法解析给定输入列的“INPUT__FILE__NAME”