flink cdc读取mysql导入hudi
Posted 真六
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink cdc读取mysql导入hudi相关的知识,希望对你有一定的参考价值。
首先编辑mysql的my.cnf 添加以下后重启 service mysqld restart
#开启binlog的配置
log_bin=mysql_bin
binlog-format=Row
server-id=1
expire_logs_days = 7
max_binlog_size = 500M
#需要同步的数据库名称
binlog-do-db=cdctest
#忽略数据库
binlog-ignore-db=mysql
flink 代码如下
def main(args: Array[String]): Unit =
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
env.setParallelism(1)
//开启 Checkpoint,每隔 5 秒钟做一次 CK
env.enableCheckpointing(5000L)
//检查点超时时间
env.getCheckpointConfig.setCheckpointTimeout(4000)
//指定 CK 的一致性语义
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val checkpointConfig: CheckpointConfig = env.getCheckpointConfig
//最大同时存在的ck数 和设置的间隔时间有一个就行//最大同时存在的ck数 和设置的间隔时间有一个就行
checkpointConfig.setMaxConcurrentCheckpoints(1)
//超时时间
checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5))
//设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//指定从 CK 自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L))
//设置状态后端
//env.setStateBackend(new FsStateBackend("hdfs://hudi01.zxzh.com:8020/flinkCDC"))
env.setStateBackend(new FsStateBackend("file:///D://tmp"))
//设置访问 HDFS 的用户名
System.setProperty("HADOOP_USER_NAME", "root")
//使用Flink sql流读
tableEnv.executeSql(
"""
|CREATE TABLE user_info (
| id int PRIMARY KEY NOT ENFORCED ,
| name STRING,
| sex STRING
|) WITH (
| 'connector' = 'mysql-cdc',
| 'hostname' = '127.0.0.1',
| 'port' = '3306',
| 'username' = 'root',
| 'password' = '123456',
| 'server-time-zone' = 'Asia/Shanghai',
| 'database-name' = 'zht',
| 'table-name' = 'myuserinfo'
|)
|""".stripMargin)
val date = new Date()
val ts: Long = date.getTime
val partitionpath_format: SimpleDateFormat = new SimpleDateFormat("yyyy")
val years = partitionpath_format.format(date)
val usertable: Table = tableEnv.sqlQuery("select * from user_info");
val tstable: Table = usertable.addColumns(lit(ts),lit(years))
tableEnv.toChangelogStream(tstable).print()
//kerberos权限认证(没有的不需要配置)
val path = "D:\\\\soft\\\\IdeaWorkSpace\\\\FlinkCDC\\\\src\\\\main\\\\resources\\\\"
println("path:"+path)
val conf = new Configuration
System.setProperty("java.security.krb5.conf", path + "krb5.conf")
conf.addResource(new Path(path + "hdfs-site.xml"))
conf.addResource(new Path(path + "hive-site.xml"))
conf.set("hadoop.security.authentication", "Kerberos")
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
conf.set("dfs.client.use.datanode.hostname", "true")
UserGroupInformation.setConfiguration(conf)
UserGroupInformation.loginUserFromKeytab("xxxx/xxx@xxx.COM", path + "xx.keytab")
println("login user: "+UserGroupInformation.getLoginUser())
tableEnv.executeSql(
"""
|CREATE TABLE hudi_user_infos(
| id int PRIMARY KEY NOT ENFORCED,
| name String,
| sex String,
| ts BIGINT,
| years String)
|PARTITIONED BY (years)
|WITH (
| 'connector' = 'hudi',
| 'path' = 'hdfs://hadoop102:8020/datas/hudi_user_info',
| 'table.type' = 'COPY_ON_WRITE',
| 'hooddatasource.write.recordkey.field' = 'id',
| 'hoodie.datasource.write.hive_style_partitioning' = 'true',
| 'hoodie.datasource.write.partitionpath.urlencode' = 'true',
| 'write.operation' ='upsert', //写入模式
| 'write.precombine' = 'true', //指定的字段来判断哪个更新,即 precombine 字段更大的 record 更新
| 'write.precombine.field' ='ts', //指定的字段来判断哪个更新,即 precombine 字段更大的 record 更新
| 'hive_sync.enable' = 'true', //是否写入hive
| 'hive_sync.mode' = 'hms', //hms或者dfs 两者区别是hms模式的 catalog 支持自动补全 hive 同步参数。
| 'hive_sync.use_jdbc' = 'false', //
| 'hive_sync.metastore.uris' = 'thrift://hadoop102:9083', //hive元数据的地址
| 'hive_sync.db' = 'hudi_ods',
| 'hive_sync.table' = 'hudi_user_infos',
| 'hive_sync.assume_date_partitioning' = 'false', //假设分区格式是yyyy/mm/dd 假定数据表是按日期分区的。这意味着,如果参数设置为True,则Hive Sync将在将数据写入到 Hive 表时自动创建日期分区。如果该参数设置为 False,则 Hive Sync 将不会自动创建分区,需要手动创建分区。
| 'hive_sync.partition_fields' = 'years', //HiveSync使用的分区字段。如果设置了该参数,则 Hive Sync 将按照指定的字段对数据进行分区,而不是默认的日期分区。在设置该参数时,可以指定多个字段,以逗号分隔。
| 'hive_sync.support_timestamp'= 'true', //是否支持时间戳类型 为true则 Hive Sync 将支持时间戳类型,并在将数据写入到 Hive 表时自动将时间戳字段转换为 Timestamp 类型
| 'write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', //是 Apache Hudi 提供的一个具体的键生成器类,它可以生成复杂的 Avro 格式的键。该键生成器可以在数据写入 Hudi 数据存储时用于对数据记录进行标识,以便对其进行管理和查询。
| 'scan.startup.mode' = 'latest-offset' ,
| 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor' //是 Apache Hudi 提供的一个具体的分区提取器类,它可以提取数据中的分区信息,并按照 Hive 风格生成分区目录。通过使用该分区提取器,可以在将数据写入 Hive 表时,更好地管理数据的存储和查询。
| )
|""".stripMargin)
tableEnv.executeSql(" insert into hudi_user_infos select * from "+tstable+" ")
env.execute()
以上是关于flink cdc读取mysql导入hudi的主要内容,如果未能解决你的问题,请参考以下文章
flink cdc debezium 读取decimal为字符串问题
flink实时读取mongodb方案调研-实现mongodb cdc
flink实时读取mongodb方案调研-实现mongodb cdc