Apache Spark 中的数据集

Posted

技术标签:

【中文标题】Apache Spark 中的数据集【英文标题】:Datasets in Apache Spark 【发布时间】:2018-04-29 19:58:23 【问题描述】:
Dataset<Tweet> ds = sc.read().json("path").as(Encoders.bean(Tweet.class));
ds.show();
JavaRDD<Tweet> dstry = ds.toJavaRDD();
System.out.println(dstry.first().getClass());
Caused by: java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"
    at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1369)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:197)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1325)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1322)
    at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:90)
    at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:89)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1435)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1497)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1494)
    at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)

当我仔细观察时,我唯一提出的疑问是:

未找到适用于实际参数“org.apache.spark.unsafe.types.UTF8String”的构造函数/方法;候选人是:“public void sparkSQL.Tweet.setId(long)”

【问题讨论】:

推文定义:- 它是一个具有长 id、字符串名称、字符串文本的 getter 和 setter 类 【参考方案1】:

正如@user9718686 所写,您的 id 字段有不同的类型:您的 json 文件中的 String 和您的类定义中的 long。当您将其读入Dataset&lt;Row&gt; 时,Spark 会从文件中推断出架构并检测到 id 的类型为String,这就是为什么当您尝试打印它时它会起作用(正如您在其中一个文件中要求的那样)厘米)。如果您想将数据框设为Dataset&lt;Tweet&gt;,那么您必须将您的 json 文件更改为使用long ids 而不是String,或者您可以让 Spark 在您尝试执行任何 action operation 时投射此 id数据框。

Dataset<Row> rowDataset = sc.read().json("path");
Dataset<Tweet> tweetDataset = rowDataset
                .withColumn("id", rowDataset.col("id").cast(DataTypes.LongType))
                .as(Encoders.bean(Tweet.class));
tweetDataset.printSchema();
System.out.println(tweetDataset.head().getId());

【讨论】:

我终于明白了,耶【参考方案2】:

由于类型不匹配,它会给你一个错误:

Tweet 类将id 字段定义为Long。 您的数据有 idString

您必须转换输入或调整类定义。

【讨论】:

是的,它起作用了,我更改了类定义,然后对其进行了类型转换,但为什么它与数据帧一起使用数据帧的代码:- Dataset df = sc.read().json(path) ; JavaRDD dftry= df.javaRDD().map(s->s.toString()); System.out.println(dftry.take(2)); 谁能回答我?

以上是关于Apache Spark 中的数据集的主要内容,如果未能解决你的问题,请参考以下文章

获取Apache Spark Java中的整个数据集或仅列的摘要

Apache Spark SQL数据集groupBy具有max函数和另一列中的不同值

合并在Apache spark中具有不同列名的两个数据集

使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

Apache Spark Java - 如何遍历行数据集并删除空字段

如何在Apache Spark Java中将数组类型的数据集转换为字符串类型