为初学者而来~手工最简MQClient

Posted ESOO

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为初学者而来~手工最简MQClient相关的知识,希望对你有一定的参考价值。

本文仅展示核心代码,全部代码,请移步:git-soomq

为初学者而来~手工最简MQ(一)设计篇
为初学者而来~手工最简MQ(二)Broker
为初学者而来~手工最简MQ(三)Client

2,client

2.1 连接管理

通过netty与mq服务器进行连接,并相应生产者与消费者的请求,通过netty自带的序列化工具,将消息序列化未byte字节进行传输

2.1.1 服务启动,连接broker

package com.esoo.mq.client.connection;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NiosocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;

public class ConnectionManager 
    private static HashMap<String,Channel> channelMap=new HashMap<>();

    public static Channel get(String ip,Integer port)
        Channel channel=null;
        String url = ip+":"+port;
        synchronized (url) 
            if (!channelMap.containsKey(url)) 
                channel=createChannel(ip,port);
                channelMap.put(url,channel);
            else
                channel= channelMap.get(url);
            
        
        return channel;
    

    private static Channel createChannel(String ip,Integer port)
        Bootstrap b = new Bootstrap();
        //创建reactor 线程组
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
        Channel channel=null;
        try 

            //1 设置reactor 线程组
            b.group(workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioSocketChannel.class);
            //3 设置监听端口
            b.remoteAddress(ip, port);
            //4 设置通道的参数
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.handler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() 
                //有连接到达时会创建一个channel
                @Override
                protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception 
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
                            ClassResolvers.cacheDisabled(null)));
                    ch.pipeline().addLast(new SooMqClientOutHandler());
                    ch.pipeline().addLast(new SooMqClientHandler());
                
            );
            ChannelFuture f = b.connect();
            f.addListener((ChannelFuture futureListener) ->
            
                if (futureListener.isSuccess()) 
                    System.out.println("Client客户端连接成功!");

                 else 
                    System.out.println("Client客户端连接失败!");
                

            );

            // 阻塞,直到连接完成
            f.sync();
            channel = f.channel();
        catch (Exception ex)
            ex.printStackTrace();
            channel=null;
        
        return channel;
    

    public static void shutdown()
        for(Map.Entry<String, Channel> entry : channelMap.entrySet())
            try 
                Channel channel = entry.getValue();
                if (channel != null && channel.isOpen()) 
                    channel.close();
                
            catch (Exception e)
                e.printStackTrace();
            
        
    


2.1.1 接收broker消息,处理逻辑

package com.esoo.mq.client.connection;

import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.common.ProcessorType;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * create by 尼恩 @ 疯狂创客圈
 **/
@ChannelHandler.Sharable
public class SooMqClientHandler extends ChannelInboundHandlerAdapter 

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        super.channelActive(ctx);
    


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        System.out.println("client re:"+JSON.toJSONString(msg));
        ProcessorCommand cm = (ProcessorCommand)msg;
        if(((ProcessorCommand) msg).getType().equals(ProcessorType.ReadMessage.getType()))
            String msgBody = new String(((ProcessorCommand) msg).getResult().getBody());
            System.out.println("msg body is : "+msgBody);
        

    

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception 
        super.channelInactive(ctx);
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        System.out.println(cause.getMessage());
        ctx.close();
    




2.2 生产者

获取服务连接后,将消息发送给消息队列服务器
参数:

  • 消息类型:生产
  • 消息topic
  • 消息体
package com.esoo.mq.client.producer;

import com.esoo.mq.client.connection.ConnectionManager;
import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.common.ProcessorType;
import io.netty.channel.Channel;

import java.util.concurrent.*;

public class TopicProducer 
    private  String serverIp;
    private  Integer serverPort;
    private  String topic;
    private  int threadNum=1;
    private ExecutorService es ;
    public  TopicProducer(String serverIp,Integer serverPort,String topic,int threadNum)
        this.serverIp=serverIp;
        this.serverPort=serverPort;
        this.topic=topic;
        if(threadNum<=0||threadNum>=10)
            threadNum=1;
        
        this.threadNum=threadNum;
        es = new ThreadPoolExecutor(0,threadNum,60L, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(200));
    

    public void sengMsg(Message msg)
        es.execute(new Runnable() 
            @Override
            public void run() 
                try 
                    Channel channel = ConnectionManager.get(serverIp, serverPort);
                    ProcessorCommand command = new ProcessorCommand();
                    command.setResult(msg);
                    command.setType(ProcessorType.SendMessage.getType());
                    channel.writeAndFlush(command);
                catch (Exception ex)

                
            
        );
    


2.3 消费者

获取服务连接后,从服务端获取消息
参数:

  • 消息类型:消费
  • 消息topic
  • 消息顺序号
package com.esoo.mq.client.consumer;


import com.esoo.mq.client.connection.ConnectionManager;
import com.esoo.mq.client.server.Server;
import com.esoo.mq.client.server.ServerMap;
import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.common.ProcessorType;
import com.esoo.mq.common.exception.SooMQException;
import io.netty.channel.Channel;

public class Consumer 
    public static void readMsg(Message msg)
        try 
            Server server = ServerMap.TopicServer.get(msg.getTopic());
            if (server == null) 
                throw new SooMQException("this topic[" + msg.getTopic() + "] have no server");
            
            Channel channel = ConnectionManager.get(server.getIp(), server.getPort());
            ProcessorCommand command = new ProcessorCommand();
            command.setResult(msg);
            command.setType(ProcessorType.ReadMessage.getType());
            channel.writeAndFlush(command);
        catch (Exception e)
            e.printStackTrace();
        

    



以上是关于为初学者而来~手工最简MQClient的主要内容,如果未能解决你的问题,请参考以下文章

为初学者而来~手工最简MQBroker

为初学者而来~手工最简MQBroker

为初学者而来~手工最简MQBroker

为初学者而来~手工最简MQ设计篇

为初学者而来~手工最简MQ设计篇

为初学者而来~手工最简MQ设计篇