spring boot使用mysql-binlog-connector-java解析mysql binlog日志(实时+离线)

Posted lgq2016

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring boot使用mysql-binlog-connector-java解析mysql binlog日志(实时+离线)相关的知识,希望对你有一定的参考价值。

         mysql数据变更捕获的实现已经有很多开源工具,比如canal,debezium,maxwell等等。alibaba/canal实现了mysql连接协议,debezium和maxwell等则是利用mysql-binlog-connector-java开源工具连接mysql数据源,实现获取binlog日志。本篇文章介绍通过引入mysql-binlog-connector-java依赖,提供在线(即连接数据源,实时获取)和离线(读取离线文件)两种方式获取,解析binlog日志(利用mysql-binlog-connector-java的事件解析数据表增删改操作DML)。

话不多说,直接上代码。

BinLogConstants类主要提供目标数据库的连接信息,也包括解析模式选择(在线or离线)。

package com.binlog.parser;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 监听配置信息
 *
 * @author lgq
 * @since 2022/7/27
 **/
@Data
@Component
public class BinLogConstants 
    @Value("$binlog.datasource.host")
    private String host;

    @Value("$binlog.datasource.port")
    private int port;

    @Value("$binlog.datasource.username")
    private String username;

    @Value("$binlog.datasource.passwd")
    private String passwd;

    @Value("$binlog.db")
    private String db;

    @Value("$binlog.table")
    private String table;

    @Value("$binlog.isOnline")
    private boolean isOnline;

    public static final int consumerThreads = 4;

    public static final int fileScanThreads = 4;

    public static final long queueSleep = 1000;


BinLogListener类注册消费监听器,提供事件解析方法,并启动消费线程。
package com.binlog.parser;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;

import static com.github.shyiko.mysql.binlog.event.EventType.QUERY;
import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;
import static com.github.shyiko.mysql.binlog.event.EventType.isUpdate;
import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;

/**
 * binlog日志监听器
 *
 * @author lgq
 * @since 2022/7/21
 **/
public abstract class BinLogListener 
    private final int consumerThreads = BinLogConstants.consumerThreads;
    private final BlockingQueue<BinLogItem> binLogItemQueue;
    private final ExecutorService consumer;
    // 存放每张数据表对应的listener
    private final Multimap<String, BinLogConsumerListener> listeners;
    private final MysqlConnConf mysqlConnConf;
    private final Map<String, Map<String, ColumnInfo>> dbTableCols;
    private String dbTable;

    /**
     * 监听器初始化
     *
     * @param conf
     */
    public BinLogListener(MysqlConnConf conf) 
        this.binLogItemQueue = new ArrayBlockingQueue<>(1024);
        this.mysqlConnConf = conf;
        this.listeners = ArrayListMultimap.create();
        this.dbTableCols = new ConcurrentHashMap<>();
        this.consumer = Executors.newFixedThreadPool(consumerThreads);
    

    /**
     * 注册消费监听
     *
     * @param db       数据库
     * @param table    操作表
     * @param listener 监听器
     * @throws Exception
     */
    public void regConsumerListener(String db, String table, BinLogConsumerListener listener) throws Exception 
        String dbTable = BinLogUtils.getDBTable(db, table);
        // 获取字段集合
        Map<String, ColumnInfo> cols = BinLogUtils.getColMap(mysqlConnConf, db, table);
        // 保存字段信息
        dbTableCols.put(dbTable, cols);
        // 保存当前注册的listener
        listeners.put(dbTable, listener);
    

    /**
     * 读取日志解析并启动多线程消费
     *
     */
    public void consume() 
        consumer.submit(() -> 
            while (true) 
                if (binLogItemQueue.size() > 0) 
                    try 
                        BinLogItem item = binLogItemQueue.take();
                        String dbTable = item.getDbTable();
                        listeners.get(dbTable).forEach(binLogListener -> binLogListener.onEvent(item));
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                
                Thread.sleep(BinLogConstants.queueSleep);
            
        );
    

    public void parseEvent(Event event) 
        EventType eventType = event.getHeader().getEventType();
        if (eventType == QUERY) 
            QueryEventData data = event.getData();
            System.out.println(data.getSql());
        

        if (eventType == EventType.TABLE_MAP) 
            TableMapEventData tableData = event.getData();
            String db = tableData.getDatabase();
            String table = tableData.getTable();
            dbTable = BinLogUtils.getDBTable(db, table);
        

        // 只处理添加删除更新三种操作
        if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) 
            if (isWrite(eventType)) 
                WriteRowsEventData data = event.getData();
                for (Serializable[] row : data.getRows()) 
                    if (dbTableCols.containsKey(dbTable)) 
                        BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable),
                                eventType, event.getHeader().getTimestamp());
                        item.setDbTable(dbTable);
                        binLogItemQueue.add(item);
                    
                
            
            if (isUpdate(eventType)) 
                UpdateRowsEventData data = event.getData();
                for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) 
                    if (dbTableCols.containsKey(dbTable)) 
                        BinLogItem item = BinLogItem.itemFromUpdate(row, dbTableCols.get(dbTable),
                                eventType, event.getHeader().getTimestamp());
                        item.setDbTable(dbTable);
                        binLogItemQueue.add(item);
                    
                

            
            if (isDelete(eventType)) 
                DeleteRowsEventData data = event.getData();
                for (Serializable[] row : data.getRows()) 
                    if (dbTableCols.containsKey(dbTable)) 
                        BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable),
                                eventType, event.getHeader().getTimestamp());
                        item.setDbTable(dbTable);
                        binLogItemQueue.add(item);
                    
                
            
        
    

    public abstract void startParseBinLog();



