SparkSQL - 直接读取镶木地板文件

Posted

技术标签:

【中文标题】SparkSQL - 直接读取镶木地板文件【英文标题】:SparkSQL - Read parquet file directly 【发布时间】:2017-05-06 08:37:44 【问题描述】:

我正在从 Impala 迁移到 SparkSQL,使用以下代码读取表:

my_data = sqlContext.read.parquet('hdfs://my_hdfs_path/my_db.db/my_table')

我如何调用上面的 SparkSQL,所以它可以返回如下内容:

'select col_A, col_B from my_table'

【问题讨论】:

【参考方案1】:

从 parquet 文件创建 Dataframe 后,您必须将其注册为临时表才能在其上运行 sql queries

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.parquet("src/main/resources/peopleTwo.parquet")

df.printSchema

// after registering as a table you will be able to run sql queries
df.registerTempTable("people")

sqlContext.sql("select * from people").collect.foreach(println)

【讨论】:

收集是否必要(或一个好主意)?因为如果数据很大,我们不想把所有的东西都收集给驱动? 它只是一个如何使用 sql 的示例。这取决于你想如何使用它。您也可以更改查询或执行 .take() 以获取驱动程序所需的数据 顺便说一句Symbol SQLContext is deprecated. Use SparkSession.builder instead【参考方案2】:

使用纯 SQL

可以查询 JSON、ORC、Parquet 和 CSV 文件无需在 Spark DataFrame 上创建表

//This Spark 2.x code you can do the same on sqlContext as well
val spark: SparkSession = SparkSession.builder.master("set_the_master").getOrCreate

spark.sql("select col_A, col_B from parquet.`hdfs://my_hdfs_path/my_db.db/my_table`")
   .show()

【讨论】:

我确实看到此错误“找不到文件。可能基础文件已更新。您可以通过在 SQL 中运行“REFRESH TABLE tableName”命令或重新创建涉及数据集/数据帧。”我该如何解决这个问题? 没有帮助 spark.sqlContext().setConf("spark.sql.parquet.cacheMetadata", "false"); 有效!只需将 hdfs://my_hdfs_path/my_db.db/my_table 替换为您的文件路径即可。 :) 这太棒了,你能指出一些关于这种行为的进一步文档吗? 我是从 github 的 spark 代码库中找到的。不确定它的文档。【参考方案3】:

假设您在 HDFS 中有 parquet 文件 ventas4

hdfs://localhost:9000/sistgestion/sql/ventas4

在这种情况下,步骤是:

    为 SQL 上下文充电:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    

    阅读拼花文件:

    val ventas=sqlContext.read.parquet("hdfs://localhost:9000/sistgestion/sql/ventas4")
    

    注册一个临时表:

    ventas.registerTempTable("ventas")
    

    执行查询(在这一行你可以使用toJSON来传递一个JSON格式也可以使用collect()):

    sqlContext.sql("select * from ventas").toJSON.foreach(println(_))
    
    sqlContext.sql("select * from ventas").collect().foreach(println(_))
    

【讨论】:

【参考方案4】:

在 intellij 中使用以下代码:

def groupPlaylistIds(): Unit =
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession.builder.appName("FollowCount")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sqlContext

    val d = sc.read.format("parquet").load("/Users/CCC/Downloads/pq/file1.parquet")
    d.printSchema()

    val d1 = d.select("col1").filter(x => x!='-')
    val d2 = d1.filter(col("col1").startsWith("searchcriteria"));
    d2.groupBy("col1").count().sort(col("count").desc).show(100, false)
  

【讨论】:

问题是使用SparkSQL。这个答案似乎是结构化的API,而不是与问题一致。另外,我认为 Intellij 或任何 IDE 与此处无关

以上是关于SparkSQL - 直接读取镶木地板文件的主要内容,如果未能解决你的问题,请参考以下文章

读取列中具有混合数据类型的镶木地板文件

如何将镶木地板文件的 int64 数据类型列转换为 SparkSQL 数据框中的时间戳?

使用 SparkSQL 和 HiveContext 读取 Parquet 文件时出错

spark sql 无法在 S3 中查询镶木地板分区

如何在独立的 Java 代码中读取镶木地板文件? [关闭]

如何从 spark sql 访问本地镶木地板文件?