如何创建 Pyspark 应用程序

Posted

技术标签:

【中文标题】如何创建 Pyspark 应用程序【英文标题】:How to create Pyspark application 【发布时间】:2019-11-30 05:30:21 【问题描述】:

我的要求是使用 pyspark 从 HDFS 读取数据,仅过滤所需的列,删除 NULL 值,然后将处理后的数据写回 HDFS。完成这些步骤后,我们需要从 HDFS 中删除 RAW Dirty 数据。这是我的每个操作的脚本。

导入库和依赖项

#Spark Version = > version 2.4.0-cdh6.3.1 

from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("example-pyspark-read-and-write").getOrCreate() 
import pyspark.sql.functions as F

从 HDFS 读取数据

df_load_1 = sparkSession.read.csv('hdfs:///cdrs/file_path/*.csv', sep = ";") 

只选择所需的列

col = [ '_c0',  '_c1',  '_c2',  '_c3',  '_c5',  '_c7',  '_c8',  '_c9', '_c10', '_C11', '_c12', '_c13', '_c22', '_C32', '_c34', '_c38', '_c40',
   '_c43', '_c46', '_c47', '_c50', '_c52', '_c53', '_c54', '_c56', '_c57', '_c59', '_c62', '_c63','_c77', '_c81','_c83'] 

df1=df_load_1.select(*[col]) 

检查 NULL 值,我们有任何删除它们

df_agg_1 = df1.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df1.columns])

df_agg_1.show()

df1 = df1.na.drop()

将预处理后的数据写入HDFS,同一个集群,不同目录

df1.write.csv("hdfs://nm/pyspark_cleaned_data/py_in_gateway.csv")

从 HDFS 中删除原始原始数据

def delete_path(spark , path):
    sc = spark.sparkContext
    fs = (sc._jvm.org
          .apache.hadoop
          .fs.FileSystem
          .get(sc._jsc.hadoopConfiguration())
          )
    fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

通过传递HDFS绝对路径在下面执行

delete_path(spark , '/cdrs//cdrs/file_path/')

pyspark and HDFS commands

我可以在pyspark提示符下成功完成所有操作。

现在我想开发应用程序并使用 spark-submit 提交作业

例如

spark-submit --master yarn --deploy-mode client project.py for local 

spark-submit --master yarn --deploy-mode cluster project.py for cluster

此时我被卡住了,我不确定我应该在 spark-submit 中传递什么参数。我不确定简单地复制和粘贴上述所有命令并制作.py 文件是否会有所帮助。我对这项技术非常陌生。

【问题讨论】:

你想在哪里运行你的代码? 由于我是全新的,我不确定是否应该在 clinet /cluster /local 模式下运行。我们有集群配置[1个namenode,2个数据节点]并且数据在namenode上,我还需要了解哪种模式适合这个要求 【参考方案1】:

基本上,您的 Spark 作业将在集群上运行。 Spark 2.4.4 支持 yarn、kubernetes、mesos 和 spark-standalone 集群 doc。

--master yarn 指定您将 Spark 作业提交到纱线集群。

--deploy-mode 指定是将驱动程序部署在工作节点(集群)还是本地作为外部客户端(客户端)(默认:客户端)

spark-submit --master yarn --deploy-mode client project.py for client mode 

spark-submit --master yarn --deploy-mode cluster project.py for cluster mode

spark-submit --master local project.py for local mode

您可以在提交 Spark 作业时提供其他参数,例如 --driver-memory--executor-memory--num-executors 等检查 here。

【讨论】:

谢谢 wypul,但我正在努力编写 project.py 脚本,我需要帮助,如果你能帮助我编写 python.py 脚本,那将非常有帮助 @Maverick 只需复制粘贴您上面提到的所有步骤。它会起作用的。例如,看看这个示例脚本github.com/apache/spark/blob/master/examples/src/main/python/…

以上是关于如何创建 Pyspark 应用程序的主要内容,如果未能解决你的问题,请参考以下文章

如何将pyspark数据帧写入不同的hadoop集群

如何使用 pyspark 在数据块中循环数据框列

如何使用 Pyspark 创建列表 json?

如何使用其模式从 Pyspark 数据框创建配置单元表?

如何使用 pyspark 从文本日志文件的特定部分创建数据框

PySpark如何读取具有多种编码的字符串的文件