Flink04: Flink核心API之DataSet

Posted 小猫不会去楼兰捉虫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink04: Flink核心API之DataSet相关的知识,希望对你有一定的参考价值。

DataSet API主要可以分为3块来分析:DataSource、Transformation、Sink。

  • DataSource是程序的数据源输入。
  • Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap、filter等操作。
  • DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。

 一、DataSet API之DataSource

         针对DataSet批处理而言,其实最多的就是读取HDFS中的文件数据,所以在这里我们主要介绍两个DataSource组件。

  • 基于集合。fromCollection(Collection),主要是为了方便测试使用。它的用法和DataStreamAPI中的用法一样,我们已经用过很多次了。
  • 基于文件。readTextFile(path),读取hdfs中的数据文件。这个前面我们也使用过了。

二、DataSet API之Transformation

 

1. mapPartition

mapPartition算子和spark中的用法一样,mapPartition就是一次处理一批数据,如果在处理数据的时候想要获取第三方资源连接,建议使用mapPartition,这样可以一批数据获取一次连接,提高性能。

scala代码:

package com.imooc.scala.batch

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

object BatchMapPartitionScala 
  def main(args: Array[String]): Unit = 
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //生成数据源数据
    val text: DataSet[String] = env.fromCollection(Array("hello you", "hello me"))

    //每次处理一个分区数据
    text.mapPartition(it => 
      val res: ListBuffer[String] = ListBuffer[String]()

      it.foreach(line => 
        val words: Array[String] = line.split(" ")
        for(word <- words)
          res.append(word)
        
      )
      res
      //关闭数据库连接
    ).print()
    //注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可。
    //env.execute("BatchMapPartitionScala")
  

Java代码:

package com.imooc.java.batch;

import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.Iterator;

public class BatchMapPartitionJava 
    public static void main(String[] args) throws Exception 
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 生成数据源数据
        DataSource<String> text = env.fromCollection(Arrays.asList("hello you", "hello me"));
        //每次处理一个分区的数据
        text.mapPartition(new MapPartitionFunction<String, String>() 
            @Override
            public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception 
                //可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
                Iterator<String> it = iterable.iterator();
                while(it.hasNext())
                    String line = it.next();
                    String[] words = line.split(" ");
                    for (String word: words)
                        collector.collect(word);
                    
                
            
        ).print();
        
    

 2. join : 内连接,可以连接两份数据集

 scala代码

package com.imooc.java.batch

import org.apache.flink.api.scala.ExecutionEnvironment

object BatchJoinScala 
  def main(args: Array[String]): Unit = 
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //初始化第一份数据 Tuple2<用户id,用户姓名>
    val text1 = env.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mick")))

    //初始化第二份数据 Tuple2<用户id,用户所在城市>
    val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz")))
    //对两份数据进行join操作
    //注意:这里的where和equalTo实现了类似于on fieldA=fieldB的效果
    //where:指定左边数据集中参与比较的元素角标,equalTo指定右边数据集中参与比较的元素角标
    text1.join(text2).where(0).equalTo(0) 
      (first, second) => (first._1, first._2, second._2)
    .print()
  

3. cross : 获取两个数据集的笛卡尔积

scala代码

package com.imooc.scala.batch

import org.apache.flink.api.scala.ExecutionEnvironment

object BatchCrossScala 
  def main(args: Array[String]): Unit = 
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //初始化第一份数据
    val text1 = env.fromCollection(Array(1, 2))
    //初始化第二份数据
    val text2 = env.fromCollection(Array(3, 4))

    //执行cross操作
    text1.cross(text2).print()
  

4. union:返回两个数据集的总和,数据类型需要一致

和DataStreamAPI中的union操作功能一样

5. first-n :获取集合中的前N个元素

6. groupBy :分组

7. sortGroup:分组内排序

8. 实例:获取分组排序后每组的前N个元素

 scala代码

package com.imooc.scala.batch

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

object BatchFirstNScala 
  def main(args: Array[String]): Unit = 
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    val data: ListBuffer[(Int, String)] = ListBuffer[Tuple2[Int, String]]()

    data.append((2,"zs"))
    data.append((4,"ls"))
    data.append((3,"ww"))
    data.append((1,"aw"))
    data.append((1,"xw"))
    data.append((1,"mw"))

    import org.apache.flink.api.scala._
    val text: DataSet[(Int, String)] = env.fromCollection(data)

    //获取前三条数据
//    text.first(3).print()

    //根据数据中的第一列进行分组,获取每组的前2个元素
//    text.groupBy(0).first(2).print()

    //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
    //分组排序取TopN
    text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print()
  

 Java代码

package com.imooc.java.batch;

import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;

public class BatchFirstNJava 
    public static void main(String[] args) throws Exception 
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<Integer,String>(2,"zs"));
        data.add(new Tuple2<Integer,String>(4,"ls"));
        data.add(new Tuple2<Integer,String>(3,"ww"));
        data.add(new Tuple2<Integer,String>(1,"aw"));
        data.add(new Tuple2<Integer,String>(1,"xw"));
        data.add(new Tuple2<Integer,String>(1,"mw"));
        DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);

        //获取前3条数据,按照数据插入的顺序
//        text.first(3).print();

//        text.groupBy(0).first(2).print();
        //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
        text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
    

三、DataSet API之DataSink

Flink针对DataSet提供了一些已经实现好的数据目的地
其中最常见的是向HDFS中写入数据

(1)writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
(2)writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的toString()方法
(3)还有一个是print:打印每个元素的toString()方法的值,这个print是测试的时候使用的。

以上是关于Flink04: Flink核心API之DataSet的主要内容,如果未能解决你的问题,请参考以下文章

Flink之CEP-API分析

学习笔记Flink—— Flink基础API及核心数据结构

学习笔记Flink—— Flink基础API及核心数据结构

流式计算-Flink Stream API 篇二

Flink 1.13,面向流批一体的运行时与 DataStream API 优化

day04_Flink高级API