如何使用 PySpark 从日常文件中加载滚动窗口?

Posted

技术标签:

【中文标题】如何使用 PySpark 从日常文件中加载滚动窗口?【英文标题】:How to use PySpark to load a rolling window from daily files? 【发布时间】:2017-04-27 13:54:16 【问题描述】:

我有大量相当大的日常文件存储在博客存储引擎(S3、Azure datalake exc.. exc..)data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv。我的目标是执行滚动 N 天线性回归,但我在数据加载方面遇到了麻烦。如果没有嵌套 RDD's.,我不确定如何做到这一点 每个.csv 文件的架构都是相同的。

换句话说,对于每个日期d_t,我需要数据x_t 并加入数据(x_t-1, x_t-2,... x_t-N)

如何使用 PySpark 加载这些日常文件的 N 天窗口?我能找到的所有 PySpark 示例似乎都是从一个非常大的文件或数据集加载的。

这是我当前代码的示例:

dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]

p = sc.parallelize(dates)
def test_run(date_range):
    dt0 = date_range[-1] #get the latest date
    s = '/daily/data.csv'
    df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
    file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
    df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
    return 1

p.filter(test_run) 

p.map(test_run) #fails with same error as p.filter

我正在使用 PySpark 版本 '2.1.0'

我在 Azure HDInsight 集群 jupyter notebook 上运行它。

spark 这里是<class 'pyspark.sql.session.SparkSession'> 类型

一个更小更可重现的例子如下:

p = sc.parallelize([1, 2, 3])
def foo(date_range):
    df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
    return 1
p.filter(foo).count()

【问题讨论】:

在下面查看我的更新答案 @Pushkr 我再次更新它以使其更清晰,删除窗口函数调用并放置日期示例。只是那个例子对我来说失败了,我可以通过直接调用 test_run(dates[0]) 来运行它 您是否尝试过使用spark.read.csv(folder) 将所有数据直接加载到Dataframe 中,然后添加一个名为(file_name) .withColumn("filename", input_file_name()) 的新列然后只需使用基于此列的分组来进一步操作数据框可以进行 N 天线性回归吗? @Teodor-BogdanBarbieru 不,我没有尝试过,稍后会尝试,谢谢您的建议。 【参考方案1】:

您最好使用Dataframes 而不是RDD。 Dataframe 的 read.csv api 接受路径列表,例如 -

pathList = ['/path/to/data1900-01-01.csv','/path/to/data1900-01-02.csv']
df = spark.read.csv(pathList)

have a look at documentation for read.csv

您可以通过在 N 天的窗口上进行一些日期操作来形成数据文件的路径列表,例如 "path/to/data"+datetime.today().strftime("%Y-%m-%d"))+.csv" (这只会让您获得今天的文件名,但不难计算出日期N天)

但是请记住,所有日期 csv 的架构都应该相同才能使上述工作。

编辑:当您并行化日期列表(即p)时,每个日期都由不同的执行程序单独处理,因此 test_run2 的输入并不是真正的日期列表,它是一个单独的字符串,例如 1995-01-01

试试这个,看看这是否有效。

# Get the list of dates 
date_range = window(dates, N) 
s = '/daily/data.csv'

dt0 = date_range[-1] # most recent file
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM') 

# read previous files
file_list = [s.format(dt) for dt in date_range[:-1]]
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')

r, resid = computeLinearRegression(df0,df1)
r.write.save('daily/r.csv'.format(dt0))
resid.write.save('/daily/resid.csv'.format(dt0))

【讨论】:

我的理解是spark.read 本身就是一个RDD 操作,所以我不能从sc.parallelize 操作中调用spark.read 不,您可以在 sc.parallelize 中使用 spark.read。 sc.parallelize 创建 RDD,而 spark.read 创建 dataframe 如果您真的只希望这些文件为 RDD,您可以使用 df.rdd 将数据帧转换为 rdd,这将创建 Rdd with Rows 当我尝试这个***.com/questions/40470487/…时收到这里描述的错误@ 你能贴出你遇到这个错误时尝试的代码吗? 我在上面加了一个例子

以上是关于如何使用 PySpark 从日常文件中加载滚动窗口?的主要内容,如果未能解决你的问题,请参考以下文章

使用模式验证在 pyspark 中加载 geoJSON

ScrollView:如何从文档中加载图像

如何使用 JSON API 在滚动时在 recyclerview 中加载更多数据

在非 Spark 环境中加载 pyspark ML 模型

在滚动时在 recyclerview 中加载更多数据

如何在 QT 中加载 UI 表单?