从spark写入elasticsearch非常慢

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从spark写入elasticsearch非常慢相关的知识,希望对你有一定的参考价值。

我正在处理一个文本文件,并将转换后的行从Spark应用程序写入弹性搜索

input.write.format("org.elasticsearch.spark.sql")
      .mode(SaveMode.Append)
      .option("es.resource", "{date}/" + dir).save()

这运行速度非常慢,大约需要8分钟才能写入287.9 MB / 1513789条记录。 enter image description here

如果网络延迟始终存在,我如何调整spark和elasticsearch设置以使其更快。

我在本地模式下使用spark,有16个内核和64GB RAM。我的elasticsearch集群有一个主节点和3个数据节点,每个节点有16个核心和64GB。

我正在阅读如下文本文件

 val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
  "ignoreTrailingWhiteSpace" -> "true",
  "inferSchema" -> "false",
  "header" -> "false",
  "delimiter" -> "	",
  "comment" -> "#",
  "mode" -> "PERMISSIVE")

....

val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)
答案

首先,让我们从您的应用程序中发生的事情开始。 Apache Spark正在读取压缩的1(不是那么大)csv文件。因此,第一个spark将花费时间解压缩数据并在将其写入elasticsearch之前对其进行扫描。

这将创建一个Dataset / DataFrame与一个分区(由评论中提到的df.rdd.getNumPartitions的结果确认)。

一个直接的解决方案是repartition你的数据读取和缓存它,然后写入elasticsearch。现在我不确定您的数据是什么样的,因此决定分区数量是您的基准。

val input = sqlContext.read.options(readOptions)
                      .csv(inputFile.getAbsolutePath)
                      .repartition(100) // 100 is just an example
                      .cache

我不确定您的应用程序会带来多大好处,因为我相信可能存在其他瓶颈(网络IO,ES的磁盘类型)。

PS:在构建ETL之前,我应该将csv转换为镶木地板文件。这里有真正的性能提升。 (个人意见和基准)

另一种可能的优化是调整elasticsearch-spark连接器的es.batch.size.entries设置。默认值为1000

设置此参数时需要小心,因为您可能会使elasticsearch超载。我强烈建议你看看可用的配置here

我希望这有帮助 !

以上是关于从spark写入elasticsearch非常慢的主要内容,如果未能解决你的问题,请参考以下文章

Spark 结构化流式 Elasticsearch 集成问题。数据源es不支持流式写入

Spark SQL大数据处理并写入Elasticsearch

项目实战——参数配置化Spark将Hive表的数据写入需要用户名密码认证的ElasticSearch(Java版本)

项目实战——参数配置化Spark将Hive表的数据写入需要用户名密码认证的ElasticSearch(Java版本)

如何从Apache Flink写入Elasticsearch

来自EMR / Spark的S3写入速度非常慢