04-flink-1.10.1-流处理WordCount代码里控制并行度

Posted 逃跑的沙丁鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了04-flink-1.10.1-流处理WordCount代码里控制并行度相关的知识,希望对你有一定的参考价值。

接续03-flink-1.10.1-流处理WordCount

1 为什么同一个单词总是在同一个线程里打印

输入单词

 输出

 

 可见hello这个单词多次输入总是在线程好3里输出这是为什么呢?

这是因为flink实际后台是对单词分别做了hash处理,通过观察应该是先取单词的hash值然后用对处理数据的线程数(并行度)取模/取余的到的索引hello总是会进入线程3打印

2 设置全局并行度参数

 /**创建flink流式处理环境*/
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)

3 flink支持对每一个算子设置并行度

 .map((_,1))//DataStream[(String, Int)]
      .setParallelism(4)
      .keyBy(0)//KeyedStream[(String, Int), Tuple]
      .sum(1)//DataStream[(String, Int)]
      .setParallelism(2)

    /**输出结果,打印到控制台*/
    res.print().setParallelism(1)

可见map算子设置的并行度是4,sum算子设置的并行度是2,最后打印设置的并行度是1

不过一般不会这么去设置,但是对于输出部分可能会做特殊设置,比如输出到文件,如果并行写入就出现乱序了。

4 flink从外部命令获取参数

① 引入依赖

import org.apache.flink.api.java.utils.ParameterTool

② 传入args

val paramTool = ParameterTool.fromArgs(args)

③ 提取变量

val ip = paramTool.get(ip)
val port = paramTool.getInt(port)

④ 使用变量

val sockerInput: DataStream[String] = env.socketTextStream(ip, port)

⑤ 传入外部参数

--ip 192.168.109.151 --port 9999

5 完整代码

package com.study.liucf.unbounded

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/8/16
 */
object WordCount {
  def main(args: Array[String]): Unit = {
    /**创建flink流式处理环境*/
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(2)
    /**flink 从外部命令获取参数*/
    val paramTool = ParameterTool.fromArgs(args)
    val ip = paramTool.get("ip")
    val port = paramTool.getInt("port")
    /**监听一个流式输入端口*/
//    val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
    val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
    /**数据流转换*/
    val res : DataStream[(String, Int)] = sockerInput.flatMap(_.split(" ")) //: DataStream[String]
      .filter(_.nonEmpty)//: DataStream[String]
      //.map(x=>(x,1))
      .map((_,1))//DataStream[(String, Int)]
//      .setParallelism(4)
      .keyBy(0)//KeyedStream[(String, Int), Tuple]
      .sum(1)//DataStream[(String, Int)]
//      .setParallelism(2)

    /**输出结果,打印到控制台*/
    res.print()
//      .setParallelism(1)

    /**启动流式处理*/
    env.execute(" liucf wordcoun")
  }

}

 补充:flink是分布流式处理引擎,那么势必会遇到计算乱序的问题,比如单词hello word 第一个单词先输入但是确是最后一个输出的,那么如何解决分布式计算乱序的问题呢?情到时间语义部分关注这个问题。

 

 

以上是关于04-flink-1.10.1-流处理WordCount代码里控制并行度的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop的word co-occurrence实现

spark实现wordcount

IO流06_处理流

流处理器和CUDA

12.4 处理流的用法

Java 输入/输出——流体系(处理流)