在 pyspark 中通过检查加载数据框给了我空的数据框

Posted

技术标签:

【中文标题】在 pyspark 中通过检查加载数据框给了我空的数据框【英文标题】:Loading a dataframe with check in pyspark is giving me empty dataframe 【发布时间】:2020-01-17 13:33:30 【问题描述】:

我正在尝试使用 pyspark 在数据框中加载数据。这些文件采用镶木地板格式。我正在使用以下代码

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,BooleanType,DateType,TimestampType,LongType,FloatType,DoubleType,ArrayType,ShortType
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
import datetime
from pyspark import SparkContext
from pyspark import SQLContext
from datetime import datetime
from datetime import *
from datetime import date, timedelta as td
import datetime
from datetime import datetime
from pyspark import SparkContext
from pyspark.sql import HiveContext

import pandas as pd
daterange = pd.date_range('2019-12-01','2019-12-31')

df = sqlContext.createDataFrame(sc.emptyRDD())

for process_date in daterange:
try:
    name = 's3://location/process_date='.format(process_date.strftime("%Y-%m-%d"))+'/'
    print(name)
    x = spark.read.parquet(name)
    x = x.withColumn('process_date',lit(process_date.strftime("%Y-%m-%d")))
    x.show()
    df = df.union(x)
except:
    print("File doesnt exist for"+str(process_date.strftime("%Y-%m-%d")))

但是当我运行这段代码时, 我得到的输出 df 是一个空数据集,尽管有某些日期的数据,但我在所有日期范围内都收到异常打印消息。 谁能指导我做错了什么?

【问题讨论】:

How to create an empty DataFrame with a specified schema? name 变量的输出是什么?是否与 s3 文件夹名称匹配 是的,我已经检查了路径。路径与名称变量匹配 【参考方案1】:

我认为问题在于联合和一个过于宽泛的 except 子句。 仅当要联合的数据框的架构相同时,联合才会起作用。 因此,emptyDF.union(nonEmtpy) 会引发您在 except 子句中捕获的错误。

【讨论】:

架构是一致的,所以联合不应该引起问题。写入异常以查看该位置是否存在文件。因此,我的用例是在 df 中加载一系列日期的数据,同时检查数据是否存在 如果与空白 df 联合导致此问题,我可以创建一个结构架构并使用它来创建一个带有架构的空白 df 是的,看起来联合与空白是问题所在。您是否尝试过使用正确的架构创建空 df? 感谢您的建议。一旦我提供了架构,它就可以工作

以上是关于在 pyspark 中通过检查加载数据框给了我空的数据框的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中通过 JDBC 实现 SQL Server

无法在 wamp 中通过 Web 表单插入,但它在 xampp 中有效

无法在 MySQLi 中通过引用传递参数 [重复]

在 Pyspark 中合并 DataFrame

在MYSQL中获取10km距离的记录

错误 - 在 windows 10/8.1 机器中通过 anaconda 使用 python pyspark