为初学者而来~手工最简MQBroker

Posted ESOO

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为初学者而来~手工最简MQBroker相关的知识,希望对你有一定的参考价值。

本文仅展示核心代码,全部代码,请移步:git-soomq

为初学者而来~手工最简MQ(一)设计篇
为初学者而来~手工最简MQ(二)Broker
为初学者而来~手工最简MQ(三)Client

1,服务端

服务端的设计就非常简单了,最核心的就是消息的存取,以及响应生产者和消费者的网络请求
分为2部分:

1.1 消息文件

消息的存储我们参考kafka,并简化其逻辑,因为是最简单的mq,我们只考虑单机的情况的就行,每个topic存储2个文件

topicname.index
topicname.data

.index 文件存储格式为:
消息顺序号:消息截止位置
.data 文件按照顺序存储具体的消息

文件操作:

package com.esoo.mq.server.message;

import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;

import java.io.RandomAccessFile;

/**
 * 为每个topic创建一个对象进行管理
 */
public class MessageFile 
    private String topic;
    private Long offset;
    //索引文件
    private RandomAccessFile indexFile = null ;
    //数据文件
    private RandomAccessFile dataFile = null ;

    //追加消息(生产者进行调用)
    public ProcessorCommand appendMsg(ProcessorCommand in)

        try 
            //加锁,避免竞争,文件乱码
            synchronized (in.getResult().getTopic()) 

                //读取index文件最后一行
                String lastLine = readLastLine(indexFile, null);
                int lastOffset = 1;
                //消息体追加到data文件中,并返回文件末尾位置,作为本条消息的offset
                long lastindex =  writeEndLine(dataFile, in.getResult().getBody());
                if (lastLine != null && !lastLine.equals("")) 
                    String index[] = lastLine.split(":");
                    lastOffset = Integer.valueOf(index[0]);
                    lastOffset = lastOffset + 1;
                
                //组装本条消息index 序列号:消息体末尾位置
                String insertMsgIndex = lastOffset + ":" + lastindex + "\\t\\n";
                writeEndLine(indexFile, insertMsgIndex.getBytes());
                in.setSuccess(true);
            
        catch (Exception e)
            e.printStackTrace();

            in.setSuccess(false);
            in.setExmsg(e.getMessage());
        
        return in;

    

    //读取消息,消费者进行调用
    public ProcessorCommand readMsg(ProcessorCommand in)


        try 
            synchronized (in.getResult().getTopic()) 
                // 消息定位位置
                int seekIn = 0;
                // 消息体大小
                int bodySize = 0;
                //先定位到开始
                indexFile.seek(0);
                String indesMap=null;
                //遍历index文件,找到上一个消息 offset 与本消息offset 进行相减就是消息体大小
                while ((indesMap = indexFile.readLine())!=null)
                    String index[] = indesMap.split(":");
                    int inNum = Integer.valueOf(String.valueOf(index[0]).trim());
                    int off = Integer.valueOf(String.valueOf(index[1]).trim());
                    if (inNum == in.getResult().getOffset()) 
                        seekIn = off;
                    
                    if (inNum == (in.getResult().getOffset() + 1)) 
                        bodySize = off - seekIn;
                    
                
                if (bodySize == 0) 
                    in.setSuccess(false);
                    in.setExmsg("offset is end");
                    return in;
                
                //定位到具体位置
                dataFile.seek(seekIn);

                //进行消息读取
                byte[] b = new byte[bodySize];
                dataFile.read(b);
                in.getResult().setBody(b);

                in.setSuccess(true);
                System.out.println(" READ MSG IS: "+JSON.toJSONString(in));
            
        catch (Exception e)
            e.printStackTrace();
            in.setSuccess(false);
            in.setExmsg(e.getMessage());
        
        return in;

    

    //写消息到最后一行
    public static long writeEndLine(RandomAccessFile file, byte[] msg)
            throws Exception 
        // 文件长度,字节数
        long fileLength = file.length();
        // 将写文件指针移到文件尾。
        file.seek(fileLength);
        file.write(msg);
        return file.getFilePointer();

    

    //读取最后一行的消息
    public static String readLastLine(RandomAccessFile file, String charset) throws Exception 

        long len = file.length();
        if (len == 0L) 
            return "";
         else 
            long pos = len - 1;
            while (pos > 0) 
                pos--;
                file.seek(pos);
                if (file.readByte() == '\\n') 
                    break;
                
            
            if (pos == 0) 
                file.seek(0);
            
            byte[] bytes = new byte[(int) (len - pos)];
            file.read(bytes);
            if (charset == null) 
                return new String(bytes);
             else 
                return new String(bytes, charset);
            
        

    

    public static String readByOffset(RandomAccessFile file, String charset) throws Exception 

        return null;
    



    public String getTopic() 
        return topic;
    

    public void setTopic(String topic) 
        this.topic = topic;
    

    public Long getOffset() 
        return offset;
    

    public void setOffset(Long offset) 
        this.offset = offset;
    

    public RandomAccessFile getIndexFile() 
        return indexFile;
    

    public void setIndexFile(RandomAccessFile indexFile) 
        this.indexFile = indexFile;
    

    public RandomAccessFile getDataFile() 
        return dataFile;
    

    public void setDataFile(RandomAccessFile dataFile) 
        this.dataFile = dataFile;
    


