PySpark - Spark 集群 EC2 - 无法保存到 S3

Posted

技术标签:

【中文标题】PySpark - Spark 集群 EC2 - 无法保存到 S3【英文标题】:PySpark - Spark clusters EC2 - unable to save to S3 【发布时间】:2016-12-06 02:25:16 【问题描述】:

我已经建立了一个包含一个主节点和两个从节点的 spark 集群(我正在使用 Spark Standalone)。该集群在一些示例中运行良好,但不适用于我的应用程序。我的应用程序工作流程是,它将读取 csv -> 提取 csv 中的每一行以及标题 -> 转换为 JSON -> 保存到 S3。这是我的代码:

def upload_func(row):
    f = row.toJSON()
    f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
    print(f)
    print(row.name)

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL data source example") \
        .getOrCreate()
    df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
    df.rdd.map(upload_func)

我还把AWS_Key_IDAWS_Secret_Key 导出到了ec2 环境中。但是,使用上面的代码,我的应用程序不起作用。以下是问题:

    JSON 文件未保存在 S3 中,我已尝试运行应用程序几次并重新加载 S3 页面但没有数据。应用程序完成,日志中没有任何错误。此外,print(f)print(row.name) 不会在日志中打印出来。我需要解决什么问题才能在 S3 上保存 JSON,并且无论如何我可以在日志上打印以进行调试?

    目前我需要将 csv 文件放在工作节点中,以便应用程序可以读取 csv 文件。如何将文件放在另一个地方,比如说主节点,当应用程序运行时,它将 csv 文件拆分到所有工作节点,以便他们可以作为分布式系统并行上传?

    李>

非常感谢您的帮助。提前感谢您的帮助。

更新

在调试 Logger 后,我发现映射函数 upload_func() 未被调用或应用程序无法进入该函数(函数调用前后记录器打印消息)的问题。如果您知道原因,请帮忙?

【问题讨论】:

【参考方案1】:

您需要强制对地图进行评估; spark 只会按需执行工作。

df.rdd.map(upload_func).count() 应该这样做

【讨论】:

以上是关于PySpark - Spark 集群 EC2 - 无法保存到 S3的主要内容,如果未能解决你的问题,请参考以下文章

在 ec2 上启动 pyspark Ipython notebook

无法从 S3 读取 csv 到 AWS 上 EC2 实例上的 pyspark 数据帧

无法访问 EC2 Spark 集群上的 Ganglia

在 google-dataproc 的 Spark 集群中的 pyspark 作业中使用外部库

spark 集群处理后转单机pyspark 或 pands 数据处理 的方法

入门 - Spark, IPython notebook with pyspark