Apache Spark(Java)中列的自定义处理
Posted
技术标签:
【中文标题】Apache Spark(Java)中列的自定义处理【英文标题】:Custom processing on column in Apache Spark (Java) 【发布时间】:2016-07-04 17:37:52 【问题描述】:我在 Spark 中加载了一个 JSON 文档,大致如下:
root
|-- datasetid: string (nullable = true)
|-- fields: struct (nullable = true)
...
| |-- type_description: string (nullable = true)
我的 DF 正在把它变成:
df = df.withColumn("desc", df.col("fields.type_description"));
一切正常,但type_description
的值看起来像:“1 - 我的描述类型”。
理想情况下,我希望我的 df 仅包含文本部分,例如“我的描述类型”。我知道该怎么做,但我怎样才能通过 Spark 做到这一点?
我希望有一些类似的:
df = df.withColumn("desc", df.col("fields.type_description").call(/* some kind of transformation class / method*/));
谢谢!
【问题讨论】:
那么您到底在寻找什么?正则表达式?子串?您能否更新问题以反映这一点? 理想情况下它可以是任何东西......在这种情况下,我会使用子字符串和修剪(永远不会超过 2 位数字)来管理......但我还有其他更有趣的情况,比如解析、列之间的值连接、调用 joda 时间等。 【参考方案1】:一般而言,Spark 提供了广泛的 SQL 函数集,这些函数从基本的字符串处理实用程序到日期/时间处理函数,再到不同的统计摘要。这是o.a.s.sql.functions
的一部分。在这种特殊情况下,您可能想要这样的东西:
import static org.apache.spark.sql.functions.*;
df.withColumn("desc",
regexp_replace(df.col("fields.type_description"), "^[0-9]*\\s*-\\s*", "")
);
一般来说,在使用 Spark SQL 时,这些函数应该是您的首选。有 Catalyst 表达式支持,通常提供 codegen 实用程序。这意味着您可以充分受益于不同的 Spark SQL 优化。
另一种但效率较低的方法是实现自定义 UDF。参见例如Creating a SparkSQL UDF in Java outside of SQLContext
【讨论】:
太棒了 - 我看到我们可以用 Python 做 UDF,但我真的很高兴我们也可以用 Java 做这件事!发送!以上是关于Apache Spark(Java)中列的自定义处理的主要内容,如果未能解决你的问题,请参考以下文章
使用 Scala 从 Spark 中列的一系列值中汇总为一个新列
如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?