如何将大型 Spark DataFrame(1.2 GB 14M 行)写入 MSSQL Server tbl?我目前的解决方案大约需要 10 个小时

Posted

技术标签:

【中文标题】如何将大型 Spark DataFrame(1.2 GB 14M 行)写入 MSSQL Server tbl?我目前的解决方案大约需要 10 个小时【英文标题】:How to write large Spark DataFrame(1.2 GB 14M rows) to an MSSQL Server tbl? My current solution takes around 10 hours 【发布时间】:2020-12-23 02:17:55 【问题描述】:

问题: 1.2 GB(1400 万条记录)存储在 apache spark 数据帧中。计算时间不到 1 分钟,但写入 MSSQL SERVER 表(非索引)需要 10 多个小时。 硬件:(1-VM 8-vcpus,64 GB 内存,固态硬盘)。问题:以下已尝试但没有成功,您有什么绝妙的想法、建议甚至是简单的帮助吗?谢谢

def FUNC_A1(df):  
    start = time.time()
    jdbc_url = f"jdbc:sqlserver://config.get('mydb',
    'host'):1434;database=config.get('mydb', 'database')"
    df.select("F1", "F2", "F3", "F4", "F5", "F6", "F7", "F8", "F9")\ 
      .write.format("jdbc").mode("overwrite") \
      .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
      .option("url", jdbc_url) \
      .option("dbtable", "dbo.tblAFS") \
      .option("user", config.get('mydb', 'username'))\
      .option("password", config.get('mydb', 'password'))\
      .save()

前面的代码创建了表,但是在尝试插入记录时它失败并生成了以下错误消息;

选项 #1 (此代码需要 9 到 10 小时才能完成,它正在将实际结果(数据帧)写入表中。


     # Execute insert into tblAFS for each row in dataframe
     # Using fast execute

     start = time.time()
     df = df.select("F1", "F2", "F3", "F4", "F5",
          "F6", "F7", "F8", "F9").na.fill(0)
     conn = pyodbc.connect(shared.get_odbcconn(config))
     cursor = conn.cursor()
     cursor.fast_executemany = True
     collected = df.rdd.toLocalIterator()
     counter = 1
     for row in collected:
        cursor.execute("""
               INSERT INTO dbo.tblAFS 
               ([F1],[F2],[F3],[F4],[F5],[F6],[F7],[F8],[F9])
               VALUES (?,?,?,?,?,?,?,?,?)""",
                 row["F1"], row["F2"], row["F3"], row["F4"], row["F5"],
                 row["F6"], row["F7"], row["F8"], row["F9"])
        counter = counter + 1
    conn.commit()
    conn.close()

选项 #2 _ 花费的时间与选项 #1_ 一样多_

def FUNC_C1(df):
    # 1. Create csv file for each F2 partition
    # 2. Bulk insert all files generated in table tblAFS

    start = time.time()
    file_path = config.get('action', 'stats_results')
    df = df.select("F1", "F2", "F3", "F4", "F5",
                   "F6", "F7", "F8", "F70").na.fill(0)

    df.write.mode("overwrite").options(header=True).csv(file_path)
    conn = pyodbc.connect(shared.get_odbcconn(config))
    cursor = conn.cursor()
    cursor.fast_executemany = True
    files = glob(f"file_path/*.csv")
    counter = 1
    for file in files:
        cursor.execute(f"BULK INSERT dbo.tblAFS FROM 'getcwd()/file' 
             WITH (FORMAT = 'CSV', FIRSTROW = 2)")
        counter = counter+1
    conn.commit()
    conn.close()

已经考虑过以下想法,但尚未实施,从而大大减少了处理时间。


rdd.collect() 不应在这种情况下使用,因为它会收集所有 数据作为驱动中的数组,这是最简单的退出方式 内存。 rdd.coalesce(1).saveAsTextFile() 也不应该用作 上游阶段的并行性将丢失,无法在 单个节点,数据将从中存储。 rdd.coalesce(1, shuffle = true).saveAsTextFile() 是最简单的 选项,因为它将保持上游任务的处理并行和 然后只对一个节点执行 shuffle (rdd.repartition(1).saveAsTextFile() 是同义词)。 rdd.saveAsSingleTextFile() 如下提供的另外允许一个 将 rdd 存储在具有特定名称的单个文件中,同时保持 rdd.coalesce(1, shuffle = true).saveAsTextFile()。

【问题讨论】:

您所说的数字等于大约。每秒插入 389 行,这太低了。如果你在虚拟机上,也许你有吵闹的邻居? SQL Server 是否在主机上设置了内存预留?它应该..... 【参考方案1】:

有两种方法。首先,删除索引、主键和外键,使用 Spark 作业在该表中插入日期。作业完成后,重新创建索引、主键和外键。

或者,您可以创建一个新表,比如说 table-temp,其结构与原始表相同,但没有任何约束。在表临时表中插入您的数据并使用表临时表更新您的原始表。

使用第一种方式,几天前我在 40 分钟内插入了 500-8 亿行的 Oracle DB。

【讨论】:

不要出于明显的原因丢弃 PK 或 FK(另外,您可能会将 PK 与集群密钥混淆) 这取决于数据库中的目标表是空表还是已经包含数据。如果目标表已经包含日期,正确的做法是使用临时表。 在 SQL 数据库中插入数据不受集群键的影响。它受 Spark 执行器建立的连接数、该表使用的约束数和批处理大小的影响。 “在 SQL 数据库中插入数据不受集群键的影响” - 这是不正确的。 为什么“在SQL数据库中插入数据不受集群键影响”是错误的?

以上是关于如何将大型 Spark DataFrame(1.2 GB 14M 行)写入 MSSQL Server tbl?我目前的解决方案大约需要 10 个小时的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Scala Spark 中的 Excel (xls,xlsx) 文件构造数据框?

Spark-SQL——DataFrame与Dataset

pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()

Spark:如何将 DataFrame 更改为 LibSVM 并执行逻辑回归

Spark SQL - 如何将 DataFrame 写入文本文件?

如何将大型 Pyspark DataFrame 写入 DynamoDB