如何创建 Pyspark 应用程序
Posted
技术标签:
【中文标题】如何创建 Pyspark 应用程序【英文标题】:How to create Pyspark application 【发布时间】:2019-11-30 05:30:21 【问题描述】:我的要求是使用 pyspark 从 HDFS 读取数据,仅过滤所需的列,删除 NULL 值,然后将处理后的数据写回 HDFS。完成这些步骤后,我们需要从 HDFS 中删除 RAW Dirty 数据。这是我的每个操作的脚本。
导入库和依赖项
#Spark Version = > version 2.4.0-cdh6.3.1
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("example-pyspark-read-and-write").getOrCreate()
import pyspark.sql.functions as F
从 HDFS 读取数据
df_load_1 = sparkSession.read.csv('hdfs:///cdrs/file_path/*.csv', sep = ";")
只选择所需的列
col = [ '_c0', '_c1', '_c2', '_c3', '_c5', '_c7', '_c8', '_c9', '_c10', '_C11', '_c12', '_c13', '_c22', '_C32', '_c34', '_c38', '_c40',
'_c43', '_c46', '_c47', '_c50', '_c52', '_c53', '_c54', '_c56', '_c57', '_c59', '_c62', '_c63','_c77', '_c81','_c83']
df1=df_load_1.select(*[col])
检查 NULL 值,我们有任何删除它们
df_agg_1 = df1.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df1.columns])
df_agg_1.show()
df1 = df1.na.drop()
将预处理后的数据写入HDFS,同一个集群,不同目录
df1.write.csv("hdfs://nm/pyspark_cleaned_data/py_in_gateway.csv")
从 HDFS 中删除原始原始数据
def delete_path(spark , path):
sc = spark.sparkContext
fs = (sc._jvm.org
.apache.hadoop
.fs.FileSystem
.get(sc._jsc.hadoopConfiguration())
)
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
通过传递HDFS绝对路径在下面执行
delete_path(spark , '/cdrs//cdrs/file_path/')
pyspark and HDFS commands
我可以在pyspark
提示符下成功完成所有操作。
现在我想开发应用程序并使用 spark-submit 提交作业
例如
spark-submit --master yarn --deploy-mode client project.py for local
spark-submit --master yarn --deploy-mode cluster project.py for cluster
此时我被卡住了,我不确定我应该在 spark-submit 中传递什么参数。我不确定简单地复制和粘贴上述所有命令并制作.py
文件是否会有所帮助。我对这项技术非常陌生。
【问题讨论】:
你想在哪里运行你的代码? 由于我是全新的,我不确定是否应该在 clinet /cluster /local 模式下运行。我们有集群配置[1个namenode,2个数据节点]并且数据在namenode上,我还需要了解哪种模式适合这个要求 【参考方案1】:基本上,您的 Spark 作业将在集群上运行。 Spark 2.4.4 支持 yarn、kubernetes、mesos 和 spark-standalone 集群 doc。
--master yarn
指定您将 Spark 作业提交到纱线集群。
--deploy-mode
指定是将驱动程序部署在工作节点(集群)还是本地作为外部客户端(客户端)(默认:客户端)
spark-submit --master yarn --deploy-mode client project.py for client mode
spark-submit --master yarn --deploy-mode cluster project.py for cluster mode
spark-submit --master local project.py for local mode
您可以在提交 Spark 作业时提供其他参数,例如 --driver-memory
、--executor-memory
、--num-executors
等检查 here。
【讨论】:
谢谢 wypul,但我正在努力编写 project.py 脚本,我需要帮助,如果你能帮助我编写 python.py 脚本,那将非常有帮助 @Maverick 只需复制粘贴您上面提到的所有步骤。它会起作用的。例如,看看这个示例脚本github.com/apache/spark/blob/master/examples/src/main/python/…以上是关于如何创建 Pyspark 应用程序的主要内容,如果未能解决你的问题,请参考以下文章