Apache Spark(Java)中列的自定义处理

Posted

技术标签:

【中文标题】Apache Spark(Java)中列的自定义处理【英文标题】:Custom processing on column in Apache Spark (Java) 【发布时间】:2016-07-04 17:37:52 【问题描述】:

我在 Spark 中加载了一个 JSON 文档,大致如下:

root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
...
 |    |-- type_description: string (nullable = true)

我的 DF 正在把它变成:

df = df.withColumn("desc", df.col("fields.type_description"));

一切正常,但type_description 的值看起来像:“1 - 我的描述类型”。

理想情况下,我希望我的 df 仅包含文本部分,例如“我的描述类型”。我知道该怎么做,但我怎样才能通过 Spark 做到这一点?

我希望有一些类似的:

df = df.withColumn("desc", df.col("fields.type_description").call(/* some kind of transformation class / method*/));

谢谢!

【问题讨论】:

那么您到底在寻找什么?正则表达式?子串?您能否更新问题以反映这一点? 理想情况下它可以是任何东西......在这种情况下,我会使用子字符串和修剪(永远不会超过 2 位数字)来管理......但我还有其他更有趣的情况,比如解析、列之间的值连接、调用 joda 时间等。 【参考方案1】:

一般而言,Spark 提供了广泛的 SQL 函数集,这些函数从基本的字符串处理实用程序到日期/时间处理函数,再到不​​同的统计摘要。这是o.a.s.sql.functions 的一部分。在这种特殊情况下,您可能想要这样的东西:

import static org.apache.spark.sql.functions.*;

df.withColumn("desc",
  regexp_replace(df.col("fields.type_description"), "^[0-9]*\\s*-\\s*", "")
);

一般来说,在使用 Spark SQL 时,这些函数应该是您的首选。有 Catalyst 表达式支持,通常提供 codegen 实用程序。这意味着您可以充分受益于不同的 Spark SQL 优化。

另一种但效率较低的方法是实现自定义 UDF。参见例如Creating a SparkSQL UDF in Java outside of SQLContext

【讨论】:

太棒了 - 我看到我们可以用 Python 做 UDF,但我真的很高兴我们也可以用 Java 做这件事!发送!

以上是关于Apache Spark(Java)中列的自定义处理的主要内容,如果未能解决你的问题,请参考以下文章

我可以更改 Spark 数据框中列的可空性吗?

使用 Scala 从 Spark 中列的一系列值中汇总为一个新列

如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?

apache spark中的自定义分区器

获取Apache Spark Java中的整个数据集或仅列的摘要

计算两个连续日期之间的唯一 ID,它们是 PySpark 中列的值