PySpark foreachPartition 并行写入数据库
Posted
技术标签:
【中文标题】PySpark foreachPartition 并行写入数据库【英文标题】:PySpark foreachPartition write to Database in Parallel 【发布时间】:2018-05-12 21:17:16 【问题描述】:我正在将数百个 XML 文件读入 Spark Dataframe,其中每一行都包含特定事件的元数据和时间序列数据。这些行中的每一行都被转换为一个 rdd,以转换为具有特定键/值结构的一批文档,然后写入数据库。 XML 数据需要分成
def build_documents(data):
# Make dataframe out of data tags
data = pd.DataFrame([i.split(',') for i in list(chain(*(data)))])
# Helper function to Get Batches
for batch in get_batches(data):
x = batch.T.to_dict()
yield x
def process_partition(partition):
client = document_client.DocumentClient(HOST, 'masterKey': MASTER_KEY )
for element in partition:
generator = build_documents(element)
for batch in generator:
client.CreateDocument(collection_link + 'data', batch)
# Write to Database
df.rdd.coalesce(20).foreachPartition(process_partition)
仍在调整分区的数量,但有什么想法可以改进吗?性能真的很慢,正如到目前为止实现的代码所预期的那样。该集群包含 32 个内核,两个驱动程序的 128.0 GB 内存,并且可以扩展到 8 个执行程序。如下所示,只有两个工人在运行,这在进一步扩大规模时显然不是最优的。想法?
【问题讨论】:
虽然您似乎在数据分发方面遇到了一些问题,但它看起来更像是您的 Python 代码和/或您使用的服务存在问题。 73 条记录 / 38MB 的 21 分钟看起来长得不切实际。除非您提供更多详细信息,否则可能很难为您提供帮助。 【参考方案1】:df.rdd.coalesce(20).foreachPartition(process_partition)
会将顺序条目写入数据库。而且,您对函数 process_partition 的逻辑也将是顺序的。
您需要对 def process_partition
的逻辑进行多线程处理。这将加快进程。也可以使用df.rdd.coalesce(20).foreachPartitionAsync(process_partition)
【讨论】:
这在不止一个方面是不正确的。 1. foreachPartition 可以同时在不同的worker上运行不同的partition。 2.您应该尝试将分区中的行批量写入批量写入,以节省时间,为每个分区创建一个到数据库的连接并在分区末尾关闭它。以上是关于PySpark foreachPartition 并行写入数据库的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark RDD 中,如何使用 foreachPartition() 打印出每个分区的第一条记录?
PySpark foreachPartition 并行写入数据库
Spark(21)——foreachPartition foreach
Apache Spark MySQL JavaRDD.foreachPartition - 为啥我得到 ClassNotFoundException