Spark Python Pyspark 如何使用字典数组和嵌入式字典来展平列(sparknlp 注释器输出)

Posted

技术标签:

【中文标题】Spark Python Pyspark 如何使用字典数组和嵌入式字典来展平列(sparknlp 注释器输出)【英文标题】:Spark Python Pyspark How to flatten a column with an array of dictionaries and embedded dictionaries (sparknlp annotator output) 【发布时间】:2019-06-24 16:41:59 【问题描述】:

我正在尝试从 sparknlp 中提取输出(使用 Pretrained Pipeline 'explain_document_dl')。我花了很多时间寻找方法(UDF、爆炸等),但无法接近可行的解决方案。假设我想从entities 列中提取resultmetadata 下的值。在该列中有一个包含多个字典的数组

当我使用df.withColumn("entity_name", explode("entities.result")) 时,只提取第一个字典中的值。

“entities”列的内容是一个字典列表。

尝试提供可重现的示例/重新创建数据框(感谢下面@jonathan 提供的建议):

# content of one cell as an example:
d = ["annotatorType":"chunk","begin":2740,"end":2747,"result":"•Ability","metadata":"entity":"ORG","sentence":"8","chunk":"22","embeddings":[],"sentence_embeddings":[], "annotatorType":"chunk","begin":2740,"end":2747,"result":"Fedex","metadata":"entity":"ORG","sentence":"8","chunk":"22","embeddings":[],"sentence_embeddings":[]]

from pyspark.sql.types import StructType, StructField, StringType
from array import array
schema = StructType([StructField('annotatorType', StringType(), True),
                     StructField('begin', IntegerType(), True),
                     StructField('end', IntegerType(), True),
                     StructField('result', StringType(), True),
                     StructField('sentence', StringType(), True),
                     StructField('chunk', StringType(), True),
                     StructField('metadata', StructType((StructField('entity', StringType(), True),
                                                      StructField('sentence', StringType(), True),
                                                      StructField('chunk', StringType(), True)
                                                      )), True),
                     StructField('embeddings', StringType(), True),
                     StructField('sentence_embeddings', StringType(), True)
                    ]
                   )

df = spark.createDataFrame(d, schema=schema)
df.show()

在单个字典列表的情况下,它可以工作:

+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end|  result|sentence|chunk|    metadata|embeddings|sentence_embeddings|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
|        chunk| 2740|2747|•Ability|    null| null|[ORG, 8, 22]|        []|                 []|
|        chunk| 2740|2747|   Fedex|    null| null|[ORG, 8, 22]|        []|                 []|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+

但我不知道如何将其应用于一列,该列包含一些带有多个字典数组的单元格(因此原始单元格有多行)。

我尝试将相同的模式应用于entities 列,我必须先将该列转换为 json。

ent1 = ent1.withColumn("entities2", to_json("entities"))

它适用于具有 1 个字典数组的单元格,但将 null 提供给具有多个字典数组(第 4 行)的单元格:

ent1.withColumn("entities2", from_json("entities2", schema)).select("entities2.*").show()

+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end|result|sentence|chunk|    metadata|embeddings|sentence_embeddings|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
|        chunk|  166| 169|  Lyft|    null| null|[MISC, 0, 0]|        []|                 []|
|        chunk|   11|  14|  Lyft|    null| null|[MISC, 0, 0]|        []|                 []|
|        chunk|   52|  55|  Lyft|    null| null|[MISC, 1, 0]|        []|                 []|
|         null| null|null|  null|    null| null|        null|      null|               null|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+

想要的输出是

+-------------+-----+----+----------------+------------------------+----------+-------------------+
|annotatorType|begin| end|         result |    metadata            |embeddings|sentence_embeddings|
+-------------+-----+----+----------------+------------------------+----------+-------------------+
|        chunk|  166| 169|Lyft            |[MISC]                  |        []|                 []|
|        chunk|   11|  14|Lyft            |[MISC]                  |        []|                 []|
|        chunk|   52|  55|Lyft.           |[MISC]                  |        []|                 []|
|        chunk| [..]|[..]|[Lyft,Lyft,     |[MISC,MISC,MISC,        |        []|                 []| 
|             |     |    |FedEx Ground..] |ORG,LOC,ORG,ORG,ORG,ORG]|          |                   |     
+-------------+-----+----+----------------+------------------------+----------+-------------------+

