如何解压缩字符串格式的列表列表?

Posted

技术标签:

【中文标题】如何解压缩字符串格式的列表列表?【英文标题】:How to unpack list of lists that are in string format? 【发布时间】:2020-12-25 21:39:44 【问题描述】:

我有一个 PySpark 数据框,其中有一列包含一个字符串类型的 StructField,它具有一个动态长度列表。

df.schema: StructType(List(StructField(id,StringType,true),StructField(recs,StringType,true)))

|id     | recs |

|ABC|[66, [["AB", 10]]]
|XYZ|[66, [["XY", 10], ["YZ", 20]]]
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]  

我正在尝试将列表扁平化为这样的内容

|id | like_id
|ABC|AB|
|XYZ|XY|
|XYZ|YZ|
|DEF|DE|
|DEF|EF|
|DEF|FG|

我做了什么:

我尝试使用数组表达式,它给我一个错误,因为 recs 是 StringType 的预期

我可以在 pandas 中使用 json 加载和 itertools 来处理这个问题,但我需要在 spark 中进行这个处理,因为数据帧很大,大约 3000 万,结果将是 10 倍。

df["recs"].apply(
        lambda x: [rec_id[0] for rec_id in json.loads(x)[1:][0]]
    )
    for i, row in df.iterrows():
        ....

【问题讨论】:

【参考方案1】:

IIUC,您可以使用模式, (?=\[\[)|\]$ 拆分 recs 列中的字符串,找到第二个元素,然后使用 from_json 检索数组数组:

from pyspark.sql import functions as F

df1 = df.withColumn('recs1', F.split('recs', ', (?=\[\[)|\]$')[1]) \
    .withColumn('recs2', F.from_json('recs1', 'array<array<string>>'))

其中:拆分模式, (?=\[\[)|\]$ 包含两个子模式:

逗号后跟空格,空格后面必须跟两个左括号, (?=\[\[) 在字符串\]$ 的末尾加上右括号

结果:

df1.show(truncate=False)
+---+------------------------------------------+------------------------------------+------------------------------+
|id |recs                                      |recs1                               |recs2                         |
+---+------------------------------------------+------------------------------------+------------------------------+
|ABC|[66, [["AB", 10]]]                        |[["AB", 10]]                        |[[AB, 10]]                    |
|XYZ|[66, [["XY", 10], ["YZ", 20]]]            |[["XY", 10], ["YZ", 20]]            |[[XY, 10], [YZ, 20]]          |
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]|[["DE", 10], ["EF", 20], ["FG", 30]]|[[DE, 10], [EF, 20], [FG, 30]]|
+---+------------------------------------------+------------------------------------+------------------------------+

那么你就可以使用explode得到想要的结果了:

df1.selectExpr("id", "explode_outer(recs2) as recs") \
    .selectExpr("id", "recs[0] as like_id") \
    .show()
+---+-------+
| id|like_id|
+---+-------+
|ABC|     AB|
|XYZ|     XY|
|XYZ|     YZ|
|DEF|     DE|
|DEF|     EF|
|DEF|     FG|
+---+-------+

总之,我们可以把上面的代码写成如下:

df_new = df.selectExpr("id", r"explode_outer(from_json(split(recs, ', (?=\\[\\[)|\\]$')[1], 'array<array<string>>')) as recs") \
    .selectExpr("id", "recs[0] as like_id")

【讨论】:

附带说明,from_json with schema=array&lt;array&lt;string&gt;&gt; 仅适用于 Spark 2.4+。在这个版本以下,你可以使用 split + regexp_replace + explode。【参考方案2】:

解决它的方法是清理列内容并拆分:

记得导入:

import pyspark.sql.functions as f
# Remove any non string character
df = df.withColumn('only_ids', f.trim(f.regexp_replace('recs', r'[^a-zA-Z\s:]', '')))

# Change blank spaces to commas
df = df.withColumn('clear_blank_spaces', f.regexp_replace('only_ids', r'\W+', ','))

# Split the values by comma
df = df.withColumn('like_ids', f.split('clear_blank_spaces', ','))

# Just for DEBUG
df.show(truncate=False)

# Explode like_ids to transform array to rows
df = df.select('id', f.explode('like_ids').alias('like_ids'))

# Final result
df.show(truncate=False)

第一个输出:

+---+------------------------------------------+----------+------------------+------------+
|id |recs                                      |only_ids  |clear_blank_spaces|like_ids    |
+---+------------------------------------------+----------+------------------+------------+
|ABC|[66, [["AB", 10]]]                        |AB        |AB                |[AB]        |
|XYZ|[66, [["XY", 10], ["YZ", 20]]]            |XY  YZ    |XY,YZ             |[XY, YZ]    |
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]|DE  EF  FG|DE,EF,FG          |[DE, EF, FG]|
+---+------------------------------------------+----------+------------------+------------+

第二次输出:

+---+--------+
|id |like_ids|
+---+--------+
|ABC|AB      |
|XYZ|XY      |
|XYZ|YZ      |
|DEF|DE      |
|DEF|EF      |
|DEF|FG      |
+---+--------+

【讨论】:

【参考方案3】:

如果您的数据真的看起来很干净,那么您可以通过 " 进行拆分,并在结果数组中获取具有奇数索引的条目。

import pyspark.sql.functions as F

df2 = df.select(
    'id',
    F.posexplode(F.split('recs', '"')).alias('pos', 'like_id')
).filter('cast(pos % 2 as boolean)').drop('pos')

df2.show()
+---+-------+
| id|like_id|
+---+-------+
|ABC|     AB|
|XYZ|     XY|
|XYZ|     YZ|
|DEF|     DE|
|DEF|     EF|
|DEF|     FG|
+---+-------+

【讨论】:

以上是关于如何解压缩字符串格式的列表列表?的主要内容,如果未能解决你的问题,请参考以下文章

linux zip命令 tar命令 压缩解压缩参数列表:

将十六进制值的列表/字符串解压缩为整数

如何在 Python 中解压缩 RDD 中每个项目的值(列表)?

011列表操作解压缩,字典压缩的是Key

将字典列表解压缩到 Pandas 中的单独列中

解压缩python列表时*做啥? [复制]