在 pyspark 中保存批次的数据

Posted

技术标签:

【中文标题】在 pyspark 中保存批次的数据【英文标题】:Saving batches' data in pyspark 【发布时间】:2016-12-01 22:11:23 【问题描述】:

我需要从批次中积累一段时间的数据以供后期处理。我正在使用 Spark 1.6.3。 我需要以(tag, [[time, value],..] 的形式累积它们。 到目前为止我已经尝试过updateStateByKey:

time = [0]
def updateFunc(new_values, last_sum,time):
    time[0] += 5
    if time == 10:
        time = 0
        return None
    return (last_sum or []) + new_values

data = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, ['t','t1'])) \
                  .updateStateByKey(lambda x,y :updateFunc(x,y,time))
data.pprint()

正在添加数据。但是,尝试在 10 秒后刷新数据不起作用。 (我做错了)

我也尝试过使用window:

data= lines.flatMap(lambda lime: line.split(' ')\
    .map(lambda tag: (tag: ['time', 'value']))\
    .window(10, 2)\
    .reduceByKey(lambda x,y : y + x)`

但是,这会产生一个一维的长列表。这是没有用的。 有什么线索吗?谢谢。

【问题讨论】:

【参考方案1】:
items = lines.flatMap(lambda x: list(x)).map(lambda x: (x, [('time', 'value')]))
counts = items.reduceByKeyAndWindow(lambda x, y: x + y, invFunc=None, windowDuration=3, slideDuration=2)

试试这个

【讨论】:

以上是关于在 pyspark 中保存批次的数据的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 朴素贝叶斯批量使用拟合

在 Pyspark 中列出保存顺序的数据框列

Pyspark 无法保存包含大量列的数据框

如何使用pyspark将数据框保存在“.txt”文件中

如何在新数据到来时重新训练 pyspark 中保存的线性回归 ML 模型

找不到pyspark数据框保存到配置单元表