使用 HDFS 存储的 Spark 作业

Posted

技术标签:

【中文标题】使用 HDFS 存储的 Spark 作业【英文标题】:Spark job using HDFS storage 【发布时间】:2019-10-01 11:53:42 【问题描述】:

我在 Google Cloud Dataproc 上运行了一个长期运行的 Spark Structured Streaming Job,它使用 Kafka 作为源和接收器。我还将检查点保存在 Google Cloud Storage 中。

运行一周后,我注意到它正在稳步消耗所有 100 GB 磁盘存储,将文件保存到 /hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/...

我的理解是我的 Spark 作业不应该对本地磁盘存储有任何依赖。

我在这里完全误解了吗?

我这样提交了我的工作:

(cd  app/src/packages/ &&  zip -r mypkg.zip mypkg/ ) && mv app/src/packages/mypkg.zip build
gcloud dataproc jobs submit pyspark \
    --cluster cluster-26aa \
    --region us-central1 \
    --properties ^#^spark.jars.packages=org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 \
    --py-files build/mypkg.zip \
    --max-failures-per-hour 10 \
    --verbosity info \
    app/src/explode_rmq.py

这些是我工作的相关部分:

来源:

 spark = SparkSession \
        .builder \
        .appName("MyApp") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    spark.sparkContext.addPyFile('mypkg.zip')

    df = spark \
        .readStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("subscribe", "lsport-rmq-12") \
        .option("startingOffsets", "earliest") \
        .load() \
        .select(f.col('key').cast(t.StringType()), f.col('value').cast(t.StringType()))

水槽:

    sink_kafka_q = sink_df \
        .writeStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("topic", "my_topic") \
        .option("checkpointLocation", "gs://my-bucket-data/checkpoints/my_topic") \
        .start()

【问题讨论】:

看起来您正在检查将 CP 和 WAL 信息存储在 hdfs 中的检查点:/hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/... @mazaneicha,这是readStream 检查点吗?那是一回事吗?因为我已经将接收器检查点存储在云中 请您按照this 的说明确定占用磁盘空间的内容。有了这些信息,就会更容易理解如何解决这个问题。 @Igor,我使用 find 命令找到了最大的文件,这就是我找到那些 Hadoop 文件的方法。 你可以对HDFS做同样的事情来识别哪些HDFS文件对应于这个块文件吗? 【参考方案1】:

如果内存不够,Spark 会将信息持久化到本地磁盘。您可以像这样禁用磁盘上的持久性:

df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)

或者你可以尝试像这样将信息序列化以占用更少的内存

df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

读取序列化数据会占用更多 CPU。

每个数据帧都有其独特的序列化级别。

欲了解更多信息:https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

【讨论】:

/hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/... -- 这是 hdfs 块池,而不是 Spark 用于溢出的本地磁盘。 但我正在做一个简单的无状态转换。为什么每天消耗 5 GB 存储空间?我唯一的状态是没有检查站?【参考方案2】:

能否通过 SSH 连接到主节点并运行以下命令来找出谁在消耗 HDFS 空间?

hdfs df -du -h /

我用一个简单的 Spark Pi 作业进行了测试,

在运行作业之前:

$ hdfs dfs -du /
34       /hadoop
0        /tmp
2107947  /user

工作完成后:

$ hdfs dfs -du /user/
0        /user/hbase
0        /user/hdfs
0        /user/hive
0        /user/mapred
0        /user/pig
0        /user/root
2107947  /user/spark
0        /user/yarn
0        /user/zookeeper

$ hdfs dfs -du /user/spark/
2107947  /user/spark/eventlog

似乎它已被 Spark 事件日志消耗,请参阅 spark.eventLog.dir。您可以考虑使用spark.eventLog.compress=true 压缩事件日志或使用spark.eventLog.enabled=false 禁用它

【讨论】:

以上是关于使用 HDFS 存储的 Spark 作业的主要内容,如果未能解决你的问题,请参考以下文章

使用 spark 和 HDFS 作为文件存储系统和 YA​​RN 作为资源管理器的优势是啥?

如何设置 HDFS 文件系统以使用 HDFS 运行 Spark 作业?

如何清洗存储在hadoop(HDFS)中的原始数据

分布式文件存储(HDFS/Cassandra/S3 等)是不是必须让 spark 在集群模式下运行?如果是,为啥?

如何使用 Pig 将数据存储在 HDFS 上的多个分区文件中

将 Spark Streaming 输出写入 HDFS 时跳过数据