就够了—JDBC保姆级教程

Posted 爱笑的钮布尼茨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了就够了—JDBC保姆级教程相关的知识,希望对你有一定的参考价值。

目录

一、JDBC概述

基本介绍

JDBC相关API

二、连接mysql数据库

准备工作

JDBC程序编写步骤

五种连接数据库的方式 

三、ResultSet(结果集)

基本介绍

四、Statement 和 PreparedStatement

Statement

PreparedStatement 

五、事务

基本介绍

六、批处理

基本介绍

七、数据库连接池

传统获取Connection问题分析

数据库连接池种类

Druid数据库连接池

八、Apache-DBUtils类库

基本介绍

使用DBUtils类库进行查询操作

使用DBUtils类库进行增删改操作

九、最后的话


一、JDBC概述

基本介绍

        1. JDBC为访问不同的数据库提供了统一的接口,为使用者屏蔽了细节问题。

        2. Java程序员使用JDBC,可以连接任何提供了JDBC驱动程序的数据库系统,从而完成对数据库的各种操作。 

        3. JDBC基本原理示意图

        4. JDBC是Java提供一套用于数据库操作的接口API,Java程序员只需要面向这套接口编程即可。不同的数据库厂商,需要针对这套接口,提供不同实现。

JDBC相关API


二、连接mysql数据库

准备工作

        1. 创建一个 lib 目录

        2. 将 mysql-connector-java .jar  驱动复制进去(点击蓝色字体即可下载)

        3. 右键添加到 Library

JDBC程序编写步骤

        1. 注册驱动 - 加载 Driver 类

        2. 获取连接 - 得到 Connection

        3. 执行增删改查 - 发送 SQL 给mysql执行

        4. 释放资源 - 关闭相关连接

五种连接数据库的方式 

        方式一:

package JDBC.Linkedways;

import com.mysql.jdbc.Driver;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class linkedways1 
    public static void main(String[] args) throws SQLException 
        //前提准备:导入mysql驱动

        //1.注册驱动
        Driver driver = new Driver();

        //2. 获取mysql连接
        // mysql连接地址  jdbc:mysql://IP:端口/数据库
        //此处我连接的时本地数据库,可以指定 ip 地址连接
        String url = "jdbc:mysql://localhost:3306/test";
        // Properties 文件存储用户名和密码
        Properties properties = new Properties();
        // user - 用户名   password — 密码  按此要求存入用户名和密码
        properties.setProperty("user", "root");
        properties.setProperty("password", "123456");
        // 按照指定的 url 和 Properties 获取连接
        Connection connect = driver.connect(url, properties);

        //3.执行增删改查
        //mysql语句
        String sql = "insert into jdbc values (1,'Mike',19)";

        Statement statement = connect.createStatement();
        //返回该指令影响的行数,为0则代表未执行成功
        int i = statement.executeUpdate(sql);
        System.out.println(i > 0 ? "yes" : "no");
        
        // 4. 释放资源
        connect.close();
        statement.close();

    

        方式二:

package JDBC.Linkedways;

import com.mysql.jdbc.Driver;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class linkedways2 
    public static void main(String[] args) throws ClassNotFoundException, InstantiationException, IllegalAccessException, SQLException 
        //1. 加载 Driver 类 ,此处用类加载
        Class<?> aClass = Class.forName("com.mysql.jdbc.Driver");
        Driver driver = (Driver) aClass.newInstance();

        //2. 获取mysql连接
        String url = "jdbc:mysql://localhost:3306/test";
        Properties properties = new Properties();
        properties.setProperty("user", "root");
        properties.setProperty("password", "123456");
        Connection connect = driver.connect(url, properties);

        //3. 执行mysql语句
        Statement statement = connect.createStatement();
        String sql = "update jdbc set age=age+1 where id = 1";
        int i = statement.executeUpdate(sql);
        System.out.println(i);

        //4. 关闭资源
        connect.close();
        statement.close();
    

        方式三:

package JDBC.Linkedways;

import com.mysql.jdbc.Driver;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;


public class linkedways3 
    public static void main(String[] args) throws ClassNotFoundException, InstantiationException, IllegalAccessException, SQLException 
        //1. 加载 Driver 类
        Class<?> aClass = Class.forName("com.mysql.jdbc.Driver");
        Driver driver = (Driver) aClass.newInstance();

        //2. 连接mysql数据库
        String url = "jdbc:mysql://localhost:3306/test";
        //利用 Properties 文件获取用户名和密码
//        Properties properties = new Properties();
//        properties.setProperty("user", "root");
//        properties.setProperty("password", "123456");
//        Connection connection = DriverManager.getConnection(url, properties);

        //直接利用变量获取用户名和密码
        String user = "root";
        String password = "123456";
        DriverManager.registerDriver(driver);//可省略
//        com.mysql.jdbc.Driver //追进 Driver 类 可以发现在类加载的时候会自动注册
        /*
                static 
                        try 
                            DriverManager.registerDriver(new Driver());
                         catch (SQLException var1) 
                            throw new RuntimeException("Can't register driver!");
                        
                    
         */
        //利用 DriverManager 类的 getConnection() 方法
        Connection connection = DriverManager.getConnection(url, user, password);

        //3. 执行mysql命令
        String sql = "insert into jdbc values (2,'Milan',20)";

        Statement statement = connection.createStatement();

        statement.executeUpdate(sql);

        //4. 关闭资源
        statement.close();
        connection.close();

    

        方式四:

package JDBC.Linkedways;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class linkedways4 
    public static void main(String[] args) throws ClassNotFoundException, SQLException 
        // 1. 加载 Driver 类
        Class.forName("com.mysql.jdbc.Driver");
//        com.mysql.jdbc.Driver //追进Driver类 可以发现在类加载的时候会自动注册
        /*
                static 
                    try 
                        DriverManager.registerDriver(new Driver());
                     catch (SQLException var1) 
                        throw new RuntimeException("Can't register driver!");
                    
                
         */
        // 2. 连接mysql
        String url = "jdbc:mysql://localhost:3306/test";
        String user = "root";
        String password = "123456";
        Connection connection = DriverManager.getConnection(url, user, password);

        // 3. 执行mysql语句
        String sql = "delete from jdbc where id = 4";
        Statement statement = connection.createStatement();
        statement.executeUpdate(sql);

        // 4. 关闭资源
        statement.close();
        connection.close();

    

        方式五:

package JDBC.Linkedways;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class linkedways5 
    public static void main(String[] args) throws IOException, ClassNotFoundException, SQLException 
        //在4的基础上进行改进,通过读取外部配置文件来读取需要的数据,而不是直接在代码里写
        //这样我们可以在配置文件修改信息而程序不需要重新编译
        //准备工作
        //将外部配置文件读取进内存 FileInputStream
        Properties properties = new Properties();
        properties.load(new FileInputStream(new File("src/mysql.properties")));
        String url = properties.getProperty("url");
        String user = properties.getProperty("user");
        String password = properties.getProperty("password");
        String driver = properties.getProperty("driver");
        //1. 加载Driver
        Class.forName(driver);//可不写
        /**
         * 1. mysql驱动 5.1.6 可以无需Class.forName(driver);
         * 2. 从JDK1.5以后使用的jdbc4,不再需要显示调用 Class.forName(driver) 注册驱动
         *    而是自动调用驱动jar包下的META-INF\\java.sql.Driver 文本中的类名称去注册
         * 3. 建议还是写上,这样会更加明确。
         */

        //2. 连接mysql
        Connection connection = DriverManager.getConnection(url, user, password);

        //3. 执行mysql语句
        String sql = "insert into jdbc values(4,'ise',20)";
        Statement statement = connection.createStatement();
        int i = statement.executeUpdate(sql);

        //4. 关闭资源
        statement.close();
        connection.close();
    

#mysql.properties配置文件
#建议放在src文件夹下
#url=jdbc:mysql://ip:端口/数据库?rewriteBatchedStatements=true
url=jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true
#user=用户名
user=root
#password=密码
password=123456
#driver=对应数据库对应的Driver类
driver=com.mysql.jdbc.Driver

温馨提示:推荐用最后一种,灵活度比较高而且操作简单。


三、ResultSet(结果集)

