flink学习day02::datasourcetransforma和sink

Posted 黑马程序员官方

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink学习day02::datasourcetransforma和sink相关的知识,希望对你有一定的参考价值。

datasource

批处理中常见是两类source

基于集合

1.使用env.fromElements()支持Tuple,自定义对象等复合形式。

2.使用env.fromCollection()支持多种Collection的具体类型

3.使用env.generateSequence()支持创建基于Sequence的DataSet

参考代码:

package cn.itcast.batch.source

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/*
演示flink中dataset的常见datasource
 */
object DataSourceDemo 
  def main(args: Array[String]): Unit = 
    /*
    dataset api中datasource主要有两类
    1.基于集合
    2.基于文件
     */
    //1 获取executionenviroment
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 2 source操作
    // 2.1     1.基于集合
    /*
    1.使用env.fromElements()支持Tuple,自定义对象等复合形式。
    2.使用env.fromCollection()支持多种Collection的具体类型
    3.使用env.generateSequence()支持创建基于Sequence的DataSet
     */
    // 使用env.fromElements()
    val eleDs: DataSet[String] = env.fromElements("spark", "hadoop", "flink")

    // 使用env.fromCollection()
    val collDs: DataSet[String] = env.fromCollection(Array("spark", "hadoop", "flink"))
    //使用env.generateSequence()
    val seqDs: DataSet[Long] = env.generateSequence(1, 9)

    // 3 转换 可以没有转换
    //4 sink 输出
    eleDs.print()
    collDs.print()
    seqDs.print()

    // 5 启动  在批处理中: 如果sink操作是'count()', 'collect()', or 'print()',最后不需要执行execute操作,否则会报错
    //    env.execute()
  



完整版本:

package cn.itcast.batch.source

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer, ListBuffer

/*
基于集合创建dataset 完整版
 */
object DataSourceDemo2 
  def main(args: Array[String]): Unit = 
    //获取env
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    //0.用element创建DataSet(fromElements)
    val ds0: DataSet[String] = env.fromElements("spark", "flink")
    ds0.print()

    //1.用Tuple创建DataSet(fromElements)
    val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    ds1.print()

    //2.用Array创建DataSet
    val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))
    ds2.print()

    //3.用ArrayBuffer创建DataSet
    val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))
    ds3.print()

    //4.用List创建DataSet
    val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))
    ds4.print()

    //5.用ListBuffer创建DataSet
    val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))
    ds5.print()

    //6.用Vector创建DataSet
    val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))
    ds6.print()

    //7.用Queue创建DataSet
    val ds7: DataSet[String] = env.fromCollection(mutable.Queue("spark", "flink"))
    ds7.print()

    //8.用Stack创建DataSet
    val ds8: DataSet[String] = env.fromCollection(mutable.Stack("spark", "flink"))
    ds8.print()

    //9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)
    val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))
    ds9.print()

    //10.用Seq创建DataSet
    val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))
    ds10.print()

    //11.用Set创建DataSet
    val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))
    ds11.print()

    //12.用Iterable创建DataSet
    val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
    ds12.print()

    //13.用ArraySeq创建DataSet
    val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))
    ds13.print()

    //14.用ArrayStack创建DataSet
    val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))
    ds14.print()

    //15.用Map创建DataSet
    val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
    ds15.print()

    //16.用Range创建DataSet
    val ds16: DataSet[Int] = env.fromCollection(Range(1, 9))
    ds16.print()

    //17.用fromElements创建DataSet
    val ds17: DataSet[Long] = env.generateSequence(1, 9)
    ds17.print()
  



基于文件

  1. 读取本地文件数据 readTextFile

  2. 读取HDFS文件数据

  3. 读取CSV文件数据

  4. 读取压缩文件

  5. 遍历目录

参考代码:

package cn.itcast.batch.source

import org.apache.flink.api.scala.DataSet, ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration

/*
演示flink 批处理基于文件创建dataset
 */
object DataSourceDemo3 
  def main(args: Array[String]): Unit = 
    //1 获取env
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 2 基于文件来创建dataset
    //2.1读取本地文本文件
    val wordsDs: DataSet[String] = env.readTextFile("E:/data/words.txt")
    // 2.2读取hdfs文件
    val hdfsDs: DataSet[String] = env.readTextFile("hdfs://node1:8020/wordcount/input/words.txt")
    // 2.3读取csv文件
    //读取csv文件需要准备一个case class
    case class Subject(id: Int, name: String)
    val subjectDs: DataSet[Subject] = env.readCsvFile[Subject]("E:/data/subject.csv")
    // 2.4 读取压缩文件
    val compressDs: DataSet[String] = env.readTextFile("E:/data/wordcount.txt.gz")
    // 2.5 遍历读取文件夹数据
    val conf = new Configuration()
    conf.setBoolean("recursive.file.enumeration",true)
    val folderDs: DataSet[String] = env.readTextFile("E:/data/wc/").withParameters(conf)
    //打印输出结果
