Spark Streaming 应用程序太慢

Posted

技术标签:

【中文标题】Spark Streaming 应用程序太慢【英文标题】:SparkStreaming application too slow 【发布时间】:2017-05-02 13:13:54 【问题描述】:

在开发 SparkStreaming 应用程序 (python) 时,我不完全确定我是否理解它的工作原理。 我只需要读取一个 json 文件流(弹出一个目录)并对每个 json 对象和一个引用执行连接操作,然后将其写回文本文件。这是我的代码:

config = configparser.ConfigParser()
config.read("config.conf")

def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
    globals()["sparkSessionSingletonInstance"] = SparkSession \
        .builder \
        .config(conf=sparkConf) \
        .getOrCreate()
return globals()["sparkSessionSingletonInstance"]

# Création du contexte
sc = SparkContext()
ssc = StreamingContext(sc, int(config["Variables"]["batch_period_spark"]))
sqlCtxt = getSparkSessionInstance(sc.getConf())
df_ref = sqlCtxt.read.json("file://" + config["Paths"]["path_ref"])
df_ref.createOrReplaceTempView("REF")
df_ref.cache()
output = config["Paths"]["path_DATAs_enri"]


# Fonction de traitement des DATAs
def process(rdd):
        if rdd.count() > 0:
                #print(rdd.toDebugString)
                df_DATAs = sqlCtxt.read.json(rdd)
                df_DATAs.createOrReplaceTempView("DATAs")
                df_enri=sqlCtxt.sql("SELECT DATAs.*, REF.Name, REF.Mail FROM DATAs, REF WHERE DATAs.ID = REF.ID")
                df_enri.createOrReplaceTempView("DATAs_enri")
                df_enri.write.mode('append').json("file://" + output)
                if(df_enri.count() < df_DATAs.count()):
                        df_fail = sqlCtxt.sql("SELECT * FROM DATAs WHERE DATAs.ID NOT IN (SELECT ID FROM DATAs_enri)")
                        df_fail.show()


# Configuration du stream et lancement
files = ssc.textFileStream("file://" + config["Paths"]["path_stream_DATAs"])
files.foreachRDD(process)
print("[GO]")
ssc.start()
ssc.awaitTermination()

这是我的火花配置:

spark.master                    local[*]
spark.executor.memory           3g
spark.driver.memory             3g
spark.python.worker.memory      3g
spark.memory.fraction           0.9
spark.driver.maxResultSize      3g
spark.memory.storageFraction    0.9
spark.eventLog.enabled          true

嗯,它正在工作,但我有一个问题:流程很慢,流程延迟正在增加。我在local[*]工作,怕是没有并行...在监控UI中,一次只能看到一个executor和一个job。有没有更简单的方法来做到这一点?就像 DStream 上的 transform 函数一样?是否缺少我的配置变量?

【问题讨论】:

【参考方案1】:

你的代码慢有几个原因。

关于工人,正如我所见,我没有看到您设置工人数量的任何地方。因此,它将从默认的工作人员数量开始,这意味着可能是 1。另一方面,您正在从一个可能不是那么大的文件中读取数据,并且 spark 没有进行并行处理。

另一方面,您需要取消代码的几个步骤:

    您有很多计数:if rdd.count() &gt; 0:; if(df_enri.count() &lt; df_DATAs.count()):,计数很昂贵,这是流数据中的减少阶段,而您的计数是计数的 3 倍。 联接也很昂贵,在流式处理过程中进行联接并不是那么好,df_ref.cache() 做得对,但是联接确实会随机播放,而且成本很高。

我建议你,不要做那个失败的步骤,从你的代码中删除它。它没有用,只是不要保存数据。其他的,设置更多的工人或更多的核心来执行:spark.executor.cores=2,你可以看到here。

【讨论】:

嗯,非常感谢这些建议!我还有一个问题,我的 if 中的第一个 count 是为了防止 Spark 在 rdd 到来之前对其进行处理。即使还没有要处理的内容,SparkStreaming 在流上启动操作是否正常?因为如果我不这样做,它会告诉我我正在处理空 RDD... 第一个count()建议你使用isEmpty()spark.apache.org/docs/2.1.0/api/python/…这个函数,这样可以更快地检查你是否为空。这不会产生随机播放。

以上是关于Spark Streaming 应用程序太慢的主要内容,如果未能解决你的问题,请参考以下文章

大数据入门:Spark Streaming实际应用

Spark Streaming应用启动过程分析

如何在 Windows 10 上运行 Spark Streaming 应用程序?

spark streaming checkpoint

Spark Streaming 应用程序运行 24 小时后出现 OOM

spark streaming的应用