spark-sql将Rdd转换为DataFrame进行操作的两种方法

Posted soft.push("zzq")

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark-sql将Rdd转换为DataFrame进行操作的两种方法相关的知识,希望对你有一定的参考价值。

   SparkConf sparkConf = new SparkConf()
                .setMaster("local").setAppName("ClzMap");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        JavaRDD<String> line_str = javaSparkContext.textFile("C:\\Users\\Administrator\\Desktop\\stud.txt");

        JavaRDD<KK> line_kk = line_str.map(new Function<String, KK>() {
            @Override
            public KK call(String s) throws Exception {
                String attr[] = s.split(",");
                KK k = new KK();
                k.setName(attr[0]);
                k.setAge(Integer.parseInt(attr[1]));
                k.setYear(attr[2]);
                return k;
            }
        });

        SQLContext sqlContext = new SQLContext(javaSparkContext);

        DataFrame df = sqlContext.createDataFrame(line_kk, KK.class);

        //在这理由两种方法进行数据过滤(1:使用DataFrame的javaApi,2:使用临时表的sql查询方式)

        //-------------------------第1种-----------------------
        DataFrame df_filter = df.filter(df.col("age").geq(19));
        //-------------------------end-----------------------

        //-------------------------第2种-----------------------
        DataFrame df_filter1 = df.filter(df.col("age").geq(19));
        df_filter1.registerTempTable("KK");//创建一个临时表,参数为表名
        sqlContext.sql("select  * from KK where age>=19");
        //-------------------------end-----------------------

        JavaRDD<Row> df_row = df_filter1.javaRDD();//将DataFrame转化成RDD

        JavaRDD<KK> df_kk = df_row.map(new Function<Row, KK>() {
            @Override
            public KK call(Row row) throws Exception {//row的顺序和原来的文件输入可能有不同
                KK k = new KK();
                k.setAge(row.getInt(0));
                k.setName(row.getString(1));
                k.setYear(row.getString(2));
                return k;
            }
        });

        df_kk.foreach(new VoidFunction<KK>() {
            @Override
            public void call(KK kk) throws Exception {
                System.out.println("getAge->" + kk.getAge());
                System.out.println("getYear->" + kk.getYear());
                System.out.println("getName->" + kk.getName());
                System.out.println("=============");
            }
        });

文本文件的内容:

   由上述代码可以看出,KK是一个实体类型并且可序列化(Serializable)!

zzq,19,2016
yyu,18,2016
uui,90,2015

 



以上是关于spark-sql将Rdd转换为DataFrame进行操作的两种方法的主要内容,如果未能解决你的问题,请参考以下文章

Spark-Sql之DataFrame实战详解

将 Dataframe 转换为 RDD 减少了分区

将 RDD 转换为 DataFrame 并再次转换回来的开销是多少?

将 RDD 转换为 Dataframe Spark

为啥 list 应该先转换为 RDD 再转换为 Dataframe?有啥方法可以将列表转换为数据框?

将带有数组的 RDD 转换为 DataFrame