ElastiSearch-采用OpenReplicator解析MySQL binlog

Posted 星河scorpion

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElastiSearch-采用OpenReplicator解析MySQL binlog相关的知识,希望对你有一定的参考价值。

欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。
Open Replicator项目地址:https://github.com/whitesock/open-replicator

binlog事件分析结构图

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{
  "eventId": 1,
  "databaseName": "canal_test",
  "tableName": "`company`",
  "eventType": 2,
  "timestamp": 1477033198000,
  "timestampReceipt": 1477033248780,
  "binlogName": "mysql-bin.000006",
  "position": 353,
  "nextPostion": 468,
  "serverId": 2,
  "before": null,
  "after": null,
  "isDdl": true,
  "sql": "DROP TABLE `company` /* generated by server */"
}



DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

{
  "eventId": 0,
  "databaseName": "canal_test",
  "tableName": "person",
  "eventType": 24,
  "timestamp": 1477030734000,
  "timestampReceipt": 1477032161988,
  "binlogName": "mysql-bin.000006",
  "position": 242,
  "nextPostion": 326,
  "serverId": 2,
  "before": {
    "id": "3",
    "sex": "f",
    "address": "shanghai",
    "age": "23",
    "name": "zzh3"
  },
  "after": {
    "id": "3",
    "sex": "m",
    "address": "shanghai",
    "age": "23",
    "name": "zzh3"
  },
  "isDdl": false,
  "sql": null
}


相关的类文件如下:

CDCEvent.java

package or;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;

public class CDCEvent {
    private long eventId = 0;//事件唯一标识
    private String databaseName = null;
    private String tableName = null;
    private int eventType = 0;//事件类型
    private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
    private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
    private String binlogName = null;// binlog file name
    private long position = 0;
    private long nextPostion = 0;
    private long serverId = 0;
    private Map<String,String> before = null;
    private Map<String,String> after = null;
    private Boolean isDdl= null;
    private String sql = null;
    
    private static AtomicLong uuid = new AtomicLong(0);
    public CDCEvent(){}
        
    public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){
        this.init(are);
        this.databaseName = databaseName;
        this.tableName = tableName;
    }
    
    private void init(final BinlogEventV4 be){
        this.eventId = uuid.getAndAdd(1);
        BinlogEventV4Header header = be.getHeader();
        
        this.timestamp = header.getTimestamp();
        this.eventType = header.getEventType();
        this.serverId = header.getServerId();
        this.timestampReceipt = header.getTimestampOfReceipt();
        this.position = header.getPosition();
        this.nextPostion = header.getNextPosition();
        this.binlogName = header.getBinlogFileName();
    }
    
    @Override
    public String toString(){
        StringBuilder builder = new StringBuilder();
        builder.append("{ eventId:").append(eventId);
        builder.append(",databaseName:").append(databaseName);
        builder.append(",tableName:").append(tableName);
        builder.append(",eventType:").append(eventType);
        builder.append(",timestamp:").append(timestamp);
        builder.append(",timestampReceipt:").append(timestampReceipt);
        builder.append(",binlogName:").append(binlogName);
        builder.append(",position:").append(position);
        builder.append(",nextPostion:").append(nextPostion);
        builder.append(",serverId:").append(serverId);
        builder.append(",isDdl:").append(isDdl);
        builder.append(",sql:").append(sql);
        builder.append(",before:").append(before);
        builder.append(",after:").append(after).append("}");
        
        return builder.toString();
    }
// 省略Getter和Setter方法    
}



