Spark将列拆分为数组和聚合计算

Posted

技术标签:

【中文标题】Spark将列拆分为数组和聚合计算【英文标题】:Spark splitting a column into array and aggregate calculation 【发布时间】:2019-12-26 16:56:51 【问题描述】:

我有一个格式如下的 DataFrame:

COL_1    COL_2    COL_3
-----    -----    -----
TEXT1     TEXT2    ["www.a.com/abc", "www.b.com/dgh", "www.c.com/axy", "www.a.com/xyz"]
TEXT3     TEXT4    ["www.a.com/abc", "www.d.com/dgh", "www.a.com/axy", "www.f.com/xyz", "www.f.com/xyz", "www.a.com/xyz"]
TEXT5     TEXT6    ["www.v.com/abc", "www.c.com/axy"]

所有列都是字符串。我想在 spark 中做什么:

将 COL_3 拆分为单独的 URL。 提取域名,然后计算该行中来自域“a.com”的 URL 的百分比。 如果“a.com”的百分比超过一定数量在该行(例如,该行中 50% 的 URL 来自 a.com),我想映射每个 URL 路径到包含 COL_1 和 COL_2 的单独行。

对于上面的示例,输出将类似于以下内容:

COL_1    COL_2    COL_MAP_REDUCED
-----    -----    -----
TEXT1     TEXT2    abc
TEXT1     TEXT2    xyz
TEXT3     TEXT4    xyz
TEXT3     TEXT4    axy

我不是在寻找可以为我解决问题的人,我正在寻找有关如何开始的指导,因为我的 google-foo 让我失望了。

谢谢。

【问题讨论】:

【参考方案1】:
    val df = // your dataframe creation code
    val res = df.mapr=>
    val col3 = r.getAs[String](“col_3”)
    val col2= r.getAs[String](“col_2”)
    val col1= r.getAs[String](“col_1”)
    //operate on col3 as you wish
    val col4_ = yourFunc(col3)

    //if col4 is of type Seq or Array then you can flatten it by using flatMap or explode function

    val col4 = Seq("x","y")    
    var explodedResult = Seq[Tuple4[String,String,String,String]]()

    col4.foreach element =>
    explodedResult = explodedResult :+ (col1, col2, col3, element)
    explodedResult    
    .flatMap(identity(_))

新数据框“res”将包含所有现有列以及新的评估结果。

如果存储此数据框,则会复制大量数据。您也可以像(col1:String,col2:String,col3:String,col4:Seq[String]) 一样将结果存储为数组。如果您需要在单独的行中重复 col4 中的每个值,那么您可以使用 explode 函数,它将 col4 的每一行分解为整个数据框的一行。其语法为df.withColumn("col4", explode(col("col4")).show()

【讨论】:

感谢您的回答。对于每个 col4 输出(在我的示例中为 COL_MAP_REDUCED)有一个单独的行怎么样? 我已经更新了答案。如果这可行,那么请为答案投票。【参考方案2】:
scala> df.show(false)
+-----+-----+------------------------------------------------------------------------------------------------------+
|COL_1|COL_2|COL_3                                                                                                 |
+-----+-----+------------------------------------------------------------------------------------------------------+
|TEXT1|TEXT2|["www.a.com/abc", "www.b.com/dgh", "www.c.com/axy", "www.a.com/xyz"]                                  |
|TEXT3|TEXT4|["www.a.com/abc", "www.d.com/dgh", "www.a.com/axy", "www.f.com/xyz", "www.f.com/xyz", "www.a.com/xyz"]|
|TEXT5|TEXT6|["www.v.com/abc", "www.c.com/axy"]                                                                    |
+-----+-----+------------------------------------------------------------------------------------------------------+

将 COL_3 拆分为单独的 URL。

scala> val df1 = df.withColumn("COL_3", regexp_replace(col("COL_3"), "[\\] \" \\[]",""))
                   .withColumn("COL_3", explode(split(col("COL_3"), ","))) 

scala> df1.show(false)
+-----+-----+-------------+
|COL_1|COL_2|COL_3        |
+-----+-----+-------------+
|TEXT1|TEXT2|www.a.com/abc|
|TEXT1|TEXT2|www.b.com/dgh|
|TEXT1|TEXT2|www.c.com/axy|
|TEXT1|TEXT2|www.a.com/xyz|
|TEXT3|TEXT4|www.a.com/abc|
|TEXT3|TEXT4|www.d.com/dgh|
|TEXT3|TEXT4|www.a.com/axy|
|TEXT3|TEXT4|www.f.com/xyz|
|TEXT3|TEXT4|www.f.com/xyz|
|TEXT3|TEXT4|www.a.com/xyz|
|TEXT5|TEXT6|www.v.com/abc|
|TEXT5|TEXT6|www.c.com/axy|
+-----+-----+-------------+

提取域名,然后计算网址的百分比 来自该行中的域“a.com”。

scala> val df2 = df1.filter(col("COL_3").like("%a.com%"))

scala> df2.show
+-----+-----+-------------+
|COL_1|COL_2|        COL_3|
+-----+-----+-------------+
|TEXT1|TEXT2|www.a.com/abc|
|TEXT1|TEXT2|www.a.com/xyz|
|TEXT3|TEXT4|www.a.com/abc|
|TEXT3|TEXT4|www.a.com/axy|
|TEXT3|TEXT4|www.a.com/xyz|
+-----+-----+-------------+

如果“a.com”在该行中的百分比超过一定数量(50% 例如,该行中的 URL 来自 a.com),我想映射 每个 URL 路径使用 COL_1 和 COL_2 指向单独的行。

您可以通过从数据帧df1 获取col_1col_2 的组计数来获得百分比,并使用数据帧df2 以相同的计数进行计算。

为了你想要的输出

scala> df2.withColumn("COL_MAP_REDUCED", split(col("COL_3"),"/")(1)).drop("COL_3").show
+-----+-----+---------------+
|COL_1|COL_2|COL_MAP_REDUCED|
+-----+-----+---------------+
|TEXT1|TEXT2|            abc|
|TEXT1|TEXT2|            xyz|
|TEXT3|TEXT4|            abc|
|TEXT3|TEXT4|            axy|
|TEXT3|TEXT4|            xyz|
+-----+-----+---------------+

【讨论】:

以上是关于Spark将列拆分为数组和聚合计算的主要内容,如果未能解决你的问题,请参考以下文章

将列字符串拆分为多个列字符串

将列拆分为多行

将列拆分为多行

Spark中的拆分,操作和联合数据框

Pandas 用逗号将列拆分为多列

熊猫:将列中的列表拆分为多行[重复]