Spark Structured Streaming - 新批次上的空字典

Posted

技术标签:

【中文标题】Spark Structured Streaming - 新批次上的空字典【英文标题】:Spark Structured Streaming - Empty dictionary on new batch 【发布时间】:2018-01-06 20:43:44 【问题描述】:

在我的构造函数中,我初始化了一个空字典,然后在 udf 中,我使用从批处理中获得的新数据对其进行了更新。

我的问题是,在每个新批次中,字典又是空的。

如何绕过空步骤,以便新批次可以访问我已经添加到字典中的所有先前值?

import CharacteristicVector
import update_charecteristic_vector

class SomeClass(object):

    def __init__(self):
        self.grid_list = 

    def run_stream(self):   

        def update_grid_list(grid):
            if grid not in self.grid_list:
                grid_list[grid] = 
            if grid not in self.grid_list:
                self.grid_list[grid] = CharacteristicVector()
            self.grid_list[grid] = update_charecteristic_vector(self.grid_list[grid])
            return self.grid_list[grid].Density
        .
        .
        .

        udf_update_grid_list = udf(update_grid_list, StringType())
        grids_dataframe = hashed.select(
            hashed.grid.alias('grid'),
            update_list(hashed.grid).alias('Density')
        )

        query = grids_dataframe.writeStream.format("console").start()
        query.awaitTermination()

【问题讨论】:

【参考方案1】:

很遗憾,由于多种原因,此代码无法正常工作。即使使用单个批处理或批处理应用程序,它也只有在只有活动的 Python 工作进程时才能工作。此外,一般情况下,不可能有全局同步的统计信息,同时支持读取和写入。

您应该可以使用stateful transformations,但目前仅在 Java / Scala 中支持,接口仍处于试验阶段/发展阶段。

根据您的要求,您可以尝试在内存数据网格、键值存储或分布式缓存中使用。

【讨论】:

以上是关于Spark Structured Streaming - 新批次上的空字典的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming

Spark Structured Streaming

Spark Structured Streaming - 1

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录