OnlineBinLogListener类通过BinaryLogClient开启连接远程实时拉取binlog日志。
package com.binlog.parser;

import java.io.IOException;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;

import lombok.extern.slf4j.Slf4j;

/**
 * 数据库监听器
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Slf4j
public class OnlineBinLogListener extends BinLogListener implements BinaryLogClient.EventListener 
    private final BinaryLogClient parseClient;

    /**
     * 监听器初始化
     *
     * @param conf
     */
    public OnlineBinLogListener(MysqlConnConf conf) 
        super(conf);
        BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPasswd());
        EventDeserializer eventDeserializer = new EventDeserializer();
        /*
        eventDeserializer.setCompatibilityMode( //序列化
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
         */
        client.setEventDeserializer(eventDeserializer);
        this.parseClient = client;
    

    /**
     * 监听处理
     *
     * @param event
     */
    @Override
    public void onEvent(Event event) 
        parseEvent(event);
    

    @Override
    public void startParseBinLog() 
        // 先启动消费线程
        consume();
        parseClient.registerEventListener(this);
        // 连接数据库,开始拉取binlog日志(没有过滤库,表信息)
        try 
            parseClient.connect();
         catch (IOException e) 
            e.printStackTrace();
        
    



OfflineBinLogListener实时获取指定目录中新增的完整的binlog文件(默认为500m)进行离线解析。
package com.binlog.parser;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;

import javax.annotation.Resource;

import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.binlog.config.FileProperties;
import com.binlog.util.SpringUtil;

import lombok.extern.slf4j.Slf4j;

/**
 * 数据库监听器
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Slf4j
public class OfflineBinLogListener extends BinLogListener 
    private MysqlBinLogReader mysqlBinLogReader;

    private FileProperties fileProperties;

    /**
     * 监听器初始化
     *
     * @param conf
     */
    public OfflineBinLogListener(MysqlConnConf conf) 
       super(conf);
       this.mysqlBinLogReader = (MysqlBinLogReader)SpringUtil.getApplicationContext().getBean("mysqlBinLogReader");
       this.fileProperties = (FileProperties)SpringUtil.getApplicationContext().getBean("fileProperties");
    

    public OfflineBinLogListener(MysqlConnConf conf, MysqlBinLogReader mysqlBinLogReader,
                                 FileProperties fileProperties) 
        super(conf);
        this.mysqlBinLogReader = mysqlBinLogReader;
        this.fileProperties = fileProperties;
    

    @Override
    public void startParseBinLog() 
        // 先启动消费线程
        consume();
        BlockingQueue<String> fileNameQueue = mysqlBinLogReader.getFileNameQueue();
        ExecutorService offlineFileProcess = mysqlBinLogReader.getOfflineFileProcess();
        offlineFileProcess.submit(() -> 
            Thread.currentThread().setName("scanBinLogFileConsumer");
            while (true) 
                if (!fileNameQueue.isEmpty()) 
                    String fileName;
                    try 
                        fileName = fileNameQueue.take();
                        File binlogFile = new File(fileProperties.getLocalBackupDir(), fileName);
                        EventDeserializer eventDeserializer = new EventDeserializer();

                        BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
                        for (Event event; (event = reader.readEvent()) != null; ) 
                            parseEvent(event);
                        
                     catch (InterruptedException e) 
                        e.printStackTrace();
                     catch (IOException e) 
                        e.printStackTrace();
                    

                
            
        );
    



