通过 Spark 使用 BigQuery Storage API:请求多个分区但仅获得 1 个
Posted
技术标签:
【中文标题】通过 Spark 使用 BigQuery Storage API:请求多个分区但仅获得 1 个【英文标题】:Using BigQuery Storage API through Spark: Requested multiple partitions but getting only 1 【发布时间】:2019-11-08 12:43:59 【问题描述】:我正在使用 bigquery-spark-connector 从使用 BigQuery Storage API 的 BigQuer 中读取数据。我的脚本(自动)从 BigQuery Storage API 请求多个分区,但我收到警告:
警告 com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation:请求 2 个分区,但仅从 BigQuery Storage API 收到 1 个
Spark 作业需要很长时间,我认为这是因为它没有读取多个分区。如何确保 BigQuery Storage API 为我提供所需的所有分区?这里发生了什么,为什么无论我请求多少,它都只给我一个分区?
首先我创建一个 SparkSession:
SparkSession spark = SparkSession.builder()
.appName("XXX")
.getOrCreate();
这是导致 WARN 的代码:
Dataset<Row> data = spark.read()
.format("bigquery")
.option("table","project.dataset.table")
.load()
.cache();
【问题讨论】:
你的桌子有多大? 我尝试了多个表。它尝试创建 2 个分区的表是 977.25MB,如果我尝试使用更大的表,它会尝试更多的分区,但我总是从 Storage API 收到 1 个分区,无论我正在读取的表的大小如何. 【参考方案1】:在从 BigQuery 存储 API 请求分区时,spark-bigquery-connector 使用一些启发式方法进行询问。返回的分区是 BigQuery 使用的实际分区,可能低于启发式预测的结果。这是正常情况,因此对于这种情况,警告可能有点过于严重(我也与 BigQuery 团队讨论过这个问题)。有关更多上下文,请阅读 requestedStreams 参数 here 的描述。
第二个问题是 Spark 作业需要很长时间。如果增加资源 - 特别是执行器的数量没有帮助,请在 spark-bigquery-connector project 中使用实际流 ID 和火花配置的其余部分打开一个错误,以便连接器和 BoigQuery 团队能够检查它.
【讨论】:
感谢您的解释。我不确定这是否会导致作业运行缓慢,这是我正在调查的第一件事,因为 WARN。您认为这会导致作业运行缓慢还是我应该寻找其他地方? 集群的大小是多少?你有多少执行人?您的 Spark 作业在哪里运行 - 本地、Dataproc、EMR 还是其他?它靠近数据 - 同一区域还是另一个区域?是实际读取慢还是后面的数据处理? 我在 Dataproc 上运行它,只是默认集群 1 个 master,2 个 worker,每个 4 个 CPU,目前仅测试。你是对的,阅读实际上很快,我分别测试了所有步骤,结果是 .write 减慢了它。我将它与 .mode("append").partitionBy("column").csv("gs://output/path") 一起使用。关于加快速度的任何提示? 如果你只有一个分区,所有的写入都将串行完成。尝试将数据重新分区到默认分区,这样写入就会被分发。 我明白你的意思,这样写会更快。我使用 partitionBy("id_column") 为每个包含其所有值的 id 获取单独的输出文件。如果没有 partitionBy(),我不知道该怎么做。以上是关于通过 Spark 使用 BigQuery Storage API:请求多个分区但仅获得 1 个的主要内容,如果未能解决你的问题,请参考以下文章
有没有更好的方法通过 PySpark 集群(dataporc)将 spark df 加载到 BigQuery 中?
Google Spark-BigQuery-Connector如何利用BigQuery Storage API?
在Apache Spark中使用Bigquery Connector时如何设置分区数?