如何使用 spark/scala 检查是不是存在大查询表

Posted

技术标签:

【中文标题】如何使用 spark/scala 检查是不是存在大查询表【英文标题】:How to check if big query table exists with spark/scala如何使用 spark/scala 检查是否存在大查询表 【发布时间】:2020-01-30 11:21:36 【问题描述】:

我需要对一组 BQ 表进行某些操作,但当且仅当我确定所有 BQ 表都存在时,我才想做该操作。

我检查了谷歌大查询包,它有一个从 BQ 表中读取数据的示例 - 很好但是如果我的表真的很大怎么办?我无法加载所有表以进行存在性检查,因为这会花费太多时间并且似乎是多余的。

还有其他方法可以实现吗?如果我能在正确的方向上得到一些指示,我会很高兴。

提前谢谢你。

高拉夫

【问题讨论】:

【参考方案1】:

您可以使用方法 tables.get

https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get

否则,您可以在 bash 脚本中运行 BG CLI 命令,该脚本可以从您的 spark 程序中调用。

【讨论】:

谢谢。让我检查一下这个选项。似乎很有希望。 :)。我很快就会回来。【参考方案2】:

spark.read.option(...).load 不会将所有对象加载到数据框中。 spark.read.option(...) 返回 DataFrameReader。当您在其上调用load 时,它将测试连接并发出类似的查询

SELECT * FROM (select * from objects) SPARK_GEN_SUBQ_11 WHERE 1=0

查询不会扫描任何记录,当表不存在时会出错。我不确定 BigQuery 驱动程序,但 jdbc 驱动程序会在此处引发 java 异常,您需要在 try catch 块中进行处理。

因此,您可以调用 load、捕获异常并检查是否可以实例化所有数据帧。这是一些示例代码

def query(q: String) = 
   val reader = spark.read.format("bigquery").option("query", q)
  try 
    Some(reader.load())
   catch 
    case e: Exception => None
  


val dfOpts =  Seq(
  query("select * from foo"),
  query("select * from bar"),
  query("select * from baz")
)


if(dfOpts.exists(_.isEmpty))
  println("Some table is missing");


【讨论】:

我看到谷歌没有官方的 spark sql 连接器。有一个,但它处于测试阶段github.com/GoogleCloudDataproc/spark-bigquery-connector 但是即使有官方驱动程序。如果表不存在,它不会在sc.newAPIHadoopRDD 上出错吗?如果是这样,您可以在这里使用类似的方法,通过捕获这些异常并仅在没有引发时继续

以上是关于如何使用 spark/scala 检查是不是存在大查询表的主要内容,如果未能解决你的问题,请参考以下文章

Spark Scala,如何检查数据框中是不是存在嵌套列

如何在 spark scala 中检查与其关联的列名和数据是不是匹配

如何检查列数据 Spark scala 上的 isEmpty

如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件

如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?

如何在 spark scala 中重命名 S3 文件而不是 HDFS