BinLogListenerStarter监听启动器注册指定的库,表监听器,并启动日志解析和消费线程。
package com.binlog.parser;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;


/**
 * 测试监听器
 * SpringBoot启动成功后的执行业务线程操作
 * CommandLineRunner去实现此操作
 * 在有多个可被执行的业务时,通过使用 @Order 注解,设置各个线程的启动顺序(value值由小到大表示启动顺序)。
 * 多个实现CommandLineRunner接口的类必须要设置启动顺序,不然程序启动会报错!
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Slf4j
@Component
@Order(value = 1)
public class BinLogListenerStarter implements CommandLineRunner 

    @Resource
    private BinLogConstants binLogConstants;

    @Override
    public void run(String... args) throws Exception 
        log.info("初始化配置信息:" + binLogConstants.toString());

        // 初始化配置信息
        MysqlConnConf conf = new MysqlConnConf(binLogConstants.getHost(), binLogConstants.getPort(),
                binLogConstants.getUsername(), binLogConstants.getPasswd());

        // 初始化监听器
        BinLogListener binLogListener;
        if (binLogConstants.isOnline()) 
            binLogListener = new OnlineBinLogListener(conf);
         else 
            binLogListener = new OfflineBinLogListener(conf);
        

        // 获取table集合
        List<String> tableList = BinLogUtils.getListByStr(binLogConstants.getTable());
        if (Objects.isNull(tableList) || tableList.size() == 0) 
            return;
        
        // 注册监听
        tableList.forEach(table -> 
            log.info("注册监听信息,注册DB:, 注册表:", binLogConstants.getDb(), table);
            try 
                binLogListener.regConsumerListener(binLogConstants.getDb(), table, item -> 
                    log.info("监听逻辑处理:打印一下数据变更信息: ", item.toString());
                );
             catch (Exception e) 
                log.error("BinLog监听异常:", e);
            
        );
        // 开启日志获取和解析业务线程
        binLogListener.startParseBinLog();
    


ColumnInfo存储表字段信息。
package com.binlog.parser;

import lombok.Data;

/**
 * 字段属性对象
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Data
public class ColumnInfo 
    public int inx;
    public String colName; // 列名
    public String dataType; // 类型
    public String schema; // 数据库
    public String table; // 表
    public boolean isPKC; //是否主键

    public ColumnInfo(String schema, String table, int idx, String colName, String dataType, boolean isPKC) 
        this.schema = schema;
        this.table = table;
        this.colName = colName;
        this.dataType = dataType;
        this.inx = idx;
        this.isPKC = isPKC;
    

BinLogItem存储解析日志获得的结构化数据。
package com.binlog.parser;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import com.github.shyiko.mysql.binlog.event.EventType;
import com.google.common.collect.Maps;

import lombok.Data;

import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;
import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;

/**
 * binlog对象
 *
 * @author lgq
 * @since 2021/7/26
 **/
