FlinkCDC自定义反序列化器
Posted 真离谱
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkCDC自定义反序列化器相关的知识,希望对你有一定的参考价值。
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.bw.func.customerDeserialization;
import com.bw.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinckCDC
public static void main(String[] args) throws Exception
//TODO 1.基本环境准备
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2.通过FlinkCDC获取mysql数据
DebeziumSourceFunction<String> mySqlSource = MySQLSource.<String>builder()
//主机名
.hostname("hadoop-single")
//端口号
.port(3306)
//数据库
.databaseList("gmall2023") // set captured database
//用户名
.username("root")
//密码
.password("root")
//自定义反序列化器 读出你想要的数据
.deserializer(new customerDeserialization()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.addSource(mySqlSource);
//TODO 3.打印数据并将数据写入Kafka
streamSource.print();
String sinkTopic = "ods_base_db";
streamSource.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));
env.execute();
StartupOptions:
initial:
第一次启动时 读取原表已有的历史数据, 操作类型为READ, 之后不断做检查点存储
第二次启动时 一定要指明检查点文件的具体位置, 这样就可以断点续传; 即使Flink宕机了, 重启后是从上次offset开始读, 而不是latest
检查点在打包部署后才有用, 因为那样才可以指明检查点的具体位置earliest:
从BinLog第一行数据开始读, 最好先给这个数据库加上BinLog后, 再去读取创建数据库latest : 读取最新变更数据, 从Flink程序启动后开始算
timestamp : 可以从BinLog某一时刻的数据开始读
specificOffset : 指明BinLog文件位置和从哪个offset开始读
注意:
Flink-CDC中插入数据 的操作类型叫create一定要在mysql的配置文件里把要获取的数据库名添加进去
(我虚拟机中的路径(vim /etc/my.cnf))
# For advice on how to change settings please see # http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html [mysqld] # log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 //要获取的数据库名 binlog-do-db=gmall2023 binlog-do-db=gmall2023_realtime # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 #binlog-do-db=test # Remove leading # and set to the amount of RAM for the most important data # cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. # innodb_buffer_pool_size = 128M # # Remove leading # to turn on a very important data integrity option: logging # changes to the binary log between backups. # log_bin # # Remove leading # to set options mainly useful for reporting servers. # The server defaults are faster for transactions and fast SELECTs. # Adjust sizes as needed, experiment to find the optimal values. # join_buffer_size = 128M # sort_buffer_size = 2M # read_rnd_buffer_size = 2M datadir=/var/lib/mysql socket=/var/lib/mysql/mysql.sock # Disabling symbolic-links is recommended to prevent assorted security risks symbolic-links=0 log-error=/var/log/mysqld.log pid-file=/var/run/mysqld/mysqld.pid skip-grant-tables
测试时:
用latest( ), 在Flink运行时再去操控数据库影响BinLog
或者用initial也行, 不过Flink重启时没法指明检查点位置, 每次都得读原表,这部分操作类型都是READ
打包部署时: 因为Flink重启时可以从指明的检查点位置进行恢复, 故用initial( )
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
/**
* 封装的数据格式
*
* "database":"",
* "tableName":"",
* "before":"id":"","tm_name":""....,
* "after":"id":"","tm_name":""....,
* "type":"c u d",
* //"ts":156456135615
*
*/
public class customerDeserialization implements DebeziumDeserializationSchema<String>
//封装之前的原始数据
//==>> SourceRecordsourcePartition=server=mysql_binlog_source, sourceOffset=ts_sec=1679964913, file=mysql-bin.000276, pos=1410, row=1, server_id=1, event=2
//ConnectRecordtopic='mysql_binlog_source.gmall2023.base_trademark',
// kafkaPartition=null, key=Structid=15,
// keySchema=Schemamysql_binlog_source.gmall2023.base_trademark.Key:STRUCT,
// value=Structbefore=Structid=15,tm_name=李丽,
// after=Structid=15,tm_name=李丽,logo_url=13,
// source=Structversion=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1679964913000,
// db=gmall2023,table=base_trademark,server_id=1,file=mysql-bin.000276,pos=1555,row=0,thread=9,op=u,ts_ms=1679964913045,
// valueSchema=Schemamysql_binlog_source.gmall2023.base_trademark.Envelope:STRUCT, timestamp=null, headers=ConnectHeaders(headers=)
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception
//1.创建json对象用于存储最终数据
JSONObject result = new JSONObject();
//2.获取库名表名
String topic = sourceRecord.topic();
String[] split = topic.split("\\\\.");
String database = split[1];
String tableName = split[2];
Struct value =(Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
//创建json对象用于存放before的value值
JSONObject beforeJson = new JSONObject(); //JSON全大写 不是JsonOBject
if (before!=null)
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field beforeField : beforeFields)
Object o = before.get(beforeField);
beforeJson.put(beforeField.name(),o);
//4.获取"after"数据
Struct after =value.getStruct("after");
//创建json对象用于存放after的value值
JSONObject afterJson = new JSONObject();
if(after!=null)
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field afterField : afterFields)
Object afterValue = after.get(afterField);
afterJson.put(afterField.name(),afterValue);
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if (type.equals("create"))
type="insert";
//"database":"gmall2023",
// "before":"tm_name":"23","logo_url":"32325","id":12,
// "after":"tm_name":"23","logo_url":"32","id":12,
// "type":"update",
// "tableName":"base_trademark"
//6.将字段写入json对象
result.put("database",database);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//7.输出数据
collector.collect(result.toJSONString());
@Override
public TypeInformation<String> getProducedType()
return BasicTypeInfo.STRING_TYPE_INFO;
使用 Jackson 自定义反序列化:扩展默认反序列化器
【中文标题】使用 Jackson 自定义反序列化:扩展默认反序列化器【英文标题】:Custom deserialization with Jackson: extend default deserializer 【发布时间】:2018-07-15 03:00:54 【问题描述】:我想通过扩展默认值并在其后设置更多值来制作自己的反序列化器:
simplified code:
public class Dto
public String originalJsonString;
public MyFooDto extends Dto
public String myField;
@Bean
public ObjectMapper deserializingObjectMapper()
ObjectMapper objectMapper = new ObjectMapper();
JavaTimeModule javaTimeModule = new JavaTimeModule();
javaTimeModule.addDeserializer(MyFooDto.class, new JsonDtoDeserializer<>());
objectMapper.registerModule(javaTimeModule);
return objectMapper;
// or maybe instead of the Beam just @JsonDeserialize(using = JsonDtoDeserializer.class) before MyFooDto?
public class JsonDtoDeserializer<T extends Dto> extends StdDeserializer<T>
// or maybe extends JsonDeserializer? or UntypedObjectDeserializer? or UntypedObjectDeserializer.Vanilla?
public T deserialize(JsonParser p, DeserializationContext ctxt) throws IOException
// here I would like:
T item = super.deserialize"AsUsual"(p, ctxt);
// the difficulty is to avoid the loop of death, where the deserializer would call itself for the eternity...
// And then set other Dto-fields depending on the original Json input, for example:
item.originalJsonString = p.readValueAsTree().toString();
return item;
如你所见,我也想为其他 DTO 重用这个 Dto 母类。
我没有找到任何例子。我真的是世界第一吗? 反序列化“AsUsual”(p, ctxt) 应该是什么? 我应该使用什么motherclass? JsonDeserializer / StdDeserializer / UntypedObjectDeserializer? 反序列化器会知道它必须实例化哪个类的 T 吗?感谢社区!
【问题讨论】:
mother class
的正确术语是 super class
关于反序列化“AsUsual”见***.com/questions/18313323/…
How do I call the default deserializer from a custom deserializer in Jackson的可能重复
【参考方案1】:
正如莎伦所说(基于How do I call the default deserializer from a custom deserializer in Jackson)
@Bean
public ObjectMapper serializingObjectMapper()
ObjectMapper objectMapper = new ObjectMapper();
SimpleModule simpleModule = new SimpleModule();
simpleModule.setDeserializerModifier(new BeanDeserializerModifier()
@Override
public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config, BeanDescription beanDesc, JsonDeserializer<?> deserializer)
if (Dto.class.isAssignableFrom(beanDesc.getBeanClass()))
return new JsonDtoDeserializer<>(deserializer, beanDesc.getBeanClass());
return deserializer;
);
objectMapper.registerModule(simpleModule);
return objectMapper;
public class JsonDtoDeserializer<T extends Dto> extends StdDeserializer<T> implements ResolvableDeserializer /*StdDeserializer<Dto<T>>*/ /*UntypedObjectDeserializer.Vanilla*/ /*<T>*/ /*implements ResolvableDeserializer*/
private final JsonDeserializer<?> defaultDeserializer;
public JsonDtoDeserializer(JsonDeserializer<?> defaultDeserializer, Class<?> clazz)
super(clazz);
this.defaultDeserializer = defaultDeserializer;
@Override
public T deserialize(JsonParser p, DeserializationContext ctxt)
throws IOException
@SuppressWarnings("unchecked")
T itemObj = (T) defaultDeserializer.deserialize(p, ctxt);
return itemObj;
// for some reason you have to implement ResolvableDeserializer when modifying BeanDeserializer
// otherwise deserializing throws JsonMappingException??
@Override public void resolve(DeserializationContext ctxt) throws JsonMappingException
((ResolvableDeserializer) defaultDeserializer).resolve(ctxt);
【讨论】:
以上是关于FlinkCDC自定义反序列化器的主要内容,如果未能解决你的问题,请参考以下文章
杰克逊自定义反序列化器在 Spring Boot 中不起作用