在 PySpark SQL 中并行执行读写 API 调用

Posted

技术标签:

【中文标题】在 PySpark SQL 中并行执行读写 API 调用【英文标题】:Parallel execution of read and write API calls in PySpark SQL 【发布时间】:2020-08-15 16:10:55 【问题描述】:

我需要将 mysql 中一组表的增量记录以 Parquet 格式加载到 Amazon S3。这些表在 AWS MySQL 托管实例中的多个数据库/模式中很常见。代码应该从每个模式(具有一组公共表)中并行复制数据。

我正在使用读取 API PySpark SQL 连接到 MySQL 实例并读取模式的每个表的数据,并使用写入 API 作为 Parquet 文件将结果数据帧写入 S3。我正在为数据库中的每个表循环运行它,如下面的代码所示:

def load_data_to_s3(databases_df):
    db_query_properties = config['mysql-query']
    auto_id_values = config['mysql-auto-id-values']
    for row in databases_df.collect():
        for table in db_query_properties.keys():
            last_recorded_id_value = auto_id_values[table]
            select_sql = "select * from . where id>".format(row.database_name, table, last_recorded_id_value)
            df = spark.read.format("jdbc") \
                    .option("driver", mysql_db_properties['driver']) \
                    .option("url", row.database_connection_url) \
                    .option("dbtable", select_sql) \
                    .option("user", username) \
                    .option("password", password) \
                    .load()
            s3_path = 's3a:////'.format(s3_bucket, database_dir, table)
            df.write.parquet(s3_path, mode="append") 

我想知道如何将此代码扩展到在 EMR 集群中并行运行的多个数据库。请建议我一个合适的方法。如果需要更多详细信息,请告诉我。

【问题讨论】:

为什么不一次向 EMR 提交多个作业(每个 db 一个作业)? 每个数据库服务器有 50 多个数据库,并且有 30 多个这样的服务器。因此,我认为如果我们为每个数据库单独启动它们,检查所有单个作业将是一项繁重的任务。随时就此提供建议。谢谢 嗯..为什么不直接将日志写入 S3如果 发生任何故障?如果您想增加并行度,最简单的方法是拆分作业并为每个作业分配自己的内存和处理器。 我相信您正在寻找的替代方法是使用线程来启动作业的多个实例。但这需要您更改代码。请查看我的答案以获取示例实现。 【参考方案1】:

我可以提出两种解决方案:

1.简单的方法

一次向您的 EMR 提交多个作业(每个数据库一个作业)。如果监控是问题,只需将失败日志写入 S3 或 HDFS。

2。需要更改一些代码

您可以尝试使用线程来并行化从每个数据库中提取的数据。我可以展示如何操作的示例,但您可能需要进行更多更改以适应您的用例。

示例实现:

import threading

def load_data_to_s3(databases_df):
    db_query_properties = config['mysql-query']
    auto_id_values = config['mysql-auto-id-values']
    for row in databases_df.collect():
        for table in db_query_properties.keys():
            last_recorded_id_value = auto_id_values[table]
            select_sql = "select * from . where id>".format(row.database_name, table, last_recorded_id_value)
            df = spark.read.format("jdbc") \
                    .option("driver", mysql_db_properties['driver']) \
                    .option("url", row.database_connection_url) \
                    .option("dbtable", select_sql) \
                    .option("user", username) \
                    .option("password", password) \
                    .load()
            s3_path = 's3a:////'.format(s3_bucket, database_dir, table)
            df.write.parquet(s3_path, mode="append") 

threads = [threading.Thread(target=load_data_to_s3, args=(db) for db in databases_df]
    
for t in threads:
    t.start()

for t in threads:
    t.join() 

另外,请确保使用 set('spark.scheduler.mode', 'FAIR') 属性将调度程序更改为 FAIR。这将为您的每个数据库创建一个线程。如果要控制并行运行的线程数,请相应修改for循环。

此外,如果您想在程序中创建新作业,请将您的 SparkSession 与参数一起传递。

【讨论】:

感谢@Jacob 提供带有示例代码的解决方案。我对此有几个问题。据我所知,线程共享相同的内存空间,其中一个将在任何时间点处于活动状态。将这个概念与 Spark 一起应用是否可以,它扩展了线程的概念,通过为它们提供单独的资源使其真正并行?你认为多处理在这里更有意义吗?请分享您的想法,如果可能的话,让我知道在生产中这样做是否还有其他挑战。 @Raghu 线程将共享相同的内存 - 是的,你是对的。其中之一将在任何时间点处于活动状态 - 不是真的。将创建不同的池来并行运行它们。多处理基本上是我的第一个选择,多处理作为线程的扩展是传递 SparkSession 会给你的。上面提到的任何多处理选项都不需要新资源。但是,如果您将所有内容分配给一个作业(参考动态分配)并且可以花足够的时间很好地调整您的作业(基本上是线程数),线程化也很好。 从生产场景来看,我认为你最好有我在选项1中提到的工作。因为否则,如果你的工作在某个地方中断,它会影响所有表的负载,这是一个非常糟糕的情况! 好的。谢谢@Jacob 的宝贵建议。这有助于我决定采用哪种方法。如果有任何结果或问题,我会通知您。 如果您需要有关 PySpark 功能、作业优化、EMR 大小等方面的更多信息,请随时参考this post。【参考方案2】:

您的list_of_databases 未并行化。要进行并行处理,您应该并行化列表并使用 foreach 或 spark 提供的东西来执行并行工作。


在 EMR 中打开并发选项并为每个表发送 EMR 步骤,或者您可以使用 Spark 的公平调度程序,它可以在内部并行执行作业,只需对您的代码进行少量修改。

【讨论】:

感谢@Lamanus 的回复。我想从代码中做而不是从 EMR(它将启用自动可扩展性功能)。我已将调度程序属性设置为 FAIR ("spark.scheduler.mode": "FAIR")。但是由于基于循环的方法,它仍然按顺序运行。为了更好地理解我当前的代码,我已将其添加到问题中。请检查它并建议我如何修改它以支持并行运行多个数据库。 我尝试使用foreach 传递我的函数。但它抛出了一个错误:PicklingError:无法序列化对象:异常:您似乎正在尝试从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它在工作程序上运行的代码中。 有关详细信息,请参阅 SPARK-5063。这是因为我在函数中使用了 spark session 对象,它是一个驱动程序对象,因此不能在 worker 上运行以进行并行处理。有什么解决方法吗? 那么你应该为每个作业创建一个新的 spark Session。很难说该怎么做,但我之前在我的 scala spark 中完成了。我什至不知道如何在 pyspark 中实现它。 对于较少的数据库听起来不错。但在我的用例中,我需要从数百个数据库中并行提取数据。这个问题必须有更好的解决方案。无论如何,我很感激你的及时回复。谢谢

以上是关于在 PySpark SQL 中并行执行读写 API 调用的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 并行执行上下文中使用 JAR 依赖项

如何使用 PySpark 在桌面本地文件夹的目录中执行文件(pdf、docs、txt、xls)的并行处理?

在pyspark中以分布式方式有效地生成大型DataFrame(没有pyspark.sql.Row)

具有多个接收器的 pyspark 并行处理

《Spark Python API 官方文档中文版》 之 pyspark.sql

如何在pyspark sql中保存表?