从 Spark 读取时分区 sql 表数据的问题

Posted

技术标签:

【中文标题】从 Spark 读取时分区 sql 表数据的问题【英文标题】:Issue with partioning sql table data when reading from Spark 【发布时间】:2019-07-30 13:47:44 【问题描述】:

我编写了一个 Scala 程序,用于从 MS SQL Server 加载数据并将其写入 BigQuery。我在 Spark 集群(Google Dataproc)中执行此操作。我的问题是,即使我有一个 64 核的集群,并且我在运行作业时指定了执行器参数,并且我对正在读取的数据进行了分区,但 Spark 仅从单个执行器读取数据。当我开始工作时,我可以看到所有执行程序都在启动,并且在 SQL Server 上,我可以看到所有 4 个工作人员的连接,但在一分钟内,它们都再次关闭,只留下一个,然后运行了一个多小时整理。

数据集是 6500 万条记录,我正在尝试将其划分为 60 个分区。

这是我的集群:

    gcloud dataproc clusters create my-cluster \
  --properties dataproc:dataproc.conscrypt.provider.enable=false,spark:spark.executor.userClassPathFirst=true,spark:spark.driver.userClassPathFirst=true \
  --region europe-north1 \
  --subnet my-subnet \
  --master-machine-type n1-standard-4 \
  --worker-machine-type n1-highmem-16 \
  --master-boot-disk-size 15GB \
  --worker-boot-disk-size 500GB \
  --image-version 1.4 \
  --master-boot-disk-type=pd-ssd \
  --worker-boot-disk-type=pd-ssd \
  --num-worker-local-ssds=1 \
  --num-workers=4

这就是我运行工作的方式:

    gcloud dataproc jobs submit spark \
--cluster my-cluster \
--region europe-north1 \
--jars gs://mybucket/mycode.jar,gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar \
--class Main \
--properties \
spark.executor.memory=19g, \
spark.executor.cores=4, \
spark.executor.instances=11 \
-- yarn

这是我用来读取数据的代码:

val data = sqlQuery(ss,
                    serverName,
                    portNumber,
                    databaseName,
                    userName,
                    password,
                    tableName)

writeToBigQuery(
      bqConfig,
      data,
      dataSetName,
      replaceInvalidCharactersInTableName(r.getAs[String]("TableName")),
      "WRITE_TRUNCATE")

def sqlQuery(ss: SparkSession,
             hostName: String,
             port: String,
             databaseName: String,
             user: String,
             password: String,
             query: String): DataFrame = 
  val result = ss.read.format("jdbc")
    .option("url", getJdbcUrl(hostName, port, databaseName))
    .option("dbtable", query)
    .option("user", user)
    .option("password", password)
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .option("numPartitions", 60)
    .option("partitionColumn", "entityid")
    .option("lowerBound", 1)
    .option("upperBound", 198012).load()

  result


def writeToBigQuery(bqConf: Configuration,
                    df: DataFrame,
                    dataset: String,
                    table: String,
                    writeDisposition: String = "WRITE_APPEND"): Unit = 

  //Convert illegal characters in column names
  var legalColumnNamesDf = df
  for (col <- df.columns) 
    legalColumnNamesDf = legalColumnNamesDf.withColumnRenamed(
      col,
      col
        .replaceAll("-", "_")
        .replaceAll("\\s", "_")
        .replaceAll("æ", "ae")
        .replaceAll("ø", "oe")
        .replaceAll("å", "aa")
        .replaceAll("Æ", "AE")
        .replaceAll("Ø", "OE")
        .replaceAll("Å", "AA")
    )
  

  val outputGcsPath = s"gs://$bucket/" + HardcodedValues.SparkTempFolderRelativePath + UUID
    .randomUUID()
    .toString
  val outputTableId = s"$projectId:$dataset.$table"

  //Apply explicit schema since to avoid creativity of BigQuery auto config
  val uniqBqConf = new Configuration(bqConf)

  BigQueryOutputConfiguration.configure(
    uniqBqConf,
    outputTableId,
    s""""fields":$Json(DefaultFormats).write(
      legalColumnNamesDf.schema.map(
        f =>
          Map(
            "name" -> f.name,
            "type" -> f.dataType.sql
              .replace("BIGINT", "INT")
              .replace("INT", "INT64")
              .replaceAll("DECIMAL\\(\\d+,\\d+\\)", "NUMERIC"),
            "mode" -> (if (f.nullable) "NULLABLE"
                       else "REQUIRED")
        ))
    ) """,
    outputGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_, _]]
  )

  uniqBqConf.set(
    BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
    if (Array("WRITE_APPEND", "WRITE_TRUNCATE") contains writeDisposition)
      writeDisposition
    else "WRITE_APPEND"
  )

  //Save to BigQuery
  legalColumnNamesDf.rdd
    .map(
      row =>
        (null,
         Json(DefaultFormats).write(
           ListMap(row.schema.fieldNames.toSeq.zip(row.toSeq): _*))))
    .saveAsNewAPIHadoopDataset(uniqBqConf)


任何想法都将不胜感激。

【问题讨论】:

你也可以发writeToBigQuery 吗? 当然,我已经将它添加到上面的代码中了。 【参考方案1】:

如果您查看 Spark 用户界面,您会发现一项任务正在读取大部分数据是否存在很多偏差?我的猜测是您选择的分区键很差,所以大部分数据最终都在一个分区中。

此 *** 答案提供了详细说明:What is the meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?。我认为您的实体 ID 需要在 1 到 198012 之间均匀分布,才能成为一个很好的分区列。

【讨论】:

感谢您的建议,虽然它不是唯一键,但分布非常均匀。所以几乎没有任何偏差。 知道了——但是当您查看 Spark UI 时,您是否看到了歪斜? (您可以使用--enable-component-gateway 创建集群,从“Web 界面”选项卡 -> YARN UI -> GCP 控制台中的 Spark ApplicationMaster 跳转到 Spark UI)。 感谢您的努力。我试图停下来告诉 spark 运行多少个执行程序并进行动态分配,现在它可以工作了。我还在学习 Spark,所以也许我错过了其他东西,但无论如何,我的问题现在已经解决了。再次感谢。【参考方案2】:

最后,我试着停下来告诉 spark 运行多少个执行程序并进行动态分配,现在它可以工作了。我请求了 24 个分区,它动态分配了 8 个执行器,每个执行器有 3 个核心,并行运行 24 个任务。

【讨论】:

以上是关于从 Spark 读取时分区 sql 表数据的问题的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Spark Streaming DStream 制作为 SQL 表

从 spark sql 插入配置单元表

从 Spark 读取 Hive 表作为数据集

sparksql 表定义 存储在哪

Spark Sql 从 Hive orc 分区表中读取,给出数组越界异常

从源码看Spark读取Hive表数据小文件和分块的问题