PySpark DataFrame在使用explode之前将字符串的列更改为数组
Posted
技术标签:
【中文标题】PySpark DataFrame在使用explode之前将字符串的列更改为数组【英文标题】:PySpark DataFrame change column of string to array before using explode 【发布时间】:2018-11-27 10:05:07 【问题描述】:我的 spark DataFrame 中有一个名为 event_data 的列,格式为 json
,在使用 from_json
阅读后,我得到了这个架构:
root
|-- user_id: string (nullable = true)
|-- event_data: struct (nullable = true)
| |-- af_content_id: string (nullable = true)
| |-- af_currency: string (nullable = true)
| |-- af_order_id: long (nullable = true)
我只需要本专栏中的af_content_id
。该属性可以是不同的格式:
['ghhjj23','123546',12356]
无(有时 event_data 不包含 af_content_id
)
我想使用explode
函数来为af_content_id
中格式为List 的每个元素返回一个新行。但是当我应用它时,我得到一个错误:
from pyspark.sql.functions import explode
def get_content_id(column):
return column.af_content_id
df_transf_1 = df_transf_1.withColumn(
"products_basket",
get_content_id(df_transf_1.event_data)
)
df_transf_1 = df_transf_1.withColumn(
"product_id",
explode(df_transf_1.products_basket)
)
由于数据类型不匹配,无法解析 'explode(
products_basket
)':explode 函数的输入应该是数组或映射类型,而不是 StringType;
我知道原因,是因为af_content_id
字段可能包含的类型不同,但我不知道如何解决。直接在列上使用pyspark.sql.functions.array()
是行不通的,因为它变成了array 的array,并且explode 不会产生预期的结果。
重现我坚持的步骤的示例代码:
import pandas as pd
arr = [
['b5ad805c-f295-4852-82fc-961a88',12732936],
['0FD6955D-484C-4FC8-8C3F-DA7D28',['Gklb38','123655']],
['0E3D17EA-BEEF-4931-8104','12909841'],
['CC2877D0-A15C-4C0A-AD65-762A35C1',[12645715, 12909837, 12909837]]
]
df = pd.DataFrame(arr, columns = ['user_id','products_basket'])
df = df[['user_id','products_basket']].astype(str)
df_transf_1 = spark.createDataFrame(df)
我正在寻找一种将 products_basket 转换为唯一可能的格式的方法:Array,这样当我应用 explode
时,它会包含一个 id行。
【问题讨论】:
原因是af_content_id1
是StringType
,不能爆。它可能看起来有时像一个列表,有时像一个字符串,但它实际上始终是一个字符串。可能有解决方法 - 请提供一个小的 reproducible example 与您想要的输出。
【参考方案1】:
如果您从以下 DataFrame 开始:
df_transf_1.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id |products_basket |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88 |12732936 |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |['Gklb38', '123655'] |
#|0E3D17EA-BEEF-4931-8104 |12909841 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+
其中products_basket
列是StringType
:
df.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: string (nullable = true)
您不能在products_basket
上调用explode
,因为它不是数组或映射。
一种解决方法是删除所有前导/尾随方括号,然后在", "
上拆分字符串(逗号后跟一个空格)。这会将字符串转换为字符串数组。
from pyspark.sql.functions import col, regexp_replace, split
df_transf_new= df_transf_1.withColumn(
"products_basket",
split(regexp_replace(col("products_basket"), r"(^\[)|(\]$)|(')", ""), ", ")
)
df_transf_new.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id |products_basket |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88 |[12732936] |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |[Gklb38, 123655] |
#|0E3D17EA-BEEF-4931-8104 |[12909841] |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+
正则表达式模式匹配以下任何一项:
(^\[)
: 字符串开头的方括号
(\]$)
:字符串末尾的右方括号
(')
:任何单引号(因为你的字符串被引用了)
并用空字符串替换它们。
这假设您的数据在product_basket
内不包含任何需要的单引号或方括号。
split
之后,新DataFrame的schema是:
df_transf_new.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: array (nullable = true)
# | |-- element: string (containsNull = true)
现在您可以拨打explode
:
from pyspark.sql.functions import explode
df_transf_new.withColumn("product_id", explode("products_basket")).show(truncate=False)
#+--------------------------------+------------------------------+----------+
#|user_id |products_basket |product_id|
#+--------------------------------+------------------------------+----------+
#|b5ad805c-f295-4852-82fc-961a88 |[12732936] |12732936 |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |[Gklb38, 123655] |Gklb38 |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |[Gklb38, 123655] |123655 |
#|0E3D17EA-BEEF-4931-8104 |[12909841] |12909841 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12645715 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837 |
#+--------------------------------+------------------------------+----------+
【讨论】:
您必须在regexp_replace
之后调用split()
。这里不适合打电话给array
。
如果你将array()
应用到一个字符串,它将变成一个包含一个元素(字符串)的数组。 array()
函数不知道逗号应该用作分隔符。如果调用 split ,它会将字符串拆分为多个元素并返回一个数组。这有意义吗?
一个可能令人困惑的方面是,如果您调用 show()
,array("products_basket", regexp_replace(r"(^\[)|(\]$)|(')", ""))
和 split("products_basket", regexp_replace(r"(^\[)|(\]$)|(')", ""), " ,")
将以相同的方式打印到控制台,但底层数据不同.后者是你想要的,而前者只是一个包含一个字符串的数组。
@SarahData 可能是因为我在 ", "
上拆分,而您只想拆分 ","
(没有空格)。没有看到您的实际数据很难说,但似乎字符串没有被拆分,因为模式不匹配。您还可以传入正则表达式模式来拆分,如",(\s+)?"
,这意味着逗号后跟可选的空格。
是的,你说得对,我再次回到 split() 函数参数并验证了它需要什么作为输入和列上一步(仅应用 regexp_replace
之后)并理解它应该是","
而不是", "
。谢谢!以上是关于PySpark DataFrame在使用explode之前将字符串的列更改为数组的主要内容,如果未能解决你的问题,请参考以下文章
我应该在 PySpark 中选择 RDD 还是 DataFrame 之一?
如何在 jupyter 中像 pandas Dataframe 一样打印 Pyspark Dataframe
在 zeppelin 中使用从 %pyspark 到 %python 的 Dataframe