Flink1.14.4 与 Flinkcdc 2.2.1 遇keng

Posted qq_31528769

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.14.4 与 Flinkcdc 2.2.1 遇keng相关的知识,希望对你有一定的参考价值。

<properties>
        <flink.version>1.14.4</flink.version>
        <flinkCDC.version>2.2.1</flinkCDC.version>
</properties>

上面是版本,使用这两个结合,程序运行成功但是不报错,数据也不打印,有人遇到这个问题吗??可以留言一下,解决方法。

换成Flink  1.13.5  , 数据就可以正常监控打印

下面是代码:

Main.class

public class Main 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties prop = new Properties();
        prop.setProperty("autoReconnect","true");


        mysqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("flink") // set captured database
                .tableList("flink.t_ranking_goods") // set captured table
                .username("root")
                .password("123456")
                .startupOptions(StartupOptions.initial())
                .deserializer(new CustomerDeserialization()) // converts SourceRecord to JString
                //jdbc连接参数配置
                .jdbcProperties(prop)
                .build();


        // enable checkpoint
        env.enableCheckpointing(3000);

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL ");
    
CustomerDeserialization.class
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> 

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception 

        HashMap<String, Object> hashMap = new HashMap<>();

        String topic = sourceRecord.topic();
        String[] split = topic.split("[.]");
        String database = split[1];
        String table = split[2];
        hashMap.put("database",database);
        hashMap.put("table",table);

        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        //获取数据本身
        Struct struct = (Struct)sourceRecord.value();
        Struct after = struct.getStruct("after");
        Struct before = struct.getStruct("before");
        /*
         	 1,同时存在 beforeStruct 跟 afterStruct数据的话,就代表是update的数据
             2,只存在 beforeStruct 就是delete数据
             3,只存在 afterStruct数据 就是insert数据
        */
        if (after != null) 
            //insert
            Schema schema = after.schema();
            HashMap<String, Object> hm = new HashMap<>();
            for (Field field : schema.fields()) 
                hm.put(field.name(), after.get(field.name()) == null?"":after.get(field.name()) );
            

            hashMap.put("data",hm);
        else if (before !=null)
            //delete
            Schema schema = before.schema();
            HashMap<String, Object> hm = new HashMap<>();
            for (Field field : schema.fields()) 
                hm.put(field.name(), before.get(field.name()) == null?"":before.get(field.name()));
            
            hashMap.put("data",hm);
        else if(before !=null && after !=null)
            //update
            Schema schema = after.schema();
            HashMap<String, Object> hm = new HashMap<>();
            for (Field field : schema.fields()) 
                hm.put(field.name(), after.get(field.name()) == null?"":after.get(field.name()));
            
            hashMap.put("data",hm);
        

        String type = operation.toString().toLowerCase();
        if ("create".equals(type)||"read".equals(type)) 
            type = "insert";
        else if("delete".equals(type)) 
            type = "delete";
        else if("update".equals(type)) 
            type = "update";
        

        hashMap.put("type",type);

        Gson gson = new Gson();
//        System.out.println(gson.toJson(hashMap));
        collector.collect(gson.toJson(hashMap));

    

    @Override
    public TypeInformation<String> getProducedType() 
        return TypeInformation.of(String.class);
    

以上是关于Flink1.14.4 与 Flinkcdc 2.2.1 遇keng的主要内容,如果未能解决你的问题,请参考以下文章

flink cdc MySQL2Doris 案例分享 解决分库多表同步

flink cdc MySQL2Doris 案例分享 解决分库多表同步

FLinkSQL+FlinkCDC

FlinkCDC部署

FlinkCDC部署

FlinkCDC部署