SparkSQL - 直接读取镶木地板文件
Posted
技术标签:
【中文标题】SparkSQL - 直接读取镶木地板文件【英文标题】:SparkSQL - Read parquet file directly 【发布时间】:2017-05-06 08:37:44 【问题描述】:我正在从 Impala 迁移到 SparkSQL,使用以下代码读取表:
my_data = sqlContext.read.parquet('hdfs://my_hdfs_path/my_db.db/my_table')
我如何调用上面的 SparkSQL,所以它可以返回如下内容:
'select col_A, col_B from my_table'
【问题讨论】:
【参考方案1】:从 parquet 文件创建 Dataframe 后,您必须将其注册为临时表才能在其上运行 sql queries
。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("src/main/resources/peopleTwo.parquet")
df.printSchema
// after registering as a table you will be able to run sql queries
df.registerTempTable("people")
sqlContext.sql("select * from people").collect.foreach(println)
【讨论】:
收集是否必要(或一个好主意)?因为如果数据很大,我们不想把所有的东西都收集给驱动? 它只是一个如何使用 sql 的示例。这取决于你想如何使用它。您也可以更改查询或执行 .take() 以获取驱动程序所需的数据 顺便说一句Symbol SQLContext is deprecated. Use SparkSession.builder instead
【参考方案2】:
使用纯 SQL
可以查询 JSON、ORC、Parquet 和 CSV 文件无需在 Spark DataFrame 上创建表。
//This Spark 2.x code you can do the same on sqlContext as well
val spark: SparkSession = SparkSession.builder.master("set_the_master").getOrCreate
spark.sql("select col_A, col_B from parquet.`hdfs://my_hdfs_path/my_db.db/my_table`")
.show()
【讨论】:
我确实看到此错误“找不到文件。可能基础文件已更新。您可以通过在 SQL 中运行“REFRESH TABLE tableName”命令或重新创建涉及数据集/数据帧。”我该如何解决这个问题? 没有帮助 spark.sqlContext().setConf("spark.sql.parquet.cacheMetadata", "false"); 有效!只需将hdfs://my_hdfs_path/my_db.db/my_table
替换为您的文件路径即可。 :)
这太棒了,你能指出一些关于这种行为的进一步文档吗?
我是从 github 的 spark 代码库中找到的。不确定它的文档。【参考方案3】:
假设您在 HDFS 中有 parquet 文件 ventas4:
hdfs://localhost:9000/sistgestion/sql/ventas4
在这种情况下,步骤是:
为 SQL 上下文充电:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
阅读拼花文件:
val ventas=sqlContext.read.parquet("hdfs://localhost:9000/sistgestion/sql/ventas4")
注册一个临时表:
ventas.registerTempTable("ventas")
执行查询(在这一行你可以使用toJSON来传递一个JSON格式也可以使用collect()):
sqlContext.sql("select * from ventas").toJSON.foreach(println(_))
sqlContext.sql("select * from ventas").collect().foreach(println(_))
【讨论】:
【参考方案4】:在 intellij 中使用以下代码:
def groupPlaylistIds(): Unit =
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder.appName("FollowCount")
.master("local[*]")
.getOrCreate()
val sc = spark.sqlContext
val d = sc.read.format("parquet").load("/Users/CCC/Downloads/pq/file1.parquet")
d.printSchema()
val d1 = d.select("col1").filter(x => x!='-')
val d2 = d1.filter(col("col1").startsWith("searchcriteria"));
d2.groupBy("col1").count().sort(col("count").desc).show(100, false)
【讨论】:
问题是使用SparkSQL。这个答案似乎是结构化的API,而不是与问题一致。另外,我认为 Intellij 或任何 IDE 与此处无关以上是关于SparkSQL - 直接读取镶木地板文件的主要内容,如果未能解决你的问题,请参考以下文章
如何将镶木地板文件的 int64 数据类型列转换为 SparkSQL 数据框中的时间戳?