客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类相关的知识,希望对你有一定的参考价值。

定义解析kafka数据的Bean对象类

一、定义消费kafka字符串的Bean对象基类

根据数据来源不同可以分为OGG数据Canal数据,两者之间有相同的属性:table,因此将该属性作为公共属性进行提取,抽象成基类

实现步骤:

  • 公共模块java目录下的 parser 包下创建 MessageBean 抽象类
  • 编写代码
    • 继承自 Serializable 接口
    • 创建 serialVersionUID 属性
    • 定义 table 属性,实现 setter/getter 方法

参考代码:

package cn.it.logistics.common.beans.parser;

import java.io.Serializable;

/**
 * 根据数据源定义抽象类,数据源:
 * 1)ogg
 * 2)canal
 * 两者有共同的table属性
 */
public abstract class MessageBean implements Serializable 
 
    private static final long serialVersionUID = -8216415778785426469L;

    private String table;

    public String getTable() 
        return table;
    

    public void setTable(String table) 
        this.table = table;
    

    @Override
    public String toString() 
        return table;
    

为什么创建serialVersionUID

serialVersionUID适用于Java的序列化机制。简单来说,Java的序列化机制是通过判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体类的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常,即是InvalidCastException。

使用idea生成serialVersionUID

操作步骤

说明

1

设置自动生成 serialVersionUID

 

 

2

选中对应的类名,然后按 alt+enter 快捷键

 

 

3

结果显示

 

 

二、​​​​​​​定义消费OGG字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 OggMessageBean
  • 继承自 MessageBean 抽象类

参考代码:

package cn.it.logistics.common.beans.parser;

import javax.print.DocFlavor;
import java.util.Map;

/**
 * 定义消费出来的ogg的数据的javaBean对象
 * 
 *     "table": "IT.tbl_route",            //表名:库名.表名
 *     "op_type": "U",                         //操作类型:U表示修改
 *     "op_ts": "2020-10-08 09:10:54.000774",
 *     "current_ts": "2020-10-08T09:11:01.925000",
 *     "pos": "00000000200006645758",
 *     "before":                             //操作前的字段集合
 *        "id": 104,
 *        "start_station": "东莞中心",
 *        "start_station_area_id": 441900,
 *        "start_warehouse_id": 1,
 *        "end_station": "蚌埠中转部",
 *        "end_station_area_id": 340300,
 *        "end_warehouse_id": 107,
 *        "mileage_m": 1369046,
 *        "time_consumer_minute": 56172,
 *        "state": 1,
 *        "cdt": "2020-02-02 18:51:39",
 *        "udt": "2020-02-02 18:51:39",
 *        "remark": null
 *        ,
 *     "after":                          //操作后的字段集合
 *        "id": 104,
 *        "start_station": "东莞中心",
 *        "start_station_area_id": 441900,
 *        "start_warehouse_id": 1,
 *        "end_station": "TBD",
 *        "end_station_area_id": 340300,
 *        "end_warehouse_id": 107,
 *        "mileage_m": 1369046,
 *        "time_consumer_minute": 56172,
 *        "state": 1,
 *        "cdt": "2020-02-02 18:51:39",
 *        "udt": "2020-02-02 18:51:39",
 *        "remark": null
 *    
 * 
 */
public class OggMessageBean extends MessageBean 
    //定义操作类型
    private String op_type;

    @Override
    public void setTable(String table) 
        //如果表名不为空
        if (table != null && !table.equals("")) 
            table = table.replaceAll("[A-Z]+\\\\.", "");
        
        super.setTable(table);
    

    public String getOp_type() 
        return op_type;
    

    public void setOp_type(String op_type) 
        this.op_type = op_type;
    

    public String getOp_ts() 
        return op_ts;
    

    public void setOp_ts(String op_ts) 
        this.op_ts = op_ts;
    

    public String getCurrent_ts() 
        return current_ts;
    

    public void setCurrent_ts(String current_ts) 
        this.current_ts = current_ts;
    

    public String getPos() 
        return pos;
    

    public void setPos(String pos) 
        this.pos = pos;
    

    public Map<String, Object> getBefore() 
        return before;
    

    public void setBefore(Map<String, Object> before) 
        this.before = before;
    

    public Map<String, Object> getAfter() 
        return after;
    

    public void setAfter(Map<String, Object> after) 
        this.after = after;
    

    //操作时间
    private String op_ts;

    @Override
    public String toString() 
        return "OggMessageBean" +
                "table='" + super.getTable() + '\\'' +
                ", op_type='" + op_type + '\\'' +
                ", op_ts='" + op_ts + '\\'' +
                ", current_ts='" + current_ts + '\\'' +
                ", pos='" + pos + '\\'' +
                ", before=" + before +
                ", after=" + after +
                '';
    

    /**
     * 返回需要处理的列的集合
     * @return
     */
    public Map<String, Object> getValue() 
        //如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合
        if (after == null) 
            return before;
         else 
            return after;
        
    

    //同步时间
    private String current_ts;
    //偏移量
    private String pos;
    //操作之前的数据
    private Map<String, Object> before;
    //操作之后的数据
    private Map<String, Object> after;

