Flink-transformation

Posted 陕西小楞娃

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink-transformation相关的知识,希望对你有一定的参考价值。

 

ke01开启: nc -lk 8888

Map

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    val streamValue = stream.map(x => {
      if (!x.contains("a")) {
        x
      }
    })
    streamValue.print()
    env.execute()
  }
}

[root@ke01 bigdata]# nc -lk 8888
b
c
b
a
a


结果:
11> b
12> c
1> b
2> ()
3> ()

 

flatMap

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    val value = stream.flatMap( x => x.split(","))
    value.print()
    env.execute()
  }
}

a,c
a,d,e

结果:
3> a
3> c
4> a
4> d
4> e

 

使用flatMap代替filter

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

import scala.collection.mutable.ListBuffer
object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    val value = stream.flatMap( x => {
      val rest = new ListBuffer[String]
      if(!x.contains("a")){
        rest += x
      }
      rest.iterator
    })
    value.print()
    env.execute()
  }
}

abc
qwe
结果:
4> qwe

 

keyBy 分流算子,根据用户指定的字段进行分组
package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    // keyBy 0代表第一个,1代表第二个 stream.flatMap(_.split(
" ")).map((_, 1)).keyBy(0).print() env.execute() } } 结果: 8> (a,1) 8> (a,1) 3> (b,1) 3> (b,1) 6> (c,1) 8> (a,1)

 

 

keyBy
package com.text.transformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    stream.flatMap(_.split(" ")).map((_, 1)).keyBy(new KeySelector[(String, Int), String]{
      override def getKey(value: (String, Int)): String = {
        value._1
      }
    }).print()
    env.execute()
  }
}


结果:
8> (a,1)
3> (b,1)
8> (a,1)
3> (b,1)

 

 

reduce,一般结合keyBy使用

package com.text.transformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    stream.flatMap(_.split(" ")).map((_, 1)).keyBy(new KeySelector[(String, Int), String]{
      override def getKey(value: (String, Int)): String = {
        value._1
      }
    }).reduce((x, y) => (x._1, x._2 + y ._2)).print()
    env.execute()
  }
}

结果:
8> (a,1)
8> (a,2)
8> (a,3)

 

union

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream1 = env.fromCollection(List(("a", 1), ("b", 2)))
    val stream2 = env.fromCollection(List(("a", 3), ("d", 4)))
    val value = stream1.union(stream2)
    value.print()
    env.execute()
  }
}

结果:
11> (b,2)
8> (a,3)
10> (a,1)
9> (d,4)

 

split

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 偶数分到一个流(first) 奇数分到另外一个流(second)
    val stream = env.generateSequence(1, 100)
    val splitStream = stream.split(info => {
      info % 2 match {
        case 0 => List("first")
        case 1 => List("second")
      }
    })
    // 查找当前流
    splitStream.select("first").print()
    env.execute()
  }
}

结果:

10> 10
6> 6
12> 12
6> 18
8> 8
6> 30
4> 4
2> 2
4> 16

 

以上是关于Flink-transformation的主要内容,如果未能解决你的问题,请参考以下文章

VSCode自定义代码片段——CSS选择器

谷歌浏览器调试jsp 引入代码片段,如何调试代码片段中的js

片段和活动之间的核心区别是啥?哪些代码可以写成片段?

VSCode自定义代码片段——.vue文件的模板

VSCode自定义代码片段6——CSS选择器

VSCode自定义代码片段——声明函数