基本介绍

        1. 表示数据库结果集的数据表,通常通过执行查询数据库的语句生成。

        2. ResultSet对象保持一个光标指向其当前数据行。最初,光标位于第一行之前。

        3. next 方法将光标移动到下一行,并且由于在ResultSet 对象中没有更多行时返回false,因此可以在while循环中使用循环来遍历结果集。

代码演示:

package JDBC.ResultSet;

import java.io.File;
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;

public class ResultSet01 
    public static void main(String[] args) throws Exception 
        //准备工作
        Properties properties = new Properties();
        properties.load(new FileInputStream(new File("src//mysql.properties")));
        String url = properties.getProperty("url");
        String driver = properties.getProperty("driver");
        String user = properties.getProperty("user");
        String password = properties.getProperty("password");
        //1. loaded driver
        Class.forName(driver);

        //2. linked mysql
        Connection connection = DriverManager.getConnection(url, user, password);

        //3. do mysql_order
        PreparedStatement preparedStatement = connection.prepareStatement("select * from jdbc ");
        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()) 
            System.out.println(resultSet.getInt(1) + "    " +
                    resultSet.getString(2) + "    " +
                    resultSet.getInt(3));
        
        //4. close
        connection.close();
        preparedStatement.close();
        resultSet.close();

    


四、Statement 和 PreparedStatement

Statement

        1. Statement 对象用于执行静态 SQL 语句并返回其生成的结果的对象。

        2. Statement 对象执行 SQL 语句,存在 SQL 注入风险。

        3. SQL注入是利用某些系统没有对用户输入的数据进行充分的检查,而在用户输入数据中注入非法的 SQL 语句段或命令,恶意攻击数据库。

        4. 要防范 SQL 注入,只要用 PreparedStatement 取代 Statement 就可以了。

代码演示:

package JDBC.Statement;

import java.io.FileInputStream;
import java.io.IOException;
import java.sql.*;
import java.util.Properties;

public class Statement01 
    public static void main(String[] args) throws SQLException, IOException, ClassNotFoundException 
        //准备工作
        Properties properties = new Properties();
        properties.load(new FileInputStream("src//mysql.properties"));
        String url = properties.getProperty("url");
        String user = properties.getProperty("user");
        String password = properties.getProperty("password");
        String driver = properties.getProperty("driver");

        String name = "1' or"; //
        String pwd = "or '1'= '1"; // 万能密码

        //加载Driver
        Class.forName(driver); // 可省略

        //获取mysql连接
        Connection connection = DriverManager.getConnection(url, user, password);

        //执行mysql命令
        Statement statement = connection.createStatement();

        //SQL注入 通过插入一些非法语句使得验证条件被屏蔽
        String sql = "select * from users where name ='"
                + user + "' and password = '" + pwd + "'";

        ResultSet resultSet = statement.executeQuery(sql);
        while (resultSet.next()) 
            System.out.print(resultSet.getInt(1) + "\\t");
            System.out.print(resultSet.getString(2) + "\\t\\t");
            System.out.println(resultSet.getString(3));
        

        //关闭资源
        resultSet.close();
        statement.close();
        connection.close();
    

PreparedStatement 

        1. PreparedStatement 执行的 SQL 语句中的参数用问号表示,调用PreparedStatement 对象的 setXxx() 方法来设置这些参数。setXxx() 方法有两个参数,第一个参数是要设置的 SQL 语句中的参数的索引(从1 开始),第二个是设置的 SQL 语句中参数的值。

        2. 调用 executeQuery() ,执行查询操作,返回ResultSet对象

        3. 调用 executeUpdate() ,执行增,删,改操作。

        4. 使用预处理可以不再使用 + 拼接sql语句,减少语法错误,而且可以有效的解决了SQL注入问题,还能大大减少编译次数,效率较高。通常情况下,我们推荐使用PreparedStatement。

代码演示:

package JDBC.PreparedStatement;

import java.io.FileInputStream;
import java.io.IOException;
import java.sql.*;
import java.util.Properties;

public class PStatement_02 
    public static void main(String[] args) throws SQLException, IOException, ClassNotFoundException 
        //准备工作
        Properties properties = new Properties();
        properties.load(new FileInputStream("src//mysql.properties"));
        String url = properties.getProperty("url");
        String user = properties.getProperty("user");
        String password = properties.getProperty("password");
        String driver = properties.getProperty("driver");

        String name = "1' or"; //
        String pwd = "or '1'= '1"; // 万能密码

        //加载Driver
        Class.forName(driver); // 可省略

        //获取mysql连接
        Connection connection = DriverManager.getConnection(url, user, password);

        //执行mysql命令
        String sql = "select * from users where name = ? and password = ?";

        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        //在preparedStatement 调用setXxx()时会检查插入的语句,以此避免SQL注入问题
        preparedStatement.setString(1, name);
        preparedStatement.setString(2, pwd);

        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()) 
            System.out.print(resultSet.getInt(1) + "\\t");
            System.out.print(resultSet.getString(2) + "\\t\\t");
            System.out.println(resultSet.getString(3));
        //此时无输出

        //关闭资源
        resultSet.close();
        preparedStatement.close();
        connection.close();
    


五、事务

基本介绍

        1. JDBC程序中当一个Connection 对象创建时,默认情况下是自动提交事务:每次执行一个SQL语句时,如果执行成功,就会向数据库自动提交,而不能回滚。

        2. JDBC程序中为了让多个SQL语句作为一个整体执行,需要使用事务。

        3. 调用Connection的setAutoCommit(false)可以取消自动提交事务。

        4. 在所有的SQL语句都成功执行后,调用Connecttion的commit() 方法,提交事务。

        5. 在其中某个操作失败或者出现异常时,调用Connection的rollback() 方法,回滚事务,默认回滚到事务开始时,也可以自己设置保存点。

 代码演示:

package JDBC.Transaction;

import JDBC.JDBCUtils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class transaction_01 
    public static void main(String[] args) throws SQLException 
        //由前面的学习过程我们能发现,在JDBC操作数据库时,
        //获得Connection连接和释放资源的操作是相同的
        //因此我们可以写一个工具类,专门用来做连接操作和关闭资源操作
 
        //调用 JDBCUtils 工具类获得连接
        Connection con = JDBCUtils.con();
        String create_Table = "create table account (id int,`name` varchar(33),money double)";
        String insert1 = "insert into account values(1,'mike',1000)";
        String insert2 = "insert into account values(2,'Alice',20000)";
        String money_sub = "update account set money = money - 100 where id = 1";
        String money_add = "update account set money = money + 100 where id = 2";
        PreparedStatement preparedStatement = null;

        try 
            preparedStatement = con.prepareStatement(create_Table);
            preparedStatement.execute();
            preparedStatement = con.prepareStatement(insert1);
            preparedStatement.executeUpdate();
            preparedStatement = con.prepareStatement(insert2);
            preparedStatement.executeUpdate();
            con.setAutoCommit(false);//开始事务
            preparedStatement = con.prepareStatement(money_sub);
            preparedStatement.executeUpdate();
//            int a = 1 / 0;
            preparedStatement = con.prepareStatement(money_add);
            preparedStatement.executeUpdate();
            con.commit();//所有操作完成,提交事务
         catch (SQLException e) 
            try 
                con.rollback();//若有异常事务回滚到开始时
             catch (SQLException ex) 
                throw new RuntimeException(ex);
            
            throw new RuntimeException(e);
         finally 
            //调用 JDBCUtils 工具类释放资源
            JDBCUtils.close(null, con, preparedStatement);
        
    

JDBCUtils工具类

        将获取连接和释放资源的过程封装成方法。

package JDBC;

import java.io.FileInputStream;
import java.io.IOException;
import java.sql.*;
import java.util.Properties;

public class JDBCUtils 
    public static String url;
    public static String user;
    public static String password;
    public static String driver;
    
    //类加载时自动调用
    static 
        Properties properties = new Properties();
        try 
            properties.load(new FileInputStream("src//mysql.properties"));
            url = properties.getProperty("url");
            user = properties.getProperty("user");
            password = properties.getProperty("password");
            driver = properties.getProperty("driver");
         catch (IOException e) 
            throw new RuntimeException(e);
        
    
    
    //获得一个连接
    public static Connection con() 
        try 
            return DriverManager.getConnection(url, user, password);
         catch (SQLException e) 
            throw new RuntimeException(e);
        
    
    
    //关闭传入的资源,资源不存在则传入null
    public static void close(ResultSet resultSet, Connection connection, Statement statement) 

        try 
            if (resultSet != null) 
                resultSet.close();
            
            if (connection != null) 
                connection.close();
            
            if (statement != null) 
                statement.close();
            


         catch (SQLException e) 
            throw new RuntimeException(e);
        
    