open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:

    /**
     * ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
     * 然后跟取回的List<Column>进行映射。
     * 
     * @param cols
     * @param databaseName
     * @param tableName
     * @return
     */
    private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){
        Map<String,String> map = new HashMap<>();
        if(cols == null || cols.size()==0){
            return null;
        }
        
        String fullName = databaseName+"."+tableName;
        List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
        if(columnInfoList == null)
            return null;
        if(columnInfoList.size() != cols.size()){
            TableInfoKeeper.refreshColumnsMap();
            if(columnInfoList.size() != cols.size())
            {
                logger.warn("columnInfoList.size is not equal to cols.");
                return null;
            }
        }
        
        for(int i=0;i<columnInfoList.size(); i++){
            if(cols.get(i).getValue()==null)
                map.put(columnInfoList.get(i).getName(),"");
            else
                map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
        }
        
        return map;
    }

    /**
     * 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
     * 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
     * 
     * @param qe
     * @return
     */    
    private TableInfo createTableInfo(QueryEvent qe){
        String sql = qe.getSql().toString().toLowerCase();
        
        TableInfo ti = new TableInfo();
        String databaseName = qe.getDatabaseName().toString();
        String tableName = null;
        if(checkFlag(sql,"table")){
            tableName = getTableName(sql,"table");
        } else if(checkFlag(sql,"truncate")){
            tableName = getTableName(sql,"truncate");
        } else{
            logger.warn("can not find table name from sql:{}",sql);
            return null;
        }
        ti.setDatabaseName(databaseName);
        ti.setTableName(tableName);
        ti.setFullName(databaseName+"."+tableName);
        
        return ti;
    }

    private boolean checkFlag(String sql, String flag){
        String[] ss = sql.split(" ");
        for(String s:ss){
            if(s.equals(flag)){
                return true;
            }
        }
        return false;
    }
    
    private String getTableName(String sql, String flag){
        String[] ss = sql.split("\\\\.");
        String tName = null;
        if (ss.length > 1) {
            String[] strs = ss[1].split(" ");
            tName = strs[0];
        } else {
            String[] strs = sql.split(" ");
            boolean start = false;
            for (String s : strs) {
                if (s.indexOf(flag) >= 0) {
                    start = true;
                    continue;
                }
                if (start && !s.isEmpty()) {
                    tName = s;
                    break;
                }
            }
        }
        tName.replaceAll("`", "").replaceAll(";", "");
        
        //del "("[create table person(....]
        int index = tName.indexOf('(');
        if(index>0){
            tName = tName.substring(0, index);
        }
        
        return tName;
    }
}


上面所涉及到的TableInfo .java如下:

package or.model;

public class TableInfo {

    private String databaseName;
    private String tableName;
    private String fullName;
    // 省略Getter和Setter
    
    @Override
    public boolean equals(Object o){
        if(this == o)
            return true;
        if(o == null || this.getClass()!=o.getClass())
            return false;
        TableInfo tableInfo = (TableInfo)o;
        if(!this.databaseName.equals(tableInfo.getDatabaseName()))
            return false;
        if(!this.tableName.equals(tableInfo.getTableName()))
            return false;
        if(!this.fullName.equals(tableInfo.getFullName()))
            return false;
        return true;
    }
    
    @Override
    public int hashCode(){
        int result = this.tableName.hashCode();
        result = 31*result+this.databaseName.hashCode();
        result = 31*result+this.fullName.hashCode();
        return result;
    }
}


接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java

package or.keeper;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import or.MysqlConnection;
import or.model.ColumnInfo;
import or.model.TableInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.impl.event.TableMapEvent;

public class TableInfoKeeper {

    private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);
    
    private static Map<Long,TableInfo> tabledIdMap = new ConcurrentHashMap<>();
    private static Map<String,List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();
    
    static{
        columnsMap = MysqlConnection.getColumns();
    }
    
    public static void saveTableIdMap(TableMapEvent tme){
        long tableId = tme.getTableId();
        tabledIdMap.remove(tableId);
        
        TableInfo table = new TableInfo();
        table.setDatabaseName(tme.getDatabaseName().toString());
        table.setTableName(tme.getTableName().toString());
        table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());
        
        tabledIdMap.put(tableId, table);
    }
    
    public static synchronized void refreshColumnsMap(){
        Map<String,List<ColumnInfo>> map = MysqlConnection.getColumns();
        if(map.size()>0){
//            logger.warn("refresh and clear cols.");
            columnsMap = map;
//            logger.warn("refresh and switch cols:{}",map);
        }
        else
        {
            logger.error("refresh columnsMap error.");
        }
    }
    
    public static TableInfo getTableInfo(long tableId){
        return tabledIdMap.get(tableId);
    }
    
    public static List<ColumnInfo> getColumns(String fullName){
        return columnsMap.get(fullName);
    }
}



