flink cdc读取mysql导入hudi

Posted 真六

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink cdc读取mysql导入hudi相关的知识,希望对你有一定的参考价值。

首先编辑mysql的my.cnf  添加以下后重启  service mysqld restart

#开启binlog的配置
log_bin=mysql_bin
binlog-format=Row
server-id=1
expire_logs_days = 7
max_binlog_size = 500M
#需要同步的数据库名称
binlog-do-db=cdctest
#忽略数据库
binlog-ignore-db=mysql

flink 代码如下 

 def main(args: Array[String]): Unit =  

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    env.setParallelism(1)

    //开启 Checkpoint,每隔 5 秒钟做一次 CK
    env.enableCheckpointing(5000L)
    //检查点超时时间
    env.getCheckpointConfig.setCheckpointTimeout(4000)
    //指定 CK 的一致性语义
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    val checkpointConfig: CheckpointConfig = env.getCheckpointConfig
    //最大同时存在的ck数 和设置的间隔时间有一个就行//最大同时存在的ck数 和设置的间隔时间有一个就行
    checkpointConfig.setMaxConcurrentCheckpoints(1)
    //超时时间
    checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5))

    //设置任务关闭的时候保留最后一次 CK 数据
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //指定从 CK 自动重启策略
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L))

    //设置状态后端
    //env.setStateBackend(new FsStateBackend("hdfs://hudi01.zxzh.com:8020/flinkCDC"))
    env.setStateBackend(new FsStateBackend("file:///D://tmp"))
    //设置访问 HDFS 的用户名
    System.setProperty("HADOOP_USER_NAME", "root")

    //使用Flink sql流读
    tableEnv.executeSql(
      """
        |CREATE TABLE user_info (
        | id int PRIMARY KEY NOT ENFORCED ,
        | name STRING,
        | sex STRING
        |) WITH (
        |      'connector' = 'mysql-cdc',
        |      'hostname' = '127.0.0.1',
        |      'port' = '3306',
        |      'username' = 'root',
        |      'password' = '123456',
        |      'server-time-zone' = 'Asia/Shanghai',
        |      'database-name' = 'zht',
        |      'table-name' = 'myuserinfo'
        |)
        |""".stripMargin)

    val date = new Date()
    val ts: Long = date.getTime
    val partitionpath_format: SimpleDateFormat = new SimpleDateFormat("yyyy")
    val years = partitionpath_format.format(date)

     val usertable: Table = tableEnv.sqlQuery("select * from user_info");
     val tstable: Table = usertable.addColumns(lit(ts),lit(years))
     tableEnv.toChangelogStream(tstable).print()


    //kerberos权限认证(没有的不需要配置)
    val path = "D:\\\\soft\\\\IdeaWorkSpace\\\\FlinkCDC\\\\src\\\\main\\\\resources\\\\"
    println("path:"+path)
    val conf = new Configuration
    System.setProperty("java.security.krb5.conf", path + "krb5.conf")
    conf.addResource(new Path(path + "hdfs-site.xml"))
    conf.addResource(new Path(path + "hive-site.xml"))
    conf.set("hadoop.security.authentication", "Kerberos")
    conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
    conf.set("dfs.client.use.datanode.hostname", "true")
    UserGroupInformation.setConfiguration(conf)
    UserGroupInformation.loginUserFromKeytab("xxxx/xxx@xxx.COM", path + "xx.keytab")
    println("login user: "+UserGroupInformation.getLoginUser())


  tableEnv.executeSql(
      """
        |CREATE TABLE hudi_user_infos(
        | id      int PRIMARY KEY NOT ENFORCED,
        | name    String,
        | sex     String,
        | ts       BIGINT,
        | years     String)
        |PARTITIONED BY (years)
        |WITH (
        |  'connector' = 'hudi',
        |  'path' = 'hdfs://hadoop102:8020/datas/hudi_user_info',
        |  'table.type' = 'COPY_ON_WRITE',
        |  'hooddatasource.write.recordkey.field' = 'id',
        |  'hoodie.datasource.write.hive_style_partitioning' = 'true',
        |  'hoodie.datasource.write.partitionpath.urlencode' = 'true',
        |  'write.operation' ='upsert',  //写入模式
        |  'write.precombine' = 'true',  //指定的字段来判断哪个更新,即 precombine 字段更大的 record 更新
        |  'write.precombine.field' ='ts', //指定的字段来判断哪个更新,即 precombine 字段更大的 record 更新
        |  'hive_sync.enable' = 'true', //是否写入hive
        |  'hive_sync.mode' = 'hms',  //hms或者dfs  两者区别是hms模式的 catalog 支持自动补全 hive 同步参数。
        |  'hive_sync.use_jdbc' = 'false', //
        |  'hive_sync.metastore.uris' = 'thrift://hadoop102:9083', //hive元数据的地址
        |  'hive_sync.db' = 'hudi_ods',
        |  'hive_sync.table' = 'hudi_user_infos',
        |  'hive_sync.assume_date_partitioning' = 'false',  //假设分区格式是yyyy/mm/dd 假定数据表是按日期分区的。这意味着,如果参数设置为True,则Hive Sync将在将数据写入到 Hive 表时自动创建日期分区。如果该参数设置为 False,则 Hive Sync 将不会自动创建分区,需要手动创建分区。
        |  'hive_sync.partition_fields' = 'years', //HiveSync使用的分区字段。如果设置了该参数,则 Hive Sync 将按照指定的字段对数据进行分区,而不是默认的日期分区。在设置该参数时,可以指定多个字段,以逗号分隔。
        |  'hive_sync.support_timestamp'= 'true', //是否支持时间戳类型 为true则 Hive Sync 将支持时间戳类型,并在将数据写入到 Hive 表时自动将时间戳字段转换为 Timestamp 类型
        |  'write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', //是 Apache Hudi 提供的一个具体的键生成器类,它可以生成复杂的 Avro 格式的键。该键生成器可以在数据写入 Hudi 数据存储时用于对数据记录进行标识,以便对其进行管理和查询。
        |  'scan.startup.mode' = 'latest-offset' ,
        |  'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor' //是 Apache Hudi 提供的一个具体的分区提取器类,它可以提取数据中的分区信息,并按照 Hive 风格生成分区目录。通过使用该分区提取器,可以在将数据写入 Hive 表时,更好地管理数据的存储和查询。
        |  )
        |""".stripMargin)

      tableEnv.executeSql("  insert into  hudi_user_infos select * from  "+tstable+" ")

   env.execute()
  




以上是关于flink cdc读取mysql导入hudi的主要内容,如果未能解决你的问题,请参考以下文章

flink cdc debezium 读取decimal为字符串问题

flink实时读取mongodb方案调研-实现mongodb cdc

flink实时读取mongodb方案调研-实现mongodb cdc

flink实时读取mongodb方案调研-实现mongodb cdc

Flink MySQL CDC 使用总结

Flink 使用之 MySQL CDC