六、批处理

基本介绍

        1. 当需要成批插入或者更新时,可以采用Java的批量更新机制,这一机制允许多条语句一次性提交给数据库批量处理,通常情况下要比单独提交处理更有效率。

        2. JDBC的批量处理语句包括下面方法:

                addBatch():添加需要批量处理的SQL语句或参数

                executeBatch():执行批量处理语句

                clearBatch():清空批处理包的语句

        3. JDBC 连接MySQL时,如果要使用批处理功能,需要在url中添加参数?rewriteBatchedStatements=true(切记切记)

        4. 批处理往往和PreparedStatement一起搭配使用,既能减少编译次数,又减少运行次数,效率大大提高。

代码演示:

package JDBC.Batch_;

import JDBC.JDBCUtils;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Batch_01 

    @Test
    // 传统方法执行5000句SQL指令
    public void m1() throws SQLException 
        Connection con = JDBCUtils.con();
        String sql = "insert into account values (?,?,?)";
        PreparedStatement preparedStatement = con.prepareStatement(sql);
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 5000; i++) 
            preparedStatement.setInt(1, i);
            preparedStatement.setString(2, "jack" + i);
            preparedStatement.setInt(3, 22);
            preparedStatement.executeUpdate();
        
        long end = System.currentTimeMillis();
        System.out.println(end - begin);//耗时2281ms
        JDBCUtils.close(null, con, preparedStatement);
    

    @Test
    //批处理执行5000句SQL语句
    public void m2() throws SQLException 
        Connection con = JDBCUtils.con();
        String sql = "insert into account values (?,?,?)";
        PreparedStatement preparedStatement = con.prepareStatement(sql);
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 5000; i++) //执行5000次sql
            preparedStatement.setInt(1, i);
            preparedStatement.setString(2, "jack" + i);
            preparedStatement.setInt(3, 20);
            preparedStatement.addBatch();//往批处理包添加SQL语句
            if ((i + 1) % 1000 == 0) //批处理包添加1000条SQL语句后
                preparedStatement.executeBatch();//执行该批处理包
                preparedStatement.clearBatch();//清空该批处理包
            
        
        long end = System.currentTimeMillis();
        System.out.println(end - begin);//耗时51ms

        JDBCUtils.close(null, con, preparedStatement);
    




七、数据库连接池

传统获取Connection问题分析

        1. 传统的JDBC数据库连接使用DriverManager来获取,每次向数据库建立连接的时候都要将Connection加载到内存中,再验证IP地址,用户名和密码(0.05s - 1s 时间)。需要数据库连接时,就向数据库要求一个,频繁的进行数据库连接操作将占用很多的系统资源,容易造成服务器崩溃。

        2. 每一次数据库连接,使用完后都得断开,如果程序出现异常而未能关闭,将会导致数据库内存泄漏,最终将导致数据库重启。

        3. 传统获取连接方式,不能控制创建的连接数量,如连接过多,也可能导致内存泄漏,数据库崩溃。

        4. 为了解决传统开发中的数据库连接问题,可以采用数据库连接池技术。

数据库连接池种类

        JDBC的数据库连接池使用javax.sql.DataSource来表示,DataSource只是一个接口,该接口通常由第三方提供实现。

        常见的数据库连接池有下面几种。

        1. C3P0 数据库连接池,速度相对较慢,稳定性不错。

        2. DBCP 数据库连接池,速度相对C3P0较快,但不稳定。

        3. Proxool 数据库连接池,有监控连接池状态的功能,稳定性较C3P0差一点。

        4. BoneCP 数据库连接池,速度快。

        5. Druid(德鲁伊)数据库连接池,由阿里提供,集DBCP,C3P0,Proxool 优点于一身的数据库连接池。(后面主要介绍德鲁伊数据库连接池的使用)

Druid数据库连接池

准备工作:1.将 druid.jar 文件添加到lib目录(点击蓝色字体即可下载)

                  2. 右键选择 Add as Library

配置文件:1. 将下面配置信息存入 druid.properties 中

                  2. 将 druid.properties 存到 src 目录下

#配置文件,根据自己的具体需求设置
#driverClassName=对应数据库的驱动路径
driverClassName=com.mysql.jdbc.Driver
#url=jdbc:mysql://ip:端口/数据库?rewriteBatchedStatements=true
url=jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true
#username=你的用户名  password=对应的密码  
username=root
password=123456
#初始化连接数量
initialSize=20
#最小连接数量
minIdle=20
#最大连接数量
maxActive=50
#最大等待时间
maxWait=5000

代码演示:

package JDBC.Druid;

import JDBC.JDBCUtils;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import org.junit.jupiter.api.Test;

import javax.sql.DataSource;
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

//传统连接数据库和通过数据池连接对比

public class Druid01 
    @Test
    //连续连接数据库5000次,每次连接完不关闭
    //出现错误提示:Too many connections
    public void m1() 
        for (int i = 0; i < 5000; i++) 
            //错误信息
            // Data source rejected establishment of connection,
            // message from server: "Too many connections"
            Connection con = JDBCUtils.con();
        
    

    //连接5000次数据库,每次连接完都关闭
    @Test
    public void m2() throws SQLException 
        long l = System.currentTimeMillis();
        for (int i = 0; i < 5000; i++) 
            Connection con = JDBCUtils.con();
            con.close();
        
        long l1 = System.currentTimeMillis();
        System.out.println(l1 - l);//耗时4385ms
    

    //使用druid连接池连接数据库
    //获取5000次连接
    @Test
    public void m3() throws Exception 
        Properties properties = new Properties();
        properties.load(new FileInputStream("src//druid.properties"));

        DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
        long l = System.currentTimeMillis();
        for (int i = 0; i < 5000; i++) 
            Connection connection = dataSource.getConnection();
            connection.close();
        
        long l1 = System.currentTimeMillis();
        System.out.println(l1 - l);//耗时311ms
    


package JDBC.Druid;

import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;

public class Druid011 
    public static void main(String[] args) throws Exception 
        Properties properties = new Properties();
        properties.load(new FileInputStream("src//druid.properties"));

        DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);

        Connection connection = dataSource.getConnection();
        String sql = "select * from account";
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()) 
            System.out.print(resultSet.getInt(1) + "\\t");
            System.out.print(resultSet.getString(2) + "\\t");
            System.out.println(resultSet.getInt(3));
        

        connection.close();
        resultSet.close();
        preparedStatement.close();
    

JDBCUtilsByDruid工具类

        在JDBCUtils工具类的基础上,将基于Druid数据库连接池的获取连接和释放资源操作封装成方法。

package JDBC;

import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class JDBCUtilsByDruid 
    public static DataSource dataSource = null;
    public static Properties properties = null;


    static 
        try 
            properties = new Properties();
            properties.load(new FileInputStream("src//druid.properties"));
            dataSource = DruidDataSourceFactory.createDataSource(properties);
         catch (Exception e) 
            throw new RuntimeException(e);
        
    

    public static Connection getConnection() 
        try 
            return dataSource.getConnection();
         catch (SQLException e) 
            throw new RuntimeException(e);
        
    

    public static void close(ResultSet resultSet, Connection connection, Statement statement) 
        try 
            if (resultSet != null)
                resultSet.close();
            if (connection != null)
                connection.close();
            if (statement != null)
                statement.close();
         catch (SQLException e) 
            throw new RuntimeException(e);
        
    


八、Apache-DBUtils类库

基本介绍

        1. commons - dbutils 是Apache 组织提供的一个开源JDBC工具类库,它是对JDBC的封装,使用dbutils能极大简化jdbc编码的工作量。

        2. QueryRunner类:该类封装了SQL的执行,是线程安全的。可以实现增、删、改、查、批处理。

        3. ResultSetHandler接口:该接口用于处理 java.sql.ResultSet,将数据按要求转换为另一种形式。其部分实现类如下:

  

