forEach Spark Scala 中的错误:值选择不是 org.apache.spark.sql.Row 的成员

Posted

技术标签:

【中文标题】forEach Spark Scala 中的错误:值选择不是 org.apache.spark.sql.Row 的成员【英文标题】:Error in forEach Spark Scala : value select is not a member of org.apache.spark.sql.Row 【发布时间】:2016-06-30 08:29:47 【问题描述】:

我正在尝试获取文件中所有 json 对象的平均评分。我加载了文件并转换为数据框,但在解析 avg 时出错。 样品请求:


        "country": "France",
        "customerId": "France001",
        "visited": [
            
                "placeName": "US",
                "rating": "2.3",
                "famousRest": "N/A",
                "placeId": "AVBS34"

            ,
              
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "SeriousPie",
                "placeId": "VBSs34"

            ,
              
                "placeName": "Canada",
                "rating": "4.3",
                "famousRest": "TimHortons",
                "placeId": "AVBv4d"

                    
    ]

所以对于这个 json,美国平均评分将为 (2.3 + 3.3)/2 = 2.8


        "country": "Egypt",
        "customerId": "Egypt009",
        "visited": [
            
                "placeName": "US",
                "rating": "1.3",
                "famousRest": "McDonald",
                "placeId": "Dedcf3"

            ,
              
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "EagleNest",
                "placeId": "CDfet3"

            ,





        "country": "Canada",
        "customerId": "Canada012",
        "visited": [
            
                "placeName": "UK",
                "rating": "3.3",
                "famousRest": "N/A",
                "placeId": "XSdce2"

            ,


    ]

对于我们的这个平均值 = (3.3 +1.3)/2 = 2.3

因此,总体而言,平均评分将为:(2.8 + 2.3)/2 = 2.55(只有两个请求在其访问列表中包含“US”)

我的架构:

root
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |   |-- placeId: string (nullable = true)
|    |   |-- placeName: string (nullable = true) 
|    |   |-- famousRest: string (nullable = true)
|    |   |-- rating: string (nullable = true)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("temp.txt")
df.show() 

做的时候:

val app = df.select("strategies"); app.registerTempTable("app"); app.printSchema(); app.show()
app.foreach(
  t =>  t.select("placeName", "rating").where(t("placeName") == "US")
).show()

I am getting : 
<console>:31: error: value select is not a member of org.apache.spark.sql.Row t => t.select("placeName", "rating").where(t("placeName") == "US") ^

谁能告诉我我在这里做错了什么?

【问题讨论】:

【参考方案1】:

假设appDataframe(您的代码示例无法理解...您创建一个df 变量并查询一个app 变量),您不应该调用foreach从中选择:

app.select("placeName", "rating").where(t("placeName") == "US")

foreach 会在每条记录(Row 类型)上调用一个函数。这主要用于调用一些副作用(例如打印到控制台/发送到外部服务等)。大多数情况下,您不会使用它来选择/转换数据框。

更新

至于如何计算仅美国访问的平均值的原始问题:

// explode to make a record out of each "visited" Array item, 
// taking only "placeName" and "rating" columns
val exploded: DataFrame = df.explode(df("visited")) 
  case Row(visits: Seq[Row]) => 
    visits.map(r => (r.getAs[String]("placeName"), r.getAs[String]("rating")))


// make some order: rename columns named _1, _2 (since we used a tuple),
// and cast ratings to Double:
val ratings: DataFrame = exploded
  .withColumnRenamed("_1", "placeName")
  .withColumn("rating", exploded("_2").cast(DoubleType))
  .select("placeName", "rating")

ratings.printSchema()
ratings.show()
/* prints:
root
 |-- placeName: string (nullable = true)
 |-- rating: double (nullable = true)

+---------+------+
|placeName|rating|
+---------+------+
|       US|   1.3|
|       US|   3.3|
|       UK|   3.3|
+---------+------+
 */

// now filter US only and get average rating:
val avg = ratings
  .filter(ratings("placeName") === "US")
  .select(mean("rating"))

avg.show()
/* prints:
 +-----------+
 |avg(rating)|
 +-----------+
 |        2.3|
 +-----------+
  */

【讨论】:

val app = df.select("strategies"); app.registerTempTable("app"); app.printSchema(); app.show() 该应用程序提供的表格由访问列表(这是一个结构数组)组成,这就是为什么每个都用于查看列表 你能指出正确的方向以找到所有 placeName = US 的平均值 我厌倦了这个:df.registerTempTable("people") sqlContext.sql("select avg(expResults.rank) from people LATERAL VIEW explode(visited)people AS expResults where expResults.placeName = 'US ' ").collect().foreach(println) 但出现错误 您应该使用所有这些信息(创建 app 的代码和您尝试过的代码)更新问题 - 我会尝试回答这个问题,但更新问题会帮助其他人这样做太

以上是关于forEach Spark Scala 中的错误:值选择不是 org.apache.spark.sql.Row 的成员的主要内容,如果未能解决你的问题,请参考以下文章

Scala Spark 中的 udf 运行时错误

如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据帧?

Scala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化

带有scala的spark2.2.0中的Regexp_extract抛出错误

Spark Scala:任务不可序列化错误

Spark/scala 中的 SQL 查询