使用 spark-sql cli 将 csv 数据直接加载到 parquet 表中

Posted

技术标签:

【中文标题】使用 spark-sql cli 将 csv 数据直接加载到 parquet 表中【英文标题】:Use spark-sql cli to load csv data directly into parquet table 【发布时间】:2021-10-13 01:40:14 【问题描述】:

我有一个 csv 文件,想将它加载到我硬盘上的 parquet 文件中,然后使用 spark-sql CLI 对其运行 SQL 查询。是否有一两个 spark-sql 命令可以做到这一点?

【参考方案1】:
package spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col, trim

object csv2parquet extends App 
  val spark = SparkSession.builder()
    .master("local")
    .appName("CSV-Parquet")
    .getOrCreate()

  import spark.implicits._
  val sourceFile = "/<path file>/test.csv" // bad data in file
  val targetFile = "/<path file>/testResult.parquet"
  // read csv file
  val df1 = spark.read.option("header", false).csv(sourceFile)
  df1.show(false)
  //    +-------+-------+----------+-----------+
  //    |_c0    |_c1    |_c2       |_c3        |
  //    +-------+-------+----------+-----------+
  //    |Header |TestApp|2020-01-01|null       |
  //    |name   | dept  | age      | batchDate |
  //    |john   | dept1 | 33       | 2020-01-01|
  //    |john   | dept1 | 33       | 2020-01-01|
  //    |john   | dept1 | 33       | 2020-01-01|
  //    |john   | dept1 | 33       | 2020-01-01|
  //    |Trailer|count  |4         |null       |
  //    +-------+-------+----------+-----------+

  // write data to parquet. 
  df1.write.mode("append").parquet(targetFile)

  val resDF = spark.read.parquet(targetFile)
  resDF.show(false)
  //          +-------+-------+----------+-----------+
  //          |_c0    |_c1    |_c2       |_c3        |
  //          +-------+-------+----------+-----------+
  //          |Header |TestApp|2020-01-01|null       |
  //          |name   | dept  | age      | batchDate |
  //          |john   | dept1 | 33       | 2020-01-01|
  //          |john   | dept1 | 33       | 2020-01-01|
  //          |john   | dept1 | 33       | 2020-01-01|
  //          |john   | dept1 | 33       | 2020-01-01|
  //          |Trailer|count  |4         |null       |
  //          +-------+-------+----------+-----------+
  // try sql
  resDF
    .filter(trim(col("_c2")).equalTo(33))
    .select(col("_c2"))
    .show(false)
    //          +---+
    //          |_c2|
    //          +---+
    //          | 33|
    //          | 33|
    //          | 33|
    //          | 33|
    //          +---+

【讨论】:

您能否解释一下为什么要多次编写输出 parquet 文件?还是更新代码?谢谢。 1.例如,仅用于收到许多行。 2. 你只需要一个命令 df1.write.mode("append").parquet(targetFile) 或者如果你使用 table 你可以使用 ....format("parquet").saveAsTable(....)

以上是关于使用 spark-sql cli 将 csv 数据直接加载到 parquet 表中的主要内容,如果未能解决你的问题,请参考以下文章

Spark-sql CLI 在运行查询时仅使用 1 个执行程序

Spark-SQL CLI:未调用 SupportsPushDownFilters.pushFilters

spark-sql(spark sql cli)客户端集成hive

php CLI php脚本使用直接访问magento集合将客户从magento导出到CSV。

php CLI php脚本使用直接访问magento集合将客户从magento导出到CSV。

尝试使用带有反斜杠作为转义字符的Google Bigquery CLI导入CSV时出错