使用DBUtils类库进行查询操作

        

 准备工作:

        1. 将 commons-dbutils.jar 文件添加到lib目录中(点击蓝色字体即可下载)

        2. 右键选择 Add as Library

package JDBC.Druid;

import JDBC.JDBCUtils;
import JDBC.JDBCUtilsByDruid;
import JDBC.student;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.BeanHandler;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.apache.commons.dbutils.handlers.ScalarHandler;
import org.junit.jupiter.api.Test;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class Druid02 


    @Test
    //传统思路
    public void test1() throws SQLException 

        Connection con = JDBCUtils.con();

        PreparedStatement preparedStatement = con.prepareStatement("select * from jdbc");

        ResultSet resultSet = preparedStatement.executeQuery();
        ArrayList<student> students = new ArrayList<>();

        while (resultSet.next()) 
            int id = resultSet.getInt(1);
            String name = resultSet.getString(2);
            int age = resultSet.getInt(3);
            students.add(new student(id, name, age));
        
        JDBCUtils.close(resultSet, con, preparedStatement);
        for (student student : students) 
            System.out.println(student);
        


    
    //使用DBUtils类库进行查询操作
    // 查询多行多列数据
    @Test
    public void m1() throws SQLException 
        Connection connection = JDBCUtilsByDruid.getConnection();
        QueryRunner queryRunner = new QueryRunner();
        List<student> query = queryRunner.query(connection, "select * from jdbc",
                new BeanListHandler<>(student.class));
        /*
            注意:student类必须有无参构造器和 set 函数
            若无参构造器不存在,则无法实例化对象
            若set 不存在,则无法赋值,所有属性都会被赋一个默认值
        */
        System.out.println(query);

        JDBCUtilsByDruid.close(null, connection, null);
    

    @Test
    //查询单行数据
    public void m2() throws SQLException 
        Connection connection = JDBCUtilsByDruid.getConnection();
        QueryRunner queryRunner = new QueryRunner();
        student query = queryRunner.query(connection, "select * from jdbc where id = 1",
                new BeanHandler<>(student.class));
        System.out.println(query);
        JDBCUtilsByDruid.close(null, connection, null);
    

    @Test
    //返回单行单列数据
    public void m3() throws SQLException 
        Connection connection = JDBCUtilsByDruid.getConnection();
        QueryRunner queryRunner = new QueryRunner();
        Object query = queryRunner.query(connection, "select name from jdbc where id = 2", new ScalarHandler());
        System.out.println(query);
        JDBCUtilsByDruid.close(null, connection, null);
    

使用DBUtils类库进行增删改操作

package JDBC.Druid;

import JDBC.JDBCUtilsByDruid;
import org.apache.commons.dbutils.QueryRunner;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.SQLException;

public class Druid03 
    //测试插入数据
    @Test
    public void m1() throws SQLException 
        Connection connection = JDBCUtilsByDruid.getConnection();

        QueryRunner queryRunner = new QueryRunner();
        int update = queryRunner.update(connection, "insert into jdbc values(4,'erson',45)");
        System.out.println(update > 0 ? "yes" : "no");
        JDBCUtilsByDruid.close(null, connection, null);
    

    @Test
    //测试更改数据
    public void m2() throws SQLException 
        Connection connection = JDBCUtilsByDruid.getConnection();

        QueryRunner queryRunner = new QueryRunner();

        int update = queryRunner.update(connection, "update jdbc set name = 'Lihua' where id = 1 ");
        System.out.println(update > 0 ? "yes" : "no");
        JDBCUtilsByDruid.close(null, connection, null);
    

    @Test
    //测试删除数据
    public void m3() throws SQLException 
        Connection connection = JDBCUtilsByDruid.getConnection();
        QueryRunner queryRunner = new QueryRunner();
        int update = queryRunner.update(connection, "delete from jdbc where id =4");
        System.out.println(update > 0 ? "yes" : "no");
        JDBCUtilsByDruid.close(null, connection, null);
    


九、最后的话

✨  原创不易,还希望各位大佬支持一下


👍  点赞,你的认可是我创作的动力!


⭐️  收藏,你的青睐是我努力的方向!


✏️  评论,你的意见是我进步的财富!

Flink保姆级教程,超全五万字,学习与面试收藏这一篇就够了

本文目录:

一、Flink简介
二、Flink 部署及启动
三、Flink 运行架构
四、Flink 算子大全
五、流处理中的 Time 与 Window
六、Flink 状态管理
七、Flink 容错
八、Flink SQL
九、Flink CEP
十、Flink CDC
十一、基于 Flink 构建全场景实时数仓
十二、Flink 大厂面试题

Flink 涉及的知识点如下图所示,本文将逐一讲解:

本文档参考了 Flink 的官网及其他众多资料整理而成,为了整洁的排版及舒适的阅读,对于模糊不清晰的图片及黑白图片进行重新绘制成了高清彩图

本文超长,获取本文完整PDF文档,带目录超全总结,请扫码关注公众号【五分钟学大数据】,后台发送:flink pdf,即可下载带目录的完整版flink文档:

正文开始:

一、Flink 简介

1. Flink 发展

这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有 Hadoop、Storm,以及后来的 Spark,他们都有着各自专注的应用场景。Spark 掀开了内存计算的先河,也以内存为赌注,赢得了内存计算的飞速发展。Spark 的火热或多或少的掩盖了其他分布式计算的系统身影。就像 Flink,也就在这个时候默默的发展着。

在国外一些社区,有很多人将大数据的计算引擎分成了 4 代,当然,也有很多人不会认同。我们先姑且这么认为和讨论。

首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。这里大家应该都不会对 MapReduce 陌生,它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。

由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。

接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的 Job。

随着第三代计算引擎的出现,促进了上层应用快速发展,例如各种迭代计算的性能以及对流计算和 SQL 等的支持。Flink 的诞生就被归在了第四代。这应该主要表现在 Flink 对流计算的支持,以及更一步的实时性上面。当然 Flink 也可以支持 Batch 的任务,以及 DAG 的运算。

总结

第 1 代:Hadoop MapReduc 批处理 Mapper、Reducer 2;

第 2 代:DAG 框架(Oozie 、Tez),Tez + MapReduce 批处理 1 个 Tez = MR(1) + MR(2) + ... + MR(n) 相比 MR 效率有所提升;

第 3 代:Spark 批处理、流处理、SQL 高层 API 支持 自带 DAG 内存迭代计算、性能较之前大幅提;

第 4 代:Flink 批处理、流处理、SQL 高层 API 支持 自带 DAG 流式计算性能更高、可靠性更高。

2. 什么是 Flink

Flink 起源于 Stratosphere 项目,Stratosphere 是在 2010~2014 年由 3 所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014 年 4 月 Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会,参加这个孵化项目的初始成员是 Stratosphere 系统的核心开发人员,2014 年 12 月,Flink 一跃成为 Apache 软件基金会的顶级项目。

在德语中,Flink 一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为 logo,这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,而 Flink 的松鼠 logo 拥有可爱的尾巴,尾巴的颜色与 Apache 软件基金会的 logo 颜色相呼应,也就是说,这是一只 Apache 风格的松鼠。

Flink 主页在其顶部展示了该项目的理念:“Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

3. Flink 流处理特性

  1. 支持高吞吐、低延迟、高性能的流处理

  2. 支持带有事件时间的窗口(Window)操作

  3. 支持有状态计算的 Exactly-once 语义

  4. 支持高度灵活的窗口(Window)操作,支持基于 time、count、session,以及 data-driven 的窗口操作

  5. 支持具有 Backpressure 功能的持续流模型

  6. 支持基于轻量级分布式快照(Snapshot)实现的容错

  7. 一个运行时同时支持 Batch on Streaming 处理和 Streaming 处理

  8. Flink 在 JVM 内部实现了自己的内存管理

  9. 支持迭代计算

  10. 支持程序自动优化:避免特定情况下 Shuffle、排序等昂贵操作,中间结果有必要进行缓存

4. Flink 基石

Flink 之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window

首先是 Checkpoint 机制,这是 Flink 最重要的一个特性。Flink 基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。Chandy-Lamport 算法实际上在 1985 年的时候已经被提出来,但并没有被很广泛的应用,而 Flink 则把这个算法发扬光大了。

Spark 最近在实现 Continue streaming,Continue streaming 的目的是为了降低它处理的延时,其也需要提供这种一致性的语义,最终采用 Chandy-Lamport 这个算法,说明 Chandy-Lamport 算法在业界得到了一定的肯定。

