Spark Structured Streaming - 新批次上的空字典
Posted
技术标签:
【中文标题】Spark Structured Streaming - 新批次上的空字典【英文标题】:Spark Structured Streaming - Empty dictionary on new batch 【发布时间】:2018-01-06 20:43:44 【问题描述】:在我的构造函数中,我初始化了一个空字典,然后在 udf 中,我使用从批处理中获得的新数据对其进行了更新。
我的问题是,在每个新批次中,字典又是空的。
如何绕过空步骤,以便新批次可以访问我已经添加到字典中的所有先前值?
import CharacteristicVector
import update_charecteristic_vector
class SomeClass(object):
def __init__(self):
self.grid_list =
def run_stream(self):
def update_grid_list(grid):
if grid not in self.grid_list:
grid_list[grid] =
if grid not in self.grid_list:
self.grid_list[grid] = CharacteristicVector()
self.grid_list[grid] = update_charecteristic_vector(self.grid_list[grid])
return self.grid_list[grid].Density
.
.
.
udf_update_grid_list = udf(update_grid_list, StringType())
grids_dataframe = hashed.select(
hashed.grid.alias('grid'),
update_list(hashed.grid).alias('Density')
)
query = grids_dataframe.writeStream.format("console").start()
query.awaitTermination()
【问题讨论】:
【参考方案1】:很遗憾,由于多种原因,此代码无法正常工作。即使使用单个批处理或批处理应用程序,它也只有在只有活动的 Python 工作进程时才能工作。此外,一般情况下,不可能有全局同步的统计信息,同时支持读取和写入。
您应该可以使用stateful transformations,但目前仅在 Java / Scala 中支持,接口仍处于试验阶段/发展阶段。
根据您的要求,您可以尝试在内存数据网格、键值存储或分布式缓存中使用。
【讨论】:
以上是关于Spark Structured Streaming - 新批次上的空字典的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming - 1
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?