Spark 中啥样的 RDD 可以使用 saveAsNewAPIHadoopDataset 保存到 BigQuery 表中
Posted
技术标签:
【中文标题】Spark 中啥样的 RDD 可以使用 saveAsNewAPIHadoopDataset 保存到 BigQuery 表中【英文标题】:what kind of RDDs in Spark can be saved to BigQuery table using saveAsNewAPIHadoopDatasetSpark 中什么样的 RDD 可以使用 saveAsNewAPIHadoopDataset 保存到 BigQuery 表中 【发布时间】:2017-10-07 14:20:50 【问题描述】:以Using the BigQuery Connector with Spark为例
// Perform word count.
val wordCounts = (tableData
.map(entry => convertToTuple(entry._2))
.reduceByKey(_ + _))
// Write data back into a new BigQuery table.
// IndirectBigQueryOutputFormat discards keys, so set key to null.
(wordCounts
.map(pair => (null, convertToJson(pair)))
.saveAsNewAPIHadoopDataset(conf))
如果我删除.reduceByKey(_ + _)
部分,那么我将出现以下错误
org.apache.spark.SparkException:作业中止。 在 org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:107) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 在 org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) ... 46 省略 原因:java.io.IOException:Schema 没有字段。表:test_output_40b400dc_1bfe_454a_9aa8_bf9562d54c3f_source 在 com.google.cloud.hadoop.io.bigquery.BigQueryUtils.waitForJobCompletion(BigQueryUtils.java:95) 在 com.google.cloud.hadoop.io.bigquery.BigQueryHelper.importFromGcs(BigQueryHelper.java:164) 在 com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputCommitter.commitJob(IndirectBigQueryOutputCommitter.java:57) 在 org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128) 在 org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101) ... 53 更多
在某些情况下,我不使用 reduceByKey,而是想将我的 RDD 保存在 BigQuery 中。
【问题讨论】:
你能添加完整的错误吗?该错误是因为您所做的代码更改吗? 是的,该错误仅在我的代码更改后发生。 【参考方案1】:尝试使用架构:
object Schema
def apply(record: JsonObject): Schema = Schema (
word = record.get ("word").getAsString,
Count = record.get ("Count").getAsInt
)
case class Schema(word String,
Count :Int
)
并像这样传递这个架构:
wordCounts.map(x=>Schema(x))
希望对你有帮助
【讨论】:
【参考方案2】:java.io.IOException: Schema has no fields
是错误,这意味着 BigQuery 无法自动检测架构。如果您指定类似
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("word").setType("STRING"));
fields.add(new TableFieldSchema().setName("word_count").setType("INTEGER"));
BigQueryOutputConfiguration.configure(conf, ..., new TableSchema().setFields(fields), ...);
您不应再遇到此问题。
我认为.reduceByKey(_ + _)
隐藏这个问题的原因是:
tableData
RDD 最初被划分为许多小碎片,每个碎片都不足以让 BigQuery 自动推断架构。
.reduceByKey(_ + _)
将 RDD 重新分区为更大的分片。
我的预感是,如果您将 .reduceByKey(_ + _)
替换为 .repartition(2)
,那么该作业也应该可以在没有明确提供架构的情况下工作。
【讨论】:
以上是关于Spark 中啥样的 RDD 可以使用 saveAsNewAPIHadoopDataset 保存到 BigQuery 表中的主要内容,如果未能解决你的问题,请参考以下文章
请问啥叫多路访问网络?OSPF中啥样的网络需要选举DR和BDR?(看题)