提供了一致性的语义之后,Flink 为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的 State API,包括里面的有 ValueState、ListState、MapState,近期添加了 BroadcastState,使用 State API 能够自动享受到这种一致性的语义。

除此之外,Flink 还实现了 Watermark 的机制,能够支持基于事件的时间的处理,或者说基于系统时间的处理,能够容忍数据的延时、容忍数据的迟到、容忍乱序的数据。

另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink 提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

5. 批处理与流处理

批处理的特点是有界、持久、大量,批处理非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。流处理的特点是无界、实时,流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

在 Spark 生态体系中,对于批处理和流处理采用了不同的技术框架,批处理由 SparkSQL 实现,流处理由 Spark Streaming 实现,这也是大部分框架采用的策略,使用独立的处理器实现批处理和流处理,而 Flink 可以同时实现批处理和流处理。

Flink 是如何同时实现批处理与流处理的呢?答案是,Flink 将批处理(即处理有限的静态数据)视作一种特殊的流处理。

Flink 的核心计算架构是下图中的 Flink Runtime 执行引擎,它是一个分布式系统,能够接受数据流程序并在一台或多台机器上以容错方式执行。

Flink Runtime 执行引擎可以作为 YARN(Yet Another Resource Negotiator)的应用程序在集群上运行,也可以在 Mesos 集群上运行,还可以在单机上运行(这对于调试 Flink 应用程序来说非常有用)。

上图为 Flink 技术栈的核心组成部分,值得一提的是,Flink 分别提供了面向流式处理的接口(DataStream API)和面向批处理的接口(DataSet API)。因此,Flink 既可以完成流处理,也可以完成批处理。Flink 支持的拓展库涉及机器学习(FlinkML)、复杂事件处理(CEP)、以及图计算(Gelly),还有分别针对流处理和批处理的 Table API。

能被 Flink Runtime 执行引擎接受的程序很强大,但是这样的程序有着冗长的代码,编写起来也很费力,基于这个原因,Flink 提供了封装在 Runtime 执行引擎之上的 API,以帮助用户方便地生成流式计算程序。Flink 提供了用于流处理的 DataStream API 和用于批处理的 DataSet API。值得注意的是,尽管 Flink Runtime 执行引擎是基于流处理的,但是 DataSet API 先于 DataStream API 被开发出来,这是因为工业界对无限流处理的需求在 Flink 诞生之初并不大。

DataStream API 可以流畅地分析无限数据流,并且可以用 Java 或者 Scala 等来实现。开发人员需要基于一个叫 DataStream 的数据结构来开发,这个数据结构用于表示永不停止的分布式数据流。

Flink 的分布式特点体现在它能够在成百上千台机器上运行,它将大型的计算任务分成许多小的部分,每个机器执行一部分。Flink 能够自动地确保发生机器故障或者其他错误时计算能够持续进行,或者在修复 bug 或进行版本升级后有计划地再执行一次。这种能力使得开发人员不需要担心运行失败。Flink 本质上使用容错性数据流,这使得开发人员可以分析持续生成且永远不结束的数据(即流处理)。

二、Flink 部署及启动

Flink 支持多种安装模式:

  1. local(本地)——单机模式,一般不使用;

  2. standalone——独立模式,Flink 自带集群,开发测试环境使用;

  3. yarn——计算资源统一由 Hadoop YARN 管理,生产环境使用。

Flink 集群的安装不属于本文档的范畴,如安装 Flink,可自行搜索资料进行安装。

本节重点在 Flink 的 Yarn 部署模式。

在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload,可以使用 YARN 来管理所有计算资源。

1. Flink 在 Yarn 上的部署架构

从图中可以看出,Yarn 的客户端需要获取 hadoop 的配置信息,连接 Yarn 的 ResourceManager。所以要设置 YARN_CONF_DIR 或者 HADOOP_CONF_DIR 或者 HADOOP_CONF_PATH,只要设置了其中一个环境变量,就会被读取。如果读取上述的变量失败了,那么将会选择 hadoop_home 的环境变量,会尝试加载$HADOOP_HOME/etc/hadoop 的配置文件。

  1. 当启动一个 Flink Yarn 会话时,客户端首先会检查本次请求的资源(存储、计算)是否足够。资源足够将会上传包含 HDFS 及 Flink 的配置信息和 Flink 的 jar 包到 HDFS;

  2. 客户端向 RM 发起请求;

  3. RM 向 NM 发请求指令,创建 container,并从 HDFS 中下载 jar 以及配置文件;

  4. 启动 ApplicationMaster 和 jobmanager,将 jobmanager 的地址信息写到配置文件中,再发到 hdfs 上;

  5. 同时,AM 向 RM 发送心跳注册自己,申请资源(cpu、内存);

  6. 创建 TaskManager 容器,从 HDFS 中下载 jar 包及配置文件并启动;

  7. 各 task 任务通过 jobmanager 汇报自己的状态和进度,AM 和 jobmanager 在一个容器上,AM 就能掌握各任务的运行状态,从而可以在任务失败时,重新启动任务;

  8. 任务完成后,AM 向 RM 注销并关闭自己;

2. 启动集群

  1. 修改 hadoop 的配置参数:vim etc/hadoop/yarn-site.xml

添加:

<property>
         <name>yarn.nodemanager.vmem-check-enabled</name>
         <value>false</value>
</property>

修改 Hadoop 的 yarn-site.xml,添加该配置表示内存超过分配值,是否将任务杀掉。

默认为 true。运行 Flink 程序,很容易内存超标,这个时候 yarn 会自动杀掉 job。

  1. 修改全局变量 /etc/profile

添加:export HADOOP_CONF_DIR=/export/servers/hadoop/etc/Hadoop

YARN_CONF_DIR 或者 HADOOP_CONF_DIR 必须将环境变量设置为读取 YARN 和 HDFS 配置

  1. 启动 HDFS、zookeeper(如果是外置 zookeeper)、YARN 集群;

  2. 使用 yarn-session 的模式提交作业。

Yarn Session 模式提交作业有两种方式:yarn-session 和 yarn-cluster

3. 模式一: yarn-session

特点:

  1. 使用 Flink 中的 yarn-session(yarn 客户端),会启动两个必要服务 JobManager 和 TaskManagers;

  2. 客户端通过 yarn-session 提交作业;

  3. yarn-session 会一直启动,不停地接收客户端提交的任务;

  4. 如果拥有有大量的小作业,适合使用这种方式。

在 flink 目录启动 yarn-session:

bin/yarn-session.sh -n 2 -tm 800 -jm 800 -s 1 -d

-n 表示申请 2 个容器
-s 表示每个容器启动多少个 slot 离模式,表示以后台程
-tm 表示每个 TaskManager 申请 800M 内存
-d 分序方式运行

使用 flink 提交任务:

bin/flink run examples/batch/WordCount.jar

如果程序运行完了,可以使用 yarn application -kill application_id 杀掉任务:

yarn application -kill application_1554377097889_0002

bin/yarn-session.sh -n 2 -tm 800 -s 1 -d 意思是:

同时向 Yarn 申请 3 个 container(即便只申请了两个,因为 ApplicationMaster 和 Job Manager 有一个额外的容器。一旦将 Flink 部署到 YARN 群集中,它就会显示 Job Manager 的连接详细信息),其中 2 个 Container 启动 TaskManager(-n 2),每个 TaskManager 拥有两个 Task Slot(-s 1),并且向每个 TaskManager 的 Container 申请 800M 的内存,以及一个 ApplicationMaster(Job Manager)。

4. 模式二: yarn-cluster

特点:

  1. 直接提交任务给 YARN;

  2. 大作业,适合使用这种方式;

  3. 会自动关闭 session。

使用 flink 直接提交任务:

bin/flink run -m yarn-cluster -yn 2 -yjm 800 -ytm 800 /export/servers/flink-1.6.0/examples/batch/WordCount.jar

-yn 表示 TaskManager 的个数

注意:

  1. 在创建集群的时候,集群的配置参数就写好了,但是往往因为业务需要,要更改一些配置参数,这个时候可以不必因为一个实例的提交而修改 conf/flink-conf.yaml;

