Spark Sql 映射问题

Posted

技术标签:

【中文标题】Spark Sql 映射问题【英文标题】:Spark Sql mapping issue 【发布时间】:2017-01-06 00:03:05 【问题描述】:

Sparks2/Java8 Cassandra2 尝试从 Cassandra 读取一些数据,然后在 sparks 中通过查询运行一个组。我的 DF 中只有 2 列 转日期(日期),原点(字符串)

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins  GROUP BY (origin,transdate) ORDER BY cnt DESC LIMIT 1"); `

获取错误:

 `Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'origins.`origin`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value)`

按问题分组已解决,删除分组中的()如下

完整代码:(尝试获取某个来源/位置的最大翻译次数)

JavaRDD<TransByDate> originDateRDD = javaFunctions(sc).cassandraTable("trans", "trans_by_date", CassandraJavaUtil.mapRowTo(TransByDate.class))
                    .select(CassandraJavaUtil.column("origin"), CassandraJavaUtil.column("trans_date").as("transdate")).limit((long)100) ;
Dataset<Row> originDF = sparks.createDataFrame(originDateRDD, TransByDate.class);
String[] columns = originDF.columns();
System.out.println("originDF columns: "+columns[0]+" "+columns[1]) ; -> transdate origin
originDF.createOrReplaceTempView("origins");

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins  GROUP BY origin,transdate ORDER BY cnt DESC LIMIT 1"); 
List list = maxOrigindate.collectAsList(); -> Exception here
int j = list.size();

originDF :转换原点

`public static class TransByDate implements Serializable 
        private String origin;
        private Date transdate;

        public TransByDate()  

        public TransByDate (String origin, Date transdate)  
            this.origin = origin;
            this.transdate= transdate;

        

        public String getOrigin()  return origin; 
        public void setOrigin(String origin)  this.origin = origin; 

        public Date getTransdate()  return transdate; 
        public void setTransdate(Date transdate)  this.transdate = transdate; 

    

架构

root
 |-- transdate: struct (nullable = true)
 |    |-- date: integer (nullable = false)
 |    |-- day: integer (nullable = false)
 |    |-- hours: integer (nullable = false)
 |    |-- minutes: integer (nullable = false)
 |    |-- month: integer (nullable = false)
 |    |-- seconds: integer (nullable = false)
 |    |-- time: long (nullable = false)
 |    |-- timezoneOffset: integer (nullable = false)
 |    |-- year: integer (nullable = false)
 |-- origin: string (nullable = true)

例外 错误执行程序:阶段 2.0 (TID 12) 中任务 0.0 中的异常 scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) …… 线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:阶段 2.0 中的任务 0 失败 1 次,最近一次失败:阶段 2.0 中丢失任务 0.0(TID 12,本地主机):scala.MatchError : Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) ... 驱动程序堆栈跟踪: 在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) ... 在 org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2184) 在 org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559) 在 org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2184) 在 spark.SparkTest.sqlMaxCount(SparkTest.java:244) -> List list = maxOrigindate.collectAsList();

原因:scala.MatchError:Sun Jan 01 00:00:00 PST 2012(class java.util.Date) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)

【问题讨论】:

只需按表达式从组中删除括号,即按原点分组,transdate @Rajat - thx,克服了那个错误,但是下一行 List list = maxOrigindate.collectAsList(); 一大堆异常:scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) ..... 我认为 HellowWorld 应该很简单,但是 添加代码细节,查询行现在没有异常,但是下一行collectAsList() 因此对 DF(计数、收集)的任何操作都会给出异常:原因:scala.MatchError: Sun Jan 01 00:00:00 PST 2012(属于 java.util 类.日期) .但在 DF 上我可以:列、printSchema、createOrReplaceTempView。 由于 Saprk 被懒惰地评估,当您进行计数或收集时,正在调用操作并且实际上正在创建您的数据框。您的架构和存在的数据之间必须存在数据类型不匹配。 【参考方案1】:

您遇到了以下错误。

Caused by: scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at 

这个错误是因为Spark sql 支持java.sql.Date 类型。请查看 Spark 文档here。也可以参考SPARK-2562。

【讨论】:

是的,这行得通。 将类中的数据类型更改为 java.sql.Date 在 long saga 中唯一仍然打开的问题是 sql [基本问题] [1] [1]:***.com/questions/41473949/spark-sql-query-fails/…【参考方案2】:

将查询改为

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, 
transdate, 
COUNT(*) AS cnt FROM origins  GROUP BY origin,transdate 
ORDER BY cnt DESC LIMIT 1"); 

这会起作用。

【讨论】:

以上是关于Spark Sql 映射问题的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL仅映射一列DataFrame

所有 Spark SQL DataType 的 Scala 类型映射是啥

在 Spark SQL 中将结构转换为映射

Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射

如何使用 Spark SQL 在 Parquet 文件中选择嵌套数组和映射

如何解决“不能使用 null 作为映射键!”使用 Group_Map 在 Python 3 中出现 Spark.SQL 错误