5-2 客户端操作MQ集群
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了5-2 客户端操作MQ集群相关的知识,希望对你有一定的参考价值。
参考技术A spring.rabbitmq.addresses=192.168.5.155:5677,192.168.5.155:5678spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
-p 5678:5678 Haproxy的端口
-p 8101:8101 Haproxy管理界面端口
-p 8001:8001 Haproxy代理Mq端口
ERROR [-,,,] 1999 --- [.168.5.155:5677] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error
测试成功:可发送消息可消费。但是有这个日志
Google 找了很多资料,也没有找到解决方案,无意间找到一篇文章:RabbitMQ and HAProxy: a timeout issue
文章说,如果使用 HAProxy 配置 RabbitMQ 高可用集群的话,则会遇到客户端连接超时问题。
为什么会出现此问题呢?因为 HAProxy 配置了客户端连接超时参数(timeout client ms),如果客户端连接超过配置的此参数,那么 HAProxy 将会删除这个客户端连接。
RabbitMQ 客户端使用永久连接到代理,从不超时,那为什么还会出现问题?因为如果 RabbitMQ 在一段时间内处于非活动状态,那么 HAProxy 将自动关闭连接(有点坑呀😂)。
那如何解决这个问题呢?我们看到 HAProxy 提供了一个clitcpka参数配置,它可以从客户端发送TCP keepalive数据包。
我们就使用它,但发现配置了之后,还是出现了上面的问题。
为什么呢?
[…]the exact behaviour of tcp keep-alive is determined by the underlying OS/Kernel configuration[…]
什么意思?意思就是TCP keepalive数据包的发送,取决于操作系统/内核配置。
我们可以使用命令查看(HAProxy 所在服务器中的tcp_keepalive_time配置):
[root@manager1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time
7200
tcp_keepalive_time默认配置时间 2 个小时,表示发送TCP keepalive数据包的间隔时间是 2 个小时,或者说每隔 2 个小时发送TCP keepalive数据包。
这么说就清楚了吧,虽然我们在 HAProxy 中,配置了clitcpka参数,但因为系统发送TCP keepalive数据包的间隔时间过长,远远超过 HAProxy 中的 timeout client超时时间(默认好像是 2 秒),所以客户端连接每隔 2 秒,就被 HAProxy 无情的给删除掉,然后不断的被重建。
说了那么多,我们该怎么解决此问题呢?
两种方案:
修改系统的tcp_keepalive_time配置,间隔时间低于 HAProxy 配置的timeout client超时时间(因为有可能影响其他系统服务,不推荐)。
修改 HAProxy 中的timeout client超时时间,配置大于系统的tcp_keepalive_time间隔时间(推荐)
因为系统tcp_keepalive_time发送TCP keepalive数据包间隔时间是 2 个小时,所以,我们将 HAProxy 中的timeout client超时时间,设置为 3 个小时:
timeout client 3h
timeout server 3h
修改的配置:
重新运行 HAProxy即可
Apache Pulsar MQ 学习笔记
一、Pulsar简介
Apache Pulsar是一个企业级分布式消息系统,最初由雅虎在2016年开源。
Pulsar的关键特性:
1)Pulsar的单个实例原生支持多个集群,可跨机房再集群间无缝地完成消息复制
2)极低的发布延迟和端到端延迟
3)可无缝扩展到超过100万个topic
4)客户端简单,支持java, go, python, c++
5)支持多种topic订阅模式(独占订阅、共享订阅、故障转移订阅)
6)通过Apache BookKeeper提供的持久化消息存储机制,保证消息船体。
- 由轻量级serverless计算框架Pulsar Functions实现流原生的数据处理
- 基于Pulsar Functions的serverless connector框架Pulsar IO使得数据更易移入、移出Apache pulsar
- 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3)中
二、Pulsar比Kafka好在哪
Pulsar旨在取代Kafka多年的主宰地位。Pulsar在很多情况下,提供了比Kafka更快的吞吐和更低的延迟,并为开发人员提供了一组兼容的API,让他们可以很轻松地从Kafka切换到Pulsar。Pulsar最大优点在于他提供了比Kafka更简单明了、更健壮的一些列操作功能,特别是在解决可观察性、地域复制和多租户访问。在运行大型Kafka集群方面感觉有困难的企业可以考虑转向使用Pulsar。
2.1、Pulsar vs Kafka
2.1.1、模型概念
Kafka:producer - topic - consumer group - consumer
Pulsar:producer - topic -subscription - consumer
2.1.2、消费模式:Pulsar没强哪去
Kafka:主要在stream流模式,对单个partition是独占消费,没有共享queue的消费模式
Pulsar:提供了统一的消息模型和API。流(stream)模式:独占和故障切换订阅模式;队列(queue)模型:共享订阅模式。
2.1.3、消息确认(ack)
Kafka : offset commit
Pulsar:使用专门的cursor管理。累计确认和Kafka效果一样;提供单条或选择性确认。
2.1.4、消息保留
Kafka:根据设置的保留时间来删除消息,有可能消息没有被消费就过期被删除了。不支持TTL
Pulsar:消息只有被所有订阅中消费后才会删除,不会丢失数据。也允许设置保留期,保留被消费的数据。支持TTL。
三、系统架构和设计理念
3.1、Pulsar的分层架构
Pulsar和其他消息系统最根本的不同是采用了分层架构。Pulsar集群由两层组成:
1)无状态服务层:由一组接收和传递消息的broker组成
2)有状态的持久层:由一组明文bookies的Apache BookKeeper存储节点组成,可持久化地存储消息。
下图为Pulsar的部署架构:
在Pulsar客户端中,提供生产者和消费者接口,应用程序使用Pulsar客户端来连接broker来生产和消费消息。
Pulsar客户端不直接与存储层Apache BookKeeper交互。客户端也没有直接的BookKeeper访问权限。这种隔离,为Pulsar实现安全的多租户统一身份验证模型提供了基础。
Apache Pursar客户端支持的语言为:Java, C++, Python, Go, Websocket
Apache Pursar还提供了一组兼容Kafka的API,用户可以通过简单地更新依赖关系并将客户端指向Pulsar集群来迁移现有的Kafka应用程序,这样现有的Kafka应用程序可以立即与Apache Pulsar一起使用,无需更改任何代码。
3.1.1、Broker层:无状态服务层
Broker集群在Pulsar中形成了无状态层。即服务层是“无状态的”,因为Broker实际上并不在本地存储任何数据消息。消息都存到啦分布式日志存储系统(Apache BookKeeper)中。
每个主题分区(Topic Partition)由Pulsar分配给某个broker,该broker称为该topic的所有者。Pulsar生产者和消费者连接到主题分区的所有broker,以向所有者代理发送消息并消费消息。
如果一个broker故障,Pulsar会自动将其拥有的主题分区移动到集群中剩余的某一个可用的broker中。这里要强调的是,由于broker是无状态的,当topic发送迁移时,Pulsar只是将所有权从一个broker转移到另一个broker,在这个过程中,数据不需要做任何迁移。
下图显示了一个拥有4个Broker的Pulsar集群,其中4个主题分区分布在4个Broker中。每个Broker拥有并为一个主题分区提供消息服务。
3.1.2、BookKeeper层:持久化存储层
Apache BookKeeper是Apache Pulsar的持久化存储层。Pulsar中的每个主题分区,本质上都是存储在Apache BookKeeper中的分布式日志。
每个分布式日志,又被分成多个Segment段,每个Segment段作为Apache BookKeeper中的一个Ledger,均匀分布并存储在BookKeeper集群中的多个Bookie(Apache BookKeeper的存储节点)中。-- 这点同Kafka类似。
Segment的创建时机包括:
1)基于配置Segment的大小
2)基于配置的滚动时间
3)当Segment的所有者被切换
通过Segment分段的方式,主题分区中的消息可以均匀和平衡地分布在集群中的所有bookie中。这意味着主题分区的大小不仅受一个节点容量的限制;相反,他可以扩展到整个BookKeeper集群的总容量。
下图说明了一个分为x个Segment段的主题分区。每个Segment段存储三个副本。所有Segment都分布并存储在4个Bookie中。
3.1.2.1、Segment为中心的存储
存储服务的分层架构和以Segment为中心的存储,是Apache Pulsar(使用Apache BookKeeper)的两个关键设计理念。这样的设计为Pulsar提供了如下好处:
1)无限制的主题分区存储-- 这个Kafka也支持
2)即时扩展,无需数据迁移 – 这个Kafka Broker是有状态,所以不如Pulsar。
- 无缝Broker故障恢复
- 无缝集群扩展
- 无缝的存储(Bookie)故障恢复
3)独立的可扩展性 – 这点和2有什么区别么?
1、无限制的主题分区存储 – 这点Kafka也具备
由于主题分区被分成多个segment,而segment在BookKeeper集群中的每个节点上分布式存储,所以主题分区不受单一节点容量的限制,相反,主题分区可以扩容到整个BookKeeper集群的总容量,只需要添加Bookie节点即可扩展集群数量。
这就是为什么Pulsar可以支持存储无限大小的流数据,并能以高效、分布式方式处理数据的关键。
2、即时扩展,无需数据迁移
Pulsar不同于Kafka的关键点,就是把消息服务和消息存储分两层,所以当主题从一个broker移动到另一个broker,只需要修改broker和topic partition的映射关系,而不需要数据迁移。-- 这点相比kafka broker故障来说,无需数据迁移。
这一特性对高可用的许多方面至关重要。例如集群扩展,对Broker和Bookie失败的快速应对,下面将距离说明。
3、无缝broker故障恢复
下图说明了Pulsar如何处理broker失败的示例。
假设broker2由于某种原因(如停电)而不可用。Pulsar检测到Broker2已关闭,并立即将topic1-part2的所有权转移到broker3。在Pulsar中数据存储和数据服务分离,所以当Broker3接管topic1-part2的所有权时,他不需要复制partition的数据(如果是kafka就需要)。如果有新数据到来,他立即附加并存储为topic1-part2中的segment x +1。segment x + 1被分发并存储在Bookie1,2,4上。因为他不需要重新复制数据,所以所有权转移立即发生,而不会牺牲主题分区的可用性。
4、无缝集群容量扩展:存储中扩展时,无需数据迁移
下图说明了Pulsar如何处理集群的容量扩展。当Broker 2将消息写入topic1-part2的segmetn x时,将bookie x和bookie y添加到集群中。broker2 立即发现新加入的bookie x和 bookie y。然后Broker将尝试将segment x + 1和X + 2的消息存储到新添加的bookie中。新添加的bookie立刻被使用起来,流量立即增加,而不会重新复制任何数据。除了机架感知和区域感知策略之外,Apache BookKeeper还提供资源感知的放置策略,以确保流量在集群中的所有存储之间保持平衡。
5、无缝的存储(Bookie)故障恢复
下图说明了Pulsar(通过Apache BookKeeper)如何处理bookie的磁盘故障。
Apache BookKeeper中副本修复是Segment(甚至是Entry)级别的多对多快速修复,这比重新复制整个topic partition要精细,只会复制必须的数据。这意味着Apache BookKeeper可以从bookie3和bookie4读取Segment4中的消息,并在bookie1处修复segment4。所有的副本修复都在后台进行,对broker和应用透明。
即使有bookie节点故障发生,通过添加新的可用的bookie来替换故障的bookie,所有broker都可以继续接受接入,而不会牺牲主题分区的可用性。
6、独立的可扩展性
由于消息服务层和持久层存储是分开的,所以Pulsar可独立地扩展存储层和服务层。这种独立的扩展,更具有成本效益:
1)当你需要支持更多的消费者或生产者时,可以简单地添加更多的broker。主题分区将立即在brokers中做平衡迁移,一些主题分区的所有权立即转移到了新的broker。
2)当你需要更多存储空间来将消息保存更长时间时,只需要添加更多的bookie。通过智能资源感知和数据放置,流量讲自动切换到新的bookie中。Pulsar中不会涉及到不必要的数据迁移,不会讲旧数据从现有的存储节点重新复制到新的存储节点。
四、Pulsar vs Kafka
Kafka和Pulsar都有相似的消息概念:客户端通过topic和消息系统交互,每个topic都有多个Partition。
但是,Pulsar和Kafka的根本区别在于:
1)Kafka是以Partition为存储中心
2)Pulsar是以Segment为存储中心
上图显示了以分区为中心和以Segment为中心的差异。
在Kafka中,partition只能存储哎单个节点上,并复制到其他节点,其单个partition容量受最小节点容量的限制。这意味着kafka要添加broker节点时,需要对partition做重新的rebalance, 这样就需要做某些整个partition数据的迁移(例如broker有3个节点,原来有6个partition,那么再扩容3个节点,原来的4~6号partition需要复制到新的3个节点上去)。
数据迁移非常昂贵,且可能出错,会耗费网络IO,维护人员在执行此操作时必须非常消息,以避免破坏生产系统。
Kafka中分区数据的重新copy,不仅发送在以分区为中心的集群扩容节点时,其他Case也可能会做数据迁移,例如你将partition配置为3个副本,这时,如果丢失了一个副本,则必须重新复制完整的整个分区后,分区才可以再次可用。
在用户遇到故障之前,通常会忽略这种缺陷,因为许多情况下,在短时间内仅是对内存中缓存数据的读取。当数据被保存到磁盘后,用户将越来越多地不可避免地遇到数据丢失,故障恢复等问题,特别是在需要将数据长时间保存的场合。
接下来看Pulsar的优势:
在Pulsar中,同样是以分区为逻辑单元,但以Segment为物理存储单元,分区随时间的推移会进行分段,并在整个集群中均衡分布,旨在有效地迅速扩张。
Pulsar是以Segment为中心的,因此在扩容时,不需要数据重新平衡和copy,旧数据不会被重新复制,这要归功于BookKeeper中使用可扩展的以Segment为中心的分布式日志存储系统。
通过利用分布式日志存储,Pulsar可最大化Segmetn放置选项,实现高写入和高读取可用性。例如,使用BookKeeper,副本设置等于2,只要任何2个Bookie启动,就可以对主题分区进行写入。对于读取可用性,只要主题分区的副本集中有1个处于活动状态,用户就可以读取他,而不会出现任何不一致。
总之,Pulsar这种独特的基于分布式日志存储的以Segment为中心的发布/定于消息系统提供了很多优势:例如可靠的流式系统,包括无限制的日志存储,无需分区重新平衡的即时扩展,快速复制修复以及最大化数据放置实现高可入和读取可用性选项。
https://blog.csdn.net/cpongo4/article/details/89118556
以上是关于5-2 客户端操作MQ集群的主要内容,如果未能解决你的问题,请参考以下文章
rocketMQ安装配置+与java交互API操作+集群搭建+高级特性
rocketMQ安装配置+与java交互API操作+集群搭建+高级特性