正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:

package or;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import or.model.BinlogInfo;
import or.model.BinlogMasterStatus;
import or.model.ColumnInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MysqlConnection {
    private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
    
    private static Connection conn;
    
    private static String host;
    private static int port;
    private static String user;
    private static String password;
    
    public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){
        try {
            if(conn == null || conn.isClosed()){
                Class.forName("com.mysql.jdbc.Driver");
                
                host = hostArg;
                port = portArg;
                user = userArg;
                password = passwordArg;
                
                conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);
                logger.info("connected to mysql:{} : {}",user,password);
            }
        } catch (ClassNotFoundException e) {
            logger.error(e.getMessage(),e);
        } catch (SQLException e) {
            logger.error(e.getMessage(),e);
        }
    }
    
    public static Connection getConnection(){
        try {
            if(conn == null || conn.isClosed()){
                setConnection(host,port,user,password);
            }
        } catch (SQLException e) {
            logger.error(e.getMessage(),e);
        }
        return conn;
    }

    /**
     * 获取Column信息
     * 
     * @return
     */
    public static Map<String,List<ColumnInfo>> getColumns(){
        Map<String,List<ColumnInfo>> cols = new HashMap<>();
        Connection conn = getConnection();
        
        try {
            DatabaseMetaData metaData = conn.getMetaData();
            ResultSet r = metaData.getCatalogs();
            String tableType[] = {"TABLE"};
            while(r.next()){
                String databaseName = r.getString("TABLE_CAT");
                ResultSet result = metaData.getTables(databaseName, null, null, tableType);
                while(result.next()){
                    String tableName = result.getString("TABLE_NAME");
//                    System.out.println(result.getInt("TABLE_ID"));
                    String key = databaseName +"."+tableName;
                    ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
                    cols.put(key, new ArrayList<ColumnInfo>());
                    while(colSet.next()){
                        ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));
                        cols.get(key).add(columnInfo);
                    }
                    
                }
            }
        } catch (SQLException e) { 
            logger.error(e.getMessage(),e);
        }    
        return cols;
    }
    
    /**
     * 参考
     * mysql> show binary logs
     *  +------------------+-----------+
     *    | Log_name         | File_size |
     *    +------------------+-----------+
     *    | mysql-bin.000001 |       126 |
     *    | mysql-bin.000002 |       126 |
     *    | mysql-bin.000003 |      6819 |
     *    | mysql-bin.000004 |      1868 |
     *    +------------------+-----------+
     */
    public static List<BinlogInfo> getBinlogInfo(){
        List<BinlogInfo> binlogList = new ArrayList<>();
        
        Connection conn = null;
        Statement statement = null;
        ResultSet resultSet = null;
        
        try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery("show binary logs");
            while(resultSet.next()){
                BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));
                binlogList.add(binlogInfo);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        } finally{
            try {
                if(resultSet != null)
                    resultSet.close();
                if(statement != null)
                    statement.close();
                if(conn != null)
                    conn.close();
            } catch (SQLException e) {
                logger.error(e.getMessage(),e);
            }
        }
        
        return binlogList;
    }

    /**
     * 参考:    
     * mysql> show master status;
     *     +------------------+----------+--------------+------------------+
     *     | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |
     *     +------------------+----------+--------------+------------------+
     *     | mysql-bin.000004 |     1868 |              |                  |
     *     +------------------+----------+--------------+------------------+
     * @return
     */
    public static BinlogMasterStatus getBinlogMasterStatus(){
        BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();
        
        Connection conn = null;
        Statement statement = null;
        ResultSet resultSet = null;
        
        try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery("show master status");
            while(resultSet.next()){
                binlogMasterStatus.setBinlogName(resultSet.getString("File"));
                binlogMasterStatus.setPosition(resultSet.getLong("Position"));
            }
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        } finally{
            try {
                if(resultSet != null)
                    resultSet.close();
                if(statement != null)
                    statement.close();
                if(conn != null)
                    conn.close();
            } catch (SQLException e) {
                logger.error(e.getMessage(),e);
            }
        }
        
        return binlogMasterStatus;
    }

    /**
     * 获取open-replicator所连接的mysql服务器的serverid信息
     * @return
     */
    public static int getServerId(){
        int serverId=6789;
        Connection conn = null;
        Statement statement = null;
        ResultSet resultSet = null;
        
        try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery("show variables like 'server_id'");
            while(resultSet.next()){
                serverId = resultSet.getInt("Value");
            }
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        } finally{
            try {
                if(resultSet != null)
                    resultSet.close();
                if(statement != null)
                    statement.close();
                if(conn != null)
                    conn.close();
            } catch (SQLException e) {
                logger.error(e.getMessage(),e);
            }
        }
        
        return serverId;
    }
}