@Data
public class BinLogItem implements Serializable 
    private static final long serialVersionUID = 1L;

    private String dbTable;
    private EventType eventType;
    private Long timestamp = null;
    private Long serverId = null;
    // 存储字段-之前的值之后的值
    private Map<String, Serializable> before = null;
    private Map<String, Serializable> after = null;
    // 存储字段--类型
    private Map<String, ColumnInfo> columnInfoMap = null;
    // json格式的数据变更信息
    private Map<String, Object> messageJson = new HashMap<>();

    /**
     * 新增或者删除操作数据格式化
     */
    public static BinLogItem itemFromInsertOrDeleted(Serializable[] row, Map<String, ColumnInfo> columnInfoMap,
                                                     EventType eventType, Long timestamp) 
        if (null == row || null == columnInfoMap) 
            return null;
        
       /* if (row.length != columnInfoMap.size()) 
            return null;
        */
        // 初始化Item
        BinLogItem item = new BinLogItem();
        item.eventType = eventType;
        item.columnInfoMap = columnInfoMap;
        item.before = Maps.newHashMap();
        item.after = Maps.newHashMap();
        item.timestamp = timestamp;

        // json预设主键
        item.messageJson.put("pkc", null);

        Map<String, Serializable> beOrAf = Maps.newHashMap();
        columnInfoMap.entrySet().forEach(entry -> 
            String key = entry.getKey();
            ColumnInfo columnInfo = entry.getValue();
            beOrAf.put(key, row[columnInfo.inx]);
            if (columnInfo.isPKC) 
                // json设置主键
                item.messageJson.put("pkc", key);
            
        );

        // 写操作放after,删操作放before
        if (isWrite(eventType)) 
            item.after = beOrAf;
        
        if (isDelete(eventType)) 
            item.before = beOrAf;
        

        // 构造json格式的数据变更信息
        item.messageJson.put("op", Operator.valueOf(eventType));
        item.messageJson.put("ts", item.timestamp);
        item.messageJson.put("before", item.before);
        item.messageJson.put("after", item.after);

        return item;
    

    /**
     * 更新操作数据格式化
     */
    public static BinLogItem itemFromUpdate(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, ColumnInfo> columnInfoMap,
                                            EventType eventType, Long timestamp) 
        if (null == mapEntry || null == columnInfoMap) 
            return null;
        
        // 初始化Item
        BinLogItem item = new BinLogItem();
        item.eventType = eventType;
        item.columnInfoMap = columnInfoMap;
        item.before = Maps.newHashMap();
        item.after = Maps.newHashMap();
        item.timestamp = timestamp;
        // json预设主键
        item.messageJson.put("pkc", null);

        Map<String, Serializable> be = Maps.newHashMap();
        Map<String, Serializable> af = Maps.newHashMap();

        columnInfoMap.entrySet().forEach(entry -> 
            String key = entry.getKey();
            ColumnInfo columnInfo = entry.getValue();
            be.put(key, mapEntry.getKey()[columnInfo.inx]);
            af.put(key, mapEntry.getValue()[columnInfo.inx]);
            if (columnInfo.isPKC) 
                // json设置主键
                item.messageJson.put("pkc", key);
            
        );

        item.before = be;
        item.after = af;

        /*
        * 构造json格式的数据变更信息
        */
        item.messageJson.put("op", Operator.valueOf(eventType));
        item.messageJson.put("ts", item.timestamp);
        item.messageJson.put("before", item.before);
        item.messageJson.put("after", item.after);

        return item;
    

    @Override
    public String toString() 
        return messageJson.toString();
    



BinLogUtils提供一些工具类方法供解析时使用。
package com.binlog.parser;

import com.github.shyiko.mysql.binlog.event.EventType;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.io.Serializable;
import java.sql.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;
import static com.github.shyiko.mysql.binlog.event.EventType.isUpdate;
import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;

/**
 * 监听工具
 *
 * @author lgq
 * @since 2021/7/27
 **/
@Slf4j
@Component
public class BinLogUtils 

    private static BinLogUtils binLogUtils;

    @PostConstruct
    public void init() 
        binLogUtils = this;
    

    /**
     * 拼接dbTable
     */
    public static String getDBTable(String db, String table) 
        return db + "-" + table;
    

    /**
     * 获取columns集合
     */
    public static Map<String, ColumnInfo> getColMap(MysqlConnConf conf, String db, String table) throws ClassNotFoundException 
        try 
            Class.forName("com.mysql.jdbc.Driver");
            // 保存当前注册的表的column信息
            Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.getHost() + ":"
                    + conf.getPort(), conf.getUsername(), conf.getPasswd());
            // 执行sql
            String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, " +
                    "DATA_TYPE, ORDINAL_POSITION, case when COLUMN_KEY = 'PRI' THEN 'Y' ELSE 'N' END IS_PKC" +
                    " FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";
            PreparedStatement ps = connection.prepareStatement(preSql);
            ps.setString(1, db);
            ps.setString(2, table);
            ResultSet rs = ps.executeQuery();
            Map<String, ColumnInfo> map = new HashMap<>(rs.getRow());
            while (rs.next()) 
                String schema = rs.getString("TABLE_SCHEMA");
                String tableName = rs.getString("TABLE_NAME");
                String column = rs.getString("COLUMN_NAME");
                int idx = rs.getInt("ORDINAL_POSITION");
                String dataType = rs.getString("DATA_TYPE");
                String isPKC = rs.getString("IS_PKC");
                if (column != null && idx >= 1) 
                    map.put(column, new ColumnInfo(schema, tableName, idx - 1, column, dataType,
                            "Y".equals(isPKC))); // sql的位置从1开始
                
            
            ps.close();
            rs.close();
            return map;
         catch (SQLException e) 
            log.error("load db conf error, db_table=: ", db, table, e);
        
        return null;
    

    /**
     * 根据DBTable获取table
     *
     * @param dbTable
     * @return java.lang.String
     */
    public static String getTable(String dbTable) 
        if (Objects.isNull(dbTable)) 
            return "";
        
        String[] split = dbTable.split("-");
        if (split.length == 2) 
            return split[1];
        
        return "";
    

    /**
     * 将逗号拼接字符串转List
     *
     * @param str
     * @return
     */
    public static List<String> getListByStr(String str) 
        if (Objects.isNull(str)) 
            return Lists.newArrayList();
        

        return Arrays.asList(str.split(","));
    

    /**
     * 根据操作类型获取对应集合
     *
     * @param binLogItem
     * @return
     */
    public static Map<String, Serializable> getOptMap(BinLogItem binLogItem) 
        // 获取操作类型
        EventType eventType = binLogItem.getEventType();
        if (isWrite(eventType) || isUpdate(eventType)) 
            return binLogItem.getAfter();
        
        if (isDelete(eventType)) 
            return binLogItem.getBefore();
        
        return null;
    

    /**
     * 获取操作类型
     *
     * @param binLogItem
     * @return
     */
    public static Integer getOptType(BinLogItem binLogItem) 
        // 获取操作类型
        EventType eventType = binLogItem.getEventType();
        if (isWrite(eventType)) 
            return 1;
        
        if (isUpdate(eventType)) 
            return 2;
        
        if (isDelete(eventType)) 
            return 3;
        
        return null;
    



