从 Spark 读取时分区 sql 表数据的问题
Posted
技术标签:
【中文标题】从 Spark 读取时分区 sql 表数据的问题【英文标题】:Issue with partioning sql table data when reading from Spark 【发布时间】:2019-07-30 13:47:44 【问题描述】:我编写了一个 Scala 程序,用于从 MS SQL Server 加载数据并将其写入 BigQuery。我在 Spark 集群(Google Dataproc)中执行此操作。我的问题是,即使我有一个 64 核的集群,并且我在运行作业时指定了执行器参数,并且我对正在读取的数据进行了分区,但 Spark 仅从单个执行器读取数据。当我开始工作时,我可以看到所有执行程序都在启动,并且在 SQL Server 上,我可以看到所有 4 个工作人员的连接,但在一分钟内,它们都再次关闭,只留下一个,然后运行了一个多小时整理。
数据集是 6500 万条记录,我正在尝试将其划分为 60 个分区。
这是我的集群:
gcloud dataproc clusters create my-cluster \
--properties dataproc:dataproc.conscrypt.provider.enable=false,spark:spark.executor.userClassPathFirst=true,spark:spark.driver.userClassPathFirst=true \
--region europe-north1 \
--subnet my-subnet \
--master-machine-type n1-standard-4 \
--worker-machine-type n1-highmem-16 \
--master-boot-disk-size 15GB \
--worker-boot-disk-size 500GB \
--image-version 1.4 \
--master-boot-disk-type=pd-ssd \
--worker-boot-disk-type=pd-ssd \
--num-worker-local-ssds=1 \
--num-workers=4
这就是我运行工作的方式:
gcloud dataproc jobs submit spark \
--cluster my-cluster \
--region europe-north1 \
--jars gs://mybucket/mycode.jar,gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar \
--class Main \
--properties \
spark.executor.memory=19g, \
spark.executor.cores=4, \
spark.executor.instances=11 \
-- yarn
这是我用来读取数据的代码:
val data = sqlQuery(ss,
serverName,
portNumber,
databaseName,
userName,
password,
tableName)
writeToBigQuery(
bqConfig,
data,
dataSetName,
replaceInvalidCharactersInTableName(r.getAs[String]("TableName")),
"WRITE_TRUNCATE")
def sqlQuery(ss: SparkSession,
hostName: String,
port: String,
databaseName: String,
user: String,
password: String,
query: String): DataFrame =
val result = ss.read.format("jdbc")
.option("url", getJdbcUrl(hostName, port, databaseName))
.option("dbtable", query)
.option("user", user)
.option("password", password)
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("numPartitions", 60)
.option("partitionColumn", "entityid")
.option("lowerBound", 1)
.option("upperBound", 198012).load()
result
def writeToBigQuery(bqConf: Configuration,
df: DataFrame,
dataset: String,
table: String,
writeDisposition: String = "WRITE_APPEND"): Unit =
//Convert illegal characters in column names
var legalColumnNamesDf = df
for (col <- df.columns)
legalColumnNamesDf = legalColumnNamesDf.withColumnRenamed(
col,
col
.replaceAll("-", "_")
.replaceAll("\\s", "_")
.replaceAll("æ", "ae")
.replaceAll("ø", "oe")
.replaceAll("å", "aa")
.replaceAll("Æ", "AE")
.replaceAll("Ø", "OE")
.replaceAll("Å", "AA")
)
val outputGcsPath = s"gs://$bucket/" + HardcodedValues.SparkTempFolderRelativePath + UUID
.randomUUID()
.toString
val outputTableId = s"$projectId:$dataset.$table"
//Apply explicit schema since to avoid creativity of BigQuery auto config
val uniqBqConf = new Configuration(bqConf)
BigQueryOutputConfiguration.configure(
uniqBqConf,
outputTableId,
s""""fields":$Json(DefaultFormats).write(
legalColumnNamesDf.schema.map(
f =>
Map(
"name" -> f.name,
"type" -> f.dataType.sql
.replace("BIGINT", "INT")
.replace("INT", "INT64")
.replaceAll("DECIMAL\\(\\d+,\\d+\\)", "NUMERIC"),
"mode" -> (if (f.nullable) "NULLABLE"
else "REQUIRED")
))
) """,
outputGcsPath,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
classOf[TextOutputFormat[_, _]]
)
uniqBqConf.set(
BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
if (Array("WRITE_APPEND", "WRITE_TRUNCATE") contains writeDisposition)
writeDisposition
else "WRITE_APPEND"
)
//Save to BigQuery
legalColumnNamesDf.rdd
.map(
row =>
(null,
Json(DefaultFormats).write(
ListMap(row.schema.fieldNames.toSeq.zip(row.toSeq): _*))))
.saveAsNewAPIHadoopDataset(uniqBqConf)
任何想法都将不胜感激。
【问题讨论】:
你也可以发writeToBigQuery
吗?
当然,我已经将它添加到上面的代码中了。
【参考方案1】:
如果您查看 Spark 用户界面,您会发现一项任务正在读取大部分数据是否存在很多偏差?我的猜测是您选择的分区键很差,所以大部分数据最终都在一个分区中。
此 *** 答案提供了详细说明:What is the meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?。我认为您的实体 ID 需要在 1 到 198012 之间均匀分布,才能成为一个很好的分区列。
【讨论】:
感谢您的建议,虽然它不是唯一键,但分布非常均匀。所以几乎没有任何偏差。 知道了——但是当您查看 Spark UI 时,您是否看到了歪斜? (您可以使用--enable-component-gateway
创建集群,从“Web 界面”选项卡 -> YARN UI -> GCP 控制台中的 Spark ApplicationMaster 跳转到 Spark UI)。
感谢您的努力。我试图停下来告诉 spark 运行多少个执行程序并进行动态分配,现在它可以工作了。我还在学习 Spark,所以也许我错过了其他东西,但无论如何,我的问题现在已经解决了。再次感谢。【参考方案2】:
最后,我试着停下来告诉 spark 运行多少个执行程序并进行动态分配,现在它可以工作了。我请求了 24 个分区,它动态分配了 8 个执行器,每个执行器有 3 个核心,并行运行 24 个任务。
【讨论】:
以上是关于从 Spark 读取时分区 sql 表数据的问题的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Spark Streaming DStream 制作为 SQL 表