在 PySpark 中将 URI 查询字符串转换为结构键值数组

Posted

技术标签:

【中文标题】在 PySpark 中将 URI 查询字符串转换为结构键值数组【英文标题】:Convert a URI query string to an Array of Struct key-value in PySpark 【发布时间】:2020-03-05 22:25:49 【问题描述】:

我在 PySpark 中有一个 DataFrame,其中有一列 URI 查询字符串 (StringType),如下所示:

+--------------+ 
| cs_uri_query |
+--------------+
| a=1&b=2&c=3  |
+--------------+
| d&e=&f=4     |
+--------------+

我需要将此列转换为具有以下结构的 StructField 元素的 ArrayType:

ArrayType(StructType([StructField('key', StringType(), nullable=False),
                      StructField('value', StringType(), nullable=True)]))

我期望的专栏是这样的:

+------------------------------------------------------------+ 
| cs_uri_query                                               |
+------------------------------------------------------------+
| [key=a, value=1,key=b, value=2,key=c, value=3]       |
+------------------------------------------------------------+
| [key=d, value=null,key=e, value=null,key=f, value=4] |
+------------------------------------------------------------+

UDF 是我发现实现这一目标的唯一方法。 我正在使用纯 Spark 函数,如果可能的话,我想避免使用 UDF ...... 与在 Scala 语言上使用 Spark 不同,UDF 在 PySpark 上的性能非常差。

这是我使用 UDF 的代码:

def parse_query(query):
    args = None
    if query:
        args = []
        for arg in query.split("&"):
            if arg:
                if "=" in arg:
                    a = arg.split("=")
                    if a[0]:
                        v = a[1] if a[1] else None
                        args.append("key": a[0], "value": v)
                else:
                    args.append("key": arg, "value": None)
    return args

uri_query = ArrayType(StructType([StructField('key', StringType(), nullable=True),
                                  StructField('value', StringType(), nullable=True)]))

udf_parse_query = udf(lambda args: parse_query(args), uri_query)

df = df.withColumn("cs_uri_query", udf_parse_query(df["cs_uri_query"]))

谁能用一个惊人的解决方案让我大开眼界?

【问题讨论】:

【参考方案1】:

对于 Spark 2.4+,您可以通过 & split 然后使用 transform 函数将每个元素 key=value 转换为 struct(key, value)

from pyspark.sql.functions import expr

df = spark.createDataFrame([("a=1&b=2&c=3",), ("d&e=&f=4",)], ["cs_uri_query"])

transform_expr = """transform(split(cs_uri_query, '&'),
                 x -> struct(split(x, '=')[0] as key, split(x, '=')[1] as value)
                 )
                 """

df.withColumn("cs_uri_query", expr(transform_expr)).show(truncate=False)

#+------------------------+
#|cs_uri_query            |
#+------------------------+
#|[[a, 1], [b, 2], [c, 3]]|
#|[[d,], [e, ], [f, 4]]   |
#+------------------------+

编辑

如果你想过滤掉 null 或空的键,那么你可以使用filter 和上面的转换表达式:

transform_expr = """filter(transform(split(cs_uri_query, '&'),
                                     x -> struct(split(x, '=')[0] as key, split(x, '=')[1] as value)
                           ),
                           x -> ifnull(x.key, '') <> ''
                    )
                 """

【讨论】:

你是我的英雄!关于空源值的另一个问题......我如何跳过添加预期的最终数组源值,如生成“key”=“”和“value”=null的“”或“=”或“=&”? 您对管理上述案例限制有什么想法吗?我从未使用过变换功能。 TY

以上是关于在 PySpark 中将 URI 查询字符串转换为结构键值数组的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中将字符串列转换为ArrayType

pyspark 在循环中将数组转换为字符串

在pyspark中将字符串价格值转换为double类型

在 Android 中将字符串转换为 Uri

如何在pyspark中将字符串值转换为arrayType

如何在 PySpark 中将字符串转换为字典 (JSON) 的 ArrayType