在 SPARK 2.1 中传递包含 ArrayType 列的 javaRDD 时,createDataFrame() 抛出异常

Posted

技术标签:

【中文标题】在 SPARK 2.1 中传递包含 ArrayType 列的 javaRDD 时,createDataFrame() 抛出异常【英文标题】:createDataFrame() throws exception when pass javaRDD that contains ArrayType column in SPARK 2.1 【发布时间】:2017-04-14 12:25:59 【问题描述】:

我想用 createDataframe () 创建一个 Dataframe(在 Spark 2.1 中也称为 Dataset),当我传递一个 List 参数时一切正常,但是当我传递一个 JavaRDD 时它会抛出异常.

[代码]

SparkSession ss = SparkSession.builder().appName("Spark Test").master("local[4]").getOrCreate();

List<Row> data = Arrays.asList(
        RowFactory.create(Arrays.asList("a", "b", "c")),
        RowFactory.create(Arrays.asList("A", "B", "C"))
);

StructType schema = new StructType(new StructField[]
        DataTypes.createStructField("col_1", DataTypes.createArrayType(DataTypes.StringType), false)
);

当我尝试这段代码时,一切正常

ss.createDataFrame(data, schema).show();

+---------+
|    col_1|
+---------+
|[a, b, c]|
|[A, B, C]|
+---------+

但是当我将 JavaRDD 作为第一个参数传递时,它会引发异常

JavaRDD<Row> rdd = JavaSparkContext.fromSparkContext(ss.sparkContext()).parallelize(data);

ss.createDataFrame(rdd, schema).show(); // throws exception

[例外]

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1.0 (TID 3, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Arrays$ArrayList is not a valid external type for schema of array<string>
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.Object), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.Object)), StringType), true), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, col_1), ArrayType(StringType,true))) AS col_1#0
+- mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.Object), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.Object)), StringType), true), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, col_1), ArrayType(StringType,true)))
   :- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.Object)), StringType), true)
   :  +- validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.Object)), StringType)
   :     +- lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.Object))
   +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, col_1), ArrayType(StringType,true))
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, col_1)
         +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
            +- input[0, org.apache.spark.sql.Row, true]

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:293)
    at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547)
    at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.util.Arrays$ArrayList is not a valid external type for schema of array<string>
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
    ... 17 more

我们将不胜感激

【问题讨论】:

如果你尝试 rdd.toDF,你有同样的错误? 【参考方案1】:

如果 Spark 无法将 ArrayList 转换为 String[] 类型,您遇到此问题的原因

我会更改 Rows 以生成 String[] 类型。试试这个代码:

List<Row> data = Arrays.asList(
        Arrays.asList("a", "b", "c"),
        Arrays.asList("A", "B", "C")
).stream().map(r -> 
   String[] arr = r.toArray(new String[r.size()]);
   return RowFactory.create( new Object[]arr);
).collect(Collectors.toList());

【讨论】:

以上是关于在 SPARK 2.1 中传递包含 ArrayType 列的 javaRDD 时,createDataFrame() 抛出异常的主要内容,如果未能解决你的问题,请参考以下文章

Spark:可以使用 DataFrame.saveAsTable 或 DataFrameWriter.options 传递哪些选项?

将 RDD 的值作为变量传递给另一个 RDD - Spark #Pyspark [重复]

Spark是否会通过数据传递多个withColumn?

在 spark 2.1 中访问共享的 SqlContext

在CDH上用外部Spark2.2.1安装和配置 CarbonData

通过python扩展spark mllib 算法包(e.g.基于spark使用孤立森林进行异常检测)