Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行

Posted

技术标签:

【中文标题】Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行【英文标题】:Spark SQL + Window + Streaming Issue - Spark SQL query is taking long to execute when running with spark streaming 【发布时间】:2015-09-08 09:26:33 【问题描述】:

我们期待使用 Spark Streaming(带有 Flume)和带有窗口的 Spark SQL 实现一个用例,允许我们对一组数据执行 CEP 计算。(有关如何捕获和使用数据,请参见下文)。这个想法是使用 SQL 来执行一些符合某些条件的操作。 .基于每个传入事件批次执行查询似乎非常慢(随着它的进展)。

这里慢的意思是说我已经配置了 600 秒的窗口大小和 20 秒的批处理间隔。 (以每 2 秒 1 个输入的速度泵送数据)所以说在 10 分钟后传入输入将保持不变的时间,它应该需要相同的时间来执行 SQL 查询。

但是在这里,经过一段时间后,它开始花费更多时间并逐渐增加,因此对于大约 300 条记录,select count(*) 查询最初需要 1 秒,然后在 15 分钟后开始需要 2 到 3 秒并逐渐增加。

如果有人能提出更好的方法来实现这个用例,我们将不胜感激。下面给出了我们为实现这一目标而执行的步骤 -

    //Creating spark and streaming context
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, 20);
    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream; = FlumeUtils.createStream(ssc, "localhost", 55555);

    //Adding the events on window
    JavaDStream<SparkFlumeEvent> windowDStream =
        flumeStream.window(WINDOW_LENGTH, SLIDE_INTERVAL);

    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

    windowDStream.foreachRDD(new Function<JavaRDD<SparkFlumeEvent>, Void>()
    

        public Void call(JavaRDD<SparkFlumeEvent> eventsData)
        throws Exception
        
            long t2 = System.currentTimeMillis();
            lTempTime = System.currentTimeMillis();

            JavaRDD<AVEventPInt> inputRDD1 = eventsData.map(new Function<SparkFlumeEvent, AVEventPInt>()
            
                @Override
                public AVEventPInt call(SparkFlumeEvent eventsData) throws Exception
                
                ...
                    return avevent;
                
            );
            DataFrame schemaevents = sqlContext.createDataFrame(inputRDD1, AVEventPInt.class);
            schemaevents.registerTempTable("avevents" + lTempTime);
            sqlContext.cacheTable("avevents" + lTempTime);

            // here the time taken by query is increasing gradually
            long t4 = System.currentTimeMillis();
            Long lTotalEvent = sqlContext.sql("SELECT count(*) FROM avevents" + lTempTime).first().getLong(0);
            System.out.println("time for total event count: " + (System.currentTimeMillis() - t4) / 1000L + " seconds \n");

            sqlContext.dropTempTable("avevents"  + lTempTime);
            sqlContext.clearCache();

            return null;

        
    );

【问题讨论】:

【参考方案1】:

例如,假设我们想通过日志级别来确定跨时间的事件计数。在 SQL 中,我们会发出如下形式的查询:

SELECT level, COUNT(1) from ambari GROUP BY level

但使用 Scala 数据帧 API,您可以发出以下查询:

ambari.groupBy("level").count()

此时,可以使用非常接近原生 SQL 的内容进行查询,例如:

sqlContext.sql("SELECT level, COUNT(1) from ambari group by level")

这将返回与 DataFrame API 中返回的相同数据结构。返回的数据结构本身就是一个数据框。

此时,还没有执行:数据帧上的操作被映射到 RDD 上的适当操作(在这种情况下

RDD.groupBy(...).aggregateByKey(...))

我们可以通过对结果执行 say collect() 来强制执行,以将执行结果带入驱动程序内存。

【讨论】:

以上是关于Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 2.4 上的 pyspark.sql.functions.max().over(window) 上使用 .where() 会引发 Java 异常

spark2.3 SQL内置函数——Date window functions

Spark SQL 滑动窗口差分计算

Spark Structured Streaming Window() 函数 - GeneratedIterator 超过 64 KB

Spark SQL 窗口平均值问题

spark sql 条件最大值