在 Pyspark 中旋转时无法解析列名
Posted
技术标签:
【中文标题】在 Pyspark 中旋转时无法解析列名【英文标题】:Cannot resolve column name when pivoting in Pyspark 【发布时间】:2021-09-01 08:09:42 【问题描述】:这是我的数据集
reloadmonthly
DataFrame[year: string, month: string, msisdn: string, reload_min: double, reload_max: double, reload_avg: double, reload_sum: double, rembal_min: string, rembal_max: string, rembal_avg: double, rembal_sum: double, period: string, application_type: string, periodloan: string, ix: string, last_x_month: double]
reloadmonthly.show(2)
+----+-----+-------------+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+
|year|month| msisdn|reload_min|reload_max|reload_avg|reload_sum|rembal_min|rembal_max|rembal_avg|rembal_sum|period|application_type|periodloan| ix|last_x_month|
+----+-----+-------------+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+
|2019| 10| 628176789488| 5000.0| 5000.0| 5000.0| 5000.0| 5189.0| 5189.0| 5189.0| 5189.0|201910| 10| 202001| 1| 1.0|
|2019| 10|6281802031321| 25000.0| 25000.0| 25000.0| 25000.0| 25633.0| 25633.0| 25633.0| 25633.0|201910| 100| 202001| 1| 2.0|
+----+-----+-------------+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+
only showing top 2 rows
这是我的代码
reloadid = reloadmonthly.dropDuplicates(["msisdn"])
reloadid = reloadid.join(
packetmonthly.withColumn("p", F.expr("concat('reload_sum_l', last_x_month)"))
.groupBy("msisdn")
.pivot("p")
.sum("reload_sum"),
on=["msisdn"],
how="left_outer",
)
这是错误信息
AnalysisException: 'Cannot resolve column name "reload_sum" among (year, month, msisdn, packet_min, packet_max, packet_avg, packet_sum, period, application_type, periodloan, ix, last_x_month, p);'
【问题讨论】:
@darkman 这是一个字符串 就像pandas中的add_prefix 【参考方案1】:您在join
之前(内部)执行pivot
。因此,您正在尝试旋转packetmonthly
,它显然不包含任何列reload_sum
(此列出现在reloadmonthly
中)。我编辑了您的代码以阐明您在联接中执行枢轴的部分。
也许,您只需要在枢轴之前进行连接 - 我无法真正测试,因为您没有给出 packetmonthly
的定义,但代码应该如下所示:
reloadid = (
reloadid.join(
packetmonthly,
on=["msisdn"],
how="left_outer",
)
.withColumn("p", F.expr("concat('reload_sum_l', last_x_month)"))
.groupBy("msisdn")
.pivot("p")
.sum("reload_sum")
)
【讨论】:
我也是在你回答这个问题前几分钟才意识到我的错误 我很确定这一切都可以在单个 groupby 中完成,而不是 groubpy + join + pivot。 @darkman 这只是一个 join + pivot (group by 是 pivot 语法的一部分)。 我们不知道真实数据的大小,所以我们不能说它只是一个join + pivot + groupby。它可以通过使用数据集 api 进行改进。【参考方案2】:package somePackage.notepad
import somePackage.SparkSessionTestWrapper
import org.apache.spark.sql.DataFrame
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.apache.spark.sql.functions.col, expr
class NotepadSpec extends AnyFunSuite with SparkSessionTestWrapper with Matchers
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
test ("Notepad test")
val inputData: Seq[NotepadInput] = Seq(
NotepadInput(
year = "2019",
month = "10",
msisdn = "628176789488",
reload_min = 5000.0,
reload_max = 5000.0,
reload_avg = 5000.0,
reload_sum = 5000.0,
rembal_min = "5189.0",
rembal_max = "5189.0",
rembal_avg = 5189.0,
rembal_sum = 5189.0,
period = "201910",
application_type = "10",
periodloan = "202001",
ix = "1",
last_x_month = 1.0,
),
NotepadInput(
year = "2019",
month = "10",
msisdn = "6281802031321",
reload_min = 25000.0,
reload_max = 25000.0,
reload_avg = 25000.0,
reload_sum = 25000.0,
rembal_min = "25633.0",
rembal_max = "25633.0",
rembal_avg = 25633.0,
rembal_sum = 25633.0,
period = "201910",
application_type = "10",
periodloan = "202001",
ix = "1",
last_x_month = 2.0,
)
)
val df: DataFrame = inputData.toDF.dropDuplicates("msisdn")
val pivotedDf: DataFrame =
df
.withColumn("p", expr("concat('reload_sum', last_x_month)"))
.groupBy("msisdn")
.pivot("p")
.sum("reload_sum")
val outDf: DataFrame = df
.join(pivotedDf, Seq("msisdn"), "left_outer")
println(outDf.show(false))
+-------------+----+-----+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+-------------+-------------+
|msisdn |year|month|reload_min|reload_max|reload_avg|reload_sum|rembal_min|rembal_max|rembal_avg|rembal_sum|period|application_type|periodloan|ix |last_x_month|reload_sum1.0|reload_sum2.0|
+-------------+----+-----+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+-------------+-------------+
|6281802031321|2019|10 |25000.0 |25000.0 |25000.0 |25000.0 |25633.0 |25633.0 |25633.0 |25633.0 |201910|10 |202001 |1 |2.0 |null |25000.0 |
|628176789488 |2019|10 |5000.0 |5000.0 |5000.0 |5000.0 |5189.0 |5189.0 |5189.0 |5189.0 |201910|10 |202001 |1 |1.0 |5000.0 |null |
+-------------+----+-----+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+-------------+-------------+
【讨论】:
我不明白您是如何得出这个解决方案的。packetmonthly
甚至没有在问题中定义......它是 Scala
F.expr 表明它是 Pyspark,而 Pyspark 只是 Spark 的一个 API,它是用 Scala 编写的,因此我在 Scala 中做出了回应。你现在明白了吗?
这里没有定义 packetmonthly 没关系。重要的是示例方法和加入流程以获得正确的输出。只要您使用正确的架构,您就可以使用任何您想要的名称。
我不明白你为什么给我减号纯粹是因为你缺乏知识不知道 Pyspark 与 Scala 的关系。
reload_sum_l
不是packetmonthly
的一部分。在阅读"concat('reload_sum_l', last_x_month)"
时,您会看到reload_sum_l
是一个简单的字符串,而不是一个列。他正在一个字符串和last_x_month
列之间进行连接。但是你可以假设任何你想要的,并回答任何看起来正确的事情......以上是关于在 Pyspark 中旋转时无法解析列名的主要内容,如果未能解决你的问题,请参考以下文章
AnalysisException,pyspark 无法解析数据框查询中的变量
使用 pyspark 解析 JSON 时嵌套动态模式不起作用
两个 Spark DataFrame 的简单连接因“org.apache.spark.sql.AnalysisException:无法解析列名”而失败
org.apache.spark.sql.AnalysisException:给定pyspark中的输入列,无法解析'sub_tot`'
PYSPARK org.apache.spark.sql.AnalysisException:无法解析给定输入列的“INPUT__FILE__NAME”