Spark将列拆分为数组和聚合计算
Posted
技术标签:
【中文标题】Spark将列拆分为数组和聚合计算【英文标题】:Spark splitting a column into array and aggregate calculation 【发布时间】:2019-12-26 16:56:51 【问题描述】:我有一个格式如下的 DataFrame:
COL_1 COL_2 COL_3
----- ----- -----
TEXT1 TEXT2 ["www.a.com/abc", "www.b.com/dgh", "www.c.com/axy", "www.a.com/xyz"]
TEXT3 TEXT4 ["www.a.com/abc", "www.d.com/dgh", "www.a.com/axy", "www.f.com/xyz", "www.f.com/xyz", "www.a.com/xyz"]
TEXT5 TEXT6 ["www.v.com/abc", "www.c.com/axy"]
所有列都是字符串。我想在 spark 中做什么:
将 COL_3 拆分为单独的 URL。 提取域名,然后计算该行中来自域“a.com”的 URL 的百分比。 如果“a.com”的百分比超过一定数量在该行(例如,该行中 50% 的 URL 来自 a.com),我想映射每个 URL 路径到包含 COL_1 和 COL_2 的单独行。对于上面的示例,输出将类似于以下内容:
COL_1 COL_2 COL_MAP_REDUCED
----- ----- -----
TEXT1 TEXT2 abc
TEXT1 TEXT2 xyz
TEXT3 TEXT4 xyz
TEXT3 TEXT4 axy
我不是在寻找可以为我解决问题的人,我正在寻找有关如何开始的指导,因为我的 google-foo 让我失望了。
谢谢。
【问题讨论】:
【参考方案1】: val df = // your dataframe creation code
val res = df.mapr=>
val col3 = r.getAs[String](“col_3”)
val col2= r.getAs[String](“col_2”)
val col1= r.getAs[String](“col_1”)
//operate on col3 as you wish
val col4_ = yourFunc(col3)
//if col4 is of type Seq or Array then you can flatten it by using flatMap or explode function
val col4 = Seq("x","y")
var explodedResult = Seq[Tuple4[String,String,String,String]]()
col4.foreach element =>
explodedResult = explodedResult :+ (col1, col2, col3, element)
explodedResult
.flatMap(identity(_))
新数据框“res”将包含所有现有列以及新的评估结果。
如果存储此数据框,则会复制大量数据。您也可以像(col1:String,col2:String,col3:String,col4:Seq[String])
一样将结果存储为数组。如果您需要在单独的行中重复 col4 中的每个值,那么您可以使用 explode
函数,它将 col4 的每一行分解为整个数据框的一行。其语法为df.withColumn("col4", explode(col("col4")).show()
【讨论】:
感谢您的回答。对于每个 col4 输出(在我的示例中为 COL_MAP_REDUCED)有一个单独的行怎么样? 我已经更新了答案。如果这可行,那么请为答案投票。【参考方案2】:scala> df.show(false)
+-----+-----+------------------------------------------------------------------------------------------------------+
|COL_1|COL_2|COL_3 |
+-----+-----+------------------------------------------------------------------------------------------------------+
|TEXT1|TEXT2|["www.a.com/abc", "www.b.com/dgh", "www.c.com/axy", "www.a.com/xyz"] |
|TEXT3|TEXT4|["www.a.com/abc", "www.d.com/dgh", "www.a.com/axy", "www.f.com/xyz", "www.f.com/xyz", "www.a.com/xyz"]|
|TEXT5|TEXT6|["www.v.com/abc", "www.c.com/axy"] |
+-----+-----+------------------------------------------------------------------------------------------------------+
将 COL_3 拆分为单独的 URL。
scala> val df1 = df.withColumn("COL_3", regexp_replace(col("COL_3"), "[\\] \" \\[]",""))
.withColumn("COL_3", explode(split(col("COL_3"), ",")))
scala> df1.show(false)
+-----+-----+-------------+
|COL_1|COL_2|COL_3 |
+-----+-----+-------------+
|TEXT1|TEXT2|www.a.com/abc|
|TEXT1|TEXT2|www.b.com/dgh|
|TEXT1|TEXT2|www.c.com/axy|
|TEXT1|TEXT2|www.a.com/xyz|
|TEXT3|TEXT4|www.a.com/abc|
|TEXT3|TEXT4|www.d.com/dgh|
|TEXT3|TEXT4|www.a.com/axy|
|TEXT3|TEXT4|www.f.com/xyz|
|TEXT3|TEXT4|www.f.com/xyz|
|TEXT3|TEXT4|www.a.com/xyz|
|TEXT5|TEXT6|www.v.com/abc|
|TEXT5|TEXT6|www.c.com/axy|
+-----+-----+-------------+
提取域名,然后计算网址的百分比 来自该行中的域“a.com”。
scala> val df2 = df1.filter(col("COL_3").like("%a.com%"))
scala> df2.show
+-----+-----+-------------+
|COL_1|COL_2| COL_3|
+-----+-----+-------------+
|TEXT1|TEXT2|www.a.com/abc|
|TEXT1|TEXT2|www.a.com/xyz|
|TEXT3|TEXT4|www.a.com/abc|
|TEXT3|TEXT4|www.a.com/axy|
|TEXT3|TEXT4|www.a.com/xyz|
+-----+-----+-------------+
如果“a.com”在该行中的百分比超过一定数量(50% 例如,该行中的 URL 来自 a.com),我想映射 每个 URL 路径使用 COL_1 和 COL_2 指向单独的行。
您可以通过从数据帧df1
获取col_1
和col_2
的组计数来获得百分比,并使用数据帧df2
以相同的计数进行计算。
为了你想要的输出
scala> df2.withColumn("COL_MAP_REDUCED", split(col("COL_3"),"/")(1)).drop("COL_3").show
+-----+-----+---------------+
|COL_1|COL_2|COL_MAP_REDUCED|
+-----+-----+---------------+
|TEXT1|TEXT2| abc|
|TEXT1|TEXT2| xyz|
|TEXT3|TEXT4| abc|
|TEXT3|TEXT4| axy|
|TEXT3|TEXT4| xyz|
+-----+-----+---------------+
【讨论】:
以上是关于Spark将列拆分为数组和聚合计算的主要内容,如果未能解决你的问题,请参考以下文章