MysqlBinLogReader初始化并启动扫描指定目录下binlog日志的线程。
package com.binlog.parser;

import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.Resource;

import com.binlog.config.FileProperties;
import com.binlog.util.FileInfoUtil;

import lombok.Data;
import org.springframework.stereotype.Component;

/**
 * @author lgq
 * @since 2022/7/21
 */
@Component
@Data
public class MysqlBinLogReader 
    private final int fileScanThreads = 4;

    private final ExecutorService offlineFileProcess;

    private final BlockingQueue<String> fileNameQueue;

    @Resource
    private FileProperties fileProperties;

    public MysqlBinLogReader()
        this.offlineFileProcess = Executors.newFixedThreadPool(fileScanThreads);
        this.fileNameQueue = new ArrayBlockingQueue<>(1024);
    

    public MysqlBinLogReader(FileProperties fileProperties)
        this.offlineFileProcess = Executors.newFixedThreadPool(fileScanThreads);
        this.fileNameQueue = new ArrayBlockingQueue<>(1024);
        this.fileProperties = fileProperties;
    

    public void scanBinLogFile() 
        offlineFileProcess.submit(() -> 
            Thread.currentThread().setName("scanBinLogFileProducer");
            while (true) 
                TreeMap<Long, String> newBinLogFilesPath = FileInfoUtil.getNewBinLogFilesPath(
                        fileProperties.getLocalBackupDir(), fileProperties.getBackupLogName(),
                        fileProperties.getLastFileName());
                newBinLogFilesPath.forEach((createTime, fileName) -> 
                    fileNameQueue.add(fileName);
                );
                if (newBinLogFilesPath.size() > 0) 
                    fileProperties.setLastFileName(newBinLogFilesPath.lastEntry().getValue());
                
            
        );
    

FileInfoUtil提供一些文件扫描的工具类,比如获取指定目录下新增的binlog日志文件(该文件确定已经写入完成,后面mysql的binlog日志是写入新的日志文件)。
package com.binlog.util;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.TreeMap;

public class FileInfoUtil 
    public static TreeMap<Long, String> getNewBinLogFilesPath(String localBackupDir,
                                                          String backupLogName,
                                                          String lastLogPath) throws IOException 
        //treeMap默认根据字典序排序,时间小的在前面
        TreeMap<Long, String> newBinLogFileMap = new TreeMap<>();
        File dir = new File(localBackupDir);
        Long lastFileCreateTime = 0L;
        if (!ObjectsUtil.isEmpty(lastLogPath)) 
            File lastFile = new File(lastLogPath);
            lastFileCreateTime = Files.readAttributes(lastFile.toPath(), BasicFileAttributes.class)
                    .creationTime().toMillis();
        
        String [] files = dir.list();
        for (int i = 0; i < files.length; i++) 
            //排除日志文件
            if (files[i].equals(backupLogName)) 
                continue;
            
            File file = new File(dir, files[i]);
            //排除异常情况:目录
            if (file.isDirectory()) 
                continue;
            
            long fileCreateTime = Files.readAttributes(file.toPath(), BasicFileAttributes.class).creationTime().toMillis();
            if (fileCreateTime > lastFileCreateTime) 
                newBinLogFileMap.put(fileCreateTime, files[i]);
            
        
        return newBinLogFileMap;
    

