PySpark DataFrame在使用explode之前将字符串的列更改为数组

Posted

技术标签:

【中文标题】PySpark DataFrame在使用explode之前将字符串的列更改为数组【英文标题】:PySpark DataFrame change column of string to array before using explode 【发布时间】:2018-11-27 10:05:07 【问题描述】:

我的 spark DataFrame 中有一个名为 event_data 的列,格式为 json,在使用 from_json 阅读后,我得到了这个架构:

root
 |-- user_id: string (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- af_content_id: string (nullable = true)
 |    |-- af_currency: string (nullable = true)
 |    |-- af_order_id: long (nullable = true)

我只需要本专栏中的af_content_id。该属性可以是不同的格式:

一个字符串 一个整数 Int 和 Str 的列表。例如['ghhjj23','123546',12356] 无(有时 event_data 不包含 af_content_id

我想使用explode 函数来为af_content_id 中格式为List 的每个元素返回一个新行。但是当我应用它时,我得到一个错误:

from pyspark.sql.functions import explode

def get_content_id(column):
    return column.af_content_id

df_transf_1 = df_transf_1.withColumn(
    "products_basket", 
    get_content_id(df_transf_1.event_data)
)

df_transf_1 = df_transf_1.withColumn(
    "product_id",
    explode(df_transf_1.products_basket)
)

由于数据类型不匹配,无法解析 'explode(products_basket)':explode 函数的输入应该是数组或映射类型,而不是 StringType;

我知道原因,是因为af_content_id字段可能包含的类型不同,但我不知道如何解决。直接在列上使用pyspark.sql.functions.array() 是行不通的,因为它变成了array 的array,并且explode 不会产生预期的结果。

重现我坚持的步骤的示例代码:

import pandas as pd

arr = [
    ['b5ad805c-f295-4852-82fc-961a88',12732936],
    ['0FD6955D-484C-4FC8-8C3F-DA7D28',['Gklb38','123655']],
    ['0E3D17EA-BEEF-4931-8104','12909841'],
    ['CC2877D0-A15C-4C0A-AD65-762A35C1',[12645715, 12909837, 12909837]]
]

df = pd.DataFrame(arr, columns = ['user_id','products_basket'])

df = df[['user_id','products_basket']].astype(str)
df_transf_1 = spark.createDataFrame(df)

我正在寻找一种将 products_basket 转换为唯一可能的格式的方法:Array,这样当我应用 explode 时,它会包含一个 id行。

【问题讨论】:

原因是af_content_id1StringType,不能爆。它可能看起来有时像一个列表,有时像一个字符串,但它实际上始终是一个字符串。可能有解决方法 - 请提供一个小的 reproducible example 与您想要的输出。 【参考方案1】:

如果您从以下 DataFrame 开始:

df_transf_1.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id                         |products_basket               |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88  |12732936                      |
#|0FD6955D-484C-4FC8-8C3F-DA7D28  |['Gklb38', '123655']          |
#|0E3D17EA-BEEF-4931-8104         |12909841                      |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+

其中products_basket 列是StringType

df.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: string (nullable = true)

您不能在products_basket 上调用explode,因为它不是数组或映射。

一种解决方法是删除所有前导/尾随方括号,然后在", " 上拆分字符串(逗号后跟一个空格)。这会将字符串转换为字符串数组。

from pyspark.sql.functions import col, regexp_replace, split
df_transf_new= df_transf_1.withColumn(
    "products_basket",
    split(regexp_replace(col("products_basket"), r"(^\[)|(\]$)|(')", ""), ", ")
)

df_transf_new.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id                         |products_basket               |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |
#|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |
#|0E3D17EA-BEEF-4931-8104         |[12909841]                    |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+

正则表达式模式匹配以下任何一项:

(^\[): 字符串开头的方括号 (\]$):字符串末尾的右方括号 ('):任何单引号(因为你的字符串被引用了)

并用空字符串替换它们。

这假设您的数据在product_basket 内不包含任何需要的单引号或方括号。

split之后,新DataFrame的schema是:

df_transf_new.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: array (nullable = true)
# |    |-- element: string (containsNull = true)

现在您可以拨打explode

from pyspark.sql.functions import explode
df_transf_new.withColumn("product_id", explode("products_basket")).show(truncate=False)
#+--------------------------------+------------------------------+----------+
#|user_id                         |products_basket               |product_id|
#+--------------------------------+------------------------------+----------+
#|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |12732936  |
#|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |Gklb38    |
#|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |123655    |
#|0E3D17EA-BEEF-4931-8104         |[12909841]                    |12909841  |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12645715  |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
#+--------------------------------+------------------------------+----------+

【讨论】:

您必须在regexp_replace 之后调用split()。这里不适合打电话给array 如果你将array() 应用到一个字符串,它将变成一个包含一个元素(字符串)的数组。 array() 函数不知道逗号应该用作分隔符。如果调用 split ,它会将字符串拆分为多个元素并返回一个数组。这有意义吗? 一个可能令人困惑的方面是,如果您调用 show()array("products_basket", regexp_replace(r"(^\[)|(\]$)|(')", ""))split("products_basket", regexp_replace(r"(^\[)|(\]$)|(')", ""), " ,") 将以相同的方式打印到控制台,但底层数据不同.后者是你想要的,而前者只是一个包含一个字符串的数组。 @SarahData 可能是因为我在 ", " 上拆分,而您只想拆分 ","(没有空格)。没有看到您的实际数据很难说,但似乎字符串没有被拆分,因为模式不匹配。您还可以传入正则表达式模式来拆分,如",(\s+)?",这意味着逗号后跟可选的空格。 是的,你说得对,我再次回到 split() 函数参数并验证了它需要什么作为输入和列上一步(仅应用 regexp_replace 之后)并理解它应该是"," 而不是", "。谢谢!

以上是关于PySpark DataFrame在使用explode之前将字符串的列更改为数组的主要内容,如果未能解决你的问题,请参考以下文章

我应该在 PySpark 中选择 RDD 还是 DataFrame 之一?

使用 PySpark 删除 Dataframe 的嵌套列

如何在 jupyter 中像 pandas Dataframe 一样打印 Pyspark Dataframe

在 zeppelin 中使用从 %pyspark 到 %python 的 Dataframe

在pyspark中以分布式方式有效地生成大型DataFrame(没有pyspark.sql.Row)

如何在 PySpark 中仅打印 DataFrame 的某一列?