Flink的sink实战之四:自定义

Posted 程序员欣宸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的sink实战之四:自定义相关的知识,希望对你有一定的参考价值。

欢迎访问我的GitHub

  • Flink官方提供的sink服务可能满足不了我们的需要,此时可以开发自定义的sink,文本就来一起实战;

    继承关系

    1. 在正式编码前,要先弄清楚对sink能力是如何实现的,前面我们实战过的print、kafka、cassandra等sink操作,核心类的继承关系如下图所示:
    2. 可见实现sink能力的关键,是实现==RichFunction和SinkFunction==接口,前者用于资源控制(如open、close等操作),后者负责sink的具体操作,来看看最简单的PrintSinkFunction类是如何实现SinkFunction接口的invoke方法:
      @Override
      public void invoke(IN record) 
      writer.write(record);
      
    3. 现在对sink的基本逻辑已经清楚了,可以开始编码实战了;

      内容和版本

  • 本次实战很简单:自定义sink,用于将数据写入mysql,涉及的版本信息如下:
    1. jdk:1.8.0_191
    2. flink:1.9.2
    3. maven:3.6.0
    4. flink所在操作系统:CentOS Linux release 7.7.1908
    5. MySQL:5.7.29
    6. IDEA:2018.3.5 (Ultimate Edition)

      源码下载

  • 如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,本章的应用在==flinksinkdemo==文件夹下,如下图红框所示:

    数据库准备

  • 请您将MySQL准备好,并执行以下sql,用于创建数据库flinkdemo和表student:
    create database if not exists flinkdemo;
    USE flinkdemo;
    DROP TABLE IF EXISTS `student`;
    CREATE TABLE `student` (
    `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
    `age` int(10) DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

    编码

    1. 使用《Flink的sink实战之二:kafka》中创建的flinksinkdemo工程;
    2. 在pom.xml中增加mysql的依赖:
      <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.11</version>
      </dependency>
  1. 创建和数据库的student表对应的实体类Student.java:
    
    package com.bolingcavalry.customize;

public class Student
private int id;
private String name;
private int age;

public int getId() 
    return id;


public void setId(int id) 
    this.id = id;


public String getName() 
    return name;


public void setName(String name) 
    this.name = name;


public int getAge() 
    return age;


public void setAge(int age) 
    this.age = age;


public Student(String name, int age) 
    this.name = name;
    this.age = age;


4. 创建自定义sink类MySQLSinkFunction.java,这是本文的核心,有关数据库的连接、断开、写入数据都集中在此:
```java
package com.bolingcavalry.customize;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class MySQLSinkFunction extends RichSinkFunction<Student> 

    PreparedStatement preparedStatement;

    private Connection connection;

    private ReentrantLock reentrantLock = new ReentrantLock();

    @Override
    public void open(Configuration parameters) throws Exception 
        super.open(parameters);

        //准备数据库相关实例
        buildPreparedStatement();
    

    @Override
    public void close() throws Exception 
        super.close();

        try
            if(null!=preparedStatement) 
                preparedStatement.close();
                preparedStatement = null;
            
         catch(Exception e) 
            e.printStackTrace();
        

        try
            if(null!=connection) 
                connection.close();
                connection = null;
            
         catch(Exception e) 
            e.printStackTrace();
        
    

    @Override
    public void invoke(Student value, Context context) throws Exception 
        preparedStatement.setString(1, value.getName());
        preparedStatement.setInt(2, value.getAge());
        preparedStatement.executeUpdate();
    

    /**
     * 准备好connection和preparedStatement
     * 获取mysql连接实例,考虑多线程同步,
     * 不用synchronize是因为获取数据库连接是远程操作,耗时不确定
     * @return
     */
    private void buildPreparedStatement() 
        if(null==connection) 
            boolean hasLock = false;
            try 
                hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS);

                if(hasLock) 
                    Class.forName("com.mysql.cj.jdbc.Driver");
                    connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");
                

                if(null!=connection) 
                    preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)");
                
             catch (Exception e) 
                //生产环境慎用
                e.printStackTrace();
             finally 
                if(hasLock) 
                    reentrantLock.unlock();
                
            
        
    
  1. 上述代码很简单,只需要注意在创建连接的时候用到了锁来控制多线程同步,以及高版本mysql驱动对应的driver和uri的写法与以前5.x版本的区别;
  2. 创建任务类StudentSink.java,用来创建一个flink任务,里面通过ArrayList创建了一个数据集,然后直接addSink,为了看清DAG,调用disableChaining方法取消了operator chain:
    
    package com.bolingcavalry.customize;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;

public class StudentSink
public static void main(String[] args) throws Exception
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //并行度为1
    env.setParallelism(1);

    List<Student> list = new ArrayList<>();
    list.add(new Student("aaa", 11));
    list.add(new Student("bbb", 12));
    list.add(new Student("ccc", 13));
    list.add(new Student("ddd", 14));
    list.add(new Student("eee", 15));
    list.add(new Student("fff", 16));

    env.fromCollection(list)
        .addSink(new MySQLSinkFunction())
        .disableChaining();

    env.execute("sink demo : customize mysql obj");



7. 在flink web页面提交任务,并设置任务类:
![在这里插入图片描述](https://s4.51cto.com/images/blog/202203/25192956_623da7b4b49e834506.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=)
8. 任务完成后,DAG图显示任务和记录数都符合预期:
![在这里插入图片描述](https://s4.51cto.com/images/blog/202203/25192956_623da7b48ff7459432.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=)
9. 去检查数据库,发现数据已写入:
![在这里插入图片描述](https://s4.51cto.com/images/blog/202203/25192956_623da7b45c4b374515.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=) 

- 至此,自定义sink的实战已经完成,希望本文能给您一些参考;

### 欢迎关注51CTO博客:程序员欣宸
> [学习路上,你不孤单,欣宸原创一路相伴...](https://blog.51cto.com/u_13674465)

以上是关于Flink的sink实战之四:自定义的主要内容,如果未能解决你的问题,请参考以下文章

Flink的sink实战之一:初探

Flink流式计算从入门到实战 三

Flink处理函数实战之四:窗口处理

flink sql 知其所以然| sourcesink 原理

1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

Flink自定义Sink将数据存到MySQL