spark sql 基本用法

Posted 永不停歇—胡章诚

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark sql 基本用法相关的知识,希望对你有一定的参考价值。

 一、通过结构化数据创建DataFrame:

publicstaticvoid main(String[] args) {
   SparkConf conf = new SparkConf()

.setAppName("DataFrameCreate").setMaster("local");  

        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        
        DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");  //结构化数据直接加载为DataFrame
        
        df.show();  
    }

 

二、通过RDD创建DataFrame的两种创建方式

 (数据源students.txt的数据截图)

2.1通过已知类型的schema创建DataFrame,代码如下:

public static void main(String[] args) {
        SparkConf conf = new SparkConf()
            .setMaster("local")
            .setAppName("RDD2DataFrameReflection");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        JavaRDD<String> lines = sc.textFile("D://students.txt");
        
        //将lines转换成 JavaRDD<Student>
        JavaRDD<Student> students = lines.map(new Function<String, Student>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Student call(String line) throws Exception {
                // TODO Auto-generated method stub
                String[] strPlits = line.split(",");
                Student stu = new Student();
                
                stu.setId(Integer.valueOf(strPlits[0]));
                stu.setName(strPlits[1]);
                stu.setAge(Integer.valueOf(strPlits[2]));
                
                return stu;
            }
            
        });
                
        // 使用反射方式,将RDD转换为DataFrame
        
// 这里要求,JavaBean必须实现Serializable接口,是可序列化的

        
//根据student的schema 和 RDD创建DataFrame
        DataFrame studentsDF = sqlContext.createDataFrame(students, Student.class);
        studentsDF.show();
    }

 

2.2手动创建schema的方式创建DataFrame

 public static void main(String[] args) {

        //...  省略创建sqlContext的过程

      

JavaRDD<String> lines = sc.textFile("D://students.txt");
        
        //将普通RDD装换成JavaRDD<Row>
        JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Row call(String line) throws Exception {
                String[] strArray = line.split(",");
                Row row= RowFactory.create(
                        Integer.valueOf(strArray[0]),    //id
                        strArray[1],    //name
                        Integer.valueOf(strArray[2]));    //age
            
                return row;
            }
        });
        
        //第二步 创建元类型, 即创建schema
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));  
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));  
        structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));  
        StructType structType = DataTypes.createStructType(structFields);

        //根据元数据类型将JavaRDD<Row>转化成DataFrame
        DataFrame studentDF = sqlCotnext.createDataFrame(rowRDD, structType);

        studentDF.show();
    }

 

-》DataFrame、RDD、List互转

JavaRDD<Row> rows = studentDF.javaRDD();

List<Row> studentList = rows.collect(); 

 

三、DataFrame基本用法

        // 打印DataFrame中所有的数据(select * from ...)

df.show();
        // 打印DataFrame的元数据(Schema)
        df.printSchema();
        // 查询某列所有的数据
        df.select("name").show();  
        // 查询某几列所有的数据,并对列进行计算
        df.select(df.col("name"), df.col("age").plus(1)).show();
        // 根据某一列的值进行过滤
        df.filter(df.col("age").gt(18)).show();
        // 根据某一列进行分组,然后进行聚合
        df.groupBy(df.col("age")).count().show();

 

  DataFrame studentDF = sqlCotnext.createDataFrame(rowRDD, structType);
        studentDF.show();

        studentDF.registerTempTable("students"); //将DataFrame注册为零时表,取名students
        
        //对students零时表做sql查询
        DataFrame oldStudentDF = sqlCotnext.sql("select * from students where age>18");

        
        oldStudentDF.show();

 

以上是关于spark sql 基本用法的主要内容,如果未能解决你的问题,请参考以下文章

SQL Cursor 基本用法

[转帖]PG语法解剖--基本sql语句用法入门

mybatis 基本用法

linux中mariadb基本用法详解(企业级)_sql_19

SQL Cursor 基本用法

游标SQL Cursor 基本用法