我如何在Spark Streaming中处理Tuple?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了我如何在Spark Streaming中处理Tuple?相关的知识,希望对你有一定的参考价值。

我有Spark Scala的问题,我想在Spark流中加倍元组元素,我从kafka到dstream获取数据,我的RDD数据是这样的,

(2,[2,3,4,6,5])
(4,[2,3,4,6,5])
(7,[2,3,4,6,5])
(9,[2,3,4,6,5])

我想用这样的乘法操作,

 (2,[2*2,3*2,4*2,6*2,5*2])
 (4,[2*4,3*4,4*4,6*4,5*4])
 (7,[2*7,3*7,4*7,6*7,5*7])
 (9,[2*9,3*9,4*9,6*9,5*9])

然后,我得到这样的rdd,

 (2,[4,6,8,12,10])
 (4,[8,12,16,24,20])
 (7,[14,21,28,42,35])
 (9,[18,27,36,54,45])

最后,我让Tuple成为像这样的最小元素,

 (2,4)
 (4,8)
 (7,14)
 (9,18)

如何使用dstream中的scala执行此操作?我使用火花版1.6

答案

给你一个scala演示

// val conf = new SparkConf().setAppName("ttt").setMaster("local")
//val  sc = new SparkContext(conf)
// val data =Array("2,2,3,4,6,5","4,2,3,4,6,5","7,2,3,4,6,5","9,2,3,4,6,5")
//val  lines  = sc.parallelize(data)
//change to your data  (each RDD in streaming)
    lines.map(x => (x.split(",")(0).toInt,List(x.split(",")(1).toInt,x.split(",")(2).toInt,x.split(",")(3).toInt,x.split(",")(4).toInt,x.split(",")(5).toInt) ))
      .map(x =>(x._1 ,x._2.min)).map(x => (x._1,x._2* x._1)).foreach(x => println(x))

这是结果

(2,4)
(4,8)
(7,14)
(9,18)

DStream中的每个RDD都包含特定时间间隔的数据,您可以根据需要操作每个RDD

另一答案

比方说,你在变量输入中得到元组rdd:

import scala.collection.mutable.ListBuffer    

val result = input
.map(x => {                           // for each element
   var l = new ListBuffer[Int]()      // create a new list for storing the multiplication result
   for(i <- x._1){                    // for each element in the array
      l += x._0 * i                   // append the multiplied result to the new list
   }
   (x._0, l.toList)                  // return the new tuple
})
.map(x => {
   (x._0, x._1.min)                  // return the new tuple with the minimum element in it from the list
})

result.foreach(println)应该导致:

(2,4)
(4,8)
(7,14)
(9,18)

以上是关于我如何在Spark Streaming中处理Tuple?的主要内容,如果未能解决你的问题,请参考以下文章

仅在处理完我的 RDD 中的所有分区后,如何在 Spark Streaming 中接收输入?

如何在 Spark Streaming 中自动重启故障节点?

Spark Streaming:如何在流上加载管道?

Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?

如何在 Spark Structured Streaming 中控制输出文件的大小

在 Spark Structured Streaming 中处理二进制数据