为啥针对 S3 的 pyspark sql 查询返回空值

Posted

技术标签:

【中文标题】为啥针对 S3 的 pyspark sql 查询返回空值【英文标题】:Why is pyspark sql query against S3 returning nulls为什么针对 S3 的 pyspark sql 查询返回空值 【发布时间】:2019-01-19 01:14:32 【问题描述】:

在 Athena 中针对 S3 源运行相同查询与在 EMR (1 x 10) 集群上的 pyspark 脚本中执行相同查询时,我得到了不同的结果。我从 Athena 取回数据,但我得到的只是脚本的空值。关于原因的任何建议/想法/猜测?

这是 Athena 查询:

SELECT <real_col1> as reg_plate, <real_col1> as model_num
FROM <my Athena table name> 
WHERE partition_datetime LIKE '2019-01-01-14' 
limit 10

返回这个结果:

reg_plate   model_num
   515355  961-824
   515355  961-824
   515355  961-824
   515355  961-824
   341243  047-891
   727027  860-403
   619656  948-977
   576345  951-657
   576345  951-657
   113721  034-035

但是,当我将此查询作为脚本运行时,针对相同的 S3 源:

# Define SQL query
load_qry = """SELECT <real_col1> as reg_plate, <real_col2> as model_num
FROM s3_table
WHERE partition_datetime LIKE '2019-01-01-14'
limit 10  """

df1 = spark.read.parquet("<s3:path to my data>")
df1.createOrReplaceTempView("s3_table")

sqlDF = spark.sql(load_qry)
sqlDF.show(10)

我只得到空值,像这样

+---------+---------+
|reg_plate|model_num|
+---------+---------+
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
+---------+---------+

这是我的集群上的配置,即 1 个主 r3.xlarge 和 10 个 r3.xlarge 工作者:

这是我用来启动 spark 作业的命令字符串:PYSPARK_PYTHON=/opt/miniconda/bin/python nohup spark-submit --driver- memory 8g --executor-memory 30g --conf spark.executor.cores=8 --conf spark.driver.maxResultSize=8g --conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER --conf spark.debug.maxToStringFields=100 --conf spark.sql.hive.convertMetastoreParquet=false stk_overflow.py &gt; stk_oflow0120.txt 2&gt;&amp;1

【问题讨论】:

我无法复制错误。 df.show() 有什么显示吗? df.show 为每列显示 10 行“null”。 OP中的图像是屏幕截图。还编辑了我的帖子以包含集群配置。 或者对不起,我的意思是df1.show()。只是想看看 spark.read.parquet 是否可以读取任何内容 我在 df1 和 sqlDF 上都做了一个 printSchema(),它们都显示了相同的列和数据类型,所以我确信至少模式是正确的。我会回去看看 df1.show() 是否显示并在此处发布。感谢您对此的帮助。 df1.show() 给了我完全填充的行。我与 Athena 返回的数据进行了比较,发现源数据的值为“NULL”时,df1.show() 将该值正确返回为 NULL,但源数据在列/行中为空白,该列/行交集不是作为空白返回,而是在 df1.show() 中返回为“null”。我进一步确认源数据中这些列的数据具有实际值(如上面的 Athena qry 所示)并且不是空白的。 【参考方案1】:

我找到了一个简单的解决方案。

代替

load_qry = """SELECT <real_col1> as reg_plate, <real_col2> as model_num 
FROM s3_table WHERE partition_datetime LIKE '2019-01-01-14' limit 10 """ 
df1 = spark.read.parquet("<s3:path to my data>") 
df1.createOrReplaceTempView("s3_table") 

我用过

load_qry = """SELECT <real_col1> as reg_plate, <real_col2> as model_num
FROM <my_athena_db>.table WHERE partition_datetime LIKE '2019-01-01-14' 
df1 = spark.sql(load_qry)

这很有效,因为 Glue 知道如何访问“my_athena_db.table”

【讨论】:

以上是关于为啥针对 S3 的 pyspark sql 查询返回空值的主要内容,如果未能解决你的问题,请参考以下文章

为啥 pyspark.sql 下层函数不接受文字列名和长度函数呢?

为啥 pyspark sql 不能正确计算 group by 子句?

PYSPARK - 如何读取 S3 中所有子文件夹中的所有 csv 文件?

PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?

Pyspark:将 sql 查询转换为 pyspark?

为啥来自 s3 的 dask read_csv 保留了这么多内存?