Flink-transformation
Posted 陕西小楞娃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink-transformation相关的知识,希望对你有一定的参考价值。
ke01开启: nc -lk 8888
Map
package com.text.transformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object MapOperator { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("ke01", 8888) val streamValue = stream.map(x => { if (!x.contains("a")) { x } }) streamValue.print() env.execute() } }
[root@ke01 bigdata]# nc -lk 8888
b
c
b
a
a
结果: 11> b 12> c 1> b 2> () 3> ()
flatMap
package com.text.transformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object MapOperator { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("ke01", 8888) val value = stream.flatMap( x => x.split(",")) value.print() env.execute() } } a,c a,d,e 结果: 3> a 3> c 4> a 4> d 4> e
使用flatMap代替filter
package com.text.transformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import scala.collection.mutable.ListBuffer object MapOperator { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("ke01", 8888) val value = stream.flatMap( x => { val rest = new ListBuffer[String] if(!x.contains("a")){ rest += x } rest.iterator }) value.print() env.execute() } } abc qwe 结果: 4> qwe
keyBy 分流算子,根据用户指定的字段进行分组
package com.text.transformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object MapOperator { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("ke01", 8888)
// keyBy 0代表第一个,1代表第二个 stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).print() env.execute() } } 结果: 8> (a,1) 8> (a,1) 3> (b,1) 3> (b,1) 6> (c,1) 8> (a,1)
keyBy
package com.text.transformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object MapOperator { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("ke01", 8888) stream.flatMap(_.split(" ")).map((_, 1)).keyBy(new KeySelector[(String, Int), String]{ override def getKey(value: (String, Int)): String = { value._1 } }).print() env.execute() } } 结果: 8> (a,1) 3> (b,1) 8> (a,1) 3> (b,1)
reduce,一般结合keyBy使用
package com.text.transformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object MapOperator { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("ke01", 8888) stream.flatMap(_.split(" ")).map((_, 1)).keyBy(new KeySelector[(String, Int), String]{ override def getKey(value: (String, Int)): String = { value._1 } }).reduce((x, y) => (x._1, x._2 + y ._2)).print() env.execute() } } 结果: 8> (a,1) 8> (a,2) 8> (a,3)
union
package com.text.transformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object MapOperator { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.fromCollection(List(("a", 1), ("b", 2))) val stream2 = env.fromCollection(List(("a", 3), ("d", 4))) val value = stream1.union(stream2) value.print() env.execute() } } 结果: 11> (b,2) 8> (a,3) 10> (a,1) 9> (d,4)
split
package com.text.transformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object MapOperator { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 偶数分到一个流(first) 奇数分到另外一个流(second) val stream = env.generateSequence(1, 100) val splitStream = stream.split(info => { info % 2 match { case 0 => List("first") case 1 => List("second") } }) // 查找当前流 splitStream.select("first").print() env.execute() } }
结果:
10> 10
6> 6
12> 12
6> 18
8> 8
6> 30
4> 4
2> 2
4> 16
以上是关于Flink-transformation的主要内容,如果未能解决你的问题,请参考以下文章