Spark SQL Java GenericRowWithSchema无法强制转换为java.lang.String
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL Java GenericRowWithSchema无法强制转换为java.lang.String相关的知识,希望对你有一定的参考价值。
我有一个应用程序试图从集群目录中读取一组csv并使用Spark将它们写为镶木地板文件。
SparkSession sparkSession = createSession();
JavaRDD<Row> entityRDD = sparkSession.read()
.csv(dataCluster + "measures/measures-*.csv")
.javaRDD()
.mapPartitionsWithIndex(removeHeader, false)
.map((Function<String, Measure>) s -> {
String[] parts = s.split(COMMA);
Measure measure = new Measure();
measure.setCobDate(parts[0]);
measure.setDatabaseId(Integer.valueOf(parts[1]));
measure.setName(parts[2]);
return measure;
});
Dataset<Row> entityDataFrame = sparkSession.createDataFrame(entityRDD, Measure.class);
entityDataFrame.printSchema();
//Create parquet file here
String parquetDir = dataCluster + "measures/parquet/measures";
entityDataFrame.write().mode(SaveMode.Overwrite).parquet(parquetDir);
sparkSession.stop();
Measure类是一个实现Serializable的简单POJO。打印架构,因此将DataFrame条目转换为镶木地板文件时一定存在问题。这是我得到的错误:
Lost task 2.0 in stage 1.0 (TID 3, redlxd00006.nomura.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:244)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
... 8 more
最终我的目的是使用Spark SQL过滤和连接数据与其他csvs,包含其他表数据,并将整个结果写入实木复合地板。我只发现了与scala相关的问题,这些问题并没有解决我的问题。任何帮助深表感谢。
CSV:
cob_date, database_id, name
20181115,56459865,name1
20181115,56652865,name6
20181115,56459845,name32
20181115,15645936,name3
答案
.map((Function<String, Measure>) s -> {
看起来应该是这样的
.map((Function<Row, Measure>) s -> {
另一答案
根据Serge的建议添加toDF()并更新地图lambda解决了我的问题:
SparkSession sparkSession = createSession();
JavaRDD<Row> entityRDD = sparkSession.read()
.csv(prismDataCluster + "measures/measures-*chop.csv")
.toDF("cobDate","databaseId","name")
.javaRDD()
.mapPartitionsWithIndex(removeHeader, false)
.map((Function<Row, Measure>) row -> {
Measure measure = new Measure();
measure.setCobDate(row.getString(row.fieldIndex("cobDate")));
measure.setDatabaseId(row.getString(row.fieldIndex("databaseId")));
measure.setName(row.getString(row.fieldIndex("name")));
TVM。
以上是关于Spark SQL Java GenericRowWithSchema无法强制转换为java.lang.String的主要内容,如果未能解决你的问题,请参考以下文章
Spark 1.6:java.lang.IllegalArgumentException:spark.sql.execution.id 已设置
Spark 1.6:java.lang.IllegalArgumentException:spark.sql.execution.id已设置
java.lang.IllegalArgumentException:实例化'org.apache.spark.sql.hive.HiveSessionState'时出错:使用spark会话读取csv