可以通过:-D <arg> Dynamic properties来覆盖原有的配置信息:比如:

-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

  1. 如果使用的是 flink on yarn 方式,想切换回 standalone 模式的话,需要删除:/tmp/.yarn-properties-root,因为默认查找当前 yarn 集群中已有的 yarn-session 信息中的 jobmanager。

三、Flink 运行架构

1. Flink 程序结构

Flink 程序的基本构建块是流和转换(请注意,Flink 的 DataSet API 中使用的 DataSet 也是内部流 )。从概念上讲,流是(可能永无止境的)数据记录流,而转换是将一个或多个流作为一个或多个流的操作。输入,并产生一个或多个输出流。

Flink 应用程序结构就是如上图所示:

Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、RabbitMQ 等,当然你也可以定义自己的 source。

Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select等,操作很多,可以将数据转换计算成你想要的数据。

Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

2. Flink 并行数据流

Flink 程序在执行的时候,会被映射成一个 Streaming Dataflow,一个 Streaming Dataflow 是由一组 Stream 和 Transformation Operator 组成的。在启动时从一个或多个 Source Operator 开始,结束于一个或多个 Sink Operator。

Flink 程序本质上是并行的和分布式的,在执行过程中,一个流(stream)包含一个或多个流分区,而每一个 operator 包含一个或多个 operator 子任务。操作子任务间彼此独立,在不同的线程中执行,甚至是在不同的机器或不同的容器上。operator 子任务的数量是这一特定 operator 的并行度。相同程序中的不同 operator 有不同级别的并行度。

一个 Stream 可以被分成多个 Stream 的分区,也就是 Stream Partition。一个 Operator 也可以被分为多个 Operator Subtask。如上图中,Source 被分成 Source1 和 Source2,它们分别为 Source 的 Operator Subtask。每一个 Operator Subtask 都是在不同的线程当中独立执行的。一个 Operator 的并行度,就等于 Operator Subtask 的个数。上图 Source 的并行度为 2。而一个 Stream 的并行度就等于它生成的 Operator 的并行度。

数据在两个 operator 之间传递的时候有两种模式:

One to One 模式:两个 operator 用此模式传递的时候,会保持数据的分区数和数据的排序;如上图中的 Source1 到 Map1,它就保留的 Source 的分区特性,以及分区元素处理的有序性。

Redistributing (重新分配)模式:这种模式会改变数据的分区数;每个一个 operator subtask 会根据选择 transformation 把数据发送到不同的目标 subtasks,比如 keyBy()会通过 hashcode 重新分区,broadcast()和 rebalance()方法会随机重新分区;

3. Task 和 Operator chain

Flink的所有操作都称之为Operator,客户端在提交任务的时候会对Operator进行优化操作,能进行合并的Operator会被合并为一个Operator,合并后的Operator称为Operator chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。

4. 任务调度与执行

  1. 当Flink执行executor会自动根据程序代码生成DAG数据流图;

  2. ActorSystem创建Actor将数据流图发送给JobManager中的Actor;

  3. JobManager会不断接收TaskManager的心跳消息,从而可以获取到有效的TaskManager;

  4. JobManager通过调度器在TaskManager中调度执行Task(在Flink中,最小的调度单元就是task,对应就是一个线程);

  5. 在程序运行过程中,task与task之间是可以进行数据传输的。

Job Client

  1. 主要职责是提交任务, 提交后可以结束进程, 也可以等待结果返回;

  2. Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点;

  3. Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。执行完成后,Job Client 将结果返回给用户。

JobManager

  1. 主要职责是调度工作并协调任务做检查点;

  2. 集群中至少要有一个 master,master 负责调度 task,协调checkpoints 和容错;

  3. 高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是standby;

  4. Job Manager 包含 Actor System、Scheduler、CheckPoint三个重要的组件;

  5. JobManager从客户端接收到任务以后, 首先生成优化过的执行计划, 再调度到TaskManager中执行。

TaskManager

  1. 主要职责是从JobManager处接收任务, 并部署和启动任务, 接收上游的数据并处理;

  2. Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点;

  3. TaskManager在创建之初就设置好了Slot, 每个Slot可以执行一个任务。

5. 任务槽和槽共享

每个TaskManager是一个JVM的进程, 可以在不同的线程中执行一个或多个子任务。为了控制一个worker能接收多少个task。worker通过task slot来进行控制(一个worker至少有一个task slot)。

1) 任务槽

每个task slot表示TaskManager拥有资源的一个固定大小的子集。

flink将进程的内存进行了划分到多个slot中。

图中有2个TaskManager,每个TaskManager有3个slot的,每个slot占有1/3的内存。

内存被划分到不同的slot之后可以获得如下好处:

  • TaskManager最多能同时并发执行的任务是可以控制的,那就是3个,因为不能超过slot的数量。

  • slot有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不受影响。

2) 槽共享

默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自同一个作业。结果是一个槽可以保存作业的整个管道。允许插槽共享有两个主要好处:

  • 只需计算Job中最高并行度(parallelism)的task slot,只要这个满足,其他的job也都能满足。

  • 资源分配更加公平,如果有比较空闲的slot可以将更多的任务分配给它。图中若没有任务槽共享,负载不高的Source/Map等subtask将会占据许多资源,而负载较高的窗口subtask则会缺乏资源。

  • 有了任务槽共享,可以将基本并行度(base parallelism)从2提升到6.提高了分槽资源的利用率。同时它还可以保障TaskManager给subtask的分配的slot方案更加公平。

四、Flink 算子大全

Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。

所以下面将Flink的算子分为两大类:一类是DataSet,一类是DataStream。

DataSet 批处理算子

一、Source算子

1. fromCollection

fromCollection:从本地集合读取数据

例:

val env = ExecutionEnvironment.getExecutionEnvironment
val textDataSet: DataSet[String] = env.fromCollection(
  List("1,张三", "2,李四", "3,王五", "4,赵六")
)

2. readTextFile

readTextFile:从文件中读取

val textDataSet: DataSet[String]  = env.readTextFile("/data/a.txt")

3. readTextFile:遍历目录

readTextFile可以对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式

val parameters = new Configuration
// recursive.file.enumeration 开启递归
parameters.setBoolean("recursive.file.enumeration", true)
val file = env.readTextFile("/data").withParameters(parameters)

4. readTextFile:读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩方法文件扩展名是否可并行读取
DEFLATE.deflateno
GZip.gz .gzipno
Bzip2.bz2no
XZ.xzno
val file = env.readTextFile("/data/file.gz")

二、Transform转换算子

因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此:

val env = ExecutionEnvironment.getExecutionEnvironment
val textDataSet: DataSet[String] = env.fromCollection(
  List("张三,1", "李四,2", "王五,3", "张三,4")
)

1. map

将DataSet中的每一个元素转换为另外一个元素

// 使用map将List转换为一个Scala的样例类

case class User(name: String, id: String)

val userDataSet: DataSet[User] = textDataSet.map {
  text =>
    val fieldArr = text.split(",")
    User(fieldArr(0), fieldArr(1))
}
userDataSet.print()

2. flatMap

将DataSet中的每一个元素转换为0...n个元素。

// 使用flatMap操作,将集合中的数据:
// 根据第一个元素,进行分组
// 根据第二个元素,进行聚合求值 

val result = textDataSet.flatMap(line => line)
      .groupBy(0) // 根据第一个元素,进行分组
      .sum(1) // 根据第二个元素,进行聚合求值
      
result.print()

3. mapPartition

将一个分区中的元素转换为另一个元素

// 使用mapPartition操作,将List转换为一个scala的样例类

case class User(name: String, id: String)

val result: DataSet[User] = textDataSet.mapPartition(line => {
      line.map(index => User(index._1, index._2))
    })
    
result.print()

4. filter

过滤出来一些符合条件的元素,返回boolean值为true的元素

val source: DataSet[String] = env.fromElements("java", "scala", "java")
val filter:DataSet[String] = source.filter(line => line.contains("java"))//过滤出带java的数据
filter.print()

5. reduce

可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素

// 使用 fromElements 构建数据源
val source = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
// 使用map转换成DataSet元组
val mapData: DataSet[(String, Int)] = source.map(line => line)
// 根据首个元素分组
val groupData = mapData.groupBy(_._1)
// 使用reduce聚合
val reduceData = groupData.reduce((x, y) => (x._1, x._2 + y._2))
// 打印测试
reduceData.print()

