rocketmq的broker如何同步信息的?

Posted notlate

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq的broker如何同步信息的?相关的知识,希望对你有一定的参考价值。

 

public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();
}


一个haservice下面有accpet和haclient分别对应客户端和服务端,grouptranserservie用来控制消息是否获取到,下面具体讲。

 

拿haclient举例子,在主线程做的事情

public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    if (this.connectMaster()) {

                        if (this.isTimeToReportOffset()) {
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }

                        this.selector.select(1000);

                        boolean ok = this.processReadEvent();
                       
                    } else {
                        this.waitForRunning(1000 * 5);
                    }

也就是在rocketmq里面,一个具体的任务就是单独分配一个线程,从而发挥多线程优势,在主线程上面休眠等待唤醒或者超时唤醒然后执行io动作。

一个典型的基于bytebuffer的写操作,通过positon、limit来判断是否数据写完:

private boolean reportSlaveMaxOffset(final long maxOffset) {
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            this.reportOffset.putLong(maxOffset);
            this.reportOffset.position(0);
            this.reportOffset.limit(8);

            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                try {
                    this.socketChannel.write(this.reportOffset);
                } catch (IOException e) {
                    log.error(this.getServiceName()
                        + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                    return false;
                }
            }

            return !this.reportOffset.hasRemaining();
        }

  

haservice里面所有的io没有走netty,全部使用原始select做异步io,然后直接使用nio的bytebuff做read和write操作

 

另外rocketmq里面的每个线程实现都有一个特别的标志位:

public abstract class ServiceThread implements Runnable {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

    private static final long JOIN_TIME = 90 * 1000;

    protected final Thread thread;
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
    protected volatile boolean stopped = false;

  

这个hasnotified和countdownlatch是配合一起使用的。如果一个线程被countdown过、唤醒过,那么hasnotified就通过cas被设置成true,下一个循环进入wait的时候,不用等待超时也不用等待下一次唤醒,直接通过hasnotified这个标志位可以直接唤醒,相当于第一次唤醒我的时候 我当时没有在阻塞,那么第一次唤醒我的时候 先设置一个标示hasnotified,下次进入阻塞的时候可以直接走唤醒流程,不用等待。

 

 

 

下面具体讲下每个模块

HAclient:

干了两个事情:

1 备broker去nameserv注册的时候,可以从nameserv拿到master-broker的ha-address,拿到这个地址以后,通过haclient去连接master-broker。定期给主机broker上报自己的currentReportedOffset,也就是备机broker自己当前的commit-log在什么地方了

2 在channel上面尝试读取数据,这个就是主机broker发过来的具体数据提交到自己的commit-log里面。

也就是对于一个备机broker而言,发布自己的ack-offset和接收主机broker的实际数据都在ha-client一个线程完成的:

3 ha-client用到了双缓冲reallocateByteBuffer,因为主机broker发过来的数据有可能备机broker的bytebuffer已经存不下了,只能存一半,这时候需要把已经落盘的数据从bytebuffer清理掉,然后写了一半的bytebuffer从后半部分移动到前半部分,那么需要有一个第三者tmp做swap,bytebufferbackup就是这个tmp,大小跟bytebufferread一样,防止极端情况:

 private void reallocateByteBuffer() {
            int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion;
            if (remain > 0) {
                this.byteBufferRead.position(this.dispatchPostion);

                this.byteBufferBackup.position(0);
                this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
                this.byteBufferBackup.put(this.byteBufferRead);
            }

            this.swapByteBuffer();

            this.byteBufferRead.position(remain);
            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
            this.dispatchPostion = 0;
        }

