Spark java问题使用java.util.Map类型创建行
Posted
技术标签:
【中文标题】Spark java问题使用java.util.Map类型创建行【英文标题】:Spark java Issue creating row with java.util.Map type 【发布时间】:2017-05-22 23:42:41 【问题描述】:使用火花 2.1
我在里面创建了一个带有 MapDataType 的 DataSet
StructType schema = new StructType(new StructField[]
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
);
Map<String,Integer> abc = new HashMap<String,Integer>();
abc.put("abc", 1);
Row r = RowFactory.create(0, "Hi these are words ", 1, abc);
List<Row> data = Arrays.asList(r);
Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);
wordDataFrame.show();
上面的代码可以正常工作。
但是当我尝试在此 DataSet 上调用 map 函数(用新的 HashMap
StructType schema = new StructType(new StructField[]
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("featuresNew", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
);
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
Dataset<Row> output = input.map(new MapFunction<Row, Row>()
@Override
public Row call(Row row) throws Exception
Map<String, Integer> newMap = new HashMap<String, Integer>();
newMap.put("Transformed string", 1);
return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), newMap);
, encoder);
return output;
错误堆栈:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: java.util.HashMap is not a valid external type for schema of map<string,int>
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:410)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我在这里缺少什么?为什么我会收到“java.util.HashMap is not a valid external type for schema of map
编辑:
我试过 java.util.List 数据类型
StructType schema = new StructType(new StructField[]
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("featuresNew", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
);
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
Dataset<Row> output = input.map(new MapFunction<Row, Row>()
@Override
public Row call(Row row) throws Exception
List<String> xyz = Arrays.asList("Hi", "how", "now");
return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), xyz);
, encoder);
我收到类似的错误消息
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: java.util.Arrays$ArrayList is not a valid external type for schema of array<string>
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:221)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.String 工作正常
StructType schema = new StructType(new StructField[]
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("featuresNew", DataTypes.StringType, false, Metadata.empty())
);
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
Dataset<Row> output = input.map(new MapFunction<Row, Row>()
@Override
public Row call(Row row) throws Exception
String xyz = Arrays.asList("Please", "work", "now").toString();
return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), xyz);
, encoder);
看起来原始数据类型工作正常!
【问题讨论】:
干得好;购买您应该发布您的答案作为您自己问题的答案。那你就可以接受了! 【参考方案1】:如果您查看row.getMap(3)
。它正在返回scala.collection.Map
scala.collection.Map<Object, Object> map = row.getMap(3);
所以,看来你需要使用scala.collection.JavaConverters
JavaConverters.mapAsScalaMapConverter(newMap).asScala();
【讨论】:
【参考方案2】:以下内容实际上是提问者找到的,我从问题中提取出来,以便其他人可以在正确的位置找到答案:
解决方案: 这对我有用
我使用了[Converting Java HashMap to Scala Map][1],改代码如下
StructType schema = new StructType(new StructField[]
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("featuresNew", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
);
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
Dataset<Row> output = input.map(new MapFunction<Row, Row>()
@Override
public Row call(Row row) throws Exception
HashMap<String, Integer> newMap = new HashMap<String,Integer();
newMap.put("Transformed string", 1);
return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), ToScalaExample.toScalaMap(newMap));
, encoder);
return output;
我认为对于原始数据类型,spark 隐式地将 java 数据类型转换为 Scala 数据类型。对于其他我们需要显式转换它们。
【讨论】:
以上是关于Spark java问题使用java.util.Map类型创建行的主要内容,如果未能解决你的问题,请参考以下文章
遇到的问题---spark---spark OutOfMemoryError: Java heap space
遇到的问题---spark---spark OutOfMemoryError: Java heap space
Spark java问题使用java.util.Map类型创建行