Spark 中啥样的 RDD 可以使用 saveAsNewAPIHadoopDataset 保存到 BigQuery 表中

Posted

技术标签:

【中文标题】Spark 中啥样的 RDD 可以使用 saveAsNewAPIHadoopDataset 保存到 BigQuery 表中【英文标题】:what kind of RDDs in Spark can be saved to BigQuery table using saveAsNewAPIHadoopDatasetSpark 中什么样的 RDD 可以使用 saveAsNewAPIHadoopDataset 保存到 BigQuery 表中 【发布时间】:2017-10-07 14:20:50 【问题描述】:

以Using the BigQuery Connector with Spark为例

// Perform word count.
val wordCounts = (tableData
    .map(entry => convertToTuple(entry._2))
    .reduceByKey(_ + _))

// Write data back into a new BigQuery table.
// IndirectBigQueryOutputFormat discards keys, so set key to null.
(wordCounts
     .map(pair => (null, convertToJson(pair)))
     .saveAsNewAPIHadoopDataset(conf))

如果我删除.reduceByKey(_ + _) 部分,那么我将出现以下错误

org.apache.spark.SparkException:作业中止。 在 org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:107) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 在 org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) ... 46 省略 原因:java.io.IOException:Schema 没有字段。表:test_output_40b400dc_1bfe_454a_9aa8_bf9562d54c3f_source 在 com.google.cloud.hadoop.io.bigquery.BigQueryUtils.waitForJobCompletion(BigQueryUtils.java:95) 在 com.google.cloud.hadoop.io.bigquery.BigQueryHelper.importFromGcs(BigQueryHelper.java:164) 在 com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputCommitter.commitJob(IndirectBigQueryOutputCommitter.java:57) 在 org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128) 在 org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101) ... 53 更多

在某些情况下,我不使用 reduceByKey,而是想将我的 RDD 保存在 BigQuery 中。

【问题讨论】:

你能添加完整的错误吗?该错误是因为您所做的代码更改吗? 是的,该错误仅在我的代码更改后发生。 【参考方案1】:

尝试使用架构:

object Schema 
  def apply(record: JsonObject): Schema = Schema (
      word = record.get ("word").getAsString,
      Count = record.get ("Count").getAsInt
    )

case class Schema(word String,
                  Count :Int
                  )

并像这样传递这个架构:

wordCounts.map(x=>Schema(x))  

希望对你有帮助

【讨论】:

【参考方案2】:

java.io.IOException: Schema has no fields 是错误,这意味着 BigQuery 无法自动检测架构。如果您指定类似

的架构
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("word").setType("STRING"));
fields.add(new TableFieldSchema().setName("word_count").setType("INTEGER"));
BigQueryOutputConfiguration.configure(conf, ..., new TableSchema().setFields(fields), ...);

您不应再遇到此问题。

我认为.reduceByKey(_ + _)隐藏这个问题的原因是:

Schema auto-detection 将随机选择一个文件并扫描最多 100 行数据。 tableData RDD 最初被划分为许多小碎片,每个碎片都不足以让 BigQuery 自动推断架构。 .reduceByKey(_ + _) 将 RDD 重新分区为更大的分片。

我的预感是,如果您将 .reduceByKey(_ + _) 替换为 .repartition(2),那么该作业也应该可以在没有明确提供架构的情况下工作。

【讨论】:

以上是关于Spark 中啥样的 RDD 可以使用 saveAsNewAPIHadoopDataset 保存到 BigQuery 表中的主要内容,如果未能解决你的问题,请参考以下文章

spring中啥样的异常会造成事务回滚?!

请问啥叫多路访问网络?OSPF中啥样的网络需要选举DR和BDR?(看题)

spark中啥是dataframe

请简要描述一下hadoop,spark,mpi三种计算框架的特点以及分别适用于啥样的场景

addSubview 方法是啥样的?

可以通过套接字发送啥样的数据?