在 PySpark 数据框中拆分字符串
Posted
技术标签:
【中文标题】在 PySpark 数据框中拆分字符串【英文标题】:Split String in PySpark Dataframe 【发布时间】:2018-12-03 04:54:12 【问题描述】:我想在 PySpark 数据框中拆分一列,该列(字符串类型)如下所示:
["quantity":25,"type":"coins","balance":35]
["balance":40,"type":"coins","quantity":25]
["quantity":2,"type":"column_breaker","balance":2,"quantity":2,"type":"row_breaker","balance":2,"quantity":2,"type":"single_block_breaker","balance":2,"quantity":1,"type":"rainbow","balance":1,"quantity":135,"type":"coins","balance":140]
所以他们中的一些人有一组"quantity, type, balance"
,而他们中的一些人有多个这样的条目。我尝试将其视为 JSON 变量并拆分:
schema = StructType(
[
StructField('balance', StringType(), True),
StructField('type', StringType(), True),
StructField('quantity', StringType(), True)
]
)
temp = merger.withColumn("data",
from_json("items",schema)).select("items", col('data.*'))
display(temp)
但它只能将观察结果拆分为一组。我想要这样的输出
balance|quantity|type
35 | 25 |coins
40 | 25 |coins
.......
这样,一组观测值拆分为一个观测值,而多组观测值拆分为垂直放置的多个观测值。
另外,拆分成多行后,如何识别每个观察值?说,我有另一个变量是 ID,我该如何分配 ID?
【问题讨论】:
你能分享一下你想要的结果吗? 【参考方案1】:如果每行有多个 JSON,则可以使用技巧将对象之间的逗号替换为换行符,并使用 explode
函数按换行符拆分。所以对于像这样的DF:
>>> df.show()
+-----------------+
| items|
+-----------------+
| "a": 1|
|"a": 2,"a": 3|
+-----------------+
这段代码完成了这项工作:
>>> from pyspark.sql.types import ArrayType, StringType
>>> from pyspark.sql.functions import udf, explode
>>> split_jsons = lambda jsons: jsons.replace(',', '\n').split('\n')
>>> df.withColumn('one_json_per_row', udf(split_jsons, ArrayType(StringType()))('items')) \
... .select(explode('one_json_per_row').alias('item')).show()
+--------+
| item|
+--------+
|"a": 1|
|"a": 2|
|"a": 3|
+--------+
那么就可以使用正则代码了
【讨论】:
非常感谢 cmets。我收到一条错误消息:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 47911.0 中的任务 0 失败 4 次,最近一次失败:阶段 47911.0 中丢失任务 0.3(TID 4961002、10.45.197.163、执行者1993):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后):【参考方案2】:您可以使用 json 库并使用 rdd.flatMap() 将 json 字符串数组解析并分解为多行
import json
data = [("[\"quantity\":25,\"type\":\"coins\",\"balance\":35]",),
("[\"balance\":40,\"type\":\"coins\",\"quantity\":25]",),
("[\"quantity\":2,\"type\":\"column_breaker\",\"balance\":2,\"quantity\":2,\"type\":\"row_breaker\",\"balance\":2,\"quantity\":2,\"type\":\"single_block_breaker\",\"balance\":2,\"quantity\":1,\"type\":\"rainbow\",\"balance\":1,\"quantity\":135,\"type\":\"coins\",\"balance\":140]",)]
schema = StructType([StructField("items", StringType(), True)])
df = spark.createDataFrame(data,schema)
def transformRow(row):
jsonObj = json.loads(row[0])
rows = [Row(**item) for item in jsonObj]
return rows
df.rdd.flatMap(transformRow).toDF().show()
输出
+-------+--------+--------------------+
|balance|quantity| type|
+-------+--------+--------------------+
| 35| 25| coins|
| 40| 25| coins|
| 2| 2| column_breaker|
| 2| 2| row_breaker|
| 2| 2|single_block_breaker|
| 1| 1| rainbow|
| 140| 135| coins|
+-------+--------+--------------------+
【讨论】:
以上是关于在 PySpark 数据框中拆分字符串的主要内容,如果未能解决你的问题,请参考以下文章
在单个 spark 数据框中减去两个字符串列的最佳 PySpark 实践是啥?