如何跨 EMR 集群中的节点运行 python 代码

Posted

技术标签:

【中文标题】如何跨 EMR 集群中的节点运行 python 代码【英文标题】:How to run python code across nodes in an EMR cluster 【发布时间】:2018-09-09 17:27:38 【问题描述】:

我有一个 Amazon EMR 集群 - 30 个节点 我的 Python 代码如下所示 -

spark = SparkSession \
        .builder \
        .appName("App") \
        .config(conf=sparkConf) \
        .getOrCreate()

def fetchCatData(cat, tableName):
    df_gl = spark.sql("select * from  where category = ".format(tableName, cat))
    df_pandas = df_gl.select("*").toPandas()
    df_pandas.to_csv("/tmp/split/_.csv".format(tableName, cat))

catList = [14, 15, 63, 65, 74, 21, 23, 60, 79, 86, 107, 147, 196, 199, 200, 201, 229, 263, 265, 267, 328, 421, 468, 469,504]
tableList = ["Table1","Table2"
             ,"Table3",
             "Table4", "Table5", "Table6",
             "Table7"
             ]

def main(args):
    log4jLogger = spark._jvm.org.apache.log4j
    LOGGER = log4jLogger.LogManager.getLogger(__name__)

    for table in tableList:
        LOGGER.info("Starting Split for ".format(table))
        dataLocation = "s3://test/APP/".format( table)
        df = spark.read.parquet(dataLocation)
        df = df.repartition("CATEGORY").cache()
        df.createOrReplaceTempView(table)
        for cat in catList:
            fetchGLData(cat, table)

我想解决以下问题-

    基本上我想读取我的镶木地板数据,将其按类别划分并将其作为 pandas 数据框存储在 csv 中。 目前我正在按顺序运行,我想在 EMR 中的一个节点上运行每个类别并行运行它 我尝试使用多处理,但对结果不满意。

在最短的时间内解决这个问题的最佳方法是什么?

【问题讨论】:

【参考方案1】:

不确定为什么要转换为 pandas 数据帧,但使用从 spark sql 创建的 spark 数据帧,您可以直接写入 csv。

但是,如果您希望将 csv 作为一个文件,则需要重新分区为 1,这不会使用所有节点。如果您不关心它生成多少文件,那么您可以重新分区数据帧以包含更多分区。然后每个分区将被节点处理并输出,直到所有分区都完成。

单个文件不使用所有节点(注意 .csv 将是包含实际 csv 的文件夹)

df_gl = spark.sql("select * from where category = ".format(tableName, cat)) df_gl.repartition(1).write.mode("overwrite").csv("/tmp/split/_.csv".format(tableName, cat))

使用多个节点进行并行处理并输出为多个拆分文件(注意 .csv 将是包含实际 csv 的文件夹)

df_gl = spark.sql("select * from where category = ".format(tableName, cat)).repartition(10) df_gl.write.mode("overwrite").csv("/tmp/split/_.csv".format(tableName, cat))

【讨论】:

我写回 csv 是因为,我需要将所有这些作为单独的 pandas 数据帧读取并对其进行一些处理。如果我写入 csv,我无法在其上构建 pandas 数据框。

以上是关于如何跨 EMR 集群中的节点运行 python 代码的主要内容,如果未能解决你的问题,请参考以下文章

如何跨 Slurm 集群上的多个节点运行 MPI Python 脚本?错误:警告:无法在 2 个节点上运行 1 个进程,将 nnodes 设置为 1

EMR [使用 MRJob] 的输入数据如何跨节点分布?

寻找有关如何使用 python 启动 AWS EMR 集群以运行 pyspark 步骤的示例

如何在多个子网上运行 AWS EMR 集群?

如何在 Amazon EMR 集群上远程提交 hadoop MR 作业

如何在 Amazon EMR 上将连接器添加到 presto