将列表中的缺失元素作为每个时间窗口组的行插入到 DataFrame

Posted

技术标签:

【中文标题】将列表中的缺失元素作为每个时间窗口组的行插入到 DataFrame【英文标题】:Insert missing elements in list as Rows per Time-Window group to DataFrame 【发布时间】:2019-06-06 19:56:05 【问题描述】:

试图以编程方式解决这个问题......似乎是一个难题......基本上,如果传感器项目没有在时间序列时间戳间隔源数据中捕获,那么想要为每个丢失的传感器项目附加一行,并带有每个时间戳窗口的 NULL 值

# list of sensor items [have 300 plus; only showing 4 as example]
list = ["temp", "pressure", "vacuum", "burner"]

# sample data
df = spark.createDataFrame([('2019-05-10 7:30:05', 'temp', '99'),\
                            ('2019-05-10 7:30:05', 'burner', 'TRUE'),\
                            ('2019-05-10 7:30:10', 'vacuum', '.15'),\
                            ('2019-05-10 7:30:10', 'burner', 'FALSE'),\
                            ('2019-05-10 7:30:10', 'temp', '75'),\
                            ('2019-05-10 7:30:15', 'temp', '77'),\
                            ('2019-05-10 7:30:20', 'pressure', '.22'),\
                            ('2019-05-10 7:30:20', 'temp', '101'),], ["date", "item", "value"])
# current dilemma => all sensor items are not being captured / only updates to sensors are being captured in current back-end design streaming devices
+------------------+--------+-----+
|              date|    item|value|
+------------------+--------+-----+
|2019-05-10 7:30:05|    temp|   99|
|2019-05-10 7:30:05|  burner| TRUE|

|2019-05-10 7:30:10|  vacuum|  .15|
|2019-05-10 7:30:10|  burner|FALSE|
|2019-05-10 7:30:10|    temp|   75|

|2019-05-10 7:30:15|    temp|   77|

|2019-05-10 7:30:20|pressure|  .22|
|2019-05-10 7:30:20|    temp|  101|
+------------------+--------+-----+

想要捕获每个时间戳的每个传感器项目,以便可以在旋转数据帧之前执行前向填充插补 [300 多列上的前向填充导致 scala 错误 =>

Spark Caused by: java.lang.***Error Window Function?

# desired output
+------------------+--------+-----+
|              date|    item|value|
+------------------+--------+-----+
|2019-05-10 7:30:05|    temp|   99|
|2019-05-10 7:30:05|  burner| TRUE|
|2019-05-10 7:30:05|  vacuum| NULL|
|2019-05-10 7:30:05|pressure| NULL|

|2019-05-10 7:30:10|  vacuum|  .15|
|2019-05-10 7:30:10|  burner|FALSE|
|2019-05-10 7:30:10|    temp|   75|
|2019-05-10 7:30:10|pressure| NULL|

|2019-05-10 7:30:15|    temp|   77|
|2019-05-10 7:30:15|pressure| NULL|
|2019-05-10 7:30:15|  burner| NULL|
|2019-05-10 7:30:15|  vacuum| NULL|

|2019-05-10 7:30:20|pressure|  .22|
|2019-05-10 7:30:20|    temp|  101|
|2019-05-10 7:30:20|  vacuum| NULL|
|2019-05-10 7:30:20|  burner| NULL|
+------------------+--------+-----+

【问题讨论】:

这是一个想法(不确定效率如何)。使用所有项目的不同日期和数据框的 crossJoin 创建一个临时数据框:df.select('date').distinct().crossJoin(broadcast(spark.createDataFrame([('temp',), ('burner',), ('vacuum',), ('pressure',)], ["item"])))。现在将您的原始数据框正确连接到这个临时数据框 【参考方案1】:

扩展至my comment:

您可以将 DataFrame 与不同日期的笛卡尔积和sensor_list 正确连接。由于sensor_list很小,你可以broadcast它。

from pyspark.sql.functions import broadcast

sensor_list = ["temp", "pressure", "vacuum", "burner"]

df.join(
    df.select('date')\
        .distinct()\
        .crossJoin(broadcast(spark.createDataFrame([(x,) for x in sensor_list], ["item"]))),
    on=["date", "item"],
    how="right"
).sort("date", "item").show()
#+------------------+--------+-----+
#|              date|    item|value|
#+------------------+--------+-----+
#|2019-05-10 7:30:05|  burner| TRUE|
#|2019-05-10 7:30:05|pressure| null|
#|2019-05-10 7:30:05|    temp|   99|
#|2019-05-10 7:30:05|  vacuum| null|
#|2019-05-10 7:30:10|  burner|FALSE|
#|2019-05-10 7:30:10|pressure| null|
#|2019-05-10 7:30:10|    temp|   75|
#|2019-05-10 7:30:10|  vacuum|  .15|
#|2019-05-10 7:30:15|  burner| null|
#|2019-05-10 7:30:15|pressure| null|
#|2019-05-10 7:30:15|    temp|   77|
#|2019-05-10 7:30:15|  vacuum| null|
#|2019-05-10 7:30:20|  burner| null|
#|2019-05-10 7:30:20|pressure|  .22|
#|2019-05-10 7:30:20|    temp|  101|
#|2019-05-10 7:30:20|  vacuum| null|
#+------------------+--------+-----+

【讨论】:

这很有意义!非常感谢!我会在大数据集上试一试,让你知道它是怎么回事。 这种连接方法运行良好......因为它是广播的,所以没有发生随机播放,所以性能看起来是可行的。谢谢! 我说得太早了...性能还可以,但是explain plan 显示nestedBroadcastLoopJoin 我听说过关于...的恐怖故事...有什么见解吗?

以上是关于将列表中的缺失元素作为每个时间窗口组的行插入到 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

如何在sql表中查找缺失的元素

将列表的元素作为最后元素插入到python列表中的列表中

如何使用 OCaml 将两个列表中的每个单独元素压缩到一个列表中

将列表作为单个元素插入到元组中

在每个第 n 个元素之后插入 Python 列表中的元素

列表与元组的基本操作