spark 基本操作

Posted jason-dong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 基本操作相关的知识,希望对你有一定的参考价值。

1.dataframe 基本操作

 def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("test")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    val people = spark.read.format("json").load("people.json")
    people.show()
    /*    +----+-------+
        | age|   name|
        +----+-------+
        |null|Michael|
          |  30|   Andy|
          |  19| Justin|
          +----+-------+   */
    people.printSchema()
    /*root
    |-- age: long (nullable = true)
    |-- name: string (nullable = true)*/
    people.select($"name").show()
    /*    +-------+
        |   name|
        +-------+
        |Michael|
        |   Andy|
        | Justin|
        +-------+*/
    people.select($"name", $"age".cast("string").as("age")).printSchema()
    /*    root
        |-- name: string (nullable = true)
        |-- age: string (nullable = true)*/
    people.select($"name", ($"age" + 1).as("age")).show()
    /*    +-------+----+
        |   name| age|
        +-------+----+
        |Michael|null|
          |   Andy|  31|
          | Justin|  20|
          +-------+----+*/
    people.filter($"age" > 21).show()
    //    +---+----+
    //    |age|name|
    //    +---+----+
    //    | 30|Andy|
    //      +---+----+
    people.groupBy("age").count().show()
    //    +----+-----+
    //    | age|count|
    //    +----+-----+
    //    |  19|    1|
    //      |null|    1|
    //      |  30|    1|
    //      +----+-----+    
    spark.stop()
  }

 

以上是关于spark 基本操作的主要内容,如果未能解决你的问题,请参考以下文章

python+spark程序代码片段

Spark闭包与序列化

Spark:如何加速 foreachRDD?

spark 例子wordcount topk

Spark——RDD算子

控制 spark-sql 和数据帧中的字段可空性