6. reduceGroup

将一个dataset或者一个group聚合成一个或多个元素
reduceGroup是reduce的一种优化方案;
它会先分组reduce,然后在做整体的reduce;这样做的好处就是可以减少网络IO

// 使用 fromElements 构建数据源
val source: DataSet[(String, Int)] = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
// 根据首个元素分组
val groupData = source.groupBy(_._1)
// 使用reduceGroup聚合
val result: DataSet[(String, Int)] = groupData.reduceGroup {
      (in: Iterator[(String, Int)], out: Collector[(String, Int)]) =>
        val tuple = in.reduce((x, y) => (x._1, x._2 + y._2))
        out.collect(tuple)
    }
// 打印测试
result.print()

7. minBy和maxBy

选择具有最小值或最大值的元素

// 使用minBy操作,求List中每个人的最小值
// List("张三,1", "李四,2", "王五,3", "张三,4")

case class User(name: String, id: String)
// 将List转换为一个scala的样例类
val text: DataSet[User] = textDataSet.mapPartition(line => {
      line.map(index => User(index._1, index._2))
    })
    
val result = text
          .groupBy(0) // 按照姓名分组
          .minBy(1)   // 每个人的最小值

8. Aggregate

在数据集上进行聚合求最值(最大值、最小值)

val data = new mutable.MutableList[(Int, String, Double)]
    data.+=((1, "yuwen", 89.0))
    data.+=((2, "shuxue", 92.2))
    data.+=((3, "yuwen", 89.99))
// 使用 fromElements 构建数据源
val input: DataSet[(Int, String, Double)] = env.fromCollection(data)
// 使用group执行分组操作
val value = input.groupBy(1)
            // 使用aggregate求最大值元素
            .aggregate(Aggregations.MAX, 2) 
// 打印测试
value.print()       

Aggregate只能作用于元组上

注意:
要使用aggregate,只能使用字段索引名或索引名称来进行分组 groupBy(0) ,否则会报一下错误:
Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.

9. distinct

去除重复的数据

// 数据源使用上一题的
// 使用distinct操作,根据科目去除集合中重复的元组数据

val value: DataSet[(Int, String, Double)] = input.distinct(1)
value.print()

10. first

取前N个数

input.first(2) // 取前两个数

11. join

将两个DataSet按照一定条件连接到一起,形成新的DataSet

// s1 和 s2 数据集格式如下:
// DataSet[(Int, String,String, Double)]

 val joinData = s1.join(s2)  // s1数据集 join s2数据集
             .where(0).equalTo(0) {     // join的条件
      (s1, s2) => (s1._1, s1._2, s2._2, s1._3)
    }

12. leftOuterJoin

左外连接,左边的Dataset中的每一个元素,去连接右边的元素

此外还有:

rightOuterJoin:右外连接,左边的Dataset中的每一个元素,去连接左边的元素

fullOuterJoin:全外连接,左右两边的元素,全部连接

下面以 leftOuterJoin 进行示例:

 val data1 = ListBuffer[Tuple2[Int,String]]()
    data1.append((1,"zhangsan"))
    data1.append((2,"lisi"))
    data1.append((3,"wangwu"))
    data1.append((4,"zhaoliu"))

val data2 = ListBuffer[Tuple2[Int,String]]()
    data2.append((1,"beijing"))
    data2.append((2,"shanghai"))
    data2.append((4,"guangzhou"))

val text1 = env.fromCollection(data1)
val text2 = env.fromCollection(data2)

text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
      if(second==null){
        (first._1,first._2,"null")
      }else{
        (first._1,first._2,second._2)
      }
    }).print()

13. cross

交叉操作,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集

和join类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存的操作

val cross = input1.cross(input2){
      (input1 , input2) => (input1._1,input1._2,input1._3,input2._2)
    }

cross.print()

14. union

联合操作,创建包含来自该数据集和其他数据集的元素的新数据集,不会去重

val unionData: DataSet[String] = elements1.union(elements2).union(elements3)
// 去除重复数据
val value = unionData.distinct(line => line)

15. rebalance

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况:

这个时候本来总体数据量只需要10分钟解决的问题,出现了数据倾斜,机器1上的任务需要4个小时才能完成,那么其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;所以在实际的工作中,出现这种情况比较好的解决方案就是接下来要介绍的—rebalance(内部使用round robin方法将数据均匀打散。这对于数据倾斜时是很好的选择。)

// 使用rebalance操作,避免数据倾斜
val rebalance = filterData.rebalance()

16. partitionByHash

按照指定的key进行hash分区

val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))

val collection = env.fromCollection(data)
val unique = collection.partitionByHash(1).mapPartition{
  line =>
    line.map(x => (x._1 , x._2 , x._3))
}

unique.writeAsText("hashPartition", WriteMode.NO_OVERWRITE)
env.execute()

17. partitionByRange

根据指定的key对数据集进行范围分区

val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))
data.+=((4, 3L, "Hello world, how are you?"))

val collection = env.fromCollection(data)
val unique = collection.partitionByRange(x => x._1).mapPartition(line => line.map{
  x=>
    (x._1 , x._2 , x._3)
})
unique.writeAsText("rangePartition", WriteMode.OVERWRITE)
env.execute()

18. sortPartition

根据指定的字段值进行分区的排序

val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))

val ds = env.fromCollection(data)
    val result = ds
      .map { x => x }.setParallelism(2)
      .sortPartition(1, Order.DESCENDING)//第一个参数代表按照哪个字段进行分区
      .mapPartition(line => line)
      .collect()

println(result)

三、Sink算子

1. collect

将数据输出到本地集合

result.collect()

2. writeAsText

将数据输出到文件

Flink支持多种存储设备上的文件,包括本地文件,hdfs文件等

Flink支持多种文件的存储格式,包括text文件,CSV文件等

// 将数据写入本地文件
result.writeAsText("/data/a", WriteMode.OVERWRITE)

// 将数据写入HDFS
result.writeAsText("hdfs://node01:9000/data/a", WriteMode.OVERWRITE)

DataStream流处理算子

和DataSet一样,DataStream也包括一系列的Transformation操作

一、Source算子

Flink可以使用 StreamExecutionEnvironment.addSource(source) 来为我们的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然我们也可以通过实现 SourceFunction 来自定义非并行的source或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

Flink在流处理上的source和在批处理上的source基本一致。大致有4大类:

  • 基于本地集合的source(Collection-based-source)

  • 基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回

  • 基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。

  • 自定义的source(Custom-source)

下面使用addSource将Kafka数据写入Flink为例:

如果需要外部数据源对接,可使用addSource,如将Kafka数据写入Flink, 先引入依赖:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

将Kafka数据写入Flink:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val source = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

基于网络套接字的:

val source = env.socketTextStream("IP", PORT)

二、Transform转换算子

1. map

将DataSet中的每一个元素转换为另外一个元素

dataStream.map { x => x * 2 }

2. FlatMap

采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数

dataStream.flatMap { str => str.split(" ") }

3. Filter

计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器

dataStream.filter { _ != 0 }

4. KeyBy

逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法。

此转换返回KeyedStream,其中包括使用被Keys化状态所需的KeyedStream。

dataStream.keyBy(0) 

5. Reduce

被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值

keyedStream.reduce { _ + _ }  

6. Fold

具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值

val result: DataStream[String] =  keyedStream.fold("start")((str, i) => { str + "-" + i }) 

// 解释:当上述代码应用于序列(1,2,3,4,5)时,输出结果“start-1”,“start-1-2”,“start-1-2-3”,...

7. Aggregations

在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。

keyedStream.sum(0);

keyedStream.min(0);

keyedStream.max(0);

keyedStream.minBy(0);

keyedStream.maxBy(0);

8. Window

可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。这里不再对窗口进行详解,有关窗口的完整说明,请查看这篇文章:Flink 中极其重要的 Time 与 Window 详细解析

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); 

9. WindowAll

Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。

注意:在许多情况下,这是非并行转换。所有记录将收集在windowAll 算子的一个任务中。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

10. Window Apply

将一般函数应用于整个窗口。

注意:如果您正在使用windowAll转换,则需要使用AllWindowFunction。

下面是一个手动求和窗口数据元的函数

windowedStream.apply { WindowFunction }

