PySpark - Spark 集群 EC2 - 无法保存到 S3
Posted
技术标签:
【中文标题】PySpark - Spark 集群 EC2 - 无法保存到 S3【英文标题】:PySpark - Spark clusters EC2 - unable to save to S3 【发布时间】:2016-12-06 02:25:16 【问题描述】:我已经建立了一个包含一个主节点和两个从节点的 spark 集群(我正在使用 Spark Standalone)。该集群在一些示例中运行良好,但不适用于我的应用程序。我的应用程序工作流程是,它将读取 csv -> 提取 csv 中的每一行以及标题 -> 转换为 JSON -> 保存到 S3。这是我的代码:
def upload_func(row):
f = row.toJSON()
f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
print(f)
print(row.name)
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.getOrCreate()
df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
df.rdd.map(upload_func)
我还把AWS_Key_ID
和AWS_Secret_Key
导出到了ec2 环境中。但是,使用上面的代码,我的应用程序不起作用。以下是问题:
JSON 文件未保存在 S3 中,我已尝试运行应用程序几次并重新加载 S3 页面但没有数据。应用程序完成,日志中没有任何错误。此外,print(f)
和 print(row.name)
不会在日志中打印出来。我需要解决什么问题才能在 S3 上保存 JSON,并且无论如何我可以在日志上打印以进行调试?
目前我需要将 csv 文件放在工作节点中,以便应用程序可以读取 csv 文件。如何将文件放在另一个地方,比如说主节点,当应用程序运行时,它将 csv 文件拆分到所有工作节点,以便他们可以作为分布式系统并行上传?
李>非常感谢您的帮助。提前感谢您的帮助。
更新
在调试 Logger 后,我发现映射函数 upload_func()
未被调用或应用程序无法进入该函数(函数调用前后记录器打印消息)的问题。如果您知道原因,请帮忙?
【问题讨论】:
【参考方案1】:您需要强制对地图进行评估; spark 只会按需执行工作。
df.rdd.map(upload_func).count()
应该这样做
【讨论】:
以上是关于PySpark - Spark 集群 EC2 - 无法保存到 S3的主要内容,如果未能解决你的问题,请参考以下文章
在 ec2 上启动 pyspark Ipython notebook
无法从 S3 读取 csv 到 AWS 上 EC2 实例上的 pyspark 数据帧
在 google-dataproc 的 Spark 集群中的 pyspark 作业中使用外部库