Pyspark - 循环通过 structType 和 ArrayType 在 structfield 中进行类型转换
Posted
技术标签:
【中文标题】Pyspark - 循环通过 structType 和 ArrayType 在 structfield 中进行类型转换【英文标题】:Pyspark - Looping through structType and ArrayType to do typecasting in the structfield 【发布时间】:2019-11-04 16:24:51 【问题描述】:我对 pyspark 很陌生,这个问题让我很困惑。基本上,我正在寻找一种 可扩展 方法来循环通过 structType 或 ArrayType 进行类型转换。
我的数据架构示例:
root
|-- _id: string (nullable = true)
|-- created: timestamp (nullable = true)
|-- card_rates: struct (nullable = true)
| |-- rate_1: integer (nullable = true)
| |-- rate_2: integer (nullable = true)
| |-- rate_3: integer (nullable = true)
| |-- card_fee: integer (nullable = true)
| |-- payment_method: string (nullable = true)
|-- online_rates: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- rate_1: integer (nullable = true)
| | |-- rate_2: integer (nullable = true)
| | |-- online_fee: double (nullable = true)
|-- updated: timestamp (nullable = true)
正如您在此处看到的,card_rates
是结构体,online_rates
是结构体数组。我正在寻找遍历上述所有字段并有条件地对它们进行类型转换的方法。理想情况下,如果它应该是数字,它应该被转换为双精度,如果它应该是字符串,它应该被转换为字符串。我需要循环,因为那些 rate_*
字段可能会随着时间增长。
但是现在,我对能够循环它们并将它们全部类型转换为字符串感到满意,因为我对 pyspark 非常陌生,并且仍在尝试感受它。
我想要的输出模式:
root
|-- _id: string (nullable = true)
|-- created: timestamp (nullable = true)
|-- card_rates: struct (nullable = true)
| |-- rate_1: double (nullable = true)
| |-- rate_2: double (nullable = true)
| |-- rate_3: double (nullable = true)
| |-- card_fee: double (nullable = true)
| |-- payment_method: string (nullable = true)
|-- online_rates: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- rate_1: double (nullable = true)
| | |-- rate_2: double (nullable = true)
| | |-- online_fee: double (nullable = true)
|-- updated: timestamp (nullable = true)
我已经没有如何做到这一点的想法了。
我从这里得到参考:PySpark convert struct field inside array to string
但此解决方案对字段进行硬编码,并不会真正循环遍历字段。
请帮忙。
【问题讨论】:
你解决了这个@addicted 吗? 一点也不 :( @AlexandrosBiratsis 有什么想法吗? 我想我可能有一个解决方案,不是你能想到的最优雅的解决方案,但它可以工作 @AlexandrosBiratsis hm 随时欢迎任何解决方案。 【参考方案1】:这是一种借助 StructType.simpleString
和 _parse_datatype_string
内置函数的解决方案:
from pyspark.sql.types import *
df_schema = StructType([
StructField("_id", StringType(), True),
StructField("created", TimestampType(), True),
StructField("card_rates", StructType([
StructField("rate_1", IntegerType(), True),
StructField("rate_2", IntegerType(), True),
StructField("rate_3", IntegerType(), True),
StructField("card_fee", IntegerType(), True),
StructField("card_fee", IntegerType(), True)])),
StructField("online_rates", ArrayType(
StructType(
[
StructField("rate_1", IntegerType(),True),
StructField("rate_2", IntegerType(),True),
StructField("online_fee", DoubleType(),True)
]),True),True),
StructField("updated", TimestampType(), True)])
schema_str = df_schema.simpleString() # this gives -> struct<_id:string,created:timestamp,card_rates:struct<rate_1:int,rate_2:int,rate_3:int,card_fee:int, card_fee:int>,online_rates:array<struct<rate_1:int,rate_2:int,online_fee:double>>,updated:timestamp>
double_schema = schema_str.replace(':int', ':double')
# convert back to StructType
final_schema = _parse_datatype_string(double_schema)
final_schema
-
首先使用
schema.simpleString
将您的架构转换为一个简单的字符串
然后将所有:int
替换为:double
最后将修改后的字符串模式转换为带有_parse_datatype_string
的StructType
更新:
为了避免@jxc 指出的反引号问题,更好的解决方案是递归扫描元素,如下所示:
def transform_schema(schema):
if schema == None:
return StructType()
updated = []
for f in schema.fields:
if isinstance(f.dataType, IntegerType):
# if IntegerType convert to DoubleType
updated.append(StructField(f.name, DoubleType(), f.nullable))
elif isinstance(f.dataType, ArrayType):
# if ArrayType unpack the array type(elementType), do recursion then wrap results with ArrayType
updated.append(StructField(f.name, ArrayType(transform_schema(f.dataType.elementType))))
elif isinstance(f.dataType, StructType):
# if StructType do recursion
updated.append(StructField(f.name, transform_schema(f.dataType)))
else:
# else handle all the other cases i.e TimestampType, StringType etc
updated.append(StructField(f.name, f.dataType, f.nullable))
return StructType(updated)
# call the function with your schema
transform_schema(df_schema)
解释:该函数遍历模式 (StructType) 上的每个项目,并尝试将 int 字段 (StructField) 转换为双精度字段。最后将转换后的模式(StructType)传递给上层(父 StructType)。
输出:
StructType(List(
StructField(_id,StringType,true),
StructField(created,TimestampType,true),
StructField(card_rates,
StructType(List(StructField(rate_1,DoubleType,true),
StructField(rate_2,DoubleType,true),
StructField(rate_3,DoubleType,true),
StructField(card_fee,DoubleType,true),
StructField(card_fee,DoubleType,true))),true),
StructField(online_rates,ArrayType(
StructType(List(
StructField(rate_1,DoubleType,true),
StructField(rate_2,DoubleType,true),
StructField(online_fee,DoubleType,true))),true),true),
StructField(updated,TimestampType,true)))
更新:(2020-02-02)
这是一个关于如何将新模式与现有数据框一起使用的示例:
updated_schema = transform_schema(df.schema)
# cast each column to the new type
select_expr = [df[f.name].cast(f.dataType) for f in updated_schema.fields]
df.select(*select_expr).show()
【讨论】:
如果下个月我有rate_3和rate_4,这能适应这种变化吗?问题是我不是创建rate_3
和rate_4
的人。其他一些人这样做了,我永远不知道一行数据可以有多少速率。这就是为什么它必须减少硬编码。
否,因为上面的脚本根本不依赖于列名。最初你有一个模式,这个模式应该是一个实例 StructType。我们将每个 int 字段转换为 double 而不检查任何有关列名的内容。您可以尝试逐行运行脚本并查看输出。如果您遇到任何问题,请告诉我
@AlexandrosBiratsis:_parse_datatype_string("struct<my.col:string,b1:int>")
,而你需要_parse_datatype_string("struct<`my.col`:string,b1:int>")
。注意字段名称中的点和df.schema.simpleString()
不会添加的反引号。
我同意@jxc,为了更正确,我更愿意实现递归解决方案。
嗨@alexandro 抱歉没有再次回复。我会在今天或明天尝试并通知您。以上是关于Pyspark - 循环通过 structType 和 ArrayType 在 structfield 中进行类型转换的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyspark 将 StructType、ArrayType 转换/转换为 StringType(单值)
Spark 2.1.1 上的 Pyspark,StructType 中的 StructFields 始终可以为空
如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果
pySpark:如何在数据框中的 arrayType 列中获取 structType 中的所有元素名称?
PySpark:TypeError:StructType 不能接受类型为 <type 'unicode'> 或 <type 'str'> 的对象