我也尝试将每一行转换为 json,但我忘记了原来的行,并得到了扁平的儿子:

new_df = sqlContext.read.json(ent2.rdd.map(lambda r: r.entities2))
new_df.show()
+-------------+-----+----------+----+------------+----------------+-------------------+
|annotatorType|begin|embeddings| end|    metadata|          result|sentence_embeddings|
+-------------+-----+----------+----+------------+----------------+-------------------+
|        chunk|  166|        []| 169|[0, MISC, 0]|            Lyft|                 []|
|        chunk|   11|        []|  14|[0, MISC, 0]|            Lyft|                 []|
|        chunk|   52|        []|  55|[0, MISC, 1]|            Lyft|                 []|
|        chunk|    0|        []|  11| [0, ORG, 0]|    FedEx Ground|                 []|
|        chunk|  717|        []| 720| [1, LOC, 4]|            Dock|                 []|
|        chunk|  811|        []| 816| [2, ORG, 5]|          Parcel|                 []|
|        chunk| 1080|        []|1095| [3, ORG, 6]|Parcel Assistant|                 []|
|        chunk| 1102|        []|1108| [4, ORG, 7]|         • Daily|                 []|
|        chunk| 1408|        []|1417| [5, ORG, 8]|      Assistants|                 []|
+-------------+-----+----------+----+------------+----------------+-------------------+

我尝试应用 UDF 来遍历“实体”中的数组列表:

def flatten(my_dict):
    d_result = defaultdict(list)
    for sub in my_dict:
        val = sub['result']
        d_result["result"].append(val)
    return d_result["result"]
ent = ent.withColumn('result', flatten(df.entities))

TypeError: Column is not iterable

我发现这篇帖子Apache Spark Read JSON With Extra Columns 与我的问题非常相似,但是在将entities 列转换为json 之后,我仍然无法通过该帖子中提供的解决方案来解决它。

感谢任何帮助!理想情况下,python 中的解决方案,但 scala 中的示例也很有帮助!

【问题讨论】:

请阅读how to create reproducible Spark example和edit您的问题。 Spark NLP 使用大量自定义模式,了解上游步骤在这里至关重要。 我更新了我尝试重新创建 spark 数据帧,但我只能得到 NULL。我不熟悉火花数据框,所以可能缺少一些东西。我提供了架构作为参考。 【参考方案1】:

获得null 的原因是因为schema 变量并不完全代表您作为数据传入的字典列表

    from pyspark.shell import *
    from pyspark.sql.types import *

    schema = StructType([StructField('result', StringType(), True),
                 StructField('metadata', StructType((StructField('entity', StringType(), True),
                                                     StructField('sentence', StringType(), True),
                                                     StructField('chunk', StringType(), True))), True)])

    df = spark.createDataFrame(d1, schema=schema)
    df.show()

如果您更喜欢定制的解决方案,您可以尝试纯 python/pandas 方法

    import pandas as pd
    from pyspark.shell import *

    result = []
    metadata_entity = []
    for row in d1:
        result.append(row.get('result'))
        metadata_entity.append(row.get('metadata').get('entity'))

    schema = 'result': [result], 'metadata.entity': [metadata_entity]
    pandas_df = pd.DataFrame(schema)

    df = spark.createDataFrame(pandas_df)
    df.show()

    # specific columns
    df.select('result','metadata.entity').show()

编辑

恕我直言,在阅读了您尝试过的所有方法后,我认为 sc.parallelize 仍然可以解决相当复杂的情况。我没有您的原始变量,但我可以对您的图像进行 OCR 并从那里获取——尽管不再有 Clas-s-room TeacherInstructional 值。希望它对所有有用的人都有用。

您始终可以使用您需要的结构和get 其架构创建一个 mock 数据框

