Spark Sql 映射问题
Posted
技术标签:
【中文标题】Spark Sql 映射问题【英文标题】:Spark Sql mapping issue 【发布时间】:2017-01-06 00:03:05 【问题描述】:Sparks2/Java8 Cassandra2 尝试从 Cassandra 读取一些数据,然后在 sparks 中通过查询运行一个组。我的 DF 中只有 2 列 转日期(日期),原点(字符串)
Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins GROUP BY (origin,transdate) ORDER BY cnt DESC LIMIT 1"); `
获取错误:
`Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'origins.`origin`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value)`
按问题分组已解决,删除分组中的()如下
完整代码:(尝试获取某个来源/位置的最大翻译次数)
JavaRDD<TransByDate> originDateRDD = javaFunctions(sc).cassandraTable("trans", "trans_by_date", CassandraJavaUtil.mapRowTo(TransByDate.class))
.select(CassandraJavaUtil.column("origin"), CassandraJavaUtil.column("trans_date").as("transdate")).limit((long)100) ;
Dataset<Row> originDF = sparks.createDataFrame(originDateRDD, TransByDate.class);
String[] columns = originDF.columns();
System.out.println("originDF columns: "+columns[0]+" "+columns[1]) ; -> transdate origin
originDF.createOrReplaceTempView("origins");
Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins GROUP BY origin,transdate ORDER BY cnt DESC LIMIT 1");
List list = maxOrigindate.collectAsList(); -> Exception here
int j = list.size();
originDF 列:转换原点
`public static class TransByDate implements Serializable
private String origin;
private Date transdate;
public TransByDate()
public TransByDate (String origin, Date transdate)
this.origin = origin;
this.transdate= transdate;
public String getOrigin() return origin;
public void setOrigin(String origin) this.origin = origin;
public Date getTransdate() return transdate;
public void setTransdate(Date transdate) this.transdate = transdate;
架构
root
|-- transdate: struct (nullable = true)
| |-- date: integer (nullable = false)
| |-- day: integer (nullable = false)
| |-- hours: integer (nullable = false)
| |-- minutes: integer (nullable = false)
| |-- month: integer (nullable = false)
| |-- seconds: integer (nullable = false)
| |-- time: long (nullable = false)
| |-- timezoneOffset: integer (nullable = false)
| |-- year: integer (nullable = false)
|-- origin: string (nullable = true)
例外 错误执行程序:阶段 2.0 (TID 12) 中任务 0.0 中的异常 scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) …… 线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:阶段 2.0 中的任务 0 失败 1 次,最近一次失败:阶段 2.0 中丢失任务 0.0(TID 12,本地主机):scala.MatchError : Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) ... 驱动程序堆栈跟踪: 在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) ... 在 org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2184) 在 org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559) 在 org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2184) 在 spark.SparkTest.sqlMaxCount(SparkTest.java:244) -> List list = maxOrigindate.collectAsList();
原因:scala.MatchError:Sun Jan 01 00:00:00 PST 2012(class java.util.Date) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
【问题讨论】:
只需按表达式从组中删除括号,即按原点分组,transdate @Rajat - thx,克服了那个错误,但是下一行List list = maxOrigindate.collectAsList();
一大堆异常:scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) .....
我认为 HellowWorld 应该很简单,但是
添加代码细节,查询行现在没有异常,但是下一行collectAsList()
因此对 DF(计数、收集)的任何操作都会给出异常:原因:scala.MatchError: Sun Jan 01 00:00:00 PST 2012(属于 java.util 类.日期) .但在 DF 上我可以:列、printSchema、createOrReplaceTempView。
由于 Saprk 被懒惰地评估,当您进行计数或收集时,正在调用操作并且实际上正在创建您的数据框。您的架构和存在的数据之间必须存在数据类型不匹配。
【参考方案1】:
您遇到了以下错误。
Caused by: scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at
这个错误是因为Spark sql 支持java.sql.Date
类型。请查看 Spark 文档here。也可以参考SPARK-2562。
【讨论】:
是的,这行得通。 将类中的数据类型更改为 java.sql.Date 在 long saga 中唯一仍然打开的问题是 sql [基本问题] [1] [1]:***.com/questions/41473949/spark-sql-query-fails/…【参考方案2】:将查询改为
Dataset<Row> maxOrigindate = sparks.sql("SELECT origin,
transdate,
COUNT(*) AS cnt FROM origins GROUP BY origin,transdate
ORDER BY cnt DESC LIMIT 1");
这会起作用。
【讨论】:
以上是关于Spark Sql 映射问题的主要内容,如果未能解决你的问题,请参考以下文章
所有 Spark SQL DataType 的 Scala 类型映射是啥
Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射
如何使用 Spark SQL 在 Parquet 文件中选择嵌套数组和映射
如何解决“不能使用 null 作为映射键!”使用 Group_Map 在 Python 3 中出现 Spark.SQL 错误