//    wordsDs.print()
//    print("------------------------------------------")
//    hdfsDs.print()
//    println("=====================")
//    subjectDs.print()
//    println("=====================")
//    compressDs.print()
//    println("===============")
    folderDs.print()
  


注意:

1 读取压缩文件,对于某些压缩文件flink可以直接读取,不能并行读取

2 如果读取的是文件夹,想要遍历读取需要设置属性;recursive.file.enumeration进行递归读取

transforma

map&mappartition

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/*
演示flink map与mappartition的operator
 */
/*
需求:
示例
使用map操作,将以下数据转换为一个scala的样例类。
"1,张三", "2,李四", "3,王五", "4,赵六"
 */
object MapAndMapPartitionDemo 
  def main(args: Array[String]): Unit = 

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    val sourceDs: DataSet[String] = env.fromCollection(List("1,张三", "2,李四", "3,王五", "4,赵六"))

    // 3 转换操作
    // 3.1 定义case class
    case class Student(id: Int, name: String)
    // 3.2 map操作
    val stuDs: DataSet[Student] = sourceDs.map(
      line => 
        val arr: Array[String] = line.split(",")
        Student(arr(0).toInt, arr(1))
      
    )
    // 3.3 mappartition操作
    val stuDs2: DataSet[Student] = sourceDs.mapPartition(
      
      iter =>  //迭代器
        //todo 做一些昂贵的动作,比如开启连接
        //遍历迭代器数据转为case class类型然后返回
        iter.map(
          it => 
            val arr: Array[String] = it.split(",")
            Student(arr(0).toInt, arr(1))
          
        )
        //todo 做一些昂贵的动作,关闭连接
      
    )

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    stuDs.print()
    println("====================")
    stuDs2.print()
    // 5 执行
  


总结:

map与mappartition最终效果实际是一样的,但是对于mappartition可以让我们有机会对整个分区的数据看做一个整体进行处理,此外还给我们创建了针对当前分区只需做一次的昂贵动作的机会。

flatMap

针对需求如果是数据变多的时候就要考虑是不是要使用flatmap进行操作,注意flatMap返回值类型要是一个可迭代类型;

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/*
演示flink flatMap的operator
 */
/*
需求:
示例
分别将以下数据,转换成国家、省份、城市三个维度的数据。
将以下数据
    张三,中国,江西省,南昌市
    李四,中国,河北省,石家庄市
转换为
    (张三,中国)
    (张三,中国,江西省)
    (张三,中国,江西省,南昌市)
    (李四,中国)
    (李四,中国,河北省)
    (李四,中国,河北省,石家庄市)
 */
object FlatMapDemo 
  def main(args: Array[String]): Unit = 

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    val sourceDs: DataSet[String] = env.fromCollection(List(
      "张三,中国,江西省,南昌市",
      "李四,中国,河北省,石家庄市"
    ))

    // 3 转换操作 使用flatMap:分为map和flat,先执行map操作针对每个数据切分,切分后按照要求组成所谓元组数据(3)装入list中,最后执行flat操作去掉
    //list的外壳
    val faltMapDs: DataSet[Product with Serializable] = sourceDs.flatMap(
      line => 
        // 3.1对数据进行切分操作
        val arr: Array[String] = line.split(",")
        // 3.2 组装数据装入list中
        List(
          //(张三,中国)
          (arr(0), arr(1)),
          // (张三,中国,江西省)
          (arr(0), arr(1), arr(2)),
          // (张三,中国,江西省,南昌市)
          (arr(0), arr(1), arr(2), arr(3))
        )
      
    )


    // 4 输出 (如果直接打印无需进行启动,执行execute)
    faltMapDs.print()
    // 5 执行
  


filter

对数据进行过滤操作,保留下来结果为true的数据

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment, _

/*
演示flink filter的operator
 */
/*
需求:
示例:
过滤出来以下以长度>4的单词。
"hadoop", "hive", "spark", "flink"
 */
object FilterDemo 
  def main(args: Array[String]): Unit = 

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    val wordsDs: DataSet[String] = env.fromCollection(List("hadoop", "hive", "spark", "flink"))

    // 3 转换操作 使用filter过滤单词长度大于4的单词
    val moreThan4Words: DataSet[String] = wordsDs.filter(_.length > 4)

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    moreThan4Words.print()
    // 5 执行
  


reduce操作

reduce聚合操作的算子,

注意针对groupby之后的数据流我们可以使用reduce进行聚合,不用考虑按照那种方式分组,reduce都可以实现;

但是如果想要使用sum()进行聚合前面分组指定key必须是按照索引才可以。

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment, _

/*
演示flink filter的operator
 */
/*
需求:
示例1
请将以下元组数据,使用reduce操作聚合成一个最终结果
 ("java" , 1) , ("java", 1) ,("java" , 1)
将上传元素数据转换为("java",3)
示例2
请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduce操作聚合成一个最终结果
("java" , 1) , ("java", 1) ,("scala" , 1)
转换为
("java", 2), ("scala", 1)
 */