对于具有嵌套数据类型的复杂情况,您可以使用 SparkContext 并读取生成的 JSON 格式

    import itertools

    from pyspark.shell import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    # assume two lists in two dictionary keys to make four cells
    # since I don't have but entities2, I can just replicate it
    sample = 
        'single_list': ['annotatorType': 'chunk', 'begin': '166', 'end': '169', 'result': 'Lyft',
                         'metadata': 'entity': 'MISC', 'sentence': '0', 'chunk': '0', 'embeddings': [],
                         'sentence_embeddings': [],
                        'annotatorType': 'chunk', 'begin': '11', 'end': '14', 'result': 'Lyft',
                         'metadata': 'entity': 'MISC', 'sentence': '0', 'chunk': '0', 'embeddings': [],
                         'sentence_embeddings': [],
                        'annotatorType': 'chunk', 'begin': '52', 'end': '55', 'result': 'Lyft',
                         'metadata': 'entity': 'MISC', 'sentence': '1', 'chunk': '0', 'embeddings': [],
                         'sentence_embeddings': []],
        'frankenstein': [
            'annotatorType': 'chunk', 'begin': '0', 'end': '11', 'result': 'FedEx Ground',
             'metadata': 'entity': 'ORG', 'sentence': '0', 'chunk': '0', 'embeddings': [],
             'sentence_embeddings': [],
            'annotatorType': 'chunk', 'begin': '717', 'end': '720', 'result': 'Dock',
             'metadata': 'entity': 'LOC', 'sentence': '4', 'chunk': '1', 'embeddings': [],
             'sentence_embeddings': [],
            'annotatorType': 'chunk', 'begin': '811', 'end': '816', 'result': 'Parcel',
             'metadata': 'entity': 'ORG', 'sentence': '5', 'chunk': '2', 'embeddings': [],
             'sentence_embeddings': [],
            'annotatorType': 'chunk', 'begin': '1080', 'end': '1095', 'result': 'Parcel Assistant',
             'metadata': 'entity': 'ORG', 'sentence': '6', 'chunk': '3', 'embeddings': [],
             'sentence_embeddings': [],
            'annotatorType': 'chunk', 'begin': '1102', 'end': '1108', 'result': '* Daily',
             'metadata': 'entity': 'ORG', 'sentence': '7', 'chunk': '4', 'embeddings': [],
             'sentence_embeddings': [],
            'annotatorType': 'chunk', 'begin': '1408', 'end': '1417', 'result': 'Assistants',
             'metadata': 'entity': 'ORG', 'sentence': '8', 'chunk': '5', 'embeddings': [],
             'sentence_embeddings': []]
    

    # since they are structurally different, get two dataframes
    df_single_list = spark.read.json(sc.parallelize(sample.get('single_list')))
    df_frankenstein = spark.read.json(sc.parallelize(sample.get('frankenstein')))

    # print better the table first border
    print('\n')

    # list to create a dataframe schema
    annotatorType = []
    begin = []
    embeddings = []
    end = []
    metadata = []
    result = []
    sentence_embeddings = []

    # PEP8 here to have an UDF instead of lambdas
    # probably a dictionary with actions to avoid IF statements
    function_metadata = lambda x: [x.entity]
    for k, i in enumerate(df_frankenstein.columns):
        if i == 'annotatorType':
            annotatorType.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'begin':
            begin.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'embeddings':
            embeddings.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'end':
            end.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'metadata':
            _temp = list(map(function_metadata, df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect()))
            metadata.append(list(itertools.chain.from_iterable(_temp)))
        if i == 'result':
            result.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'sentence_embeddings':
            sentence_embeddings.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())

    # headers
    annotatorType_header = 'annotatorType'
    begin_header = 'begin'
    embeddings_header = 'embeddings'
    end_header = 'end'
    metadata_header = 'metadata'
    result_header = 'result'
    sentence_embeddings_header = 'sentence_embeddings'
    metadata_entity_header = 'metadata.entity'

    frankenstein_schema = StructType(
        [StructField(annotatorType_header, ArrayType(StringType())),
         StructField(begin_header, ArrayType(StringType())),
         StructField(embeddings_header, ArrayType(StringType())),
         StructField(end_header, ArrayType(StringType())),
         StructField(metadata_header, ArrayType(StringType())),
         StructField(result_header, ArrayType(StringType())),
         StructField(sentence_embeddings_header, ArrayType(StringType()))
         ])

    # list of lists of lists of lists of ... lists
    frankenstein_list = [[annotatorType, begin, embeddings, end, metadata, result, sentence_embeddings]]
    df_frankenstein = spark.createDataFrame(frankenstein_list, schema=frankenstein_schema)

    print(df_single_list.schema)
    print(df_frankenstein.schema)

    # let's see how it is
    df_single_list.select(
        annotatorType_header,
        begin_header,
        end_header,
        result_header,
        array(metadata_entity_header),
        embeddings_header,
        sentence_embeddings_header).show()

    # let's see again
    df_frankenstein.select(
        annotatorType_header,
        begin_header,
        end_header,
        result_header,
        metadata_header,
        embeddings_header,
        sentence_embeddings_header).show()