三、​​​​​​​定义消费Canal字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 CanalMessageBean
  • 继承自 MessageBean 抽象类

参考代码:

package cn.it.logistics.common.beans.parser;

import java.util.List;
import java.util.Map;

/**
 * 定义消费出来的canal的数据对应的javaBean对象
 * 
 *     "data": [
 *        "id": "1",
 *        "name": "北京",
 *        "tel": "222",
 *        "mobile": "1111",
 *        "detail_addr": "北京",
 *        "area_id": "1",
 *        "gis_addr": "1",
 *        "cdt": "2020-10-08 17:20:12",
 *        "udt": "2020-11-05 17:20:16",
 *        "remark": null
 *        ],
 *     "database": "crm",
 *     "es": 1602148867000,
 *     "id": 15,
 *     "isDdl": false,
 *     "mysqlType": 
 *        "id": "bigint(20)",
 *        "name": "varchar(50)",
 *        "tel": "varchar(20)",
 *        "mobile": "varchar(20)",
 *        "detail_addr": "varchar(100)",
 *        "area_id": "bigint(20)",
 *        "gis_addr": "varchar(20)",
 *        "cdt": "datetime",
 *        "udt": "datetime",
 *        "remark": "varchar(100)"
 *    ,
 *     "old": [
 *        "tel": "111"
 *    ],
 *     "sql": "",
 *     "sqlType": 
 *        "id": -5,
 *        "name": 12,
 *        "tel": 12,
 *        "mobile": 12,
 *        "detail_addr": 12,
 *        "area_id": -5,
 *        "gis_addr": 12,
 *        "cdt": 93,
 *        "udt": 93,
 *        "remark": 12
 *    ,
 *     "table": "crm_address",
 *     "ts": 1602148867311,
 *     "type": "UPDATE"               //修改数据
 * 
 */
public class CanalMessageBean extends MessageBean 
    //操作的数据集合
    private List<Map<String, Object>> data;

    public List<Map<String, Object>> getData() 
        return data;
    

    public void setData(List<Map<String, Object>> data) 
        this.data = data;
    

    public String getDatabase() 
        return database;
    

    public void setDatabase(String database) 
        this.database = database;
    

    public Long getEs() 
        return es;
    

    public void setEs(Long es) 
        this.es = es;
    

    public Long getId() 
        return id;
    

    public void setId(Long id) 
        this.id = id;
    

    public boolean isDdl() 
        return isDdl;
    

    public void setDdl(boolean ddl) 
        isDdl = ddl;
    

    public Map<String, Object> getMysqlType() 
        return mysqlType;
    

    public void setMysqlType(Map<String, Object> mysqlType) 
        this.mysqlType = mysqlType;
    

    public String getOld() 
        return old;
    

    public void setOld(String old) 
        this.old = old;
    

    public String getSql() 
        return sql;
    

    public void setSql(String sql) 
        this.sql = sql;
    

    public Map<String, Object> getSqlType() 
        return sqlType;
    

    public void setSqlType(Map<String, Object> sqlType) 
        this.sqlType = sqlType;
    


    public Long getTs() 
        return ts;
    

    public void setTs(Long ts) 
        this.ts = ts;
    

    public String getType() 
        return type;
    

    public void setType(String type) 
        this.type = type;
    

    //数据库名称
    private String database;
    private Long es;
    private Long id;
    private boolean isDdl;
    private Map<String, Object> mysqlType;
    private String old;
    private String sql;
    private Map<String, Object> sqlType;
    private Long ts;
    private String type;

    /**
     * 重写父类的settable方法,将表名修改成统一的前缀
     * @param table
     */
    @Override
    public void setTable(String table) 
        if(table!=null && !table.equals(""))
            if(table.startsWith("crm_")) 
                table = table.replace("crm_", "tbl_");
            
        
        super.setTable(table);
    

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

 

 

 

 

 

以上是关于客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类的主要内容,如果未能解决你的问题,请参考以下文章

客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中

客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)

客快物流大数据项目(五十四):初始化Spark流式计算程序

客快物流大数据项目(十九):项目环境准备

客快物流大数据项目(五十):项目框架初始化

客快物流大数据项目(五十一):数据库表分析