上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)

package or.model;
public class BinlogInfo {
    private String binlogName;
    private Long fileSize;
    // 省略Getter和Setter
}

package or.model;
public class BinlogMasterStatus {
    private String binlogName;
    private long position;
 // 省略Getter和Setter
}

package or.model;
public class ColumnInfo {
    private String name;
    private String type;
 // 省略Getter和Setter
}


最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)

package or.manager;
import java.util.concurrent.ConcurrentLinkedDeque;
import or.CDCEvent;
public class CDCEventManager {
    public static final ConcurrentLinkedDeque<CDCEvent> queue = new ConcurrentLinkedDeque<>();
}


所有的准备工作都完成了,下面可以解析binlog日志了:

package or.test;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import or.CDCEvent;
import or.InstanceListener;
import or.MysqlConnection;
import or.OpenReplicatorPlus;
import or.manager.CDCEventManager;
import or.model.BinlogMasterStatus;

import com.google.code.or.OpenReplicator;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;

public class OpenReplicatorTest {
    private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
    private static final String host = "xx.xx.xx.60";
    private static final int port = 3306;
    private static final String user = "****";
    private static final String password = "****";
    
    public static void main(String[] args){
        OpenReplicator or = new OpenReplicator ();
        or.setUser(user);
        or.setPassword(password);
        or.setHost(host);
        or.setPort(port);
        MysqlConnection.setConnection(host, port, user, password);
        
//        or.setServerId(MysqlConnection.getServerId());
        //配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId
        
        BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
        or.setBinlogFileName(bms.getBinlogName());
//        or.setBinlogFileName("mysql-bin.000004");
        or.setBinlogPosition(4);
        or.setBinlogEventListener(new InstanceListener());
        try {
            or.start();
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        }
        
        Thread thread = new Thread(new PrintCDCEvent());
        thread.start();
    }

    public static class PrintCDCEvent implements Runnable{
        @Override
        public void run() {
            while(true){
                if(CDCEventManager.queue.isEmpty() == false)
                {
                    CDCEvent ce = CDCEventManager.queue.pollFirst();
                    Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
                    String prettyStr1 = gson.toJson(ce);
                    System.out.println(prettyStr1);    
                }
                else{
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }        
    }
}


时间运行旧了会遇到这样一个问题:

16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog
java.io.EOFException: null
    at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]
16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from xx.xx.xx.60:3306
 

初步解决方案(extends OpenReplicator然后添加重试机制): 

package or;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.OpenReplicator;

public class OpenReplicatorPlus extends OpenReplicator{
    private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);
    private volatile boolean autoRestart = true;
    @Override
    public void stopQuietly(long timeout, TimeUnit unit){
        super.stopQuietly(timeout, unit);
        if(autoRestart){
            try {
                TimeUnit.SECONDS.sleep(10);
                logger.error("Restart OpenReplicator");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。

大功告成~~
 

以上是关于ElastiSearch-采用OpenReplicator解析MySQL binlog的主要内容,如果未能解决你的问题,请参考以下文章

ElastiSearch-采用OpenReplicator解析MySQL binlog

第131天学习打卡(ElastiSearch 集成SpringBoot)

Elastisearch不会通过带有动态类的嵌套调用返回数据

ElastiSearch默认分词器

Elastisearch 简介 使用 Query DSL 映射 分词 Elasticsearch-Rest-Client

SpringBoot整合ElasticSearch实现多版本的兼容