输出:

    StructType(List(StructField(annotatorType,StringType,true),StructField(begin,StringType,true),StructField(embeddings,ArrayType(StringType,true),true),StructField(end,StringType,true),StructField(metadata,StructType(List(StructField(chunk,StringType,true),StructField(entity,StringType,true),StructField(sentence,StringType,true))),true),StructField(result,StringType,true),StructField(sentence_embeddings,ArrayType(StringType,true),true)))
    StructType(List(StructField(annotatorType,ArrayType(StringType,true),true),StructField(begin,ArrayType(StringType,true),true),StructField(embeddings,ArrayType(StringType,true),true),StructField(end,ArrayType(StringType,true),true),StructField(metadata,ArrayType(StringType,true),true),StructField(result,ArrayType(StringType,true),true),StructField(sentence_embeddings,ArrayType(StringType,true),true)))

    +-------------+-----+---+------+----------------------+----------+-------------------+
    |annotatorType|begin|end|result|array(metadata.entity)|embeddings|sentence_embeddings|
    +-------------+-----+---+------+----------------------+----------+-------------------+
    |        chunk|  166|169|  Lyft|                [MISC]|        []|                 []|
    |        chunk|   11| 14|  Lyft|                [MISC]|        []|                 []|
    |        chunk|   52| 55|  Lyft|                [MISC]|        []|                 []|
    +-------------+-----+---+------+----------------------+----------+-------------------+
    +--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
    |       annotatorType|               begin|                 end|              result|            metadata|          embeddings| sentence_embeddings|
    +--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
    |[[chunk, chunk, c...|[[0, 717, 811, 10...|[[11, 720, 816, 1...|[[FedEx Ground, D...|[[ORG, LOC, ORG, ...|[[[], [], [], [],...|[[[], [], [], [],...|
    +--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+

您必须分别从每个数据帧中进行选择,因为它们的数据类型不同,但内容已准备好(如果我从输出中了解您的要求)可以使用

( ͡° ͜ʖ ͡°)

【讨论】:

谢谢乔纳森!我还发现我可以 json_schema = spark.read.json(df.rdd.map(lambda row: row.entities2)).schema 而不是写出来。但我的挑战仍然存在 - 对于具有多个字典的单元格,我无法使用 UDF 或使用 from_json 函数迭代它来获取值......有什么想法吗? 欢迎佩吉。我对sc.parallelize 方法做了一个小的更新。最后一部分只是将列显示为列表的一种快速(非标准)方式,但我相信它与事先开发的真正解决方案无关。我只是想重现输出。如果我不明白,请告诉我(: 您在我的屏幕截图上使用了 OCR 来重新创建我的示例?你是我的英雄。在您使用的示例中,sample 实际上只是数据框中的一行(在entities 列下)。所以我需要取那个单元格,展开到中间数据框(第一个output),然后把它放回去(第二个output)以匹配原始行。我可以将这些步骤合并到 UDF 中吗?我在帖子中更新了所需的输出。 大声笑没什么好吹嘘的。它只是一个基本的 PyTesseract 并用右方括号修剪/替换最后 9 个字符(:我将相应地更新答案。你的“期望输出”是最后一个吗?或者你需要另一列吗?我可以创建随机内容并使用UDF将它们全部组合。如果它不是敏感信息并且您认为可以共享它,那么请告诉我最终的输出及其结构。无论如何,同时我会用随机复杂的结构和一些模拟它无意义的数据 让我们看看这个自定义架构是否对您继续后续步骤有用。可悲的是,集合论中的联合定义一直存在这种限制。在 Spark 中它是相同的,这使得每列链接两个数据帧变得困难,因为数据类型不同。希望对你有帮助

以上是关于Spark Python Pyspark 如何使用字典数组和嵌入式字典来展平列(sparknlp 注释器输出)的主要内容,如果未能解决你的问题,请参考以下文章

Spark实践|如何让CDSW的PySpark自动适配Python版本

如何安装 pyspark 以在独立脚本中使用?

Spark Python Pyspark 如何使用字典数组和嵌入式字典来展平列(sparknlp 注释器输出)

如何在 Pyspark 中运行 Python 脚本

如何在 PySpark 中读取 Avro 文件

Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?