Apache Spark - 分组

Posted

技术标签:

【中文标题】Apache Spark - 分组【英文标题】:Apache Spark - GROUP BY 【发布时间】:2020-08-25 14:48:07 【问题描述】:

我希望使用 Scala 在 Apache Spark 中实现以下功能。我有一组这样的示例记录。

请注意,EffectiveDate 列有时为空白,并且对于相同的 CustomerID 也不同。

现在,作为输出,我希望每个 CustomerID 有一条记录,其中 EffectiveDate 为 MAX。因此,对于上面的示例,我希望突出显示的记录如下所示。

任何人都可以在 Spark(使用 scala)中提供一个脚本来实现上述输出。提前致谢

【问题讨论】:

能否请您删除该图像并将数据作为文本输入?到目前为止,您尝试了哪些,哪些没有奏效? 首先您可以过滤带空白的有效日期记录,然后是 groupBy("customerId").agg(max_("EffectiveDate"))) .... 【参考方案1】:

来了。如果您的 EffectiveDate 列已经是日期,则省略强制转换为日期步骤。

           val df = spark.createDataFrame(
                Seq(
                    (1, "John", 28, "M", "1-Jan-15" ),
                    (1, "John", 28, "M", "1-Feb-15" ),
                    (1, "John", 28, "M", "" ),
                    (1, "John", 28, "M", "1-Mar-14" ),
                    (2, "Jane", 25, "F", "5-Mar-14" ),
                    (2, "Jane", 25, "F", "5-Jun-15" ),
                    (2, "Jane", 25, "F", "3-Feb-14" )
                    
                )
            ).toDF(
                "CustomerID", 
                "CustomerName",
                "Age",
                "Gender",
                "EffectiveDate"
            ).withColumn(
                "EffectiveDate",
                from_unixtime(
                    unix_timestamp(
                        col("EffectiveDate"), "dd-MMM-yy"), 
                    "YYYY-MM-dd").cast("date")
)               
            
            df.show()
        /*
        +----------+------------+---+------+-------------+
        |CustomerID|CustomerName|Age|Gender|EffectiveDate|
        +----------+------------+---+------+-------------+
        |         1|        John| 28|     M|   2015-01-01|
        |         1|        John| 28|     M|   2015-02-01|
        |         1|        John| 28|     M|         null|
        |         1|        John| 28|     M|   2014-03-01|
        |         2|        Jane| 25|     F|   2014-03-05|
        |         2|        Jane| 25|     F|   2015-06-05|
        |         2|        Jane| 25|     F|   2014-02-03|
        +----------+------------+---+------+-------------+
        */
            
            val df_agg = df.groupBy("CustomerID", "CustomerName","Age", "Gender" ).agg(max("EffectiveDate"))  
            df_agg.show()
        
        /*
        +----------+------------+---+------+------------------+
        |CustomerID|CustomerName|Age|Gender|max(EffectiveDate)|
        +----------+------------+---+------+------------------+
        |         2|        Jane| 25|     F|        2015-06-05|
        |         1|        John| 28|     M|        2015-02-01|
        +----------+------------+---+------+------------------+
        */

【讨论】:

以上是关于Apache Spark - 分组的主要内容,如果未能解决你的问题,请参考以下文章

使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件

Spark:求出分组内的TopN

Apache Scala/Python Spark 2.4.4:按年份分组数据以生成/分析新功能

spark分组取 topN

使用dataframe解决spark TopN问题:分组排序取TopN

Spark中分组后的TopN,以及Spark的优化(重点)