FilesScanStarter开启文件扫描线程,捕获新增的日志文件。
package com.binlog.parser;

import javax.annotation.Resource;

import com.binlog.config.FileProperties;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
 * @author lgq
 * @since 2022/7/21
 */
@Slf4j
@Order(value = 0)
@Component
public class FilesScanStarter implements CommandLineRunner 
    @Resource
    private MysqlBinLogReader mysqlBinLogReader;

    @Override
    public void run(String... args) 
        mysqlBinLogReader.scanBinLogFile();
    

其他的配置文件如下。

package com.binlog.parser;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * 数据库配置
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Data
@AllArgsConstructor
public class MysqlConnConf 
    private String host;
    private int port;
    private String username;
    private String passwd;



//application.yml文件
/*
server:
  port: 8080

binlog:
  localBackupDir: E:\\backup\\binlog
  backupLog: E:\\backup\\binlog\\backuplog.txt
  backupLogName: backuplog.txt
  lastFileName:
  table: abc
  db: db1
  isOnline: false

  datasource:
    host: 1.1.1.1
    port: 3306
    username: abc
    passwd: 123456
*/




离线方式的指定目录文件获取方式有很多,比如scp,tfp等命令拷贝,或者直接通过mysql提供的mysqlbinlog命令实时拉取,最后给出mysqlbinlog远程实时拉取binlog日志的脚本。

#!/bin/sh
BACKUP_BIN=/usr/bin/mysqlbinlog
LOCAL_BACKUP_DIR=/backup/binlog/
BACKUP_LOG=/backup/binlog/backuplog
REMOTE_HOST=1.1.1.1
REMOTE_PORT=3306
REMOTE_USER=abc
REMOTE_PASS=123456
FIRST_BINLOG=binlog.000001  
#time to wait before reconnecting after failure
SLEEP_SECONDS=10
##create local_backup_dir if necessary
mkdir -p $LOCAL_BACKUP_DIR
cd $LOCAL_BACKUP_DIR
## 运行while循环,连接断开后等待指定时间,重新连接
while :
do
 if [ `ls -A "$LOCAL_BACKUP_DIR" |wc -l` -eq 0 ];then
 LAST_FILE=$FIRST_BINLOG
 else
 LAST_FILE=`ls -l $ | grep -v backuplog |tail -n 1 |awk 'print $9'`
 fi
 $BACKUP_BIN --raw --read-from-remote-server --stop-never --host=$REMOTE_HOST --port=$REMOTE_PORT --user=$REMOTE_USER --password=$REMOTE_PASS $LAST_FILE
 echo "`date +"%Y/%m/%d %H:%M:%S"` mysqlbinlog停止,返回代码:$?" | tee -a $BACKUP_LOG
 echo "$SLEEP_SECONDS秒后再次连接并继续备份" | tee -a $BACKUP_LOG 
 sleep $SLEEP_SECONDS
done

本文介绍的解析过程(尤其是离线方式)有个问题,当数据表结构变更频繁(DDL)时会出现解析错误问题。(离线数据滞后,日志中的数据表结构可能与当前实际的数据表结构不一致,导致解析失败或数据解析错误),解决这个问题可以参考开源软件源代码的处理逻辑,比如maxwell。

开启mysql-binlog日志操作步骤


步骤1:找到mysql主库的配置文件(注意:不能为从库),lunix系统路径一般为/etc/my.cnf

步骤2:修改配置文件(可以提前备份一下)以开启mysql-binlog功能。添加以下3行:

log-bin=mysql-bin     #先添加这一行
binlog-format=ROW     #再选择row模式,切记模式不能错
server_id=1     #配置server_id=1

步骤3:创建新账户
CREATE USER canal IDENTIFIED BY ‘canalserviceforyunnan#2018‘;  
GRANT ALL PRIVILEGES ON *.* TO ‘canal‘@‘%‘ ;

FLUSH PRIVILEGES;

步骤4:重启mysql数据库

以上是关于spring boot使用mysql-binlog-connector-java解析mysql binlog日志(实时+离线)的主要内容,如果未能解决你的问题,请参考以下文章

mysql-binlog日志恢复数据库

spring boot系列spring boot 使用mongodb

使用 spring-boot:run 时是不是可以使用 spring-boot 命令行属性?

spring boot 2.0之使用spring boot

spring boot8.spring boot的日志框架使用

(转)Spring Boot 2 :使用 Docker 部署 Spring Boot