在 spark 1.6 中计数(不同)不能与 hivecontext 查询一起使用

Posted

技术标签:

【中文标题】在 spark 1.6 中计数(不同)不能与 hivecontext 查询一起使用【英文标题】:Having count(distinct) not working with hivecontext query in spark 1.6 【发布时间】:2016-11-02 16:10:37 【问题描述】:

最近我们进行了从 1.3 到 1.6 版本的 spark 更新,在此更新之后,具有“具有计数(不同)”条件的查询不起作用,我们收到以下错误

查询 ::

hiveContext.sql(  "select A1.x,  A1.y, A1.z from (select concat(g,h) as x,  y,  z   from raw_parquet where f = '') A1   group by A1.x,  A1.y,A1.z  having count(distinct(A1.z)) > 1").show()

“具有 count(*) 的查询工作正常”

例如:

hiveContext.sql(  "select A1.x,  A1.y, A1.z from (select concat(g,h) as x,  y,  z   from raw_parquet where f = '') A1   group by A1.x,  A1.y,A1.z  having count(*) > 1").show()

如果有任何解决方案,请告诉我们。非常感谢

错误::

org.apache.spark.sql.AnalysisException: resolved attribute(s) gid#687,z#688 missing from x#685,y#252,z#255 in operator !Aggregate [x#685,y#252], [cast(((count(if ((gid#687 = 1)) z#688 else null),mode=Complete,isDistinct=false) > cast(1 as bigint)) as boolean) AS havingCondition#686,x#685,y#252];
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:183)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:120)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:120)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:120)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:120)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:120)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:120)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
        at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
        at $iwC$$iwC$$iwC.<init>(<console>:44)
        at $iwC$$iwC.<init>(<console>:46)
        at $iwC.<init>(<console>:48)
        at <init>(<console>:50)
        at .<init>(<console>:54)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

【问题讨论】:

【参考方案1】:

试试这样的:

 df.groupBy("x").count().filter($"count" >= 1).show()

import org.apache.spark.sql.functions.count
df.groupBy("x").agg(count("*").alias("cnt")).where($"cnt"  > 1)

【讨论】:

以上是关于在 spark 1.6 中计数(不同)不能与 hivecontext 查询一起使用的主要内容,如果未能解决你的问题,请参考以下文章

在访问报告中计数

Spark 2.0 将 json 读入带有引号的数据帧中 - 与 spark 1.6 不同的行为......错误?

Spark 1.6以后的内存管理机制

为什么在Hive中计数(明显)慢于group by?

Spark 1.6 RPC内幕解密:运行机制源码详解Netty与Akka等(DT大数据梦工厂)

基于不同类型spark 1.6列的Spark join dataframe