mq从零开始实现 mq-07-负载均衡 load balance

Posted 老马啸西风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mq从零开始实现 mq-07-负载均衡 load balance相关的知识,希望对你有一定的参考价值。

前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载均衡 load balance

为什么需要负载均衡

大家好,我是老马。

这一节让我们看一下如何实现 MQ 的负载均衡。

为什么需要负载均衡呢?

作用

负载均衡最核心的作用:

(1)可以避免单点故障

(2)可以让请求均分的分散到每一个节点

实现思路

负载均衡实现的方式比较多,最简单的就是随机选择一个。

拓展阅读:

MQ 中用到负载均衡的地方

生产者发送

生产者发送消息时,可以发送给任一 broker。

broker 推送给消费者

broker 接收到消息以后,在推送给消费者时,也可以任一选择一个。

消费者的消费 ACK

消费者消费完,状态回执给 broker,可以选择任一一个。

消息黏连

有些消息比较特殊,比如需要保证消费的有序性,可以通过 shardingKey 的方式,在负载的时候固定到指定的片区。

代码实现

生产者发送

统一调整获取 channel 的方法。

@Override
public Channel getChannel(String key) 
    // 等待启动完成
    while (!statusManager.status()) 
        log.debug("等待初始化完成...");
        DateUtil.sleep(100);
    
    RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance,
            channelFutureList, key);
    return rpcChannelFuture.getChannelFuture().channel();

工具类实现为核心实现:

/**
 * 负载均衡
 *
 * @param list 列表
 * @param key 分片键
 * @return 结果
 * @since 0.0.7
 */
public static <T extends IServer> T loadBalance(final ILoadBalance<T> loadBalance,
                                                final List<T> list, String key) 
    if(CollectionUtil.isEmpty(list)) 
        return null;
    

    if(StringUtil.isEmpty(key)) 
        LoadBalanceContext<T> loadBalanceContext = LoadBalanceContext.<T>newInstance()
                .servers(list);
        return loadBalance.select(loadBalanceContext);
    

    // 获取 code
    int hashCode = Objects.hash(key);
    int index = hashCode % list.size();
    return list.get(index);

如果指定了 shardingKey,那么根据 shadringKey 进行 hash 判断。

如果没有,则进行默认的负载均衡策略。

Broker 消息推送给消费者

消费者订阅列表的获取:

@Override
public List<Channel> getSubscribeList(MqMessage mqMessage) 
    final String topicName = mqMessage.getTopic();
    Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);
    if(CollectionUtil.isEmpty(set)) 
        return Collections.emptyList();
    

    //2. 获取匹配的 tag 列表
    final List<String> tagNameList = mqMessage.getTags();
    Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>();
    for(ConsumerSubscribeBo bo : set) 
        String tagRegex = bo.getTagRegex();
        if(hasMatch(tagNameList, tagRegex)) 
            //TODO: 这种设置模式,统一添加处理 haven
            String groupName = bo.getGroupName();
            List<ConsumerSubscribeBo> list = groupMap.get(groupName);
            if(list == null) 
                list = new ArrayList<>();
            
            list.add(bo);
            groupMap.put(groupName, list);
        
    

    //3. 按照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 选择
    final String shardingKey = mqMessage.getShardingKey();
    List<Channel> channelList = new ArrayList<>();
    for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) 
        List<ConsumerSubscribeBo> list = entry.getValue();
        ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey);
        final String channelId = bo.getChannelId();
        BrokerServiceEntryChannel entryChannel = registerMap.get(channelId);
        if(entryChannel == null) 
            log.warn("channelId:  对应的通道信息为空", channelId);
            continue;
        
        channelList.add(entryChannel.getChannel());
    
    return channelList;

核心逻辑:RandomUtils.loadBalance(loadBalance, list, shardingKey); 获取,其他的保持不变。

消费者 ACK

消费者也是类似的,获取 channel 的方式调整如下:

public Channel getChannel(String key) 
    // 等待启动完成
    while (!statusManager.status()) 
        log.debug("等待初始化完成...");
        DateUtil.sleep(100);
    

    RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance,
            channelFutureList, key);
    return rpcChannelFuture.getChannelFuture().channel();

小结

负载均衡在分布式服务中,是必备的特性之一。实现的原理并不算复杂。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc

以上是关于mq从零开始实现 mq-07-负载均衡 load balance的主要内容,如果未能解决你的问题,请参考以下文章

mq从零开始实现 mq-05-实现优雅停机

从零开始学Nginx——反向代理+负载均衡

mq从零开始实现 mq-13-注册鉴权 auth

RabbitMQ集群架构之使用Haproxy实现高可用负载均衡

Oracle RAC 客户端连接负载均衡(Load Balance)

负载均衡(load Balancing)学习笔记