分布式发布订阅消息系统 Kafka 架构设计
Posted IT哈哈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式发布订阅消息系统 Kafka 架构设计相关的知识,希望对你有一定的参考价值。
端到端的批量压缩
多数情况下系统的瓶颈是网络而不是CPU。 这一点对于需要将消息在个数据中心间进行传输的数据管道来说,尤其如此。当然,无需来自Kafka的支持,用户总是可以自行将消息压缩后进行传输,但这么做的压缩率会非常低,因为不同的消息里都有很多重复性的内容(比如JSON里的字段名、web日志中的用户代理或者常用的字符串)。高效压缩需要将多条消息一起进行压缩而不是分别压缩每条消息。理想情况下,以端到端的方式这么做是行得通的 —— 也即,数据在消息生产者发送之前先压缩一下,然后在服务器上一直保存压缩状态,只有到最终的消息使用者那里才需要将其解压缩。
通过运行递归消息集,Kafka对这种压缩方式提供了支持。 一批消息可以打包到一起进行压缩,然后以这种形式发送给服务器。这批消息都会被发送给同一个消息使用者,并会在到达使用者那里之前一直保持为被压缩的形式。
Kafka支持GZIP和Snappy压缩协议。
客户状态
追踪(客户)消费了什么是一个消息系统必须提供的一个关键功能之一。它并不直观,但是记录这个状态是该系统的关键性能之一。状态追踪要求(不断)更新一个有持久性的实体的和一些潜在会发生的随机访问。因此它更可能受到存储系统的查询时间的制约而不是带宽(正如上面所描述的)。
大部分消息系统保留着关于代理者使用(消费)的消息的元数据。也就是说,当消息被交到客户手上时,代理者自己记录了整个过程。这是一个相当直观的选择,而且确实对于一个单机服务器来说,它(数据)能去(放在)哪里是不清晰的。又由于许多消息系统存储使用的数据结构规模小,所以这也是个实用的选择--因为代理者知道什么被消费了使得它可以立刻删除它(数据),保持数据大小不过大。
也许不显然的是,让代理和使用者这两者对消息的使用情况做到一致表述绝不是一件轻而易举的事情。如果代理每次都是在将消息发送到网络中后就将该消息记录为已使用的话,一旦使用者没能真正处理到该消息(比方说,因为它宕机或这请求超时了抑或别的什么原因),就会出现消息丢失的情况。为了解决此问题,许多消息系新加了一个确认功能,当消息发出后仅把它标示为已发送而不是已使用,然后代理需要等到来自使用者的特定的确认信息后才将消息记录为已使用。这种策略的确解决了丢失消息的问题,但由此产生了新问题。首先,如果使用者已经处理了该消息但却未能发送出确认信息,那么就会让这一条消息被处理两次。第二个问题是关于性能的,这种策略中的代理必须为每条单个的消息维护多个状态(首先为了防止重复发送就要将消息锁定,然后,然后还要将消息标示为已使用后才能删除该消息)。另外还有一些棘手的问题需要处理,比如,对于那些以发出却未得到确认的消息该如何处理?
消息传递语义(Message delivery semantics)
系统可以提供的几种可能的消息传递保障如下所示:
最多一次—这种用于处理前段文字所述的第一种情况。消息在发出后立即标示为已使用,因此消息不会被发出去两次,但这在许多故障中都会导致消息丢失。
至少一次—这种用于处理前文所述的第二种情况,系统保证每条消息至少会发送一次,但在有故障的情况下可能会导致重复发送。
仅仅一次—这种是人们实际想要的,每条消息只会而且仅会发送一次。
这个问题已得到广泛的研究,属于“事务提交”问题的一个变种。提供仅仅一次语义的算法已经有了,两阶段或者三阶段提交法以及Paxos算法的一些变种就是其中的一些例子,但它们都有与生俱来的的缺陷。这些算法往往需要多个网络往返(round trip),可能也无法很好的保证其活性(liveness)(它们可能会导致无限期停机)。FLP结果给出了这些算法的一些基本的局限。
Kafka对元数据做了两件很不寻常的事情。一件是,代理将数据流划分为一组互相独立的分区。这些分区的语义由生产者定义,由生产者来指定每条消息属于哪个分区。一个分区内的消息以到达代理的时间为准进行排序,将来按此顺序将消息发送给使用者。这么一来,就用不着为每一天消息保存一条元数据(比如说,将消息标示为已使用)了,我们只需为使用者、话题和分区的每种组合记录一个“最高水位标记”(high water mark)即可。因此,标示使用者状态所需的元数据总量实际上特别小。在Kafka中,我们将该最高水位标记称为“偏移量”(offset),这么叫的原因将在实现细节部分讲解。
使用者的状态
在Kafka中,由使用者负责维护反映哪些消息已被使用的状态信息(偏移量)。典型情况下,Kafka使用者的library会把状态数据保存到Zookeeper之中。然而,让使用者将状态信息保存到保存它们的消息处理结果的那个数据存储(datastore)中也许会更佳。例如,使用者也许就是要把一些统计值存储到集中式事物OLTP数据库中,在这种情况下,使用者可以在进行那个数据库数据更改的同一个事务中将消息使用状态信息存储起来。这样就消除了分布式的部分,从而解决了分布式中的一致性问题!这在非事务性系统中也有类似的技巧可用。搜索系统可用将使用者状态信息同它的索引段(index segment)存储到一起。尽管这么做可能无法保证数据的持久性(durability),但却可用让索引同使用者状态信息保存同步:如果由于宕机造成有一些没有刷新到磁盘的索引段信息丢了,我们总是可用从上次建立检查点(checkpoint)的偏移量处继续对索引进行处理。与此类似,Hadoop的加载作业(load job)从Kafka中并行加载,也有相同的技巧可用。每个Mapper在map任务结束前,将它使用的最后一个消息的偏移量存入HDFS。
这个决策还带来一个额外的好处。使用者可用故意回退(rewind)到以前的偏移量处,再次使用一遍以前使用过的数据。虽然这么做违背了队列的一般协约(contract),但对很多使用者来讲却是个很基本的功能。举个例子,如果使用者的代码里有个Bug,而且是在它处理完一些消息之后才被发现的,那么当把Bug改正后,使用者还有机会重新处理一遍那些消息。
Push和Pull
相关问题还有一个,就是到底是应该让使用者从代理那里吧数据Pull(拉)回来还是应该让代理把数据Push(推)给使用者。和大部分消息系统一样,Kafka在这方面遵循了一种更加传统的设计思路:由生产者将数据Push给代理,然后由使用者将数据代理那里Pull回来。近来有些系统,比如scribe和flume,更着重于日志统计功能,遵循了一种非常不同的基于Push的设计思路,其中每个节点都可以作为代理,数据一直都是向下游Push的。上述两种方法都各有优缺点。然而,因为基于Push的系统中代理控制着数据的传输速率,因此它难以应付大量不同种类的使用者。我们的设计目标是,让使用者能以它最大的速率使用数据。不幸的是,在Push系统中当数据的使用速率低于产生的速率时,使用者往往会处于超载状态(这实际上就是一种拒绝服务攻击)。基于Pull的系统在使用者的处理速度稍稍落后的情况下会表现更佳,而且还可以让使用者在有能力的时候往往前赶赶。让使用者采用某种退避协议(backoff protocol)向代理表明自己处于超载状态,可以解决部分问题,但是,将传输速率调整到正好可以完全利用(但从不能过度利用)使用者的处理能力可比初看上去难多了。以前我们尝试过多次,想按这种方式构建系统,得到的经验教训使得我们选择了更加常规的Pull模型。
分发
Kafka通常情况下是运行在集群中的服务器上。没有中央的“主”节点。代理彼此之间是对等的,不需要任何手动配置即可可随时添加和删除。同样,生产者和消费者可以在任何时候开启。 每个代理都可以在Zookeeper(分布式协调系统)中注册的一些元数据(例如,可用的主题)。生产者和消费者可以使用Zookeeper发现主题和相互协调。关于生产者和消费者的细节将在下面描述。
生产者
生产者自动负载均衡
对于生产者,Kafka支持客户端负载均衡,也可以使用一个专用的负载均衡器对TCP连接进行负载均衡调整。专用的第四层负载均衡器在Kafka代理之上对TCP连接进行负载均衡。在这种配置的情况,一个给定的生产者所发送的消息都会发送给一个单个的代理。使用第四层负载均衡器的好处是,每个生产者仅需一个单个的TCP连接而无须同Zookeeper建立任何连接。不好的地方在于所有均衡工作都是在TCP连接的层次完成的,因而均衡效果可能并不佳(如果有些生产者产生的消息远多于其它生产者,按每个代理对TCP连接进行平均分配可能会导致每个代理接收到的消息总数并不平均)。
采用客户端基于zookeeper的负载均衡可以解决部分问题。如果这么做就能让生产者动态地发现新的代理,并按请求数量进行负载均衡。类似的,它还能让生产者按照某些键值(key)对数据进行分区(partition)而不是随机乱分,因而可以保存同使用者的关联关系(例如,按照用户id对数据使用进行分区)。这种分法叫做“语义分区”(semantic partitioning),下文再讨论其细节。
下面讲解基于zookeeper的负载均衡的工作原理。在发生下列事件时要对zookeeper的监视器(watcher)进行注册:
加入了新的代理
有一个代理下线了
注册了新的话题
代理注册了已有话题。
生产者在其内部为每一个代理维护了一个弹性的连接(同代理建立的连接)池。通过使用zookeeper监视器的回调函数(callback),该连接池在建立/保持同所有在线代理的连接时都要进行更新。当生产者要求进入某特定话题时,由分区者(partitioner)选择一个代理分区(参加语义分区小结)。从连接池中找出可用的生产者连接,并通过它将数据发送到刚才所选的代理分区。
异步发送
对于可伸缩的消息系统而言,异步非阻塞式操作是不可或缺的。在Kafka中,生产者有个选项(producer.type=async)可用指定使用异步分发出产请求(produce request)。这样就允许用一个内存队列(in-memory queue)把生产请求放入缓冲区,然后再以某个时间间隔或者事先配置好的批量大小将数据批量发送出去。因为一般来说数据会从一组以不同的数据速度生产数据的异构的机器中发布出,所以对于代理而言,这种异步缓冲的方式有助于产生均匀一致的流量,因而会有更佳的网络利用率和更高的吞吐量。
语义分区
下面看看一个想要为每个成员统计一个个人空间访客总数的程序该怎么做。应该把一个成员的所有个人空间访问事件发送给某特定分区,因此就可以把对一个成员的所有更新都放在同一个使用者线程中的同一个事件流中。生产者具有从语义上将消息映射到有效的Kafka节点和分区之上的能力。这样就可以用一个语义分区函数将消息流按照消息中的某个键值进行分区,并将不同分区发送给各自相应的代理。通过实现kafak.producer.Partitioner接口,可以对分区函数进行定制。在缺省情况下使用的是随即分区函数。上例中,那个键值应该是member_id,分区函数可以是hash(member_id)%num_partitions。
对Hadoop以及其它批量数据装载的支持
具有伸缩性的持久化方案使得Kafka可支持批量数据装载,能够周期性将快照数据载入进行批量处理的离线系统。我们利用这个功能将数据载入我们的数据仓库(data warehouse)和Hadoop集群。
批量处理始于数据载入阶段,然后进入非循环图(acyclic graph)处理过程以及输出阶段。支持这种处理模型的一个重要特性是,要有重新装载从某个时间点开始的数据的能力(以防处理中有任何错误发生)。
对于Hadoop,我们通过在单个的map任务之上分割装载任务对数据的装载进行了并行化处理,分割时,所有节点/话题/分区的每种组合都要分出一个来。Hadoop提供了任务管理,失败的任务可以重头再来,不存在数据被重复的危险。
原文:http://kafka.apache.org/documentation/#design
译者:参与翻译 (4人) : fbm, 木川瓦兹, K6F, nesteaa
链接:https://www.oschina.net/translate/kafka-design
《》
《》
《》
以上是关于分布式发布订阅消息系统 Kafka 架构设计的主要内容,如果未能解决你的问题,请参考以下文章