如何进行外连接:Spark Scala SQLContext

Posted

技术标签:

【中文标题】如何进行外连接:Spark Scala SQLContext【英文标题】:How to do outer joins : Spark Scala SQLContext 【发布时间】:2016-07-01 04:57:42 【问题描述】:

我正在尝试获取 Total(count of all) 和 Top Elements (count after filters),以便我可以找到所有 jsons (top/total) 中每个 placeName 的百分位数,评分 > 3:

  // sc : An existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val df = sqlContext.jsonFile("temp.txt")
    //df.show()


    val res =  df.withColumn("visited", explode($"visited"))

    val result = res.groupBy($"customerId", $"visited.placeName")

Tried with joins :
val result1 =res.groupBy($"customerId", $"visited.placeName").agg(count("*").alias("total"))

val result2 = res
.filter($"visited.rating" < 4)
  .groupBy($"requestId", $"visited.placeName")  
  .agg(count("*").alias("top"))

result1.show()

result2.show()
percentile = result1.join(result2, List("placeName","customerId"), "outer")
 sqlContext.sql("select top/total as percentile from temp groupBy placeName") 

但给了我错误。

我可以在 udf 中这样做吗:

 val result1 =  result.withColumn("Top", getCount(res , true))
                    .withColumn("Total",getCount(result, false)).show()


    def getCount(df: DataFrame, flag: Boolean): Int 
            if (flag == "true") return df.filter($"visited.rating" < 3).groupBy($"customerId", $"visited.placeName").agg(count("*"))
            else return  df.agg(count("*"))
          

我的架构:

 
        "country": "France",
        "customerId": "France001",
        "visited": [
            
                "placeName": "US",
                "rating": "2",
                "famousRest": "N/A",
                "placeId": "AVBS34"

            ,
              
                "placeName": "US",
                "rating": "3",
                "famousRest": "SeriousPie",
                "placeId": "VBSs34"

            ,
              
                "placeName": "Canada",
                "rating": "3",
                "famousRest": "TimHortons",
                "placeId": "AVBv4d"

                    
    ]


US top = 1 count = 3
Canada top = 1 count = 3



        "country": "Canada",
        "customerId": "Canada012",
        "visited": [
            
                "placeName": "UK",
                "rating": "3",
                "famousRest": "N/A",
                "placeId": "XSdce2"

            ,


    ]

UK top = 1 count = 1



        "country": "France",
        "customerId": "France001",
        "visited": [
            
                "placeName": "US",
                "rating": "4.3",
                "famousRest": "N/A",
                "placeId": "AVBS34"

            ,
              
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "SeriousPie",
                "placeId": "VBSs34"

            ,
              
                "placeName": "Canada",
                "rating": "4.3",
                "famousRest": "TimHortons",
                "placeId": "AVBv4d"

                    
    ]


US top = 2 count = 3
Canada top = 1 count = 3

所以最后我需要类似的东西:

PlaceName  percentile
US         57.14            (1+1+2)/(3+1+3) *100
Canada     33.33            (1+1)/(3+3) *100
UK         100               1*100

架构:

root
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |   |-- placeId: string (nullable = true)
|    |   |-- placeName: string (nullable = true) 
|    |   |-- famousRest: string (nullable = true)
|    |   |-- rating: string (nullable = true)

【问题讨论】:

【参考方案1】:

鉴于您提供的代码,尚不清楚源代码的结构以及为什么会出现此特定错误,但通常此代码甚至远程有效。

getCount 不是 UDF - 不是关键但重要的区别。 getCount 不是有效函数,因为范围内没有 col 类型。除非您出于某种原因将其用作 o.a.s.sql.DataFrame 的类型别名,否则它甚至无法编译! 即使类型匹配,Spark 也不支持嵌套操作/转换,因此您无法使用 UDF 在 Spark DataFrame 上执行查询或聚合。

【讨论】:

我只是想添加我打算做的事情。我尝试使用单独的查询并使用连接: val res = df.withColumn("visited", explode($"visited")) val result1 =res.groupBy($"customerId", $"visited.placeName").agg(count("*").alias("total")) val result2 = res .filter($"visited.rating" &lt; 4) .groupBy($"requestId", $"visited.placeName") .agg(count("*").alias("top")) result1.show() result2.show() 并使用连接:percentile = result1.join(result2, List("placeName","customerId"), "outer") sqlContext.sql("select top/total as percentile from temp groupBy placeName") 但它也会出错。这就是为什么想尝试 pdf (添加计数(顶部和总数)。你能告诉我如何处理这个来获得百分位数

以上是关于如何进行外连接:Spark Scala SQLContext的主要内容,如果未能解决你的问题,请参考以下文章

Spark 2.2 空安全左外连接空指针异常

如何在 Windows 中使用 Scala 将 Cassandra 与 Spark 连接起来

使用 spark 和 scala 进行连接计数时获得性能的最佳方法

Scala(Spark)连接数据框中的列[重复]

如何在scala spark中将数据框的特定列与另一个列连接[重复]

使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集