Elasticsearch-hadoop & Elasticsearch-spark sql - 语句跟踪扫描&滚动

Posted

技术标签:

【中文标题】Elasticsearch-hadoop & Elasticsearch-spark sql - 语句跟踪扫描&滚动【英文标题】:Elasticsearch-hadoop & Elasticsearch-spark sql - Tracing of statements scan&scroll 【发布时间】:2015-11-13 07:27:12 【问题描述】:

我们正在尝试将 ES(1.7.2,4 节点集群)与 Spark(1.5.1,使用 hive 编译和 hadoop 与 scala 2.11,4 节点集群)集成,hdfs 进入方程(hadoop 2.7,4节点)和thrift jdbc服务器和elasticsearch-hadoop-2.2.0-m1.jar

因此,在 ES 上执行语句有两种方式。

    使用 scala 激发 SQL

    val conf = new  SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g")
    conf.set("spark.logConf", "true")
    conf.set("spark.cores.max","20")
    conf.set("es.index.auto.create", "false")
    conf.set("es.batch.size.bytes", "100mb")
    conf.set("es.batch.size.entries", "10000")
    conf.set("es.scroll.size", "10000")
    conf.set("es.nodes", "node2:39200")
    conf.set("es.nodes.discovery","true")
    conf.set("pushdown", "true")
    
    sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar")
    sc.addJar("executorLib/scala-library-2.10.1.jar")
    
    sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')" )
    
    val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'")
    .....
    

    Thrift 服务器(在 Spark 上执行的代码)

    ....
    polledDataSource = new ComboPooledDataSource()
    polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver")
    polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001")
    polledDataSource.setMaxPoolSize(5)
    dbConnection = polledDataSource.getConnection
    dbStatement = dbConnection.createStatement
    
    val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='\"query\":\"term\":\"transmittersID\":\"262021306841042\"','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')")
    
    dbStatement.setFetchSize(50000)
    dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6")
    .....
    

我有以下问题,由于它们是相互关联的,我决定将它们打包到堆栈上的一个问题中:

    似乎使用 Spark SQL 的方法支持下推 WHERE 后面的内容(无论是否指定 es.query),执行时间相同并且可以接受。但是解决方案 1 绝对不支持聚合函数的 pushdow,即呈现的 count(*) 不会在 ES 一侧执行,但只有在检索到所有数据之后 - ES 返回行并且 Spark SQL 对它们进行计数。请确认这是否是正确的行为

    第一个解决方案的行为很奇怪,无论下推是真还是假,时间都是相等的

    解决方案 2 似乎不支持下推,我尝试以什么方式指定子查询并不重要,无论是表定义的一部分还是语句的 WHERE 子句,似乎是只需获取所有巨大的索引,然后对其进行数学计算。是不是 Thrift-hive 无法对 ES 进行下推?

    我想在弹性搜索中跟踪查询,我做了以下设置:

    //logging.yml
    index.search.slowlog: TRACE, index_search_slow_log_file
    index.indexing.slowlog: TRACE, index_indexing_slow_log_file
    
    additivity:
      index.search.slowlog: true
      index.indexing.slowlog: true
    

所有 index.search.slowlog.threshold.query、index.search.slowlog.threshold.fetch 甚至 index.indexing.slowlog.threshold.index 都设置为 0ms。 而且我确实在慢日志文件中看到了从感觉执行的常见语句(所以它可以工作)。但我没有看到针对 ES 执行的 Spark SQL 或 thrift 语句。我想这些是 scan&scroll 语句,因为如果我从感觉执行 scan&scroll,这些也不会被记录。是否有可能以某种方式在 ES 一侧跟踪扫描和滚动?

【问题讨论】:

【参考方案1】:

    据我所知,这是一种预期行为。我所知道的所有来源的行为方式都完全相同,而且直觉上它是有道理的。 SparkSQL 专为分析查询而设计,在本地获取数据、缓存和处理更有意义。另见Does spark predicate pushdown work with JDBC?

    我认为conf.set("pushdown", "true") 根本没有任何作用。如果要配置特定于连接的设置,则应将其作为OPTION 映射传递,就像在第二种情况下一样。使用es 前缀应该也可以

    这确实很奇怪。 Martin Senne 使用 PostgreSQL 报告了 a similar issue,但我无法重现。

【讨论】:

【参考方案2】:

我在 elasticsearch 讨论组与 Costin Leau 讨论后,他指出了以下内容,我应该与您分享:

您的设置存在许多问题:

    您提到使用 Scala 2.11,但使用的是 Scala 2.10。请注意,如果您想选择您的 Scala 版本,应使用 elasticsearch-sparkelasticsearch-hadoop 仅提供 Scala 2.10 的二进制文件。

    下推功能只能通过 Spark 数据源使用。如果您不使用这种类型的声明,pushdown 不会传递给 ES(这就是 Spark 的工作方式)。因此声明pushdown 是不相关的。

    1234563 DS)

    使用临时表确实算作数据源,但是您需要在那里使用pushdown。如果你不这样做,它会默认被激活,因此你为什么看不到你的运行之间的区别;您没有更改任何相关参数。

    Spark 不会下推计数和其他聚合。根据 Databricks 团队的说法,未来可能会有一些东西,但目前没有任何东西。对于计数,您可以使用dataFrame.rdd.esCount 进行快速呼叫。但这是个特例。

    我不确定 Thrift 服务器是否真的算作数据源,因为它从 Hive 加载数据。您可以通过启用将 org.elasticsearch.hadoop.spark 包登录到 DEBUG 来仔细检查这一点。您应该查看 SQL 是否确实被转换为 DSL。

我希望这会有所帮助!

【讨论】:

我找不到任何关于 elasticsearch-spark 的信息。我认为elasticsearch-hadoop 包含 Spark 支持? 我不明白你的问题。 是否有像elasticsearch-hadoop 一样的名为elasticsearch-spark 的可下载项目?我在github上没有找到。

以上是关于Elasticsearch-hadoop & Elasticsearch-spark sql - 语句跟踪扫描&滚动的主要内容,如果未能解决你的问题,请参考以下文章

是否可以在 ElasticSearch 中使用 presto 或 Hive (ElasticSearch-Hadoop) 的任何 ES 连接器进行 JOIN 操作?

Elasticsearch安装篇

Spark和Elasticsearch交互

Elasticsearch 系列4 --- Windows下安装Kibana

Pyspark 将 rdd 转换为具有空值的数据帧

Spark + ElasticSearch 返回 RDD[(String, Map[String, Any])]。我怎样才能操纵任何?