PySpark foreachPartition 并行写入数据库

Posted

技术标签:

【中文标题】PySpark foreachPartition 并行写入数据库【英文标题】:PySpark foreachPartition write to Database in Parallel 【发布时间】:2018-05-12 21:17:16 【问题描述】:

我正在将数百个 XML 文件读入 Spark Dataframe,其中每一行都包含特定事件的元数据和时间序列数据。这些行中的每一行都被转换为一个 rdd,以转换为具有特定键/值结构的一批文档,然后写入数据库。 XML 数据需要分成

def build_documents(data):

    # Make dataframe out of data tags
    data = pd.DataFrame([i.split(',') for i in list(chain(*(data)))])

    # Helper function to Get Batches
    for batch in get_batches(data): 
          x = batch.T.to_dict()
          yield x

def process_partition(partition):
   client = document_client.DocumentClient(HOST, 'masterKey': MASTER_KEY )
   for element in partition:
        generator = build_documents(element)
        for batch in generator:
            client.CreateDocument(collection_link + 'data', batch)


# Write to Database
df.rdd.coalesce(20).foreachPartition(process_partition)

仍在调整分区的数量,但有什么想法可以改进吗?性能真的很慢,正如到目前为止实现的代码所预期的那样。该集群包含 32 个内核,两个驱动程序的 128.0 GB 内存,并且可以扩展到 8 个执行程序。如下所示,只有两个工人在运行,这在进一步扩大规模时显然不是最优的。想法?

【问题讨论】:

虽然您似乎在数据分发方面遇到了一些问题,但它看起来更像是您的 Python 代码和/或您使用的服务存在问题。 73 条记录 / 38MB 的 21 分钟看起来长得不切实际。除非您提供更多详细信息,否则可能很难为您提供帮助。 【参考方案1】:

df.rdd.coalesce(20).foreachPartition(process_partition) 会将顺序条目写入数据库。而且,您对函数 process_partition 的逻辑也将是顺序的。

您需要对 def process_partition 的逻辑进行多线程处理。这将加快进程。也可以使用df.rdd.coalesce(20).foreachPartitionAsync(process_partition)

【讨论】:

这在不止一个方面是不正确的。 1. foreachPartition 可以同时在不同的worker上运行不同的partition。 2.您应该尝试将分区中的行批量写入批量写入,以节省时间,为每个分区创建一个到数据库的连接并在分区末尾关闭它。

以上是关于PySpark foreachPartition 并行写入数据库的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark RDD 中,如何使用 foreachPartition() 打印出每个分区的第一条记录?

PySpark foreachPartition 并行写入数据库

石墨或grafana可以用来监控pyspark指标吗?

Spark(21)——foreachPartition foreach

Apache Spark MySQL JavaRDD.foreachPartition - 为啥我得到 ClassNotFoundException

foreach和foreachPartition