allWindowedStream.apply { AllWindowFunction }

11. Window Reduce

将函数缩减函数应用于窗口并返回缩小的值

windowedStream.reduce { _ + _ }

12. Window Fold

将函数折叠函数应用于窗口并返回折叠值

val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i }) 

// 上述代码应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”

13. Union

两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元

dataStream.union(otherStream1, otherStream2, ...)

14. Window Join

在给定Keys和公共窗口上连接两个数据流

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})

15. Interval Join

在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

am.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) 
    .upperBoundExclusive(true) 
    .lowerBoundExclusive(true) 
    .process(new IntervalJoinFunction() {...})

16. Window CoGroup

在给定Keys和公共窗口上对两个数据流进行Cogroup

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...})

17. Connect

“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态

DataStream<Integer> someStream = ... DataStream<String> otherStream = ... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream)

// ... 代表省略中间操作

18. CoMap,CoFlatMap

类似于连接数据流上的map和flatMap

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false)connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false)

19. Split

根据某些标准将流拆分为两个或更多个流

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    })      

20. Select

从拆分流中选择一个或多个流

SplitStream<Integer> split;DataStream<Integer> even = split.select("even");DataStream<Integer> odd = split.select("odd");DataStream<Integer> all = split.select("even","odd")

三、Sink算子

支持将数据输出到:

  • 本地文件(参考批处理)

  • 本地集合(参考批处理)

  • HDFS(参考批处理)

除此之外,还支持:

  • sink到kafka

  • sink到mysql

  • sink到redis

下面以sink到kafka为例:

val sinkTopic = "test"

//样例类
case class Student(id: Int, name: String, addr: String, sex: String)
val mapper: ObjectMapper = new ObjectMapper()

//将对象转换成字符串
def toJsonString(T: Object): String = {
    mapper.registerModule(DefaultScalaModule)
    mapper.writeValueAsString(T)
}

def main(args: Array[String]): Unit = {
    //1.创建流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.准备数据
    val dataStream: DataStream[Student] = env.fromElements(
      Student(8, "xiaoming", "beijing biejing", "female")
    )
    //将student转换成字符串
    val studentStream: DataStream[String] = dataStream.map(student =>
      toJsonString(student) // 这里需要显示SerializerFeature中的某一个,否则会报同时匹配两个方法的错误
    )
    //studentStream.print()
    val prop = new Properties()
    prop.setProperty("bootstrap.servers", "node01:9092")

    val myProducer = new FlinkKafkaProducer011[String](sinkTopic, new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop)
    studentStream.addSink(myProducer)
    studentStream.print()
    env.execute("Flink add sink")
}

本文档首发于公众号【五分钟学大数据】,更多大数据技术文档可下方扫码关注获取:

五、流处理中的Time与Window

Flink 是流式的、实时的 计算引擎。

上面一句话就有两个概念,一个是流式,一个是实时。

流式:就是数据源源不断的流进来,也就是数据没有边界,但是我们计算的时候必须在一个有边界的范围内进行,所以这里面就有一个问题,边界怎么确定?无非就两种方式,根据时间段或者数据量进行确定,根据时间段就是每隔多长时间就划分一个边界,根据数据量就是每来多少条数据划分一个边界,Flink 中就是这么划分边界的,本文会详细讲解。

实时:就是数据发送过来之后立马就进行相关的计算,然后将结果输出。这里的计算有两种:

  • 一种是只有边界内的数据进行计算,这种好理解,比如统计每个用户最近五分钟内浏览的新闻数量,就可以取最近五分钟内的所有数据,然后根据每个用户分组,统计新闻的总数。

  • 另一种是边界内数据与外部数据进行关联计算,比如:统计最近五分钟内浏览新闻的用户都是来自哪些地区,这种就需要将五分钟内浏览新闻的用户信息与 hive 中的地区维表进行关联,然后在进行相关计算。

本节所讲的 Flink 内容就是围绕以上概念进行详细剖析的!

1. Time

在Flink中,如果以时间段划分边界的话,那么时间就是一个极其重要的字段。

Flink中的时间有三种类型,如下图所示:

  • Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。

  • Ingestion Time:是数据进入Flink的时间。

  • Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

例如,一条日志进入Flink的时间为2021-01-22 10:00:00.123,到达Window的系统时间为2021-01-22 10:00:01.234,日志的内容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2

对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。

2. Window

Window,即窗口,我们前面一直提到的边界就是这里的Window(窗口)。

官方解释:流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段

所以Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作

Window类型

本文刚开始提到,划分窗口就两种方式:

  1. 根据时间进行截取(time-driven-window),比如每1分钟统计一次或每10分钟统计一次。

  2. 根据数据进行截取(data-driven-window),比如每5个数据统计一次或每50个数据统计一次。

窗口类型

对于TimeWindow(根据时间划分窗口), 可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)

  1. 滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片。

特点:时间对齐,窗口长度固定,没有重叠

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。

例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:

滚动窗口

适用场景:适合做BI统计等(做每个时间段的聚合计算)。

  1. 滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特点:时间对齐,窗口长度固定,有重叠

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:

滑动窗口

适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

  1. 会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐

session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

会话窗口

3. Window API

1) TimeWindow

TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算(就是本文开头说的对一个边界内的数据进行计算)。

我们以 红绿灯路口通过的汽车数量 为例子:

红绿灯路口会有汽车通过,一共会有多少汽车通过,无法计算。因为车流源源不断,计算没有边界。

所以我们统计每15秒钟通过红路灯的汽车数量,如第一个15秒为2辆,第二个15秒为3辆,第三个15秒为1辆 ...

  • tumbling-time-window (无重叠数据)

我们使用 Linux 中的 nc 命令模拟数据的发送方

1.开启发送端口,端口号为9999
nc -lk 9999

2.发送内容(key 代表不同的路口,value 代表每次通过的车辆)
一次发送一行,发送的时间间隔代表汽车经过的时间间隔
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4

Flink 进行采集数据并计算:

object Window {
  def main(args: Array[String]): Unit = {
    //TODO time-window
    //1.创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.定义数据流来源
    val text = env.socketTextStream("localhost", 9999)

    //3.转换数据格式,text->CarWc
    case class CarWc(sensorId: Int, carCnt: Int)
    val ds1: DataStream[CarWc] = text.map {
      line => {
        val tokens = line.split(",")
        CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
      }
    }

    //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5秒
    //也就是说,每5秒钟统计一次,在这过去的5秒钟内,各个路口通过红绿灯汽车的数量。
    val ds2: DataStream[CarWc] = ds1
      .keyBy("sensorId")
      .timeWindow(Time.seconds(5))
      .sum("carCnt")

    //5.显示统计结果
    ds2.print()

    //6.触发流计算
    env.execute(this.getClass.getName)

  }
}

我们发送的数据并没有指定时间字段,所以Flink使用的是默认的 Processing Time,也就是Flink系统处理数据时的时间。

  • sliding-time-window (有重叠数据)

//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2.定义数据流来源
val text = env.socketTextStream("localhost", 9999)

//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)

val ds1: DataStream[CarWc] = text.map {
  line => {
    val tokens = line.split(",")
    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
  }
}
//4.执行统计操作,每个sensorId一个sliding窗口,窗口时间10秒,滑动时间5秒
//也就是说,每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。
val ds2: DataStream[CarWc] = ds1
  .keyBy("sensorId")
  .timeWindow(Time.seconds(10), Time.seconds(5))
  .sum("carCnt")

//5.显示统计结果
ds2.print()

//6.触发流计算
env.execute(this.getClass.getName)

2) CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。

注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数

  • tumbling-count-window (无重叠数据)

//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2.定义数据流来源
val text = env.socketTextStream("localhost", 9999)

//

以上是关于就够了—JDBC保姆级教程的主要内容,如果未能解决你的问题,请参考以下文章

VSCode安装配置使用教程(最新版超详细保姆级含插件)一文就够了

2022版超详细Python+Pycharm安装保姆级教程,Python环境配置和使用指南,看完这一篇就够了

2022版超详细Python+Pycharm安装保姆级教程,Python环境配置和使用指南,看完这一篇就够了

Java弄清方法重写,看这一篇就够了|由浅入深,保姆级讲解

Git 工 具, 看这篇保姆式的教程就够了

Git 工 具, 看这篇保姆式的教程就够了