将rdd保存到镶木地板文件scala

Posted

技术标签:

【中文标题】将rdd保存到镶木地板文件scala【英文标题】:Pair rdd save to parquet file scala 【发布时间】:2020-04-24 19:05:40 【问题描述】:

我有 RDD[Map[String, String]] ,需要转换为 datframe 以便我可以将数据保存在 parquet 文件中,其中映射键是列名。

例如:

 val inputRdf = spark.sparkContext.parallelize(List(Map("city" -> "", "ip" -> "42.106.1.102", "source" -> "PlayStore","createdDate"->"2020-04-21"),
          Map("city" -> "delhi", "ip" -> "42.1.15.102", "source" -> "PlayStore","createdDate"->"2020-04-21"),
           Map("city" -> "", "ip" -> "42.06.15.102", "source" -> "PlayStore","createdDate"->"2020-04-22")))

输出:

City | ip
Delhi| 1.234

【问题讨论】:

【参考方案1】:

我提供了一些指导来解决您的问题

import org.apache.log4j.Level, Logger
import org.apache.spark.sql.SparkSession

object MapToDfParquet 

  val spark = SparkSession
    .builder()
    .appName("MapToDfParquet")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","MapToDfParquet") // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val sqlContext = spark.sqlContext

  def main(args: Array[String]): Unit = 

    Logger.getRootLogger.setLevel(Level.ERROR)

    try 

      import spark.implicits._

      val data = Seq(Map("city" -> "delhi", "ip" -> "42.1.15.102", "source" -> "PlayStore","createdDate"->"2020-04-21"),
                     Map("city" -> "", "ip" -> "42.06.15.102", "source" -> "PlayStore","createdDate"->"2020-04-22"))
        .map( seq => seq.values.mkString(","))

      val df = sc.parallelize(data)
        .map(str => str.split(","))
        .map(arr => (arr(0),arr(1),arr(2),arr(3)))
        .toDF("city", "ip","source","createdDate")

      df.show(truncate = false)

      // by default writes it will write as parquet with snappy compression
      // we change this behavior and save as parquet uncompressed
      sqlContext.setConf("spark.sql.parquet.compression.codec","uncompressed")
      
      df
        .write
        .parquet("hdfs://quickstart.cloudera/user/cloudera/parquet")

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
     finally 
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    
  

预期输出

+-----+------------+---------+-----------+
|city |ip          |source   |createdDate|
+-----+------------+---------+-----------+
|delhi|42.1.15.102 |PlayStore|2020-04-21 |
|     |42.06.15.102|PlayStore|2020-04-22 |
+-----+------------+---------+-----------+

【讨论】:

嘿,感谢您的努力。我所拥有的是 RDD[Map[String, String]],这种类型的数据我需要保存在 parquet 文件中,其中键成为列名。 嗨!此代码将您的地图保存在镶木地板文件中,键和值是列名。你还需要别的吗? 是的。是的 。输出应该是这样的:+--------+--------------+-------------+---- ---------+ |城市|知识产权 |来源 |创建日期 | +--------+-------------+------------+------------ -+ |德里| 42.1.15.102 |游戏商店 | 2020-04-21 | | | 42.06.15.102 |游戏商店 | 2020-04-22 | +----+------------+---------+---------+------------ --(表格格式)并且我有 RDD[Map[String, String]] 需要转换为数据帧,这样我才能获得高于输出的结果,并且保存在镶木地板中可能很棒。 现在它被保存为未压缩的镶木地板,并具有预期的输出。 yes 完美,但它改变了大型数据集上的数据顺序,但密钥是固定的,这很好,但不明白为什么它不断将数据位置更改为不同的列。

以上是关于将rdd保存到镶木地板文件scala的主要内容,如果未能解决你的问题,请参考以下文章

无法将数据框保存到镶木地板 pyspark

是否可以将巨大的 dask 数据框保存到镶木地板中?

如何将 RDD 保存到单个镶木地板文件中?

无法将数据附加到镶木地板 [FileAlreadyExists 异常]

使用 pyarrow 如何附加到镶木地板文件?

如何附加到镶木地板文件以及它如何影响分区?