MQ

Posted lwx_R

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ相关的知识,希望对你有一定的参考价值。

1.流量消峰
举个例子,如果订单系统最多能处理一万次订单, 这个处理能力应付正常时段的 下单时绰绰有余,正
常时段我们下单-秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限
制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把-秒内下的订单分
散成一-段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体
验要好。|
2.应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合
调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于
消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在
这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流
系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
3.异步处理
有些服务间调用是异步的,例如A调用B, B需要花费很长时间执行,但是A需要知道B什么时候可
以执行完,以前一般有两种方式, A过一段时间去调用B的查询api查询。或者A提供一个callback api,
B执行完之后调用api通知A服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,
A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给 MQ, MQ会将此
消息转发给A服务。这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不
用做这些操作。A服务还能及时的得到异步处理成功的消息。
1.ActiveMQ
优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较
低的概率丢失数据
缺点:官方社区现在对ActiveMQ 5.x维护越来越少,高吞吐量场景较少使用。
尚硅谷官网视频: http://www.gulixueyuan.com/course/322
2. Kafka
大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,
以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥
着举足轻重的作用。目前已经被LinkedIn, Uber, Twitter, Netflix 等大公司所采纳。
优点:性能卓越,单机写入TPS约在百万条秒,最大的优点,就是吞吐量高。时效性ms级可用性非
常高,kafka 是分布式的,-个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采
用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费-次;有优秀的第三方
Kafka Web管理界面Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:
功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点: Kafka 单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load 越高.发送消
息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顶序,
但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
3.RocketMQ
RocketMQ出自阿里巴巴的开源产品,用Java语言实现,在设计时参考了Kafka, 并做出了自己的一
些改进。被阿里巴巴f泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binglog 分发等场
景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失,MQ功能较为完善,还是分
布式的,扩展性好,支持10亿级别的消息堆积,不会因为堆积导致性能下降,源码是java我们可以自己阅
读源码,定制自己公司的MQ
缺点:支持的客户端语言不多,目前是java及C++,其中C++不成熟;社区活跃度一般,污 有王MQ
核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
4. RabbitMQ
2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最
主流的消息中间件之-。
优点:由于erlang语言的高井发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易
用、跨平台、支持多种语言如: Python. Ruby. .NET、 Java. JMS、 C. PHP、 ActionSaript. XMPP、 STOMP
等,支持AJAX文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
https://ww.rabbitmg.com/news.html .
缺点:商业版需要收费,学习成本较高
1.1.4.
MQ的选择
1.Kafka
Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,-开始的目的就是用于日志收集
和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,
肯定是首选kafka了。尚硅谷官网kafka视频连接ht:/www.gulixueyuan.com/course/330/tasks
2. RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削
峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务
场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ.
3.RabbitMQ
结合erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分
方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的RabbitMQ。

1.2.1.
Rabbi tMQ的概念
RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一一个快递站点,当你要发送一个包
裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是
-个快递站,一个快递员帮你传递快件。RabitMQ. 与快递站的主要区别在于,它不处理快件而是接收,
存储和转发消息数据。
1.2.2.
四大核心概念
产生数据发送消息的程序是生产者
交换机
交换机是RabbitMQ非常重要的一个部件,一-方面它接收来自生产者的消息, 另一方面它将消息
推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推
送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列
队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存
储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。 许多生产者可
以将消息发送到一个队列,许多消费者可以尝试从一一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者「消费
解释老和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费名。

1.2.3.
Rabb itMQ核心部分
1 "Hello World!"
2 Work queues
3 Publish/Subscribe
4 Routing
5 Topics
6 Publisher Confirms

Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似
于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出
多个vhost,每个用户在自己的vhost创建exchange/ queue等
Connep:tion: publisher / consumer和broker之间的TCP连接
Channel信道:如果每一次访问RabbitMQ 都建立一个Connection,在消息量大的时候建立TCP
Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程
序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客
户端和message broker识别channel, 所以channel之间是完全隔离的。Channel 作为轻量级的
Connection极大减少了操作系统建立TCP connection的开销
Exchange: message 到达broker的第一站, 根据分发规则,匹配查询表中的routing key,分发
消息到queue中去。常用的类型有: direct (point to point), topic (publish-subscibe) and fanout
(multicast)

mq从零开始实现 mq-07-负载均衡 load balance

前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载均衡 load balance

为什么需要负载均衡

