PySpark 递归键搜索

Posted

技术标签:

【中文标题】PySpark 递归键搜索【英文标题】:PySpark recursive key search 【发布时间】:2018-06-14 15:25:49 【问题描述】:

我有一个深度嵌套的 json esque 结构,我需要在所有级别(最多 7 个)中搜索所有出现的给定键。级别 0 中始终存在数据,我需要将其与在任何级别找到的 search_key 的每次出现相关联。我尝试通过递归调用推送这些数据并在返回时附加它,但是当我将它从标准 Python 移动到 PySpark RDD 时,我遇到了堆和不可散列的类型问题。

我的搜索功能如下:

def search(input, search_key, results):

    if input:
        for i in input:
            if isinstance(i, list):
                search(i, search_key, results)

            elif isinstance(i, dict):
                for k, v in i.iteritems():
                    if k == search_key:
                        results.append(i)
                        continue
                    elif isinstance(v, list):
                        search(v, search_key, results)
                    elif isinstance(v, dict):
                        search(v, search_key, results)

        return results

我一直用以下方式称呼它:

origin_rdd = sc.parallelize(origin)
concept_lambda = lambda c: search(c, term, [])
results = origin_rdd.flatMap(concept_lambda)

谁能建议一种方法来捕获***数据并将其作为结果中每个对象的一部分?结果可能是 0 到 n,因此 7 个键的乘积始终出现在顶层,然后是搜索词集合的所有出现。然后,我想将生成的 RDD 中的每一行转换为 PySpark Row 以与 PySpark DataFrame 一起使用。我还没有找到从 DataFrame 开始而不是 RDD 或将搜索功能应用于 DataFrame 列的好方法,因为该结构在其架构中是高度动态的,但如果有人认为这是一条更好的路线,我很高兴听到建议。

【问题讨论】:

如果您知道您最多只有 7 个级别,为什么不尝试迭代解决方案而不是递归解决方案?此外,带有一些小样本数据和所需输出的minimal reproducible example 也会有所帮助。 谢谢@pault 我会努力获取示例。我从一个只有 3 层的集合开始,因为它已经从 2 层反弹到 7 层嵌套。不幸的是,我没有生成需要解析的输入数据,也没有更好地定义输入。 【参考方案1】:

当我在搜索中遇到问题时,我能够在使用 deepcopy 的同时通过切片和传递基础来解决我的问题。尝试做类似事情的其他人可以调整下面的切片。

origin_rdd = sc.parallelize(origin)
concept_lambda = lambda r: search(r[-1], r[0:9], term, [])
results = origin_rdd.flatMap(concept_lambda)

搜索功能

def search(input, row_base, search_key, results):    
    if input:
        for i in input:
            if isinstance(i, list):
                search(i, row_base, search_key, results)
            if isinstance(i, dict):
                for k, v in iteritems(i):
                    if k == search_key:
                        row = copy.deepcopy(row_base)
                        row.append(i)
                        results.append(row)
                        continue
                    elif isinstance(v, list):
                        search(v, row_base, search_key, results)
                    elif isinstance(v, dict):
                        search(v, row_base, search_key, results)

    return results

【讨论】:

以上是关于PySpark 递归键搜索的主要内容,如果未能解决你的问题,请参考以下文章

映射 dict(来自 rdd)以递归方式更改 Python/PySpark 中的列名

pyspark - 左连接,随机行匹配键

Pyspark - 每个键添加缺失值?

将嵌套字典键值转换为 pyspark 数据框

使用键名过滤pyspark中的字典

PySpark 计数 groupby 与 None 键