ElastiSearch-采用OpenReplicator解析MySQL binlog
Posted 星河scorpion
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElastiSearch-采用OpenReplicator解析MySQL binlog相关的知识,希望对你有一定的参考价值。
欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/
Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。
Open Replicator项目地址:https://github.com/whitesock/open-replicator
binlog事件分析结构图
在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。
这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:
DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):
"eventId": 1,
"databaseName": "canal_test",
"tableName": "`company`",
"eventType": 2,
"timestamp": 1477033198000,
"timestampReceipt": 1477033248780,
"binlogName": "mysql-bin.000006",
"position": 353,
"nextPostion": 468,
"serverId": 2,
"before": null,
"after": null,
"isDdl": true,
"sql": "DROP TABLE `company` /* generated by server */"
DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):
"eventId": 0,
"databaseName": "canal_test",
"tableName": "person",
"eventType": 24,
"timestamp": 1477030734000,
"timestampReceipt": 1477032161988,
"binlogName": "mysql-bin.000006",
"position": 242,
"nextPostion": 326,
"serverId": 2,
"before":
"id": "3",
"sex": "f",
"address": "shanghai",
"age": "23",
"name": "zzh3"
,
"after":
"id": "3",
"sex": "m",
"address": "shanghai",
"age": "23",
"name": "zzh3"
,
"isDdl": false,
"sql": null
相关的类文件如下:
CDCEvent.java
package or;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;
public class CDCEvent
private long eventId = 0;//事件唯一标识
private String databaseName = null;
private String tableName = null;
private int eventType = 0;//事件类型
private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
private String binlogName = null;// binlog file name
private long position = 0;
private long nextPostion = 0;
private long serverId = 0;
private Map<String,String> before = null;
private Map<String,String> after = null;
private Boolean isDdl= null;
private String sql = null;
private static AtomicLong uuid = new AtomicLong(0);
public CDCEvent()
public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName)
this.init(are);
this.databaseName = databaseName;
this.tableName = tableName;
private void init(final BinlogEventV4 be)
this.eventId = uuid.getAndAdd(1);
BinlogEventV4Header header = be.getHeader();
this.timestamp = header.getTimestamp();
this.eventType = header.getEventType();
this.serverId = header.getServerId();
this.timestampReceipt = header.getTimestampOfReceipt();
this.position = header.getPosition();
this.nextPostion = header.getNextPosition();
this.binlogName = header.getBinlogFileName();
@Override
public String toString()
StringBuilder builder = new StringBuilder();
builder.append(" eventId:").append(eventId);
builder.append(",databaseName:").append(databaseName);
builder.append(",tableName:").append(tableName);
builder.append(",eventType:").append(eventType);
builder.append(",timestamp:").append(timestamp);
builder.append(",timestampReceipt:").append(timestampReceipt);
builder.append(",binlogName:").append(binlogName);
builder.append(",position:").append(position);
builder.append(",nextPostion:").append(nextPostion);
builder.append(",serverId:").append(serverId);
builder.append(",isDdl:").append(isDdl);
builder.append(",sql:").append(sql);
builder.append(",before:").append(before);
builder.append(",after:").append(after).append("");
return builder.toString();
// 省略Getter和Setter方法
open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:
/**
* ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
* 然后跟取回的List<Column>进行映射。
*
* @param cols
* @param databaseName
* @param tableName
* @return
*/
private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName)
Map<String,String> map = new HashMap<>();
if(cols == null || cols.size()==0)
return null;
String fullName = databaseName+"."+tableName;
List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
if(columnInfoList == null)
return null;
if(columnInfoList.size() != cols.size())
TableInfoKeeper.refreshColumnsMap();
if(columnInfoList.size() != cols.size())
logger.warn("columnInfoList.size is not equal to cols.");
return null;
for(int i=0;i<columnInfoList.size(); i++)
if(cols.get(i).getValue()==null)
map.put(columnInfoList.get(i).getName(),"");
else
map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
return map;
/**
* 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
* 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
*
* @param qe
* @return
*/
private TableInfo createTableInfo(QueryEvent qe)
String sql = qe.getSql().toString().toLowerCase();
TableInfo ti = new TableInfo();
String databaseName = qe.getDatabaseName().toString();
String tableName = null;
if(checkFlag(sql,"table"))
tableName = getTableName(sql,"table");
else if(checkFlag(sql,"truncate"))
tableName = getTableName(sql,"truncate");
else
logger.warn("can not find table name from sql:",sql);
return null;
ti.setDatabaseName(databaseName);
ti.setTableName(tableName);
ti.setFullName(databaseName+"."+tableName);
return ti;
private boolean checkFlag(String sql, String flag)
String[] ss = sql.split(" ");
for(String s:ss)
if(s.equals(flag))
return true;
return false;
private String getTableName(String sql, String flag)
String[] ss = sql.split("\\\\.");
String tName = null;
if (ss.length > 1)
String[] strs = ss[1].split(" ");
tName = strs[0];
else
String[] strs = sql.split(" ");
boolean start = false;
for (String s : strs)
if (s.indexOf(flag) >= 0)
start = true;
continue;
if (start && !s.isEmpty())
tName = s;
break;
tName.replaceAll("`", "").replaceAll(";", "");
//del "("[create table person(....]
int index = tName.indexOf('(');
if(index>0)
tName = tName.substring(0, index);
return tName;
上面所涉及到的TableInfo .java如下:
package or.model;
public class TableInfo
private String databaseName;
private String tableName;
private String fullName;
// 省略Getter和Setter
@Override
public boolean equals(Object o)
if(this == o)
return true;
if(o == null || this.getClass()!=o.getClass())
return false;
TableInfo tableInfo = (TableInfo)o;
if(!this.databaseName.equals(tableInfo.getDatabaseName()))
return false;
if(!this.tableName.equals(tableInfo.getTableName()))
return false;
if(!this.fullName.equals(tableInfo.getFullName()))
return false;
return true;
@Override
public int hashCode()
int result = this.tableName.hashCode();
result = 31*result+this.databaseName.hashCode();
result = 31*result+this.fullName.hashCode();
return result;
接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java
package or.keeper;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import or.MysqlConnection;
import or.model.ColumnInfo;
import or.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.binlog.impl.event.TableMapEvent;
public class TableInfoKeeper
private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);
private static Map<Long,TableInfo> tabledIdMap = new ConcurrentHashMap<>();
private static Map<String,List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();
static
columnsMap = MysqlConnection.getColumns();
public static void saveTableIdMap(TableMapEvent tme)
long tableId = tme.getTableId();
tabledIdMap.remove(tableId);
TableInfo table = new TableInfo();
table.setDatabaseName(tme.getDatabaseName().toString());
table.setTableName(tme.getTableName().toString());
table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());
tabledIdMap.put(tableId, table);
public static synchronized void refreshColumnsMap()
Map<String,List<ColumnInfo>> map = MysqlConnection.getColumns();
if(map.size()>0)
// logger.warn("refresh and clear cols.");
columnsMap = map;
// logger.warn("refresh and switch cols:",map);
else
logger.error("refresh columnsMap error.");
public static TableInfo getTableInfo(long tableId)
return tabledIdMap.get(tableId);
public static List<ColumnInfo> getColumns(String fullName)
return columnsMap.get(fullName);
正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:
package or;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import or.model.BinlogInfo;
import or.model.BinlogMasterStatus;
import or.model.ColumnInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MysqlConnection
private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
private static Connection conn;
private static String host;
private static int port;
private static String user;
private static String password;
public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg)
try
if(conn == null || conn.isClosed())
Class.forName("com.mysql.jdbc.Driver");
host = hostArg;
port = portArg;
user = userArg;
password = passwordArg;
conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);
logger.info("connected to mysql: : ",user,password);
catch (ClassNotFoundException e)
logger.error(e.getMessage(),e);
catch (SQLException e)
logger.error(e.getMessage(),e);
public static Connection getConnection()
try
if(conn == null || conn.isClosed())
setConnection(host,port,user,password);
catch (SQLException e)
logger.error(e.getMessage(),e);
return conn;
/**
* 获取Column信息
*
* @return
*/
public static Map<String,List<ColumnInfo>> getColumns()
Map<String,List<ColumnInfo>> cols = new HashMap<>();
Connection conn = getConnection();
try
DatabaseMetaData metaData = conn.getMetaData();
ResultSet r = metaData.getCatalogs();
String tableType[] = "TABLE";
while(r.next())
String databaseName = r.getString("TABLE_CAT");
ResultSet result = metaData.getTables(databaseName, null, null, tableType);
while(result.next())
String tableName = result.getString("TABLE_NAME");
// System.out.println(result.getInt("TABLE_ID"));
String key = databaseName +"."+tableName;
ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
cols.put(key, new ArrayList<ColumnInfo>());
while(colSet.next())
ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));
cols.get(key).add(columnInfo);
catch (SQLException e)
logger.error(e.getMessage(),e);
return cols;
/**
* 参考
* mysql> show binary logs
* +------------------+-----------+
* | Log_name | File_size |
* +------------------+-----------+
* | mysql-bin.000001 | 126 |
* | mysql-bin.000002 | 126 |
* | mysql-bin.000003 | 6819 |
* | mysql-bin.000004 | 1868 |
* +------------------+-----------+
*/
public static List<BinlogInfo> getBinlogInfo()
List<BinlogInfo> binlogList = new ArrayList<>();
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show binary logs");
while(resultSet.next())
BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));
binlogList.add(binlogInfo);
catch (Exception e)
logger.error(e.getMessage(),e);
finally
try
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
catch (SQLException e)
logger.error(e.getMessage(),e);
return binlogList;
/**
* 参考:
* mysql> show master status;
* +------------------+----------+--------------+------------------+
* | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
* +------------------+----------+--------------+------------------+
* | mysql-bin.000004 | 1868 | | |
* +------------------+----------+--------------+------------------+
* @return
*/
public static BinlogMasterStatus getBinlogMasterStatus()
BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show master status");
while(resultSet.next())
binlogMasterStatus.setBinlogName(resultSet.getString("File"));
binlogMasterStatus.setPosition(resultSet.getLong("Position"));
catch (Exception e)
logger.error(e.getMessage(),e);
finally
try
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
catch (SQLException e)
logger.error(e.getMessage(),e);
return binlogMasterStatus;
/**
* 获取open-replicator所连接的mysql服务器的serverid信息
* @return
*/
public static int getServerId()
int serverId=6789;
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show variables like 'server_id'");
while(resultSet.next())
serverId = resultSet.getInt("Value");
catch (Exception e)
logger.error(e.getMessage(),e);
finally
try
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
catch (SQLException e)
logger.error(e.getMessage(),e);
return serverId;
上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)
package or.model;
public class BinlogInfo
private String binlogName;
private Long fileSize;
// 省略Getter和Setter
package or.model;
public class BinlogMasterStatus
private String binlogName;
private long position;
// 省略Getter和Setter
package or.model;
public class ColumnInfo
private String name;
private String type;
// 省略Getter和Setter
最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)
package or.manager;
import java.util.concurrent.ConcurrentLinkedDeque;
import or.CDCEvent;
public class CDCEventManager
public static final ConcurrentLinkedDeque<CDCEvent> queue = new ConcurrentLinkedDeque<>();
所有的准备工作都完成了,下面可以解析binlog日志了:
package or.test;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import or.CDCEvent;
import or.InstanceListener;
import or.MysqlConnection;
import or.OpenReplicatorPlus;
import or.manager.CDCEventManager;
import or.model.BinlogMasterStatus;
import com.google.code.or.OpenReplicator;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
public class OpenReplicatorTest
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
private static final String host = "xx.xx.xx.60";
private static final int port = 3306;
private static final String user = "****";
private static final String password = "****";
public static void main(String[] args)
OpenReplicator or = new OpenReplicator ();
or.setUser(user);
or.setPassword(password);
or.setHost(host);
or.setPort(port);
MysqlConnection.setConnection(host, port, user, password);
// or.setServerId(MysqlConnection.getServerId());
//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId
BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
or.setBinlogFileName(bms.getBinlogName());
// or.setBinlogFileName("mysql-bin.000004");
or.setBinlogPosition(4);
or.setBinlogEventListener(new InstanceListener());
try
or.start();
catch (Exception e)
logger.error(e.getMessage(),e);
Thread thread = new Thread(new PrintCDCEvent());
thread.start();
public static class PrintCDCEvent implements Runnable
@Override
public void run()
while(true)
if(CDCEventManager.queue.isEmpty() == false)
CDCEvent ce = CDCEventManager.queue.pollFirst();
Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
String prettyStr1 = gson.toJson(ce);
System.out.println(prettyStr1);
else
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
时间运行旧了会遇到这样一个问题:
16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog
java.io.EOFException: null
at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]
16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from xx.xx.xx.60:3306
初步解决方案(extends OpenReplicator然后添加重试机制):
package or;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.OpenReplicator;
public class OpenReplicatorPlus extends OpenReplicator
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);
private volatile boolean autoRestart = true;
@Override
public void stopQuietly(long timeout, TimeUnit unit)
super.stopQuietly(timeout, unit);
if(autoRestart)
try
TimeUnit.SECONDS.sleep(10);
logger.error("Restart OpenReplicator");
catch (InterruptedException e)
e.printStackTrace();
最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。
大功告成~~
以上是关于ElastiSearch-采用OpenReplicator解析MySQL binlog的主要内容,如果未能解决你的问题,请参考以下文章
ElastiSearch-采用OpenReplicator解析MySQL binlog
第131天学习打卡(ElastiSearch 集成SpringBoot)
Elastisearch不会通过带有动态类的嵌套调用返回数据
Elastisearch 简介 使用 Query DSL 映射 分词 Elasticsearch-Rest-Client