客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类相关的知识,希望对你有一定的参考价值。
定义解析kafka数据的Bean对象类
一、定义消费kafka字符串的Bean对象基类
根据数据来源不同可以分为OGG数据和Canal数据,两者之间有相同的属性:table,因此将该属性作为公共属性进行提取,抽象成基类
实现步骤:
- 在公共模块的java目录下的 parser 包下创建 MessageBean 抽象类
- 编写代码
- 继承自 Serializable 接口
- 创建 serialVersionUID 属性
- 定义 table 属性,实现 setter/getter 方法
参考代码:
package cn.it.logistics.common.beans.parser;
import java.io.Serializable;
/**
* 根据数据源定义抽象类,数据源:
* 1)ogg
* 2)canal
* 两者有共同的table属性
*/
public abstract class MessageBean implements Serializable
private static final long serialVersionUID = -8216415778785426469L;
private String table;
public String getTable()
return table;
public void setTable(String table)
this.table = table;
@Override
public String toString()
return table;
为什么创建serialVersionUID:
serialVersionUID适用于Java的序列化机制。简单来说,Java的序列化机制是通过判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体类的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常,即是InvalidCastException。
使用idea生成serialVersionUID:
操作步骤 | 说明 |
1 | 设置自动生成 serialVersionUID |
| |
2 | 选中对应的类名,然后按 alt+enter 快捷键 |
| |
3 | 结果显示 |
|
二、定义消费OGG字符串的Bean对象
实现步骤:
- 在公共模块的 parser 包下创建 OggMessageBean 类
- 继承自 MessageBean 抽象类
参考代码:
package cn.it.logistics.common.beans.parser;
import javax.print.DocFlavor;
import java.util.Map;
/**
* 定义消费出来的ogg的数据的javaBean对象
*
* "table": "IT.tbl_route", //表名:库名.表名
* "op_type": "U", //操作类型:U表示修改
* "op_ts": "2020-10-08 09:10:54.000774",
* "current_ts": "2020-10-08T09:11:01.925000",
* "pos": "00000000200006645758",
* "before": //操作前的字段集合
* "id": 104,
* "start_station": "东莞中心",
* "start_station_area_id": 441900,
* "start_warehouse_id": 1,
* "end_station": "蚌埠中转部",
* "end_station_area_id": 340300,
* "end_warehouse_id": 107,
* "mileage_m": 1369046,
* "time_consumer_minute": 56172,
* "state": 1,
* "cdt": "2020-02-02 18:51:39",
* "udt": "2020-02-02 18:51:39",
* "remark": null
* ,
* "after": //操作后的字段集合
* "id": 104,
* "start_station": "东莞中心",
* "start_station_area_id": 441900,
* "start_warehouse_id": 1,
* "end_station": "TBD",
* "end_station_area_id": 340300,
* "end_warehouse_id": 107,
* "mileage_m": 1369046,
* "time_consumer_minute": 56172,
* "state": 1,
* "cdt": "2020-02-02 18:51:39",
* "udt": "2020-02-02 18:51:39",
* "remark": null
*
*
*/
public class OggMessageBean extends MessageBean
//定义操作类型
private String op_type;
@Override
public void setTable(String table)
//如果表名不为空
if (table != null && !table.equals(""))
table = table.replaceAll("[A-Z]+\\\\.", "");
super.setTable(table);
public String getOp_type()
return op_type;
public void setOp_type(String op_type)
this.op_type = op_type;
public String getOp_ts()
return op_ts;
public void setOp_ts(String op_ts)
this.op_ts = op_ts;
public String getCurrent_ts()
return current_ts;
public void setCurrent_ts(String current_ts)
this.current_ts = current_ts;
public String getPos()
return pos;
public void setPos(String pos)
this.pos = pos;
public Map<String, Object> getBefore()
return before;
public void setBefore(Map<String, Object> before)
this.before = before;
public Map<String, Object> getAfter()
return after;
public void setAfter(Map<String, Object> after)
this.after = after;
//操作时间
private String op_ts;
@Override
public String toString()
return "OggMessageBean" +
"table='" + super.getTable() + '\\'' +
", op_type='" + op_type + '\\'' +
", op_ts='" + op_ts + '\\'' +
", current_ts='" + current_ts + '\\'' +
", pos='" + pos + '\\'' +
", before=" + before +
", after=" + after +
'';
/**
* 返回需要处理的列的集合
* @return
*/
public Map<String, Object> getValue()
//如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合
if (after == null)
return before;
else
return after;
//同步时间
private String current_ts;
//偏移量
private String pos;
//操作之前的数据
private Map<String, Object> before;
//操作之后的数据
private Map<String, Object> after;
三、定义消费Canal字符串的Bean对象
实现步骤:
- 在公共模块的 parser 包下创建 CanalMessageBean 类
- 继承自 MessageBean 抽象类
参考代码:
package cn.it.logistics.common.beans.parser;
import java.util.List;
import java.util.Map;
/**
* 定义消费出来的canal的数据对应的javaBean对象
*
* "data": [
* "id": "1",
* "name": "北京",
* "tel": "222",
* "mobile": "1111",
* "detail_addr": "北京",
* "area_id": "1",
* "gis_addr": "1",
* "cdt": "2020-10-08 17:20:12",
* "udt": "2020-11-05 17:20:16",
* "remark": null
* ],
* "database": "crm",
* "es": 1602148867000,
* "id": 15,
* "isDdl": false,
* "mysqlType":
* "id": "bigint(20)",
* "name": "varchar(50)",
* "tel": "varchar(20)",
* "mobile": "varchar(20)",
* "detail_addr": "varchar(100)",
* "area_id": "bigint(20)",
* "gis_addr": "varchar(20)",
* "cdt": "datetime",
* "udt": "datetime",
* "remark": "varchar(100)"
* ,
* "old": [
* "tel": "111"
* ],
* "sql": "",
* "sqlType":
* "id": -5,
* "name": 12,
* "tel": 12,
* "mobile": 12,
* "detail_addr": 12,
* "area_id": -5,
* "gis_addr": 12,
* "cdt": 93,
* "udt": 93,
* "remark": 12
* ,
* "table": "crm_address",
* "ts": 1602148867311,
* "type": "UPDATE" //修改数据
*
*/
public class CanalMessageBean extends MessageBean
//操作的数据集合
private List<Map<String, Object>> data;
public List<Map<String, Object>> getData()
return data;
public void setData(List<Map<String, Object>> data)
this.data = data;
public String getDatabase()
return database;
public void setDatabase(String database)
this.database = database;
public Long getEs()
return es;
public void setEs(Long es)
this.es = es;
public Long getId()
return id;
public void setId(Long id)
this.id = id;
public boolean isDdl()
return isDdl;
public void setDdl(boolean ddl)
isDdl = ddl;
public Map<String, Object> getMysqlType()
return mysqlType;
public void setMysqlType(Map<String, Object> mysqlType)
this.mysqlType = mysqlType;
public String getOld()
return old;
public void setOld(String old)
this.old = old;
public String getSql()
return sql;
public void setSql(String sql)
this.sql = sql;
public Map<String, Object> getSqlType()
return sqlType;
public void setSqlType(Map<String, Object> sqlType)
this.sqlType = sqlType;
public Long getTs()
return ts;
public void setTs(Long ts)
this.ts = ts;
public String getType()
return type;
public void setType(String type)
this.type = type;
//数据库名称
private String database;
private Long es;
private Long id;
private boolean isDdl;
private Map<String, Object> mysqlType;
private String old;
private String sql;
private Map<String, Object> sqlType;
private Long ts;
private String type;
/**
* 重写父类的settable方法,将表名修改成统一的前缀
* @param table
*/
@Override
public void setTable(String table)
if(table!=null && !table.equals(""))
if(table.startsWith("crm_"))
table = table.replace("crm_", "tbl_");
super.setTable(table);
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
以上是关于客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类的主要内容,如果未能解决你的问题,请参考以下文章
客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中