通过迭代另一个数据框中的列表列来创建数据框

Posted

技术标签:

【中文标题】通过迭代另一个数据框中的列表列来创建数据框【英文标题】:Create a dataframe by iterating over column of list in another dataframe 【发布时间】:2021-10-07 15:30:33 【问题描述】:

在 pyspark 中,我有一个 DataFrame,其中有一列包含要通过的 有序节点列表

osmDF.schema
Out[1]:
 StructType(List(StructField(id,LongType,true),
                         StructField(nodes,ArrayType(LongType,true),true),
                         StructField(tags,MapType(StringType,StringType,true),true)))
osmDF.head(3)
Out[2]:
|     id    |                         nodes                       |         tags        |
|-----------|-----------------------------------------------------|---------------------|
| 62960871  | [783186590,783198852]                               | """foo"":""bar""" |
| 211528816 | [2215187080,2215187140,2215187205,2215187256]       | """foo"":""boo""" |
| 62960872  | [783198772,783183397,783167527,783169067,783198772] | """foo"":""buh""" |

我需要为节点列表中每个连续的 2 个节点组合创建一个数据框,然后将其保存为镶木地板。

预期结果的长度为 n-1,每行有 n 个len(nodes)。它看起来像这样(我将添加其他列):

|           id          |    from    |      to    |         tags        |
|-----------------------|------------|------------|---------------------|
| 783186590_783198852   | 783186590  | 783198852  | """foo"":""bar""" |
| 2215187080_2215187140 | 2215187080 | 2215187140 | """foo"":""boo""" |
| 2215187140_2215187205 | 2215187140 | 2215187205 | """foo"":""boo""" |
| 2215187205_2215187256 | 2215187205 | 2215187256 | """foo"":""boo""" |
| 783198772_783183397   | 783198772  | 783183397  | """foo"":""buh""" |
| 783183397_783167527   | 783183397  | 783167527  | """foo"":""buh""" |
| 783167527_783169067   | 783167527  | 783169067  | """foo"":""buh""" |
| 783169067_783198772   | 783169067  | 783198772  | """foo"":""buh""" |

我尝试使用以下方式启动

from pyspark.sql.functions import udf

def split_ways_into_arcs(row):
    arcs = []
    for node in range(len(row['nodes']) - 1):
      arc = dict()
      
      arc['id'] = str(row['nodes'][node]) + "_" + str(row['nodes'][node + 1])
      
      arc['from'] = row['nodes'][node]
      arc['to'] = row['nodes'][node + 1]
      arc['tags'] = row['tags']
      
      arcs.append(arc)

    return arcs

# Declare function as udf
split = udf(lambda row: split_ways_into_arcs(row.asDict()))

我遇到的问题是我不知道原始 DataFrame 的每一行中有多少个节点。 我知道如何应用 udf 向现有 DataFrame 添加一列,但不知道如何从 dicts 列表中创建一个新列。

【问题讨论】:

【参考方案1】:

之后使用transform 和explode 数组遍历nodes 数组:

from pyspark.sql import functions as F

df = ...

df.withColumn("nodes", F.expr("transform(nodes, (n,i) -> named_struct('from', nodes[i], 'to', nodes[i+1]))")) \
  .withColumn("nodes", F.explode("nodes")) \
  .filter("not nodes.to is null") \
  .selectExpr("concat_ws('_', nodes.to, nodes.from) as id", "nodes.*", "tags") \
  .show(truncate=False)

输出:

+---------------------+----------+----------+-----------------+
|id                   |from      |to        |tags             |
+---------------------+----------+----------+-----------------+
|783198852_783186590  |783186590 |783198852 |""foo"":""bar""|
|2215187140_2215187080|2215187080|2215187140|""foo"":""boo""|
|2215187205_2215187140|2215187140|2215187205|""foo"":""boo""|
|2215187256_2215187205|2215187205|2215187256|""foo"":""boo""|
|783183397_783198772  |783198772 |783183397 |""foo"":""buh""|
|783167527_783183397  |783183397 |783167527 |""foo"":""buh""|
|783169067_783167527  |783167527 |783169067 |""foo"":""buh""|
|783198772_783169067  |783169067 |783198772 |""foo"":""buh""|
+---------------------+----------+----------+-----------------+

【讨论】:

感谢您的回复。这里的想法是节点是要经过的有序点的列表。所以我正在寻找的是从第一个节点显示到第二个节点的第一行,从第二个节点显示到第三个节点的第二行,依此类推。所以预期的结果是 n - 1 行,每行有 n len(nodes)。 好的,我明白了。您的问题说明对于 2 个节点的每个组合。您可能需要改写这句话,以便其他回答者更清楚 好的,谢谢我正在编辑。 这就是为什么我在循环for node in range(len(row['nodes']) - 1) 我看到了你的更改,我已经更新了我的代码

以上是关于通过迭代另一个数据框中的列表列来创建数据框的主要内容,如果未能解决你的问题,请参考以下文章

迭代列表并通过函数传递结果并将结果保存在数据框中

循环转换/提取pandas DataFrame中的json数据不起作用

通过迭代另一列来创建一列

在数据框中迭代并保存每个股票历史数据,而无需在 CSV 中下载

根据另一个数据框/列表对数据框中的列进行子集化

通过数据框中的列表 len 复制行并将它们添加为索引