Flink自定义Sink将数据存到MySQL

Posted 我不是秃头sheep

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink自定义Sink将数据存到MySQL相关的知识,希望对你有一定的参考价值。

如有更佳的保存mysql方法 欢迎私信或留言分享 相互学习~

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction

// 自定义Sink
dataStream.addSink( new JDBCSink() )

// 继承RichSinkFunction
class JDBCSink extends RichSinkFunction[输入的数据类型]

  // 定义sql连接、插入预编译器、更新预编译器
  var conn: Connection = _
  var insertStatement: PreparedStatement = _
  var updateStatement: PreparedStatement = _

  // 重写open函数 在此函数初始化,创建连接和预编译语句
  override def open(parameters: Configuration): Unit = 
	// 初始化连接
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/数据库", "账号", "密码")
    // 初始化插入预编译器
    insertStatement = conn.prepareStatement("INSERT INTO 表名 VALUES (?, ?, ?(占位符));")
    // 初始化更新与编译器
    updateStatement = conn.prepareStatement("UPDATE 表名 SET 字段 = ? WHERE 字段 = ?;")
  
  
  // 重写close函数 关闭与编译器和sql连接
  override def close(): Unit = 
    insertStatement.close()
    updateStatement.close()
    conn.close()
  
  // 重写invoke函数 
  override def invoke(value: 输入的数据类型, context: SinkFunction.Context[_]): Unit = 
    // 先执行更新操作 给跟更新预编译器的占位符赋值
    updateStatement.setInt(1, value.count.toInt)
    updateStatement.setString(2, value.url)
    updateStatement.setDouble(3, value.windowEnd)
    // 执行更新
    updateStatement.execute()
    // 判断如果更新的行数为0 则执行插入
    if(updateStatement.getUpdateCount == 0)
      // 给插入预编译器的占位符赋值
      insertStatement.setDouble(1, value.windowEnd)
      insertStatement.setString(2, value.url)
      insertStatement.setInt(3, value.count.toInt)
      // 执行插入
      insertStatement.execute()
    
  

以上是关于Flink自定义Sink将数据存到MySQL的主要内容,如果未能解决你的问题,请参考以下文章

flink02------1.自定义source

9.FLINK SinkAPI自定义sink

9.FLINK SinkAPI自定义sink

Flink的sink实战之四:自定义

FLINK 基于1.15.2的Java开发-自定义Redis Sink用于连接 Redis Sentinel模式

Flink-1.11开始提供了JDBC Sink