pyspark:使用别名选择列
Posted
技术标签:
【中文标题】pyspark:使用别名选择列【英文标题】:pyspark: select column using an alias 【发布时间】:2017-03-07 21:22:05 【问题描述】:我试图在 spark 1.6 中使用 SQLContext.sql 从别名中进行简单的选择。
sqlCtx = SQLContext(sc)
## Import CSV File
header = (sc.textFile("data.csv")
.map(lambda line: [x for x in line.split(",")]))
## Convert RDD to DF, specify column names
headerDF = header.toDF(['header', 'adj', 'desc'])
## Convert Adj Column to numeric
headerDF = headerDF.withColumn("adj", headerDF['adj'].cast(DoubleType()))
headerDF.registerTempTable("headerTab")
head = sqlCtx.sql("select d.desc from headerTab as d").show()
我注意到这似乎在 Spark 2.0 中有效,但我目前仅限于 1.6。
这是我看到的错误消息。对于一个简单的选择,我可以删除别名,但最终我试图对具有相同列名的多个表进行连接。
Spark 1.6 错误
Traceback (most recent call last):
File "/home/temp/text_import.py", line 49, in <module>
head = sqlCtx.sql("select d.desc from headerTab as d").show()
File "/home/pricing/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/context.py", line 580, in sql
File "/home/pricing/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/home/pricing/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/home/pricing/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o64.sql.
: java.lang.RuntimeException: [1.10] failure: ``*'' expected but `desc' found
Spark 2.0 回归
+--------+
| desc|
+--------+
| data|
| data|
| data|
【问题讨论】:
您的问题在于desc
,因为desc
是用于排序的关键字。所以试试这个 head = sqlCtx.sql("select d.desc
from headerTab as d").show()
我无法理解您的建议有何不同,但我确实理解您对关键字的意思。我将列名更改为 descTmp,它按设计工作。感谢您的帮助,我会发布答案。
【参考方案1】:
基本上你在列名中使用关键字desc
,这是不合适的。您可以通过 2 种方式解决此问题,要么更改列名,要么在关键字 desc 周围使用符号 (`)。
方式一:-
sqlCtx = SQLContext(sc)
## Import CSV File
header = (sc.textFile("data.csv")
.map(lambda line: [x for x in line.split(",")]))
## Convert RDD to DF, specify column names
headerDF = header.toDF(['header', 'adj', 'description'])
## Convert Adj Column to numeric
headerDF = headerDF.withColumn("adj", headerDF['adj'].cast(DoubleType()))
headerDF.registerTempTable("headerTab")
head = sqlCtx.sql("select d.description from headerTab as d").show()
方式2:-
sqlCtx = SQLContext(sc)
## Import CSV File
header = (sc.textFile("data.csv")
.map(lambda line: [x for x in line.split(",")]))
## Convert RDD to DF, specify column names
headerDF = header.toDF(['header', 'adj', 'desc'])
## Convert Adj Column to numeric
headerDF = headerDF.withColumn("adj", headerDF['adj'].cast(DoubleType()))
headerDF.registerTempTable("headerTab")
head = sqlCtx.sql("select d.`desc` from headerTab as d").show()
【讨论】:
你有更好的答案。接受你的回答。谢谢! @JestonBlu 谢谢,您可以使用 rakeshkaswan8356@gmail.com 保持联系【参考方案2】:正如问题下方的评论中所述,使用 desc 是不合适的,因为它是一个关键字。更改列的名称可以解决此问题。
## Convert RDD to DF, specify column names
headerDF = header.toDF(['header', 'adj', 'descTmp'])
## Convert Adj Column to numeric
headerDF = headerDF.withColumn("adj", headerDF['adj'].cast(DoubleType()))
headerDF.registerTempTable("headerTab")
head = sqlCtx.sql("select d.descTmp from headerTab as d").show()
+-----------+
| descTmp|
+-----------+
| data|
| data|
| data|
【讨论】:
伟人,2票给你以上是关于pyspark:使用别名选择列的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:内部连接两个 pyspark 数据帧并从第一个数据帧中选择所有列,从第二个数据帧中选择几列
使用 spark-xml 从 pyspark 数据框中选择嵌套列
为每组 pyspark RDD/dataframe 选择随机列