无法在 spark/pyspark 中创建数组文字
Posted
技术标签:
【中文标题】无法在 spark/pyspark 中创建数组文字【英文标题】:Unable to create array literal in spark/pyspark 【发布时间】:2017-01-06 18:22:10 【问题描述】:我在尝试根据要过滤的两列项目列表从数据框中删除行时遇到了麻烦。例如对于这个数据框:
df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id'])
df.show()
+------+------+---+
|number|letter| id|
+------+------+---+
| 100| A|304|
| 200| B|305|
| 300| C|306|
+------+------+---+
我可以使用isin
轻松删除一列上的行:
df.where(~col('number').isin([100, 200])).show()
+------+------+---+
|number|letter| id|
+------+------+---+
| 300| C|306|
+------+------+---+
但是当我尝试通过两列删除它们时,我得到了一个异常:
df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show()
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A]
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.functions.lit(functions.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
经过一番调查,我意识到问题的根本原因是从非原始类型创建文字。我在 pyspark 中尝试了以下代码:
lit((100, 'A'))
lit([100, 'A'])
以及 scala-spark 中的以下内容:
lit((100, "A"))
lit(List(100, "A"))
lit(Seq(100, "A"))
lit(Array(100, "A"))
但是没有运气...有谁知道在 spark/pyspark 中创建数组文字的方法?还是有另一种方法可以按两列过滤数据框?
【问题讨论】:
【参考方案1】:首先你可能想要struct
而不是arrays
。请记住,Spark SQL 不支持异构数组,因此 array(1, 'a')
被强制转换为 array<string>
。
所以查询可能如下所示:
choices = [(100, 'A'), (200, 'B')]
target = [
struct(
lit(number).alias("number").cast("long"),
lit(letter).alias("letter").cast("string"))
for number, letter in choices]
query = struct("number", "letter").isin(target)
这似乎生成了有效的表达式:
query
Column<b'(named_struct(NamePlaceholder(), number, NamePlaceholder(), letter) IN (named_struct(col1, CAST(100 AS `number` AS BIGINT), col2, CAST(A AS `letter` AS STRING)), named_struct(col1, CAST(200 AS `number` AS BIGINT), col2, CAST(B AS `letter` AS STRING))))'>
但由于某种原因在分析仪上失败:
df.where(~query)
AnalysisException Traceback (most recent call last)
...
AnalysisException: "cannot resolve '(named_struct('number', `number`, 'letter', `letter`) IN (named_struct('col1', CAST(100 AS BIGINT), 'col2', CAST('A' AS STRING)), named_struct('col1', CAST(200 AS BIGINT), 'col2', CAST('B' AS STRING))))' due to data type mismatch: Arguments must be same type;;\n'Filter NOT named_struct(number, number#0L, letter, letter#1) IN (named_struct(col1, cast(100 as bigint), col2, cast(A as string)),named_struct(col1, cast(200 as bigint), col2, cast(B as string)))\n+- LogicalRDD [number#0L, letter#1, id#2L]\n"
奇怪的是,SQL 跟踪也失败了:
df.createOrReplaceTempView("df")
spark.sql("SELECT * FROM df WHERE struct(letter, letter) IN (struct(CAST(1 AS bigint), 'a'))")
AnalysisException: "cannot resolve '(named_struct('letter', df.`letter`, 'letter', df.`letter`) IN (named_struct('col1', CAST(1 AS BIGINT), 'col2', 'a')))' due to data type mismatch: Arguments must be same type; line 1 pos 46;\n'Project [*]\n+- 'Filter named_struct(letter, letter#1, letter, letter#1) IN (named_struct(col1, cast(1 as bigint), col2, a))\n +- SubqueryAlias df\n +- LogicalRDD [number#0L, letter#1, id#2L]\n"
但是当两边都换成文字时:
spark.sql("SELECT * FROM df WHERE struct(CAST(1 AS bigint), 'a') IN (struct(CAST(1 AS bigint), 'a'))")
DataFrame[number: bigint, letter: string, id: bigint]
工作正常,所以它看起来像一个错误。
据说左反连接在这里应该可以正常工作:
from pyspark.sql.functions import broadcast
df.join(
broadcast(spark.createDataFrame(choices, ("number", "letter"))),
["number", "letter"],
"leftanti"
)
+------+------+---+
|number|letter| id|
+------+------+---+
| 300| C|306|
+------+------+---+
【讨论】:
【参考方案2】:要在 spark 中创建数组字面量,您需要从一系列列创建一个数组,其中一列是从 lit
函数创建的:
scala> array(lit(100), lit("A"))
res1: org.apache.spark.sql.Column = array(100, A)
【讨论】:
问题是关于 pyspark,而不是 scala。 这个答案的提示!from pyspark.sql.functions import *; array(lit(100), lit("A"))
工作以上是关于无法在 spark/pyspark 中创建数组文字的主要内容,如果未能解决你的问题,请参考以下文章