从 JDBC 源迁移数据时如何优化分区?

Posted

技术标签:

【中文标题】从 JDBC 源迁移数据时如何优化分区?【英文标题】:How to optimize partitioning when migrating data from JDBC source? 【发布时间】:2018-10-02 06:38:23 【问题描述】:

我正在尝试将数据从 PostgreSQL 表中的表移动到 HDFS 上的 Hive 表。为此,我想出了以下代码:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = 
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select $allColumns, 0 as $flagCol from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"($execQuery) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) 
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        
        finalDF
  
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION($prtn_String_columns) select * from preparedDF")

数据插入到基于prtn_String_columns: source_system_name, period_year, period_num动态分区的hive表中

使用 Spark 提交:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

执行器日志中会生成以下错误消息:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

我在日志中看到使用给定数量的分区正确执行读取,如下所示:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

下面是执行者分阶段的状态:

数据未正确分区。一个分区较小,而另一个分区变得很大。这里有一个倾斜问题。 将数据插入 Hive 表时,作业在以下行失败:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION($prtn_String_columns) select * from preparedDF") 但我知道这是由于数据倾斜问题而发生的。

我尝试增加执行程序的数量,增加执行程序内存,驱动程序内存,尝试只保存为 csv 文件而不是将数据帧保存到 Hive 表中,但没有任何影响执行给出异常:

java.lang.OutOfMemoryError: GC overhead limit exceeded

我需要更正代码中的任何内容吗?谁能告诉我如何解决这个问题?

【问题讨论】:

实际运行了多少个执行器? 50, 48 跑了。 @cricket_007 你对这个问题有什么建议吗? 除了更多的执行器内存或更多的执行器,不是真的 @cricket_007 我将这些参数设置为: --num-executors 50 --executor-cores 8 --executor-memory 60g 但仍然遇到相同的异常。我需要避免代码中的任何操作吗? 【参考方案1】:

    根据输入数据量和集群资源确定您需要多少个分区。根据经验,除非绝对必要,否则最好将分区输入保持在 1GB 以下。并且严格小于块大小限制。

    您 previously stated 迁移了在不同帖子中使用的 1TB 数据值 (5 - 70) 可能会过低以确保顺利进行。

    尝试使用不需要更多repartitioning的值。

    了解您的数据。

    分析数据集中可用的列,以确定是否有任何具有高基数和均匀分布的列要分布在所需数量的分区中。这些是导入过程的良好候选者。此外,您应该确定一个准确的值范围。

    具有不同中心性和偏度度量的聚合以及直方图和基本按键计数是很好的探索工具。对于这部分,最好直接在数据库中分析数据,而不是将其提取到 Spark。

    根据 RDBMS,您可能可以使用 width_bucket(PostgreSQL、Oracle)或等效函数来大致了解在使用 partitionColumnlowerBoundupperBound 加载后数据将如何在 Spark 中分布, numPartitons.

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
    

    如果没有满足上述条件的列,请考虑:

    创建一个自定义并通过它公开它。一个看法。多个独立列上的散列通常是很好的候选者。请查阅您的数据库手册以确定可以在此处使用的函数(Oracle 中的DBMS_CRYPTO,PostgreSQL 中的pgcrypto)*。

    使用一组独立的列,它们一起提供足够高的基数。

    (可选)如果您要写入已分区的 Hive 表,则应考虑包括 Hive 分区列。它可能会限制以后生成的文件数量。

    准备分区参数

    如果在前面的步骤中选择或创建的列是数字 (or date / timestamp in Spark >= 2.4),则直接将其作为 partitionColumn 提供,并使用之前确定的范围值填充 lowerBoundupperBound

    如果绑定值不反映数据的属性(min(col) 对应于lowerBoundmax(col) 对应于upperBound)可能会导致严重的数据偏差,因此请谨慎处理。在最坏的情况下,当边界没有覆盖数据范围时,所有记录都将由一台机器获取,这并不比不分区好。

    如果在前面的步骤中选择的列是分类列或者是一组列,则生成一个完全覆盖数据的互斥谓词列表,其形式可用于SQL where 子句。

    例如,如果您有一列 A 的值为 a1a2a3,列 B 的值为 b1b2b3

    val predicates = for 
      a <- Seq("a1", "a2", "a3")
      b <- Seq("b1", "b2", "b3")
     yield s"A = $a AND B = $b"
    

    仔细检查条件不重叠并且所有组合都被覆盖。如果不满足这些条件,您将分别得到重复或丢失的记录。

    将数据作为predicates 参数传递给jdbc 调用。请注意,分区数将完全等于谓词数。

    将数据库置于只读模式(任何正在进行的写入都可能导致数据不一致。如果可能,您应该在开始整个过程​​之前锁定数据库,但如果不可能,请在您的组织中)。

    如果分区数匹配所需的输出负载数据而没有repartition 并直接转储到接收器,如果不是,您可以尝试按照与步骤 1 中相同的规则重新分区。

    如果您仍然遇到任何问题,请确保您已正确配置 Spark 内存和 GC 选项。

    如果以上都不起作用:

    考虑将您的数据转储到网络/使用COPY TO 等工具分配存储并直接从那里读取。

    请注意,您通常需要符合 POSIX 标准的文件系统或标准数据库实用程序,因此 HDFS 通常不会这样做。

    这种方式的好处是不用担心列属性,也不需要将数据置于只读模式,保证一致性。

    使用专用的批量传输工具,例如 Apache Sqoop,然后再对数据进行整形。


