如何最好地将 SparkSQL Dataframe Array[String] 列转换为新的 [String] 列

Posted

技术标签:

【中文标题】如何最好地将 SparkSQL Dataframe Array[String] 列转换为新的 [String] 列【英文标题】:How to best transform a SparkSQL Dataframe Array[String] column to a new [String] column 【发布时间】:2019-04-05 10:45:27 【问题描述】:

我是 Spark 的新手,我有一个 Apache SparkSQL DataFrame df 有 4 列,具有以下架构:

root
 |-- _id: string (nullable = false)
 |-- _title: string (nullable = false)
 |-- _published-at: date (nullable = false)
 |-- p: array (nullable = true)
 |    |-- element: string (containsNull = true)

df 包含大量(一百万左右)新闻文章,每条记录的列包含:唯一 id (_id)、标题 (_title)、发布日期 (_published-at) 和一个字符串数组每篇文章中的文本段落 (p)。

我现在想将“p”列从文章段落的当前格式Array[String] 转换为全文文本的融合String,其中转换是一个简单的映射,其中段落元素与它们之间的空格 (" "),从而导致将新的第五列 String 添加到 df。 IE。像这样:

df.withColumn(df.(col"p").map(_.mkString(" ")).alias("fullarticle"))

这不起作用。然而,这似乎是一个微不足道的问题,但我一定有什么问题。在 Spark 的functions 包中,可以找到很多功能,但似乎没有一个适合这里。我必须以某种方式使用“用户定义函数”(UDF)吗?如果可能,最好避免它。

可以通过以下方式将其转换为String,从而生成一个新的Dataset[String] dsFullArticles

dsFullArticles = df.select(col("p").as[Array[String]]).map(_.mkString(" ")).alias("fullarticle")

(似乎需要.as[Array[String]] 来解开WrappedArray,它实际上包装了“p”列中的每个Array[String] 元素)。但是如何将dsFullArticles 作为新的 附加到df

之后,我也想在“fullarticle”一栏中找到每篇文章最长单词的长度,并将其作为第六列添加到df

// Split each article in an array of its words
val dsFullArticlesArrayOfWords = dsFullArticles.map(s => s.split(" "))
// Find number of characters of longest word in article, 0 if article is empty
val dsMaxWordLength =
  dsFullArticlesArrayOfWords.map(s => (s.map(w => w.length()) match 
    case x if x.isEmpty => 0  
    case x => x.max
  ))

上面的代码也可以工作,生成一个Dataset[int],但是如何将它作为一个列添加到df?这里同样的问题。当所有都在同一个DataFrame df 中时,很容易进行各种 SQL 选择、过滤等。

【问题讨论】:

【参考方案1】:

你可以使用concat_ws函数:

concat_ws(sep, [str | array(str)]+) - 返回由 sep 分隔的字符串的串联。

在你的情况下:

df.withColumn("fullarticle", concat_ws(" ",col("p")))

【讨论】:

感谢@galando,它成功了!我以前看过concat_ws 函数,但我只认为它可以用来水平连接Columns,比如几个String 列到一个新的String 列,而不是一个Array[String] 列到一个@ 987654328@专栏。

以上是关于如何最好地将 SparkSQL Dataframe Array[String] 列转换为新的 [String] 列的主要内容,如果未能解决你的问题,请参考以下文章

如何动态地将列添加到 DataFrame?

Spark SQL - 如何将 DataFrame 写入文本文件?

RDD和SparkSQL综合应用

Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF

SparkSQL使用IDEA创建DataFrame

DataFrame,SparkSQL