流处理 —— Spark Streaming中的操作函数

Posted yyy-blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了流处理 —— Spark Streaming中的操作函数相关的知识,希望对你有一定的参考价值。

1.1 map(fun) 操作

map操作需要传入一个函数当做参数, 主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成新的元素,得到的DStream对象b中包含这些新的元素。 

    val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
// input:java output:(java,1) ssc.textFileStream(
"file:\D:\workspace\idea\silent\src\main\resources\stream").map((_,1))

1.2 flatMap(func)

类似map + flatten 即先对集合中的每个元素进行map,再对map后的每个元素中的每个元素进行flatten。

val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
    
// input:java scala    
ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream").flatMap((_,1))
output: (java,1) (scala,1)

1.3 filter(func)

过滤filter传入一个func函数。对DStream a中的每一个元素,应用func方法进行计算,如果func函数返回结果为true,则保留该元素,否则丢弃该元素,返回一个新的DStream b

val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
    
// input:java scala    

ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream").flatMap((_,1)).filter(_.equalsIgnoreCase("java"))

output:
(java,1)

1.4 union(otherStream)

这个操作将两个DStream进行合并,生成一个包含着两个DStream中所有元素的新DStream对象。 

val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))

//input:java
val ds = ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")

val ds1 = ds.map(p => s"${p}_one")
val ds2 = ds.map(p => s"${p}_two")

ds1.union(ds2).print()
// output:
java_one
java_two

1.5 count()

统计DStream中每个RDD包含的元素的个数,得到一个新的DStream,这个DStream中只包含一个元素,这个元素是对应语句单词统计数值。 

val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))

//input:java scala java hadoop
val ds = ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream").flatMap(_.split(""))

//output:
4

1.6  reduce(func)

返回一个包含一个元素的DStream,传入的func方法会作用在调用者的每一个元素上,将其中的元素顺次的两两进行计算。 

// input: java scala spark    
ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")
.flatMap(_.split(" ")) .reduce(_ + "_" + _) //optput: java_scala_spark

1.7  countByValue()

某个DStream中的元素类型为K,调用这个方法后,返回的DStream的元素为(K, Long)对,后面这个Long值是原DStream中每个RDD元素key出现的频率。 

 // input: java scala java   

ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")
.flatMap(_.split(" ")) .map((1,_)) .countByValue() // 统计 .foreachRDD(f => { f.foreachPartition(p => { p.foreach(println) }) }) // output: ((1,java),2) ((1,scala),1)

1.8  reduceByKey(func, [numTasks])

调用这个操作的DStream是以(K, V)的形式出现,返回一个新的元素格式为(K, V)的DStream。返回结果中,K为原来的K,V是由K经过传入func计算得到的。还可以传入一个并行计算的参数,在local模式下,默认为2。在其他模式下,默认值由参数spark.default.parallelism确定。 

// input: java scala java java
    ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")
          .flatMap(_.split(" "))
          .map((_,1))
          .reduceByKey(_ + _) // 统计
          .print()

//output:
java,3
scala,1

1.9 join(otherStream, [numTasks])

由一个DStream对象调用该方法,元素内容为(k, V),传入另一个DStream对象,元素内容为(k, W),返回的DStream中包含的内容是(k, (V, W))。这个方法也可以传入一个并行计算的参数,该参数与reduceByKey中是相同的。

// input: java scala

    val ds = ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")
          .flatMap(_.split(" "))

    val ds1 = ds.map(p => (p,s"${p}_one"))
    val ds2 = ds.map(p=> (p,s"${p}_two"))

    ds1.join(ds2).print()

// output:
(java,(java_one,java_two))
(scala,(scala_one,scala_two))

1.10 cogroup(otherStream, [numTasks])

由一个DStream对象调用该方法,元素内容为(k, V),传入另一个DStream对象,元素内容为(k, W),返回的DStream中包含的内容是(k, (Seq[V], Seq[W]))。这个方法也可以传入一个并行计算的参数,该参数与reduceByKey中是相同的。

// input:java scala java    
val ds = ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")
          .flatMap(_.split(" "))

    val ds1 = ds.map(p => (p,s"${p}_one"))
    val ds2 = ds.map(p=> (p,s"${p}_two"))

    ds1.cogroup(ds2).print()

// output:
(scala,(CompactBuffer(scala_one),CompactBuffer(scala_two)))
(java,(CompactBuffer(java_one, java_one),CompactBuffer(java_two, java_two)))

1.11 transform(func)

DStream的transform操作极大的丰富了DStream上能够进行的操作内容。使用transform操作后,除了可以使用DStream提供的一些转换方法之外(例如RDD操作),还能够直接调用任意的调用RDD上的操作函数。

注意: transform传入的方法是被每一个batch调用的。这样可以支持在RDD上做一些时变的操作,即RDD,分区数以及广播变量可以在不同的batch之间发生变化。

val ssc = ContextUtils.getStreamingContext(this.getClass.getSimpleName, 5)
val lines = ssc.socketTextStream("localhost", 9999)

// 构建黑名单
val blacks = new ListBuffer[(String, Boolean)]()
blacks.append(("huhu", true))
val blacksRDD = ssc.sparkContext.parallelize(blacks)

// 从流中获取访问日志,并对黑名单中的数据进行过滤
lines.map(x => {(x.split(",")(0), x)})
.transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(_._2._2.getOrElse(false) != true)
.map(x => x._2._1)
}).print()

1.12  updateStateByKey(func)

 

以上是关于流处理 —— Spark Streaming中的操作函数的主要内容,如果未能解决你的问题,请参考以下文章

入门大数据---Spark_Streaming与流处理

Spark Streaming中空RDD处理及流处理程序优雅的停止

Spark Streaming实时流处理项目实战Spark Streaming整合Flume实战二

Spark Streaming实时流处理项目实战Spark Streaming整合Flume实战二

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一