从数据集中的现有列创建新列 - Apache Spark Java

Posted

技术标签:

【中文标题】从数据集中的现有列创建新列 - Apache Spark Java【英文标题】:Create new column from existing column in Dataset - Apache Spark Java 【发布时间】:2019-08-15 14:28:00 【问题描述】:

我是Spark ML 的新手,并且被困在一项需要一些数据规范化的任务中,而且网上可用于 Spark ML - Java 的文档非常少。任何帮助深表感谢。

问题描述:

我有一个Dataset,其中包含列 (ENCODED_URL) 中的编码 url,我想在现有数据集中创建包含解码版本 ENCODED_URL 的新列 (DECODED_URL)。

例如:当前数据集

ENCODED_URL https%3A%2F%2Fmy网站

新数据集

ENCODED_URL                         | DECODED_URL  
https%3A%2F%2Fmywebsite             | https://mywebsite

尝试使用withColumn,但不知道我应该将什么作为第二个参数传递

Dataset<Row> newDs = ds.withColumn("new_col",?);

在阅读了Spark documentation 之后,我了解到使用 SQLTransformer 可能是可行的,但无法弄清楚如何自定义它来解码 url。

这就是我从 CSV 读取信息的方式

Dataset<Row> urlDataset = s_spark.read().option("header", true).csv(CSV_FILE).persist(StorageLevel.MEMORY_ONLY());

【问题讨论】:

【参考方案1】:

火花入门

首先要知道的是 Spark Datasets 实际上是不可变的。每当您进行转换时,都会创建并返回一个新的数据集。要记住的另一件事是actionstransformations 之间的区别——动作导致Spark 实际上开始处理数字并计算您的DataFrame,而transformations 添加到DataFrame 的定义中,但除非有动作,否则不会计算叫做。一个动作的例子是DataFrame#count,而一个转换的例子是DataFrame#withColumn。请参阅Spark Scala documentation 中的操作和转换的完整列表。

解决方案

withColumn 允许您或者创建新列或替换Dataset 中的现有列(如果第一个参数是现有列的名称)。 withColumn 的文档会告诉您第二个参数应该是 Column 对象。不幸的是,Column 文档仅描述了可用于Column 对象的方法,但没有链接到创建 Column 对象的其他方法,因此您不知所措并不是您的错下一步做什么。

您要查找的是org.apache.spark.sql.functions#regexp_replace。综上所述,您的代码应如下所示:

...
import org.apache.spark.sql.functions

Dataset<Row> ds = ... // reading from your csv file
ds = ds.withColumn(
    "decoded_url", 
    functions.regexp_replace(functions.col("encoded_url"), "\\^https%3A%2F%2F", "https://"))

regexp_replace 要求我们将Column 对象作为第一个值传递,但没有任何要求它甚至存在于任何Dataset 上,因为Column 对象基本上是如何 计算的指令列,它们本身实际上并不包含任何真实数据。为了说明这个原理,我们可以把上面的 sn-p 写成:

...
import org.apache.spark.sql.functions

Dataset<Row> ds = ... // reading from your csv file
Column myColExpression = functions.regexp_replace(functions.col("encoded_url"), "\\^https%3A%2F%2F", "https://"))
ds = ds.withColumn("decoded_url", myColExpression)

如果您愿意,您可以在其他具有 encoded_url 列的数据集上重复使用 myColExpression

建议

如果您还没有,您应该熟悉org.apache.spark.sql.functions class。它是一个实用程序类,实际上是用于转换的 Spark 标准库。

【讨论】:

以上是关于从数据集中的现有列创建新列 - Apache Spark Java的主要内容,如果未能解决你的问题,请参考以下文章

Python:从现有列创建新列

使用 UDF 从 Apache Spark 中的其他列创建新列

尝试在 Jupyter Notebook 上使用 Pandas 从现有列创建新列时出现 NoneType 错误

使用正则表达式根据列的值在数据集中创建新列

Spark 根据现有列的映射值创建新列

如何创建创建新列并修改现有列的 UDF