从 Pyspark 中读取文件后模拟流数据

Posted

技术标签:

【中文标题】从 Pyspark 中读取文件后模拟流数据【英文标题】:Mimic Streaming data after Reading from File in Pyspark 【发布时间】:2019-10-11 09:53:45 【问题描述】:

所以我正在从文件中读取数据。类似的东西

 data = spark.read.format('orc').load('myfilepath')

我现在想遍历上面的每一行并创建两个数组。一种用于购买商品的客户,另一种用于销售商品的客户。我需要按他们购买商品的价格和购买时间以及如果有退货我想更新它/从数组中删除它来订购它。对于每个时间点,我都想查看库存。例如,假设我们有一个像这样的表的列表。

Item Bought Time | Item Price | Item Action        | Transaction Unique ID | Amount
     8.30             50          Bought               1                      2000
     8.31             51           Sold                2                      5000
     8.32             50       Bought Returned         1                      2000 
     8.33             52          Bought               3                      10000
     8.34             49          Bought               4                      3000 

所以我将以上内容作为数据框阅读。我想在每个时间点为购买的物品和出售的物品保留一个数组,并按价格和时间对它们进行排序,这样在任何给定的时间点,我都可以像上面的购买和出售的物品一样获得排序的数据。

类似

购买数组

    Time  Info
    8.30  [50,2000]
    8.31  [50,2000]
    8.32  []
    8.33  [52,10000]         
    8.34  [49,3000, 52,10000]

出售阵列

    Time  Info
    8.30  []
    8.31  [51,5000]
    8.32  [51,5000]
    8.33  [51,5000]         
    8.34  [51,5000]

每天大约有 500 万行,因此它也需要高性能。你能告诉我如何最好地做到这一点吗?

【问题讨论】:

您可以使用collect_list,但这一次只适用于一列。可以吗? 我会看看collect_list。也许会没事的 【参考方案1】:

您可以使用pyspark.sql.window 中的Window 并找到这些值的累积总和,然后调用collect_list

from pyspark.sql.window import Window
import pyspark.sql.functions as F

w = Window.partitionBy("Item Bought Time").orderBy("Item Bought Time")

df = df.withColumn("Cumsum_Price", F.sum("Item Price").over(w))
df_g = df.groupby("Item Bought Time").agg(F.collect_list("Cumsum_Price"))

如果您需要有关如何包含它的更多帮助,请指定确切的业务逻辑,这可以使用udf 完成

【讨论】:

我认为这实际上可能行不通。例如对于 ID2,它需要保留 ID1 中的所有内容,如果该项目已返回,则需要从列表中删除。因此有效地在 ID1 列表上添加了一些额外的逻辑并为 ID2 添加/删除。这可以实现吗? 那么你需要一个累计和吗?使用窗口函数相当容易。

以上是关于从 Pyspark 中读取文件后模拟流数据的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:我可以从数据块中读取来自谷歌云的文件吗?

Pyspark 从 csv 文件中读取 delta/upsert 数据集

PySpark 从目录中读取多个 txt 文件为 json 格式

从文件中读取规则并将这些规则应用于 pyspark 数据框行

从 pyspark 中的 HDFS 读取 70gb bson 文件然后将其索引到 Elastic 时出错

使用 pyspark 从 s3 位置读取镶木地板文件的文件夹到 pyspark 数据帧