从数据集中的现有列创建新列 - Apache Spark Java
Posted
技术标签:
【中文标题】从数据集中的现有列创建新列 - Apache Spark Java【英文标题】:Create new column from existing column in Dataset - Apache Spark Java 【发布时间】:2019-08-15 14:28:00 【问题描述】:我是Spark ML
的新手,并且被困在一项需要一些数据规范化的任务中,而且网上可用于 Spark ML - Java 的文档非常少。任何帮助深表感谢。
问题描述:
我有一个Dataset
,其中包含列 (ENCODED_URL
) 中的编码 url,我想在现有数据集中创建包含解码版本 ENCODED_URL 的新列 (DECODED_URL
)。
例如:当前数据集
ENCODED_URL https%3A%2F%2Fmy网站
新数据集
ENCODED_URL | DECODED_URL
https%3A%2F%2Fmywebsite | https://mywebsite
尝试使用withColumn
,但不知道我应该将什么作为第二个参数传递
Dataset<Row> newDs = ds.withColumn("new_col",?);
在阅读了Spark documentation 之后,我了解到使用 SQLTransformer 可能是可行的,但无法弄清楚如何自定义它来解码 url。
这就是我从 CSV 读取信息的方式
Dataset<Row> urlDataset = s_spark.read().option("header", true).csv(CSV_FILE).persist(StorageLevel.MEMORY_ONLY());
【问题讨论】:
【参考方案1】:火花入门
首先要知道的是 Spark Datasets
实际上是不可变的。每当您进行转换时,都会创建并返回一个新的数据集。要记住的另一件事是actions
和transformations
之间的区别——动作导致Spark 实际上开始处理数字并计算您的DataFrame,而transformations
添加到DataFrame 的定义中,但除非有动作,否则不会计算叫做。一个动作的例子是DataFrame#count
,而一个转换的例子是DataFrame#withColumn
。请参阅Spark Scala documentation 中的操作和转换的完整列表。
解决方案
withColumn
允许您或者创建新列或替换Dataset
中的现有列(如果第一个参数是现有列的名称)。 withColumn
的文档会告诉您第二个参数应该是 Column
对象。不幸的是,Column 文档仅描述了可用于Column
对象的方法,但没有链接到创建 Column
对象的其他方法,因此您不知所措并不是您的错下一步做什么。
您要查找的是org.apache.spark.sql.functions#regexp_replace
。综上所述,您的代码应如下所示:
...
import org.apache.spark.sql.functions
Dataset<Row> ds = ... // reading from your csv file
ds = ds.withColumn(
"decoded_url",
functions.regexp_replace(functions.col("encoded_url"), "\\^https%3A%2F%2F", "https://"))
regexp_replace
要求我们将Column
对象作为第一个值传递,但没有任何要求它甚至存在于任何Dataset
上,因为Column
对象基本上是如何 计算的指令列,它们本身实际上并不包含任何真实数据。为了说明这个原理,我们可以把上面的 sn-p 写成:
...
import org.apache.spark.sql.functions
Dataset<Row> ds = ... // reading from your csv file
Column myColExpression = functions.regexp_replace(functions.col("encoded_url"), "\\^https%3A%2F%2F", "https://"))
ds = ds.withColumn("decoded_url", myColExpression)
如果您愿意,您可以在其他具有 encoded_url
列的数据集上重复使用 myColExpression
。
建议
如果您还没有,您应该熟悉org.apache.spark.sql.functions
class。它是一个实用程序类,实际上是用于转换的 Spark 标准库。
【讨论】:
以上是关于从数据集中的现有列创建新列 - Apache Spark Java的主要内容,如果未能解决你的问题,请参考以下文章
使用 UDF 从 Apache Spark 中的其他列创建新列