如何将大型 Spark DataFrame(1.2 GB 14M 行)写入 MSSQL Server tbl?我目前的解决方案大约需要 10 个小时
Posted
技术标签:
【中文标题】如何将大型 Spark DataFrame(1.2 GB 14M 行)写入 MSSQL Server tbl?我目前的解决方案大约需要 10 个小时【英文标题】:How to write large Spark DataFrame(1.2 GB 14M rows) to an MSSQL Server tbl? My current solution takes around 10 hours 【发布时间】:2020-12-23 02:17:55 【问题描述】:问题: 1.2 GB(1400 万条记录)存储在 apache spark 数据帧中。计算时间不到 1 分钟,但写入 MSSQL SERVER 表(非索引)需要 10 多个小时。 硬件:(1-VM 8-vcpus,64 GB 内存,固态硬盘)。问题:以下已尝试但没有成功,您有什么绝妙的想法、建议甚至是简单的帮助吗?谢谢
def FUNC_A1(df):
start = time.time()
jdbc_url = f"jdbc:sqlserver://config.get('mydb',
'host'):1434;database=config.get('mydb', 'database')"
df.select("F1", "F2", "F3", "F4", "F5", "F6", "F7", "F8", "F9")\
.write.format("jdbc").mode("overwrite") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
.option("url", jdbc_url) \
.option("dbtable", "dbo.tblAFS") \
.option("user", config.get('mydb', 'username'))\
.option("password", config.get('mydb', 'password'))\
.save()
前面的代码创建了表,但是在尝试插入记录时它失败并生成了以下错误消息;
选项 #1 (此代码需要 9 到 10 小时才能完成,它正在将实际结果(数据帧)写入表中。
# Execute insert into tblAFS for each row in dataframe
# Using fast execute
start = time.time()
df = df.select("F1", "F2", "F3", "F4", "F5",
"F6", "F7", "F8", "F9").na.fill(0)
conn = pyodbc.connect(shared.get_odbcconn(config))
cursor = conn.cursor()
cursor.fast_executemany = True
collected = df.rdd.toLocalIterator()
counter = 1
for row in collected:
cursor.execute("""
INSERT INTO dbo.tblAFS
([F1],[F2],[F3],[F4],[F5],[F6],[F7],[F8],[F9])
VALUES (?,?,?,?,?,?,?,?,?)""",
row["F1"], row["F2"], row["F3"], row["F4"], row["F5"],
row["F6"], row["F7"], row["F8"], row["F9"])
counter = counter + 1
conn.commit()
conn.close()
选项 #2 _ 花费的时间与选项 #1_ 一样多_
def FUNC_C1(df):
# 1. Create csv file for each F2 partition
# 2. Bulk insert all files generated in table tblAFS
start = time.time()
file_path = config.get('action', 'stats_results')
df = df.select("F1", "F2", "F3", "F4", "F5",
"F6", "F7", "F8", "F70").na.fill(0)
df.write.mode("overwrite").options(header=True).csv(file_path)
conn = pyodbc.connect(shared.get_odbcconn(config))
cursor = conn.cursor()
cursor.fast_executemany = True
files = glob(f"file_path/*.csv")
counter = 1
for file in files:
cursor.execute(f"BULK INSERT dbo.tblAFS FROM 'getcwd()/file'
WITH (FORMAT = 'CSV', FIRSTROW = 2)")
counter = counter+1
conn.commit()
conn.close()
已经考虑过以下想法,但尚未实施,从而大大减少了处理时间。
rdd.collect() 不应在这种情况下使用,因为它会收集所有 数据作为驱动中的数组,这是最简单的退出方式 内存。 rdd.coalesce(1).saveAsTextFile() 也不应该用作 上游阶段的并行性将丢失,无法在 单个节点,数据将从中存储。 rdd.coalesce(1, shuffle = true).saveAsTextFile() 是最简单的 选项,因为它将保持上游任务的处理并行和 然后只对一个节点执行 shuffle (rdd.repartition(1).saveAsTextFile() 是同义词)。 rdd.saveAsSingleTextFile() 如下提供的另外允许一个 将 rdd 存储在具有特定名称的单个文件中,同时保持 rdd.coalesce(1, shuffle = true).saveAsTextFile()。
【问题讨论】:
您所说的数字等于大约。每秒插入 389 行,这太低了。如果你在虚拟机上,也许你有吵闹的邻居? SQL Server 是否在主机上设置了内存预留?它应该..... 【参考方案1】:有两种方法。首先,删除索引、主键和外键,使用 Spark 作业在该表中插入日期。作业完成后,重新创建索引、主键和外键。
或者,您可以创建一个新表,比如说 table-temp,其结构与原始表相同,但没有任何约束。在表临时表中插入您的数据并使用表临时表更新您的原始表。
使用第一种方式,几天前我在 40 分钟内插入了 500-8 亿行的 Oracle DB。
【讨论】:
不要出于明显的原因丢弃 PK 或 FK(另外,您可能会将 PK 与集群密钥混淆) 这取决于数据库中的目标表是空表还是已经包含数据。如果目标表已经包含日期,正确的做法是使用临时表。 在 SQL 数据库中插入数据不受集群键的影响。它受 Spark 执行器建立的连接数、该表使用的约束数和批处理大小的影响。 “在 SQL 数据库中插入数据不受集群键的影响” - 这是不正确的。 为什么“在SQL数据库中插入数据不受集群键影响”是错误的?以上是关于如何将大型 Spark DataFrame(1.2 GB 14M 行)写入 MSSQL Server tbl?我目前的解决方案大约需要 10 个小时的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Scala Spark 中的 Excel (xls,xlsx) 文件构造数据框?
pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()
Spark:如何将 DataFrame 更改为 LibSVM 并执行逻辑回归