在 Apache Spark 中将批处理 RDD 的结果与流式 RDD 相结合

Posted

技术标签:

【中文标题】在 Apache Spark 中将批处理 RDD 的结果与流式 RDD 相结合【英文标题】:Combine results from batch RDD with streaming RDD in Apache Spark 【发布时间】:2014-12-18 16:17:22 【问题描述】:

背景: 我正在使用 Apache Spark 从日志中汇总不同事件类型的运行计数。日志存储在 Cassandra 中用于历史分析目的和 Kafka 中用于实时分析目的。每个日志都有一个日期和事件类型。为简单起见,假设我想跟踪每天单一类型的日志数量。

我们有两个 RDD,一个来自 Cassandra 的批处理数据 RDD,另一个来自 Kafka 的流式 RDD。 伪代码:

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() 
    @Override
    public Tuple2<String, Integer> call(CassandraRow row) 
        return new Tuple2<String, Integer>(row.getString("date"), 1);
    
).reduceByKey(new Function2<Integer, Integer, Integer>() 
    @Override
    public Integer call(Integer count1, Integer count2) 
        return count1 + count2;
    
);

save(batchRDD) // Assume this saves the batch RDD somewhere

...

// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream =  KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() 
    @Override
    public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) 
        String jsonString = data._2;
        JSON jsonObj = JSON.parse(jsonString);
        Date eventDate = ... // get date from json object
        // Assume startTime is broadcast variable that is set to the time when the job started.
        if (eventDate.after(startTime.value()))  
            ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
            pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
            return pairs;
         else 
            return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
        
    
).reduceByKey(new Function2<Integer, Integer, Integer>() 
    @Override
    public Integer call(Integer count1, Integer count2) 
        return count1 + count2;
    
).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() 
    @Override
    public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) 
        Integer previousValue = state.or(0l);
        Integer currentValue = ... // Sum of counts
        return Optional.of(previousValue + currentValue);
    
);
save(streamRDD); // Assume this saves the stream RDD somewhere

sc.start();
sc.awaitTermination();

问题: 如何将 streamRDD 的结果与 batchRDD 结合起来? 假设batchRDD 具有以下数据,并且该作业于 2014 年 10 月 16 日运行:

("2014-10-15", 1000000)
("2014-10-16", 2000000)

由于 Cassandra 查询只包含到批量查询开始时间的所有数据,因此我们必须在查询完成时从 Kafka 读取,只考虑作业开始时间之后的日志。我们假设查询需要很长时间。这意味着我需要将历史结果与流结果结合起来。

为了说明:

    |------------------------|-------------|--------------|--------->
tBatchStart             tStreamStart   streamBatch1  streamBatch2

然后假设在第一个流批次中我们得到了这个数据:

("2014-10-19", 1000)

然后我想将批处理 RDD 与这个流 RDD 结合起来,这样流 RDD 现在就有了值:

("2014-10-19", 2001000)

然后假设在第二批流中我们得到了这个数据:

("2014-10-19", 4000)

然后应该更新流 RDD 以具有值:

("2014-10-19", 2005000)

等等……

可以使用streamRDD.transformToPair(...) 将streamRDD 数据与使用join 的batchRDD 数据结合起来,但如果我们对每个流块执行此操作,那么我们将为每个流块制作添加来自batchRDD 的计数状态值“双重计数”,它应该只添加到第一个流块中。

【问题讨论】:

【参考方案1】:

为了解决这种情况,我会将基本 rdd 与聚合的结果合并 StateDStream 以保留流数据的总数。这有效地为在每个流式传输间隔上报告的数据提供了基线,而无需计算所述基线 x 次。

我使用示例 WordCount 尝试了这个想法,它确实有效。将其放在 REPL 上以获取实时示例:

(在单独的 shell 上使用 nc -lk 9876socketTextStream 提供输入)

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds, StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7 )
val defaultRdd = sc.parallelize(defaults)

@transient val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/spark")

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
val historicCount = wordCount.updateStateByKey[Int](newValues: Seq[Int], runningCount: Option[Int]) => 
    Some(newValues.sum + runningCount.getOrElse(0))

val runningTotal = historicCount.transform rdd => rdd.union(defaultRdd).reduceByKey( _+_ )

wordCount.print()
historicCount.print()
runningTotal.print()
ssc.start()

【讨论】:

谢谢。我只想补充一点,而不是在转换中使用rdd.union(defaultRdd),我最终使用rdd.leftOuterJoin(defaultRdd),这样runningTotal 不包括未更改的对。然后我只需要保存它们的值发生变化的对。【参考方案2】:

你可以试试updateStateByKey

def main(args: Array[String]) 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => 
        val currentCount = values.foldLeft(0)(_ + _)
        val previousCount = state.getOrElse(0)
        Some(currentCount + previousCount)
    

    // stream
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1))
    ssc.checkpoint(".")
    val lines = ssc.socketTextStream("127.0.0.1", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val stateWordCounts = pairs.updateStateByKey[Int](updateFunc)
    stateWordCounts.print()
    ssc.start()
    ssc.awaitTermination()

【讨论】:

我已经在使用它了。问题是,如果可选状态值为 null,那么我必须默认为一个值。理想情况下,这将是从批处理 RDD 计算的值。问题是updateStateByKey() 没有传入密钥,所以我无法查找从批处理 RDD 计算的值。

以上是关于在 Apache Spark 中将批处理 RDD 的结果与流式 RDD 相结合的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中将两个 RDD[string] 合并在一起?

Apache Spark - 处理临时 RDD 上的滑动窗口

使用 Apache Spark 将 RDD 写入文本文件

转换CassandraTableScanRDD org.apache.spark.rdd.RDD

如何在spark(Python)中将两个rdd组合成on rdd

如何在spark中将rdd数据一分为二?