为初学者而来~手工最简MQClient
Posted ESOO
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为初学者而来~手工最简MQClient相关的知识,希望对你有一定的参考价值。
本文仅展示核心代码,全部代码,请移步:git-soomq
为初学者而来~手工最简MQ(一)设计篇
为初学者而来~手工最简MQ(二)Broker
为初学者而来~手工最简MQ(三)Client
2,client
2.1 连接管理
通过netty与mq服务器进行连接,并相应生产者与消费者的请求,通过netty自带的序列化工具,将消息序列化未byte字节进行传输
2.1.1 服务启动,连接broker
package com.esoo.mq.client.connection;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NiosocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
public class ConnectionManager
private static HashMap<String,Channel> channelMap=new HashMap<>();
public static Channel get(String ip,Integer port)
Channel channel=null;
String url = ip+":"+port;
synchronized (url)
if (!channelMap.containsKey(url))
channel=createChannel(ip,port);
channelMap.put(url,channel);
else
channel= channelMap.get(url);
return channel;
private static Channel createChannel(String ip,Integer port)
Bootstrap b = new Bootstrap();
//创建reactor 线程组
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
Channel channel=null;
try
//1 设置reactor 线程组
b.group(workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioSocketChannel.class);
//3 设置监听端口
b.remoteAddress(ip, port);
//4 设置通道的参数
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线
b.handler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>()
//有连接到达时会创建一个channel
@Override
protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加一个handler处理器
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new SooMqClientOutHandler());
ch.pipeline().addLast(new SooMqClientHandler());
);
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener) ->
if (futureListener.isSuccess())
System.out.println("Client客户端连接成功!");
else
System.out.println("Client客户端连接失败!");
);
// 阻塞,直到连接完成
f.sync();
channel = f.channel();
catch (Exception ex)
ex.printStackTrace();
channel=null;
return channel;
public static void shutdown()
for(Map.Entry<String, Channel> entry : channelMap.entrySet())
try
Channel channel = entry.getValue();
if (channel != null && channel.isOpen())
channel.close();
catch (Exception e)
e.printStackTrace();
2.1.1 接收broker消息,处理逻辑
package com.esoo.mq.client.connection;
import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.common.ProcessorType;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* create by 尼恩 @ 疯狂创客圈
**/
@ChannelHandler.Sharable
public class SooMqClientHandler extends ChannelInboundHandlerAdapter
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
super.channelActive(ctx);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("client re:"+JSON.toJSONString(msg));
ProcessorCommand cm = (ProcessorCommand)msg;
if(((ProcessorCommand) msg).getType().equals(ProcessorType.ReadMessage.getType()))
String msgBody = new String(((ProcessorCommand) msg).getResult().getBody());
System.out.println("msg body is : "+msgBody);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
super.channelInactive(ctx);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
System.out.println(cause.getMessage());
ctx.close();
2.2 生产者
获取服务连接后,将消息发送给消息队列服务器
参数:
- 消息类型:生产
- 消息topic
- 消息体
package com.esoo.mq.client.producer;
import com.esoo.mq.client.connection.ConnectionManager;
import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.common.ProcessorType;
import io.netty.channel.Channel;
import java.util.concurrent.*;
public class TopicProducer
private String serverIp;
private Integer serverPort;
private String topic;
private int threadNum=1;
private ExecutorService es ;
public TopicProducer(String serverIp,Integer serverPort,String topic,int threadNum)
this.serverIp=serverIp;
this.serverPort=serverPort;
this.topic=topic;
if(threadNum<=0||threadNum>=10)
threadNum=1;
this.threadNum=threadNum;
es = new ThreadPoolExecutor(0,threadNum,60L, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(200));
public void sengMsg(Message msg)
es.execute(new Runnable()
@Override
public void run()
try
Channel channel = ConnectionManager.get(serverIp, serverPort);
ProcessorCommand command = new ProcessorCommand();
command.setResult(msg);
command.setType(ProcessorType.SendMessage.getType());
channel.writeAndFlush(command);
catch (Exception ex)
);
2.3 消费者
获取服务连接后,从服务端获取消息
参数:
- 消息类型:消费
- 消息topic
- 消息顺序号
package com.esoo.mq.client.consumer;
import com.esoo.mq.client.connection.ConnectionManager;
import com.esoo.mq.client.server.Server;
import com.esoo.mq.client.server.ServerMap;
import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.common.ProcessorType;
import com.esoo.mq.common.exception.SooMQException;
import io.netty.channel.Channel;
public class Consumer
public static void readMsg(Message msg)
try
Server server = ServerMap.TopicServer.get(msg.getTopic());
if (server == null)
throw new SooMQException("this topic[" + msg.getTopic() + "] have no server");
Channel channel = ConnectionManager.get(server.getIp(), server.getPort());
ProcessorCommand command = new ProcessorCommand();
command.setResult(msg);
command.setType(ProcessorType.ReadMessage.getType());
channel.writeAndFlush(command);
catch (Exception e)
e.printStackTrace();
以上是关于为初学者而来~手工最简MQClient的主要内容,如果未能解决你的问题,请参考以下文章