如何使用 PySpark 从日常文件中加载滚动窗口?
Posted
技术标签:
【中文标题】如何使用 PySpark 从日常文件中加载滚动窗口?【英文标题】:How to use PySpark to load a rolling window from daily files? 【发布时间】:2017-04-27 13:54:16 【问题描述】:我有大量相当大的日常文件存储在博客存储引擎(S3、Azure datalake exc.. exc..)data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv
。我的目标是执行滚动 N 天线性回归,但我在数据加载方面遇到了麻烦。如果没有嵌套 RDD's.
,我不确定如何做到这一点
每个.csv
文件的架构都是相同的。
换句话说,对于每个日期d_t
,我需要数据x_t
并加入数据(x_t-1, x_t-2,... x_t-N)
。
如何使用 PySpark 加载这些日常文件的 N 天窗口?我能找到的所有 PySpark 示例似乎都是从一个非常大的文件或数据集加载的。
这是我当前代码的示例:
dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]
p = sc.parallelize(dates)
def test_run(date_range):
dt0 = date_range[-1] #get the latest date
s = '/daily/data.csv'
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
return 1
p.filter(test_run)
p.map(test_run) #fails with same error as p.filter
我正在使用 PySpark 版本 '2.1.0'
我在 Azure HDInsight 集群 jupyter notebook 上运行它。
spark
这里是<class 'pyspark.sql.session.SparkSession'>
类型
一个更小更可重现的例子如下:
p = sc.parallelize([1, 2, 3])
def foo(date_range):
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
return 1
p.filter(foo).count()
【问题讨论】:
在下面查看我的更新答案 @Pushkr 我再次更新它以使其更清晰,删除窗口函数调用并放置日期示例。只是那个例子对我来说失败了,我可以通过直接调用test_run(dates[0])
来运行它
您是否尝试过使用spark.read.csv(folder)
将所有数据直接加载到Dataframe 中,然后添加一个名为(file_name) .withColumn("filename", input_file_name())
的新列然后只需使用基于此列的分组来进一步操作数据框可以进行 N 天线性回归吗?
@Teodor-BogdanBarbieru 不,我没有尝试过,稍后会尝试,谢谢您的建议。
【参考方案1】:
您最好使用Dataframes
而不是RDD
。 Dataframe 的 read.csv
api 接受路径列表,例如 -
pathList = ['/path/to/data1900-01-01.csv','/path/to/data1900-01-02.csv']
df = spark.read.csv(pathList)
have a look at documentation for read.csv
您可以通过在 N 天的窗口上进行一些日期操作来形成数据文件的路径列表,例如 "path/to/data"+datetime.today().strftime("%Y-%m-%d"))+.csv"
(这只会让您获得今天的文件名,但不难计算出日期N天)
但是请记住,所有日期 csv 的架构都应该相同才能使上述工作。
编辑:当您并行化日期列表(即p
)时,每个日期都由不同的执行程序单独处理,因此 test_run2 的输入并不是真正的日期列表,它是一个单独的字符串,例如 1995-01-01
试试这个,看看这是否有效。
# Get the list of dates
date_range = window(dates, N)
s = '/daily/data.csv'
dt0 = date_range[-1] # most recent file
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
# read previous files
file_list = [s.format(dt) for dt in date_range[:-1]]
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
r, resid = computeLinearRegression(df0,df1)
r.write.save('daily/r.csv'.format(dt0))
resid.write.save('/daily/resid.csv'.format(dt0))
【讨论】:
我的理解是spark.read
本身就是一个RDD
操作,所以我不能从sc.parallelize
操作中调用spark.read
?
不,您可以在 sc.parallelize 中使用 spark.read。 sc.parallelize
创建 RDD,而 spark.read 创建 dataframe
如果您真的只希望这些文件为 RDD
,您可以使用 df.rdd
将数据帧转换为 rdd,这将创建 Rdd with Rows
。
当我尝试这个***.com/questions/40470487/…时收到这里描述的错误@
你能贴出你遇到这个错误时尝试的代码吗?
我在上面加了一个例子以上是关于如何使用 PySpark 从日常文件中加载滚动窗口?的主要内容,如果未能解决你的问题,请参考以下文章