        private void swapByteBuffer() {
            ByteBuffer tmp = this.byteBufferRead;
            this.byteBufferRead = this.byteBufferBackup;
            this.byteBufferBackup = tmp;
        }

  

 

 

 


AcceptSocketService:

干一件事:绑定端口以后,作为accpet,在主循环的select里面,监听accpet事件,如果有客户端连接进来,那么生成一个haconnection。

所以看得出来,只有主机broker才有这个accpetsocketservice和haconnection。下面具体说下HaConnection

 

public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
        this.haService = haService;
        this.socketChannel = socketChannel;
        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.socket().setSoLinger(false, -1);
        this.socketChannel.socket().setTcpNoDelay(true);
        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
        this.socketChannel.socket().setSendBufferSize(1024 * 64);
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        this.readSocketService = new ReadSocketService(this.socketChannel);
        this.haService.getConnectionCount().incrementAndGet();
    }

 socketchannel是服务端accept后拿到的、跟客户端通信的channel。

   SO_LINGER选项,使用默认的

   最主要的是有两个线程,一个是writesocketService一个是readSocketService

 

 

   writesocketservice:当主机写入commit-log以后offset肯定会长,但是备机传过来的ack-offset没有增长。通过这种方式主机知道此时需要把什么数据传给备机。

   这个线程没有双缓冲,也没有swap-bytebuffer,全部数据通过网络io写出去即可,不涉及磁盘io。这个线程平时不需要工作,只有在有新数据的时候才需要工作,啥时候被唤醒的呢?

   是在service.getWaitNotifyObject().wakeupAll()业务线程进行唤醒的

 

   readSocketService: 专门接收备机发过来的ack-offset的,收到新的ack以后,通过HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset)唤醒GroupTransferService,后者专门处理消息是否真的已经被接收。

 

  GroupTransferService:

  双缓冲逻辑,在主循环的waitforEnding结束后的onWaitEnd中,执行swapRequests,把requestsWrite和requestsRead互换,因为这个线程在处理的时候需要用synchronnized锁整个requestsRead,别人无法put了,所以弄一个requestsWrite出来,其他线程可以在这个里面put,跟自己的线程锁住的requestsRead不冲突。

  

  这个模块本质上就是一个thread,干两个事情:

  1 在waitfoRunning中等待

  2 等待超时或者被唤醒的话,那么针对requestRead里面所有request,push2SlaveMaxOffset(这个就是备机的ack-offset)大于request的offset的话,那么说明备机当前已经有这个数据了,那么wakeupCustomer把在request上的CountdownLatch去掉,并且把GroupCommitRequest的flushOK=ture。

如果备机的ack-offset比GroupCommitRequest小的话,那么循环5次等待,阻塞在notifyTransferObject,尝试等待5次看看备机的ack-offset 也就是push2SlaveMaxOffset能不能追上来,从而也让这个GroupCommitRequest的flushOK=true。

private void doWaitTransfer() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        for (int i = 0; !transferOK && i < 5; i++) {
                            this.notifyTransferObject.waitForRunning(1000);
                            transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        }

                        if (!transferOK) {
                            log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                        }

                        req.wakeupCustomer(transferOK);
                    }

                    this.requestsRead.clear();
                }
            }
        }

  

 

 

 

那么谁在request上的countdownlatch等待呢?flushok啥含义

如果是同步master的话,两个地方:handleHA和handleDiskFlush

以前者举例子,handleHA里面:

 

 public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                // Determine whether to wait
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

  

主机在sendmessage以后,执行service.putRequest

1 会在GroupTransferService的requestWrite里面放入新的request,让GroupTransferService去检查这个request是否已经被备机同步了

2 对GroupTransferService做waitpoint.countDown,让GroupTransferService干活必须先要唤醒他。

public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

  

GroupTransferService每次被唤醒的时候,首先把requestWrite放入到requestRead里面,然后检查request的offset和备机ackoffset是否ok。

service.getWaitNotifyObject().wakeupAll(); 这个是唤醒writeSockeService,新的数据来了,那么写线程需要工作了。

4 在每个request上面做waitforFlush,也就是在request上面countdown等待,然后检查flushok。

 

所以主机上面新数据来了以后,业务线程唤醒writeSocketService去发数据给备机broker(writeSocketService通过检查commit-log的offset感知主机数据offset增长了),然后唤醒grouptransferservice去检查每个request是否已经ok。

 

grouptransferservice的工作没有放在writeSocketService,而是单独一个线程来做,还是利用多核并发处理。

grouptransferservice即使被业务线程在putRequest中waitPoint.countDown();被唤醒还会被this.notifyTransferObject.waitForRunning(1000)阻塞,因为被业务线程唤醒也不能表示立马可以更新request和ack-offset的关系,比较备机新的ack-offset还没来,所以还需要readSocketService在拿到新的ackoffset以后,通过this.groupTransferService.notifyTransferSome();进一步唤醒groupTransferService,此时才能真正更新request是状态flushok状态

private void doWaitTransfer() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        for (int i = 0; !transferOK && i < 5; i++) {
                            this.notifyTransferObject.waitForRunning(1000);
                            transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        }

                        if (!transferOK) {
                            log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                        }

                        req.wakeupCustomer(transferOK);
                    }

                    this.requestsRead.clear();
                }
            }
        }

 










以上是关于rocketmq的broker如何同步信息的?的主要内容,如果未能解决你的问题,请参考以下文章

rocketmq发送消息的期间的broker选择

5 SpringBoot整合RocketMQ发送同步消息

RocketMQ Producer发送消息过程

RocketMQ Producer发送消息过程

我理解的RocketMQ:主从复制HA(High Availability)原理

7RocketMQ 源码解析之 Broker 启动(下)