在 PySpark 数据框中拆分字符串

Posted

技术标签:

【中文标题】在 PySpark 数据框中拆分字符串【英文标题】:Split String in PySpark Dataframe 【发布时间】:2018-12-03 04:54:12 【问题描述】:

我想在 PySpark 数据框中拆分一列,该列(字符串类型)如下所示:

["quantity":25,"type":"coins","balance":35]
["balance":40,"type":"coins","quantity":25]
["quantity":2,"type":"column_breaker","balance":2,"quantity":2,"type":"row_breaker","balance":2,"quantity":2,"type":"single_block_breaker","balance":2,"quantity":1,"type":"rainbow","balance":1,"quantity":135,"type":"coins","balance":140]

所以他们中的一些人有一组"quantity, type, balance",而他们中的一些人有多个这样的条目。我尝试将其视为 JSON 变量并拆分:

schema = StructType(
[
    StructField('balance', StringType(), True),
    StructField('type', StringType(), True),
    StructField('quantity', StringType(), True)
 ]
 )

temp = merger.withColumn("data", 
from_json("items",schema)).select("items", col('data.*'))
display(temp)

但它只能将观察结果拆分为一组。我想要这样的输出

balance|quantity|type
   35  |   25   |coins
   40  |   25   |coins
.......

这样,一组观测值拆分为一个观测值,而多组观测值拆分为垂直放置的多个观测值。

另外,拆分成多行后,如何识别每个观察值?说,我有另一个变量是 ID,我该如何分配 ID?

【问题讨论】:

你能分享一下你想要的结果吗? 【参考方案1】:

如果每行有多个 JSON,则可以使用技巧将对象之间的逗号替换为换行符,并使用 explode 函数按换行符拆分。所以对于像这样的DF:

>>> df.show()
+-----------------+
|            items|
+-----------------+
|         "a": 1|
|"a": 2,"a": 3|
+-----------------+

这段代码完成了这项工作:

>>> from pyspark.sql.types import ArrayType, StringType
>>> from pyspark.sql.functions import udf, explode
>>> split_jsons = lambda jsons: jsons.replace(',', '\n').split('\n')
>>> df.withColumn('one_json_per_row', udf(split_jsons, ArrayType(StringType()))('items')) \
...    .select(explode('one_json_per_row').alias('item')).show()
+--------+
|    item|
+--------+
|"a": 1|
|"a": 2|
|"a": 3|
+--------+

那么就可以使用正则代码了

【讨论】:

非常感谢 cmets。我收到一条错误消息:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 47911.0 中的任务 0 失败 4 次,最近一次失败:阶段 47911.0 中丢失任务 0.3(TID 4961002、10.45.197.163、执行者1993):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后):【参考方案2】:

您可以使用 json 库并使用 rdd.flatMap() 将 json 字符串数组解析并分解为多行

import json

data = [("[\"quantity\":25,\"type\":\"coins\",\"balance\":35]",),
         ("[\"balance\":40,\"type\":\"coins\",\"quantity\":25]",),
    ("[\"quantity\":2,\"type\":\"column_breaker\",\"balance\":2,\"quantity\":2,\"type\":\"row_breaker\",\"balance\":2,\"quantity\":2,\"type\":\"single_block_breaker\",\"balance\":2,\"quantity\":1,\"type\":\"rainbow\",\"balance\":1,\"quantity\":135,\"type\":\"coins\",\"balance\":140]",)]

schema = StructType([StructField("items", StringType(), True)])
df = spark.createDataFrame(data,schema)

def transformRow(row):
    jsonObj = json.loads(row[0])
    rows = [Row(**item) for item in jsonObj]
    return rows

df.rdd.flatMap(transformRow).toDF().show()

输出

+-------+--------+--------------------+
|balance|quantity|                type|
+-------+--------+--------------------+
|     35|      25|               coins|
|     40|      25|               coins|
|      2|       2|      column_breaker|
|      2|       2|         row_breaker|
|      2|       2|single_block_breaker|
|      1|       1|             rainbow|
|    140|     135|               coins|
+-------+--------+--------------------+

【讨论】:

以上是关于在 PySpark 数据框中拆分字符串的主要内容,如果未能解决你的问题,请参考以下文章

如何拆分对象列表以分隔pyspark数据框中的列

将列表的列拆分为同一 PySpark 数据框中的多列

在单个 spark 数据框中减去两个字符串列的最佳 PySpark 实践是啥?

如何在 PySpark 中为数据框中的所有列替换字符串值与 NULL?

从 pyspark 数据框中的列中提取特定字符串

在 python 或 Pyspark 数据框中使用特殊字符重命名列