PySpark 递归键搜索
Posted
技术标签:
【中文标题】PySpark 递归键搜索【英文标题】:PySpark recursive key search 【发布时间】:2018-06-14 15:25:49 【问题描述】:我有一个深度嵌套的 json esque 结构,我需要在所有级别(最多 7 个)中搜索所有出现的给定键。级别 0 中始终存在数据,我需要将其与在任何级别找到的 search_key
的每次出现相关联。我尝试通过递归调用推送这些数据并在返回时附加它,但是当我将它从标准 Python 移动到 PySpark RDD 时,我遇到了堆和不可散列的类型问题。
我的搜索功能如下:
def search(input, search_key, results):
if input:
for i in input:
if isinstance(i, list):
search(i, search_key, results)
elif isinstance(i, dict):
for k, v in i.iteritems():
if k == search_key:
results.append(i)
continue
elif isinstance(v, list):
search(v, search_key, results)
elif isinstance(v, dict):
search(v, search_key, results)
return results
我一直用以下方式称呼它:
origin_rdd = sc.parallelize(origin)
concept_lambda = lambda c: search(c, term, [])
results = origin_rdd.flatMap(concept_lambda)
谁能建议一种方法来捕获***数据并将其作为结果中每个对象的一部分?结果可能是 0 到 n,因此 7 个键的乘积始终出现在顶层,然后是搜索词集合的所有出现。然后,我想将生成的 RDD 中的每一行转换为 PySpark Row 以与 PySpark DataFrame 一起使用。我还没有找到从 DataFrame 开始而不是 RDD 或将搜索功能应用于 DataFrame 列的好方法,因为该结构在其架构中是高度动态的,但如果有人认为这是一条更好的路线,我很高兴听到建议。
【问题讨论】:
如果您知道您最多只有 7 个级别,为什么不尝试迭代解决方案而不是递归解决方案?此外,带有一些小样本数据和所需输出的minimal reproducible example 也会有所帮助。 谢谢@pault 我会努力获取示例。我从一个只有 3 层的集合开始,因为它已经从 2 层反弹到 7 层嵌套。不幸的是,我没有生成需要解析的输入数据,也没有更好地定义输入。 【参考方案1】:当我在搜索中遇到问题时,我能够在使用 deepcopy 的同时通过切片和传递基础来解决我的问题。尝试做类似事情的其他人可以调整下面的切片。
origin_rdd = sc.parallelize(origin)
concept_lambda = lambda r: search(r[-1], r[0:9], term, [])
results = origin_rdd.flatMap(concept_lambda)
搜索功能
def search(input, row_base, search_key, results):
if input:
for i in input:
if isinstance(i, list):
search(i, row_base, search_key, results)
if isinstance(i, dict):
for k, v in iteritems(i):
if k == search_key:
row = copy.deepcopy(row_base)
row.append(i)
results.append(row)
continue
elif isinstance(v, list):
search(v, row_base, search_key, results)
elif isinstance(v, dict):
search(v, row_base, search_key, results)
return results
【讨论】:
以上是关于PySpark 递归键搜索的主要内容,如果未能解决你的问题,请参考以下文章