如何使用 Java 和 Spark SQL 打印数据集中的行内容?

Posted

技术标签:

【中文标题】如何使用 Java 和 Spark SQL 打印数据集中的行内容?【英文标题】:How can I print the content of rows in a Dataset using Java and the Spark SQL? 【发布时间】:2018-07-31 20:15:49 【问题描述】:

我想做一个简单的 Spark SQL 代码,它读取一个名为 u.data 的文件,其中包含电影分级,创建一个 DatasetRows,然后打印数据集的第一行。

我以将文件读取到JavaRDD为前提,并根据ratingsObject映射RDD(该对象有两个参数,movieIDrating)。所以我只想打印这个数据集中的第一行。

我正在使用 Java 语言和 Spark SQL。

public static void main(String[] args)
    App obj = new App();
    SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();

    Map<Integer,String> movieNames = obj.loadMovieNames();
    JavaRDD<String> lines = spark.read().textFile("hdfs:///ml-100k/u.data").javaRDD();

    JavaRDD<MovieRatings> movies = lines.map(line -> 
        String[] parts = line.split(" ");
        MovieRatings ratingsObject = new MovieRatings();
        ratingsObject.setMovieID(Integer.parseInt(parts[1].trim()));
        ratingsObject.setRating(Integer.parseInt(parts[2].trim()));
        return ratingsObject;
    );

    Dataset<Row> movieDataset = spark.createDataFrame(movies, MovieRatings.class);

    Encoder<Integer> intEncoder = Encoders.INT();
    Dataset<Integer> HUE = movieDataset.map(
            new MapFunction<Row, Integer>()

                private static final long serialVersionUID = -5982149277350252630L;

                @Override
                public Integer call(Row row) throws Exception
                    return row.getInt(0);
                
            , intEncoder
    );

    HUE.show();


    //stop the session
    spark.stop();

我尝试了很多可能的解决方案,但都遇到了同样的错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, localhost, executor 1): java.lang.ArrayIndexOutOfBoundsException: 1
at com.ericsson.SparkMovieRatings.App.lambda$main$1e634467$1(App.java:63)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这是u.data 文件的示例:

196 242 3   881250949
186 302 3   891717742
22  377 1   878887116
244 51  2   880606923
166 346 1   886397596
298 474 4   884182806
115 265 2   881171488
253 465 5   891628467
305 451 3   886324817
6   86  3   883603013
62  257 2   879372434
286 1014    5   879781125
200 222 5   876042340
210 40  3   891035994
224 29  3   888104457
303 785 3   879485318
122 387 5   879270459
194 274 2   879539794

其中第一列代表de UserID,第二列代表MovieID,第三列代表rating,最后一列是时间戳

【问题讨论】:

你能提供一个数据样本吗? 我已经用文件样本更新了帖子。 示例文件中的数据似乎是用\t而不是空格分隔的。 【参考方案1】:

如前所述,您的数据不是空格分隔的。 我将向您展示两种可能的解决方案,第一种基于 RDD,第二种基于 spark sql,一般来说,就性能而言,这是最好的解决方案。

    RDD(你应该使用内置类型来减少开销):

    public class SparkDriver 
    
        public static void main (String args[]) 
                   // Create a configuration object and set the name of 
                   // the application
                      SparkConf conf = new SparkConf().setAppName("application_name");
    
                   // Create a spark Context object
                      JavaSparkContext context = new JavaSparkContext(conf);
    
                  // Create final rdd (suppose you have a text file)
                     JavaPairRDD<Integer,Integer> movieRatingRDD = 
                                contextFile("u.data.txt")
                                .mapToPair(line -> (
                                    String[] tokens = line.split("\\s+");
                                    int movieID = Integer.parseInt(tokens[0]);
                                    int rating = Integer.parseInt(tokens[1]);
                                    return new Tuple2<Integer, Integer>(movieID, rating););
    
                 // Keep in mind that take operation takes the first n elements
                 // and the order is the order of the file.
                    ArrayList<Tuple2<Integer, Integer> list = new ArrayList<>(movieRatingRDD.take(10));
    
                    System.out.println("MovieID\tRating");
    
                    for(tuple : list) 
                       System.out.println(tuple._1 + "\t" + tuple._2);
                    
    
                    context.close();
                  
    

    SQL

    公共类 SparkDriver

    public static void main(String[] args) 
    
    // Create spark session
       SparkSession session = SparkSession.builder().appName("[Spark app sql version]").getOrCreate();
    
       Dataset<MovieRatings> personsDataframe = session.read()
                    .format("tct")
                    .option("header", false)
                    .option("inferSchema", true)
                    .option("delimiter", "\\s+")
                    .load("u.data.txt")
                    .map(row -> 
                       int movieID = row.getInteger(0);
                       int rating = row.getInteger(1);
                       return new MovieRatings(movieID, rating);
                     ).as(Encoders.bean(MovieRatings.class);
    
          // Stop session
             session.stop();
    
      
    
      
    

【讨论】:

不错的答案。 “contextFile”从何而来?

以上是关于如何使用 Java 和 Spark SQL 打印数据集中的行内容?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Java 的 Spark 和 Spark SQL 新手

使用 Apache Spark SQL 和 Java 直接运行 sql 查询

如何在 java 中使用 Apache spark 计算中位数和众数?

如何打印 spark dataframe

如何使用 JAVA 在 Spark SQL 中基于单列删除重复行

实例化 'org.apache.spark.sql.hive.HiveSessionState' 时出错:"