Flink1.14.4 与 Flinkcdc 2.2.1 遇keng
Posted qq_31528769
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.14.4 与 Flinkcdc 2.2.1 遇keng相关的知识,希望对你有一定的参考价值。
<properties>
<flink.version>1.14.4</flink.version>
<flinkCDC.version>2.2.1</flinkCDC.version>
</properties>
上面是版本,使用这两个结合,程序运行成功但是不报错,数据也不打印,有人遇到这个问题吗??可以留言一下,解决方法。
换成Flink 1.13.5 , 数据就可以正常监控打印
下面是代码:
Main.class
public class Main
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties prop = new Properties();
prop.setProperty("autoReconnect","true");
mysqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("flink") // set captured database
.tableList("flink.t_ranking_goods") // set captured table
.username("root")
.password("123456")
.startupOptions(StartupOptions.initial())
.deserializer(new CustomerDeserialization()) // converts SourceRecord to JString
//jdbc连接参数配置
.jdbcProperties(prop)
.build();
// enable checkpoint
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL ");
CustomerDeserialization.class
public class CustomerDeserialization implements DebeziumDeserializationSchema<String>
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception
HashMap<String, Object> hashMap = new HashMap<>();
String topic = sourceRecord.topic();
String[] split = topic.split("[.]");
String database = split[1];
String table = split[2];
hashMap.put("database",database);
hashMap.put("table",table);
//获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//获取数据本身
Struct struct = (Struct)sourceRecord.value();
Struct after = struct.getStruct("after");
Struct before = struct.getStruct("before");
/*
1,同时存在 beforeStruct 跟 afterStruct数据的话,就代表是update的数据
2,只存在 beforeStruct 就是delete数据
3,只存在 afterStruct数据 就是insert数据
*/
if (after != null)
//insert
Schema schema = after.schema();
HashMap<String, Object> hm = new HashMap<>();
for (Field field : schema.fields())
hm.put(field.name(), after.get(field.name()) == null?"":after.get(field.name()) );
hashMap.put("data",hm);
else if (before !=null)
//delete
Schema schema = before.schema();
HashMap<String, Object> hm = new HashMap<>();
for (Field field : schema.fields())
hm.put(field.name(), before.get(field.name()) == null?"":before.get(field.name()));
hashMap.put("data",hm);
else if(before !=null && after !=null)
//update
Schema schema = after.schema();
HashMap<String, Object> hm = new HashMap<>();
for (Field field : schema.fields())
hm.put(field.name(), after.get(field.name()) == null?"":after.get(field.name()));
hashMap.put("data",hm);
String type = operation.toString().toLowerCase();
if ("create".equals(type)||"read".equals(type))
type = "insert";
else if("delete".equals(type))
type = "delete";
else if("update".equals(type))
type = "update";
hashMap.put("type",type);
Gson gson = new Gson();
// System.out.println(gson.toJson(hashMap));
collector.collect(gson.toJson(hashMap));
@Override
public TypeInformation<String> getProducedType()
return TypeInformation.of(String.class);
以上是关于Flink1.14.4 与 Flinkcdc 2.2.1 遇keng的主要内容,如果未能解决你的问题,请参考以下文章
flink cdc MySQL2Doris 案例分享 解决分库多表同步