pyspark:使用别名选择列

Posted

技术标签:

【中文标题】pyspark:使用别名选择列【英文标题】:pyspark: select column using an alias 【发布时间】:2017-03-07 21:22:05 【问题描述】:

我试图在 spark 1.6 中使用 SQLContext.sql 从别名中进行简单的选择。

sqlCtx = SQLContext(sc)
## Import CSV File
header = (sc.textFile("data.csv")
          .map(lambda line: [x for x in line.split(",")]))

## Convert RDD to DF, specify column names
headerDF = header.toDF(['header', 'adj', 'desc'])

## Convert Adj Column to numeric
headerDF = headerDF.withColumn("adj", headerDF['adj'].cast(DoubleType()))

headerDF.registerTempTable("headerTab")
head = sqlCtx.sql("select d.desc from headerTab as d").show()

我注意到这似乎在 Spark 2.0 中有效,但我目前仅限于 1.6。

这是我看到的错误消息。对于一个简单的选择,我可以删除别名,但最终我试图对具有相同列名的多个表进行连接。

Spark 1.6 错误

Traceback (most recent call last):
  File "/home/temp/text_import.py", line 49, in <module>
    head = sqlCtx.sql("select d.desc from headerTab as d").show()
  File "/home/pricing/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/context.py", line 580, in sql
  File "/home/pricing/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/home/pricing/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
  File "/home/pricing/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o64.sql.
: java.lang.RuntimeException: [1.10] failure: ``*'' expected but `desc' found

Spark 2.0 回归

+--------+
|    desc|
+--------+
|    data|
|    data|
|    data|

【问题讨论】:

您的问题在于desc,因为desc 是用于排序的关键字。所以试试这个 head = sqlCtx.sql("select d.desc from headerTab as d").show() 我无法理解您的建议有何不同,但我确实理解您对关键字的意思。我将列名更改为 descTmp,它按设计工作。感谢您的帮助,我会发布答案。 【参考方案1】:

基本上你在列名中使用关键字desc,这是不合适的。您可以通过 2 种方式解决此问题,要么更改列名,要么在关键字 desc 周围使用符号 (`)。

方式一:-

sqlCtx = SQLContext(sc)
## Import CSV File
header = (sc.textFile("data.csv")
          .map(lambda line: [x for x in line.split(",")]))

## Convert RDD to DF, specify column names
headerDF = header.toDF(['header', 'adj', 'description'])

## Convert Adj Column to numeric
headerDF = headerDF.withColumn("adj", headerDF['adj'].cast(DoubleType()))

headerDF.registerTempTable("headerTab")
head = sqlCtx.sql("select d.description from headerTab as d").show()

方式2:-

sqlCtx = SQLContext(sc)
## Import CSV File
header = (sc.textFile("data.csv")
          .map(lambda line: [x for x in line.split(",")]))

## Convert RDD to DF, specify column names
headerDF = header.toDF(['header', 'adj', 'desc'])

## Convert Adj Column to numeric
headerDF = headerDF.withColumn("adj", headerDF['adj'].cast(DoubleType()))

headerDF.registerTempTable("headerTab")
head = sqlCtx.sql("select d.`desc` from headerTab as d").show()

【讨论】:

你有更好的答案。接受你的回答。谢谢! @JestonBlu 谢谢,您可以使用 rakeshkaswan8356@gmail.com 保持联系【参考方案2】:

正如问题下方的评论中所述,使用 desc 是不合适的,因为它是一个关键字。更改列的名称可以解决此问题。

## Convert RDD to DF, specify column names
headerDF = header.toDF(['header', 'adj', 'descTmp'])

## Convert Adj Column to numeric
headerDF = headerDF.withColumn("adj", headerDF['adj'].cast(DoubleType()))

headerDF.registerTempTable("headerTab")
head = sqlCtx.sql("select d.descTmp from headerTab as d").show()

+-----------+
|    descTmp|
+-----------+
|       data|
|       data|
|       data|

【讨论】:

伟人,2票给你

以上是关于pyspark:使用别名选择列的主要内容,如果未能解决你的问题,请参考以下文章

基于列 pyspark 的条件列选择

Pyspark:内部连接两个 pyspark 数据帧并从第一个数据帧中选择所有列,从第二个数据帧中选择几列

使用 spark-xml 从 pyspark 数据框中选择嵌套列

为每组 pyspark RDD/dataframe 选择随机列

连接后如何在 Pyspark Dataframe 中选择和排序多个列

Pyspark减去而不选择列[重复]