Spark UDF:如何在每一行上编写一个 UDF 以提取嵌套结构中的特定值?

Posted

技术标签:

【中文标题】Spark UDF:如何在每一行上编写一个 UDF 以提取嵌套结构中的特定值?【英文标题】:Spark UDF: How to write a UDF on each row to extract a specific value in a nested struct? 【发布时间】:2019-12-04 13:19:42 【问题描述】:

我在 Java 中使用 Spark 来处理 XML 文件。来自databricks的spark-xml包用于将xml文件读入dataframe。

示例 xml 文件是:

<RowTag>
    <id>1</id>
    <name>john</name>
    <expenses>
        <travel>
            <details>
                <date>20191203</date>
                <amount>400</amount>
            </details>
        </travel>
    </expenses>
</RowTag>
<RowTag>
    <id>2</id>
    <name>joe</name>
    <expenses>
        <food>
            <details>
                <date>20191204</date>
                <amount>500</amount>
            </details>
        </food>
    </expenses>
</RowTag>

结果sparkDataset&lt;Row&gt; df如下图,每一行代表一个xml文件。

+--+------+----------------+
|id| name |expenses        |
+---------+----------------+
|1 | john |[[20191203,400]]|
|2 | joe  |[[20191204,500]]|
+--+------+----------------+

df.printSchema(); 显示如下:

root
|-- id: int(nullable = true)
|-- name: string(nullable = true)
|-- expenses: struct (nullable = true)
|    |-- travel: struct (nullable = true)
|    |    |-- details: struct (nullable = true)
|    |    |    |-- date: string (nullable = true)
|    |    |    |-- amount: int (nullable = true)
|    |-- food: struct (nullable = true)
|    |    |-- details: struct (nullable = true)
|    |    |    |-- date: string (nullable = true)
|    |    |    |-- amount: int (nullable = true)

所需的输出数据框如下:

+--+------+-------------+
|id| name |expenses_date|
+---------+-------------+
|1 | john |20191203     |
|2 | joe  |20191204     |
+--+------+-------------+

基本上我想要一个通用的解决方案来从具有以下结构的 xml 中获取日期,其中只有标签 &lt;X&gt; 会有所不同。

<RowTag>
    <id>1</id>
    <name>john</name>
    <expenses>
        **<X>**
            <details>
                <date>20191203</date>
                <amount>400</amount>
            </details>
        **</X>**
    </expenses>
</RowTag>

我尝试过的:

spark.udf().register("getDate",(UDF1 <Row, String>) (Row row) -> 
            return row.getStruct(0).getStruct(0).getAs("date").toString();
        , DataTypes.StringType);

df.select(callUDF("getDate",df.col("expenses")).as("expenses_date")).show();

但它不起作用,因为 row.getStruct(0) 路由到 &lt;travel&gt;,但是对于 row joe,&lt;expenses&gt; 下没有 &lt;travel&gt; 标签,所以它返回了一个 java.lang.NullPointerException。我想要的是一个通用的解决方案,对于每一行,它可以自动获取下一个标签名称,例如row.getStruct(0) 路由到 &lt;travel&gt; 用于 row john 和 &lt;food&gt; 用于 row joe。

所以我的问题是:我应该如何重新制定我的 UDF 来实现这一点?

提前致谢!! :)

【问题讨论】:

如果您提到到目前为止您尝试过的所有事情都会有所帮助。以便您的代码/尝试可以改进或类似的东西。 @VarunJain 谢谢!现在就去做。 【参考方案1】:

spark-xml 包允许您直接在选择表达式中访问嵌套字段。为什么要寻找 UDF?

df.selectExpr("id", "name", "COALESCE(`expenses`.`food`.`details`.`date`, `expenses`.`travel`.`details`.`date`) AS expenses_date" ).show()

输出:

+---+----+-------------+
| id|name|expenses_date|
+---+----+-------------+
|  1|john|     20191203|
|  2| joe|     20191204|
+---+----+-------------+

编辑

如果唯一改变的标签是expenses 结构之后的标签,那么您可以搜索expenses 下的所有字段,然后搜索coalesce 列:expenses.X.details.date。在 Spark 中是这样的:

val expenses_fields = df.select(col("expenses.*")).columns
val date_cols = expenses_fields.map(f => col(s"`expenses`.`$f`.`details`.`date`"))

df.select(col("id"), col("name"), coalesce(date_cols: _*).alias("expenses_date")).show()

不过,您不需要使用 UDF!

【讨论】:

您好,谢谢您的回答。对不起,我没有说清楚。因为我必须处理我不知道标签 下接下来会发生什么的情况,它可能是 或任何其他标签名称,在这种情况下,我需要一个 UDF 函数(或任何其他可能的方式)来概括这部分的解决方案。 谢谢!可以将其翻译成Java吗?我已经尝试了解决方案,.columns 函数返回一个字符串[],我无法使用它的 map 函数... 在 Java 8 中你也可以使用 map。没试过这样的东西:Arrays.stream(expenses_fields).map(f -&gt; col(s"expenses.$f.details.date")).collect(Collectors.toList()) 我也试过了,但是 collect 函数会返回 Java 中的 List 而不是 spark 列... 这实际上会返回List&lt;Column&gt; 。 “Java 中的列表而不是火花列”是什么意思?

以上是关于Spark UDF:如何在每一行上编写一个 UDF 以提取嵌套结构中的特定值?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark Udf 中传递地图?

如何使用 uniroot 解决数据框中的用户定义函数 (UDF)?

一套 Spark UDF 实践教程(文末专栏抽奖)

如何在 Spark 中创建有状态的 UDF?

如何使用反射从scala调用spark UDF?

如何在spark shell中注册Java SPark UDF?