1.2 网络编程

利用netty 开放端口,响应生产者与消费者,每个消息包装成一个commod,commod类型

  • 消息类型(消费/生产)
  • 消息topic
  • 消息体(生产时用)
  • 消息顺序号(消费时用)
  • 处理结果(成功/失败)
  • 处理消息(失败时添加原因)

网络启动

package com.esoo.mq.server;

import com.esoo.mq.server.netty.handler.NettySooMqServerHandler;
import com.esoo.mq.server.netty.handler.NettySooMqServerOutHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioserverSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class SooMQServer 
    private static Integer serverPort=9870;
    ServerBootstrap b = new ServerBootstrap();

    public void start()
        //创建reactor 线程组
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try 
            //1 设置reactor 线程组
            b.group(bossLoopGroup, workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioServerSocketChannel.class);
            //3 设置监听端口
            b.localAddress(serverPort);
            //4 设置通道的参数
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.childHandler(new ChannelInitializer<SocketChannel>() 
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception 
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
                            ClassResolvers.cacheDisabled(null)));
                    ch.pipeline().addLast(new NettySooMqServerOutHandler());
                    ch.pipeline().addLast(new NettySooMqServerHandler());
                
            );
            // 6 开始绑定server
            // 通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = b.bind().sync();
            System.out.println(" 服务器启动成功,监听端口: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道关闭的异步任务结束
            // 服务监听通道会一直等待通道关闭的异步任务结束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
         catch (Exception e) 
            e.printStackTrace();
         finally 
            // 8 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        
    



网络逻辑分发

注意:回写给客户端的消息体类型必须与入参保持一致,否则netty无法解析

package com.esoo.mq.server.netty.handler;


import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.processor.Processor;
import com.esoo.mq.server.processor.ProcessorFactory;
import io.netty.channel.*;

@ChannelHandler.Sharable
public class NettySooMqServerHandler extends ChannelInboundHandlerAdapter 

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 

        try 
            ProcessorCommand command = (ProcessorCommand) msg;
            System.out.println("["+ctx.channel().remoteAddress()+"] msg:"+JSON.toJSONString(msg));
            Processor processor = ProcessorFactory.getProcessorInstantiate(command.getType());
            msg = processor.handle(command);
            ChannelFuture f = ctx.writeAndFlush(msg);
            f.addListener(new ChannelFutureListener() 
                @Override
                public void operationComplete(ChannelFuture future) throws Exception 
                    System.out.println("msg ctx send");
                
            );
        catch (Exception e)
            e.printStackTrace();
        
    

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception 
        super.channelInactive(ctx);
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        System.out.println(cause.getMessage());
        ctx.close();
    





生产者

package com.esoo.mq.server.processor;

import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;

public class SendMessageProcessor implements Processor<Message,Message> 

    @Override
    public ProcessorCommand handle(ProcessorCommand task) 
        MessageFile file = MessageFileFactory.getTopicFile(task.getResult().getTopic());
        task = file.appendMsg(task);
        return task;
    




消费者

package com.esoo.mq.server.processor;

import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;

public class ReadMessageProcessor implements Processor<Message,Message> 

    @Override
    public ProcessorCommand handle(ProcessorCommand task) 
        Message msg = task.getResult();
        MessageFile file = MessageFileFactory.getTopicFile(msg.getTopic());
        task = file.readMsg(task);
        return task;
    




下一篇
为初学者而来~手工最简MQ(三)Client

以上是关于为初学者而来~手工最简MQBroker的主要内容,如果未能解决你的问题,请参考以下文章

为初学者而来~手工最简MQBroker

为初学者而来~手工最简MQClient

为初学者而来~手工最简MQ设计篇

为初学者而来~手工最简MQClient

为初学者而来~手工最简MQ设计篇

为初学者而来~手工最简MQ设计篇