canal 源码解析系列-CanalServerWithEmbedded解读
Posted 犀牛饲养员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 源码解析系列-CanalServerWithEmbedded解读相关的知识,希望对你有一定的参考价值。
前面的文章简单说过这个类。canal server模块的核心接口为CanalServer,有两个实现:
- CanalServerWithEmbedded
- CanalServerWithNetty
这两个实现代表了canal的两种应用模式,CanalServerWithNetty在canal独立部署场景发挥作用,开发者只需要实现cient,不同的应用通过canal client与canal server进行通信,canal client的请求统一由CanalServerWithNetty接受进行处理。
而通过CanalServerWithEmbeded,可以不需要独立部署canal,而是把canal嵌入到我们自己的服务里。但是这种对开发者的要求就比较高。
下面的图表示二者的关系,
从图上可以看出,CanalServerWithNetty是client通过http访问的canal的门户,而它往下其实也是调用CanalServerWithEmbedded
来完成具体的工作。所以我们就来具体说说这个CanalServerWithEmbedded
。
从上图还可以看出,CanalServerWithEmbedded
内部管理所有的CanalInstance
,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理。关于instance后面会有专门的文章分析,这里暂且不表。
下面涉及到源码的地方,我都经过了处理,删减了一些不重要的代码(比如参数校验),便于理解
canal处理client请求的核心逻辑都在SessionHandler
这个类中,这个handler在接收到客户端请求是调用CanalServerWithEmbeded
对应的方法进行处理,如下所示:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
logger.info("message receives in session handler...");
long start = System.nanoTime();
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
ClientIdentity clientIdentity = null;
try {
switch (packet.getType()) {
case SUBSCRIPTION://订阅请求
...
embeddedServer.subscribe(clientIdentity);
...
} else {
byte[] errorBytes = NettyUtils.errorPacket(401,
MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage());
NettyUtils.write(ctx.getChannel(),
errorBytes,
new ChannelFutureAggregator(sub.getDestination(),
sub,
packet.getType(),
errorBytes.length,
System.nanoTime() - start,
(short) 401));
}
break;
case UNSUBSCRIPTION://取消订阅
...
embeddedServer.unsubscribe(clientIdentity);
stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
...
break;
case GET://获取binlog
Get get = CanalPacket.Get.parseFrom(packet.getBody());
...
if (get.getTimeout() == -1) {// 是否是初始值
message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
} else {
TimeUnit unit = convertTimeUnit(get.getUnit());
message = embeddedServer.getWithoutAck(clientIdentity,
get.getFetchSize(),
get.getTimeout(),
unit);
}
// }
...
break;
case CLIENTACK://客户端消费成功ack
ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());
...
clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
embeddedServer.ack(clientIdentity, ack.getBatchId());
...
break;
case CLIENTROLLBACK://客户端消费失败回滚请求
...
if (rollback.getBatchId() == 0L) {
embeddedServer.rollback(clientIdentity);// 回滚所有批次
} else {
embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
}
...
break;
default:
byte[] errorBytes = NettyUtils.errorPacket(400,
MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel()
.getRemoteAddress()
.toString(), null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400));
break;
}
来看下CanalServerWithEmbedded
类,
public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
private static final Logger logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
private Map<String, CanalInstance> canalInstances;//destination --> CanalInstance
...
实现了一个抽象类,两个接口。里面的一个map存放的是destination和CanalInstance的对应关系,这个我们前面的文章说过。
AbstractCanalLifeCycle
和CanalServer
主要是继承的start和stop方法,很简单,这里不表。来看看实现的CanalService
的接口。
public interface CanalService {
//订阅
void subscribe(ClientIdentity clientIdentity) throws CanalServerException;
//取消订阅
void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException;
//获取数据,自动ack
Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
//获取数据,自动ack,可以指定超时时间
Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException;
//获取数据,不ack
Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
//获取数据,不ack,超时时间
Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
throws CanalServerException;
//ack某个批次的数据
void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException;
//回滚没有ack的批次的数据
void rollback(ClientIdentity clientIdentity) throws CanalServerException;
//回滚某个批次的数据
void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException;
}
先来看看订阅方法,
/**
* 客户端订阅,重复订阅时会更新对应的filter信息
*/
@Override
public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
checkStart(clientIdentity.getDestination());//检查 destination对应的instance已经启动
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
//instance持有的metaManager没启动的话就重新启动
if (!canalInstance.getMetaManager().isStart()) {
canalInstance.getMetaManager().start();
}
//通过CanalInstance的CanalMetaManager组件进行元数据管理,记录一下当前这个CanalInstance有客户端在订阅
canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅
//客户端当前订阅的binlog位置
Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
if (position == null) {
//如果是第一次订阅,尝试从CanalEventStore中获取第一个binlog的位置,作为客户端订阅开始的位置。
position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
if (position != null) {
canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
}
logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
} else {
logger.info("subscribe successfully, {} use last cursor position:{} ", clientIdentity, position);
}
// 通知下订阅关系变化
canalInstance.subscribeChange(clientIdentity);
}
解释下。首先可以看到是通过canalInstance下的MetaManager
管理订阅,包括binlog的位置这些都在MetaManager下保存。CanalMetaManager接口有几个实现类:
FileMixedMetaManager
MemoryMetaManager
MixedMetaManager
PeriodMixedMetaManager
ZooKeeperMetaManager
这些实现类之间有些会持有其它实现的引用来装饰自己的功能。具体服务中使用哪个实现是我们可以设置的,通过指定对应的枚举设置,下面是个例子:
public static enum MetaMode {
/** 内存存储模式 */
MEMORY,
/** 文件存储模式 */
ZOOKEEPER,
/** 混合模式,内存+文件 */
MIXED,
/** 本地文件存储模式 */
LOCAL_FILE;
...
CanalParameter parameter = new CanalParameter();
parameter.setZkClusters(Arrays.asList(cluster1));
parameter.setMetaMode(MetaMode.MEMORY);
...
然后是取消订阅方法,
/**
* 取消订阅
*/
@Override
public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅
logger.info("unsubscribe successfully, {}", clientIdentity);
}
这个比较简单,不做过多解释。
看下get
方法,
/**
* 获取数据
*
* <pre>
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
* </pre>
*/
@Override
public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
return get(clientIdentity, batchSize, null, null);
}
最终调用的是下面这个,
/**
* 获取数据,可以指定超时时间.
*
* <pre>
* 几种case:
* a. 如果timeout为null,则采用tryGet方式,即时获取
* b. 如果timeout不为null
* 1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
* 2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
*
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
* </pre>
*/
@Override
public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
throws CanalServerException {
...
synchronized (canalInstance) {//会读写流数据,这里加锁防止并发
// 获取到流式数据中的最后一批获取的位置
//从CanalMetaManager中获取最后一个没有ack的binlog批次的位置信息
PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
if (positionRanges != null) {
throw new CanalServerException(String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data",
clientIdentity.getClientId(),
positionRanges));
}
Events<Event> events = null;
//从当前store中的第一条开始获取
Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
if (CollectionUtils.isEmpty(events.getEvents())) {
logger.debug("get successfully, clientId:{} batchSize:{} but result is null",
clientIdentity.getClientId(),
batchSize);
//如果获取到的binlog消息为空,构造一个空的Message对象,将batchId设置为-1返回给客户端
return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
} else {
// 记录到流式信息
//如果获取到了binlog消息,将这个批次的binlog消息记录到CanalMetaMaager中,并生成一个唯一的batchId
Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
boolean raw = isRaw(canalInstance.getEventStore());
List entrys = null;
if (raw) {
entrys = Lists.transform(events.getEvents(), Event::getRawEntry);
} else {
entrys = Lists.transform(events.getEvents(), Event::getEntry);
}
if (logger.isInfoEnabled()) {
logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
clientIdentity.getClientId(),
batchSize,
entrys.size(),
batchId,
events.getPositionRange());
}
// 直接提交ack
ack(clientIdentity, batchId);
//根据获取到的信息,构造message返回
return new Message(batchId, raw, entrys);
}
}
}
getWithoutAck
就是不带ack的get,所以这里就不多说了。
ack
方法如下,
/**
* 进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
*
* <pre>
* 注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
* </pre>
*/
@Override
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
...
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
PositionRange<LogPosition> positionRanges = null;
positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
if (positionRanges == null) { // 说明是重复的ack/rollback
throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
clientIdentity.getClientId(),
batchId));
}
// 更新cursor
if (positionRanges.getAck() != null) {
canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
if (logger.isInfoEnabled()) {
logger.info("ack successfully, clientId:{} batchId:{} position:{}",
clientIdentity.getClientId(),
batchId,
positionRanges);
}
}
// 可定时清理数据
//从CanalEventStore中,将这个批次的binlog内容移除
canalInstance.getEventStore().ack(positionRanges.getEnd(), positionRanges.getEndSeq());
}
ack方法是客户端用户确认某个批次的binlog消费成功。确认之后,小于等于此 batchId 的 Message 都会被确认(其实就是更新消费成功的位置)。
rollback
是用来回滚的。
/**
* 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
*/
@Override
public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
checkStart(clientIdentity.getDestination());
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
// 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅
boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
if (!hasSubscribe) {
return;
}
synchronized (canalInstance) {
// 清除batch信息,所有批次
canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
// rollback eventStore中的状态信息
canalInstance.getEventStore().rollback();
logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });
}
}
eventstore的rollback方法也很简单,主要是更新下标,
public void rollback() throws CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
getSequence.set(ackSequence.get());//更新最后一次get的位置为ack的位置
getMemSize.set(ackMemSize.get());//更新get内存大小为ack内存大小
} finally {
lock.unlock();
}
}
参考:
- http://www.tianshouzhi.com/api/tutorials/canal/382
以上是关于canal 源码解析系列-CanalServerWithEmbedded解读的主要内容,如果未能解决你的问题,请参考以下文章