object ReduceDemo 
  def main(args: Array[String]): Unit = 

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    //    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("java", 1)))
    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("scala", 1)))
    //    // 3 转换操作 使用reduce计算单词次数
    //    val resultDs: DataSet[(String, Int)] = wordsDs.reduce(
    //      (w1, w2) =>  // w1是一个初始值,第一次的时候是:(单词,0),之后都是之前的累加结果
    //        //3.1 进行次数累加
    //        (w1._1, w1._2 + w2._2)
    //      
    //    )

    //    // 3 转换操作,需要对单词进行分组,分组之后再进行次数的累加
    val groupDs: GroupedDataSet[(String, Int)] = wordsDs.groupBy(_._1)
    // 3.2 进行次数累加
    val resultDs: DataSet[(String, Int)] = groupDs.reduce(
      (w1, w2) =>  // w1是一个初始值,第一次的时候是:(单词,0),之后都是之前的累加结果
        //3.1 进行次数累加
        (w1._1, w1._2 + w2._2)
      
    )
    // 3 转换操作,按照索引分组,使用sum操作
    //    wordsDs.groupBy(0).reduce(
    //      (w1, w2) =>  // w1是一个初始值,第一次的时候是:(单词,0),之后都是之前的累加结果
    //                //3.1 进行次数累加
    //                (w1._1, w1._2 + w2._2)
    //              
    //    ).print()
    // 这种方式不允许,会报错:ava.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.
    //    wordsDs.groupBy(_._1).sum(1).print()
    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resultDs.print()
    // 5 执行
  


reduceGroup

原理分析:

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment, _

/*
演示flink filter的operator
 */
/*
需求:

示例
请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduce操作聚合成一个最终结果
("java" , 1) , ("java", 1) ,("scala" , 1)
转换为
("java", 2), ("scala", 1)
 */
object ReduceGroupDemo 
  def main(args: Array[String]): Unit = 

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    //    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("java", 1)))
    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("scala", 1)))

    // 3 转换 使用reducegroup实现单词计数
    val groupDs: GroupedDataSet[(String, Int)] = wordsDs.groupBy(_._1)
    val resultDs: DataSet[(String, Int)] = groupDs.reduceGroup(
      iter => 
        //参数是一个迭代器
        iter.reduce(//再对迭代器进行reduce聚合操作
          (w1, w2) => (w1._1, w1._2 + w2._2)
        )
      
    )

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resultDs.print()
    // 5 执行
  


aggregate操作

aggregate只能作用于元组类型的数据,并且对分组方式有要求只能是按照索引或者字段名称方式分组的聚合计算。

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.ExecutionEnvironment, _

/*
演示flink filter的operator
 */
/*
需求:

示例
请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduce操作聚合成一个最终结果
("java" , 1) , ("java", 1) ,("scala" , 1)
转换为
("java", 2), ("scala", 1)
 */
object AggregateDemo 
  def main(args: Array[String]): Unit = 

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    //    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("java", 1)))
    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("scala", 1)))

    // 3 转换 使用aggregate实现单词计数
//   val groupDs: GroupedDataSet[(String, Int)] = wordsDs.groupBy(_._1) //aggregation聚合方式不支持这种分组方式
    val groupDs: GroupedDataSet[(String, Int)] = wordsDs.groupBy(0)
    val resultDs: AggregateDataSet[(String, Int)] = groupDs.aggregate(Aggregations.SUM,1)

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resultDs.print()
    // 5 执行
  


distinct

对数据进行去重操作,可以指定对某个字段进行去重

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment, _

/*
演示flink distinct的operator
 */
/*
需求:

示例
请将以下元组数据,对数据进行去重操作
("java" , 1) , ("java", 1) ,("scala" , 1)
转换为
("java", 2), ("scala", 1)
 */
object DistinctDemo 
  def main(args: Array[String]): Unit = 

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    val wordsDs = env.fromCollection(List(("java", 1), ("java", 2), ("scala", 1)))

    // 3 转换 使用distinct实现去重
//    wordsDs.distinct().print() //是对整个元组进行去重
    wordsDs.distinct(0).print()  //指定按照某个字段进行去重操作

    // 4 输出 (如果直接打印无需进行启动,执行execute)
//    resultDs.print()
    // 5 执行
  


join操作

类似于sql中的inner join,只展示join成功的数据。

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment, _

/*
演示flink join的operator
 */
/*
需求:
示例
有两个csv文件,有一个为score.csv,一个为subject.csv,分别保存了成绩数据以及学科数据。
需要将这两个数据连接到一起,然后打印出来。
 */
flink学习day03:flink datastream 开发

flink学习day05:checkpoint 原理与实践

flink02------1.自定义source

大数据Flink学习系列文章(快学)---02 Flink基本概念及架构

Flink学习笔记02:三种运行模式

day04_Flink高级API