* 不要使用伪列 - Pseudocolumn in Spark JDBC。

【讨论】:

代码正在运行。但我在这里看到了一个问题。一些表具有复合主键,即两列被称为“主键”。例如: ref_id(integer)、header_id(integer)、source_system_name(String) 被称为主键。在这种情况下如何准备 partitionColumn ? 确实没有要求partitionColumn 是主键,因此只要各个组件列具有上述属性,您就可以安全地使用其中的任何一个。如果出于某种原因你想要两者都可以,例如计算ref_id &lt;&lt; 32 | header_id 之类的东西并将结果用作分区列。或者使用其他一些按位运算,例如ref_id ⊕ header_id。要将其传递给 Spark,您需要数据库视图或 subquery 好的。最后一件事。该代码仅针对具有 1GB 数据的表运行,该表具有一个主键列(整数数据类型),下限和上限是该列的最小值和最大值。但是当我在大小为 400gb 的表上尝试相同的操作时,作业再次失败并出现 GC 开销异常。我将在下面的评论中发布 spark-jdbc。 val dataDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(select * from schema.table where period_year=2016 ) as year2016") .option("user", usrName).option("password", pwd) .option("partitionColumn","header_id") .option("lowerBound", 3275L).option("upperBound", 1152921494159532424L) .option("numPartitions",100).load() 我增加了分区、执行程序和驱动程序内存的数量,但仍然面临同样的异常。如果这也不起作用,有什么我可以尝试申请的吗? header_id 是均匀分布的吗(在应用period_year=2016 谓词之后)?你增加了多少分区?【参考方案2】:

根据我的经验,有 4 种内存设置会有所不同:

A) [1] 用于存储处理原因的数据的内存 VS [2] 用于保存程序堆栈的堆空间

B) [1] Driver VS [2] executor memory

到目前为止,我始终能够通过增加适当类型的内存来成功运行我的 Spark 作业:

因此,A2-B1 将是驱动程序上可用于保存程序堆栈的内存。等等。

属性名称如下:

A1-B1) executor-memory

A1-B2) driver-memory

A2-B1) spark.yarn.executor.memoryOverhead

A2-B2) spark.yarn.driver.memoryOverhead

请记住,所有 *-B1 的总和必须小于您的工作节点上的可用内存,并且所有 *-B2 的总和必须小于您的驱动程序节点上的内存。

我敢打赌,罪魁祸首是粗体标记的堆设置之一。

【讨论】:

【参考方案3】:

你的另一个问题被路由到这里作为重复

 'How to avoid data skewing while reading huge datasets or tables into spark? 
  The data is not being partitioned properly. One partition is smaller while the 
  other one becomes huge on read.
  I observed that one of the partition has nearly 2million rows and 
  while inserting there is a skew in partition. '

如果问题是处理读取后在数据帧中分区的数据,您是否尝试过增加“numPartitions”值?

.option("numPartitions",50)

lowerBound, upperBound 为生成的 WHERE 子句表达式形成分区步长,而 numpartitions 决定了拆分的数量。

例如,sometable 有列 - ID(我们选择它为partitionColumn);我们在表中看到的列-ID 的值范围是从 1 到 1000,我们希望通过运行 select * from sometable 来获取所有记录, 所以我们使用 lowerbound = 1 & upperbound = 1000 和 numpartition = 4

这将通过基于我们的提要 (lowerbound = 1 &amp; upperbound = 1000 and numpartition = 4) 构建 sql 生成一个包含 4 个分区的数据框,其中包含每个查询的结果

select * from sometable where ID < 250
select * from sometable where ID >= 250 and ID < 500
select * from sometable where ID >= 500 and ID < 750
select * from sometable where ID >= 750

如果我们表中的大部分记录都在ID(500,750) 的范围内怎么办。这就是你所处的情况。

当我们增加 numpartition 时,拆分会进一步发生,这会减少同一分区中的记录量,但这 不是一个好镜头。

不是根据我们提供的边界对partitioncolumn进行火花拆分,如果您想自己提供拆分,数据可以均匀 分裂。您需要切换到另一个 JDBC 方法,而不是 (lowerbound,upperbound &amp; numpartition) 我们可以提供 直接谓词。

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame 

Link

【讨论】:

您提供的链接无效。你能给我一个合适的吗? 明白。看你提到的方法,边界我就不用提了,但我这里具体给出的是什么:“谓词:每个分区的where子句中的条件”。数组中的 partitionColumn、lowBound、upperBound 是一样的吗? 类似这样,val query = Array[String]("column >= value1 and column = value2 and column = value3 and column 好的。但是我在表中有 22Bill 行,该分区列中有各种值,我无法为这么多值提供谓词。

以上是关于从 JDBC 源迁移数据时如何优化分区?的主要内容,如果未能解决你的问题,请参考以下文章

执行 Core Data 迁移时,如何从不同实体的源存储中继承旧值?

使用 pg_dump 将 Postgres 从 Windows 迁移到 Linux 时如何选择正确的排序规则来创建数据库?

oracle数据迁移到mysql中去,数据类型不一致

kvm迁移及优化

Flyway jdbc迁移连接问题

Flyway迁移,无法从DataSource获取Jdbc连接