大家好,我是老马。

这一节让我们看一下如何实现 MQ 的负载均衡。

为什么需要负载均衡呢?

作用

负载均衡最核心的作用:

(1)可以避免单点故障

(2)可以让请求均分的分散到每一个节点

实现思路

负载均衡实现的方式比较多,最简单的就是随机选择一个。

拓展阅读:

MQ 中用到负载均衡的地方

生产者发送

生产者发送消息时,可以发送给任一 broker。

broker 推送给消费者

broker 接收到消息以后,在推送给消费者时,也可以任一选择一个。

消费者的消费 ACK

消费者消费完,状态回执给 broker,可以选择任一一个。

消息黏连

有些消息比较特殊,比如需要保证消费的有序性,可以通过 shardingKey 的方式,在负载的时候固定到指定的片区。

代码实现

生产者发送

统一调整获取 channel 的方法。

@Override
public Channel getChannel(String key) 
    // 等待启动完成
    while (!statusManager.status()) 
        log.debug("等待初始化完成...");
        DateUtil.sleep(100);
    
    RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance,
            channelFutureList, key);
    return rpcChannelFuture.getChannelFuture().channel();

工具类实现为核心实现:

/**
 * 负载均衡
 *
 * @param list 列表
 * @param key 分片键
 * @return 结果
 * @since 0.0.7
 */
public static <T extends IServer> T loadBalance(final ILoadBalance<T> loadBalance,
                                                final List<T> list, String key) 
    if(CollectionUtil.isEmpty(list)) 
        return null;
    

    if(StringUtil.isEmpty(key)) 
        LoadBalanceContext<T> loadBalanceContext = LoadBalanceContext.<T>newInstance()
                .servers(list);
        return loadBalance.select(loadBalanceContext);
    

    // 获取 code
    int hashCode = Objects.hash(key);
    int index = hashCode % list.size();
    return list.get(index);

如果指定了 shardingKey,那么根据 shadringKey 进行 hash 判断。

如果没有,则进行默认的负载均衡策略。

Broker 消息推送给消费者

消费者订阅列表的获取:

@Override
public List<Channel> getSubscribeList(MqMessage mqMessage) 
    final String topicName = mqMessage.getTopic();
    Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);
    if(CollectionUtil.isEmpty(set)) 
        return Collections.emptyList();
    

    //2. 获取匹配的 tag 列表
    final List<String> tagNameList = mqMessage.getTags();
    Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>();
    for(ConsumerSubscribeBo bo : set) 
        String tagRegex = bo.getTagRegex();
        if(hasMatch(tagNameList, tagRegex)) 
            //TODO: 这种设置模式,统一添加处理 haven
            String groupName = bo.getGroupName();
            List<ConsumerSubscribeBo> list = groupMap.get(groupName);
            if(list == null) 
                list = new ArrayList<>();
            
            list.add(bo);
            groupMap.put(groupName, list);
        
    

    //3. 按照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 选择
    final String shardingKey = mqMessage.getShardingKey();
    List<Channel> channelList = new ArrayList<>();
    for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) 
        List<ConsumerSubscribeBo> list = entry.getValue();
        ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey);
        final String channelId = bo.getChannelId();
        BrokerServiceEntryChannel entryChannel = registerMap.get(channelId);
        if(entryChannel == null) 
            log.warn("channelId:  对应的通道信息为空", channelId);
            continue;
        
        channelList.add(entryChannel.getChannel());
    
    return channelList;

核心逻辑:RandomUtils.loadBalance(loadBalance, list, shardingKey); 获取,其他的保持不变。

消费者 ACK

消费者也是类似的,获取 channel 的方式调整如下:

public Channel getChannel(String key) 
    // 等待启动完成
    while (!statusManager.status()) 
        log.debug("等待初始化完成...");
        DateUtil.sleep(100);
    

    RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance,
            channelFutureList, key);
    return rpcChannelFuture.getChannelFuture().channel();

小结

负载均衡在分布式服务中,是必备的特性之一。实现的原理并不算复杂。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc

以上是关于MQ的主要内容,如果未能解决你的问题,请参考以下文章

软件-MQ-MQ:IBM MQ

mq从零开始实现 mq-05-实现优雅停机

MQ定义/优缺点/几种MQ对比

MQ系列——MQ简介

消息中间件MQ知识概括

MQ解决消息重发--做到幂等性