kafka 消息队列
Posted finley
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 消息队列相关的知识,希望对你有一定的参考价值。
kafka是使用Java和Scala编写的一个快速可扩展的高吞吐量的分布式消息队列系统。
kafka将数据持久化存储到磁盘上,自带分区和副本机制,因而具有较好的持久化保证。
但是kafka的消息消费没有确认机制,可能因为consumer崩溃导致消息没有完成处理。因此不建议将kafka用于一致性较高的业务场景,kafka经常被用做日志收集和数据仓库之间的缓存。
比如将网站的浏览日志缓存到kafka,然后从kafka中取出批量写入ElasticSearch, Hive或者HBase等数据仓库中。这样做可以极大的减轻离线分析系统的负载。
架构简介
kafka架构中有下列角色参与:
- broker: kafka 集群中的服务器实例称为broker
- producer: 向broker发送消息的客户端
- consumer: 向从borker中读取消息的客户端
- zookeeper: 存储集群状态的注册中心,不处理具体消息。在负载均衡和集群扩展等功能中有重要作用。
接下来介绍kafka的逻辑模型:
- message: 消息是kafka通信的基本单元
- topic: topic 在逻辑结构上类似于队列, 每条消息都属于一个 topic。
- consumer group: 每个group中可以包含若干 consumer 实例,每个topic可以被多个consumer group 订阅。
消费者组拥有唯一的 GroupID 进行标识, 每个 consumer 实例有且只有一个 GroupID。 - partition: topic 被分为若干个 partition 进行存储,每条消息都属于一个 partition。
- offset: 每条消息在 partition 中使用 offset (偏移量)作为唯一标识。
kafka 保证订阅某个 topic 的所有 consumer group 都会收到该 topic 中所有消息。
topic 中的一条消息在一个 consumer group 中都会被一个 consumer 读取,且仅会被该 consumer 读取。
若每个 consumer 都属于一个独立的 consumer group 那么消息会被所有 consumer 读取,即实现了消息广播。 若所有 consumer 属于同一个 consumer group, 那么消息只会被一个 consumer 读取,即实现消息单播。
kafka 不会主动将消息推送给消费者, 消费者需要主动从broker中读取数据。
kafka 没有消息确认机制,由 consumer 自行控制消费的消息。
partition与消息传递的实现
kafka 将一个 topic 中的数据存存储到多个 partition 中,每个 partition 分为多个段文件存储在 broker 节点上。
producer 会与 topic 下所有 partition 保持通信,并根据配置的算法(key-hash 或 round robin等)决定将消息写入哪个 partition 中。
partition 内部是有序的,但是同一个 topic 的多个 partition 之间不保证有序, 即 topic 不是整体有序的。
kafka 会为监听 topic 的 consumer 分配一个 partition。 在一个消费者组内,一个 partition 最多分配给一个 consumer。
当组内 consumer 数量大于 partition 数量时,可能有 consumer 分配不到数据。
一个 partition 可以被属于不同 group 的多个 consumer 监听。
consumer 监听不同 partition 的机制实现了消息只能被组内一个 consumer 消费的特性,避免使用锁机制极大提高了吞吐率简化了 broker 实现。
消费者通过 offset 标记自己读取的位置,主动读取 parttion 中的数据
消费者向 broker 发送包含 offset 和 max 参数的 fetch 请求来读取 partition中的数据。 因此,消费者可以自由设置 offset 来控制读取的位置,从而实现增量读取或从头读取等功能。
当消费者订阅某个 topic 时,kafka 会将最新的offset告知消费者。
消费者可以将自己当前的 offset 反馈给 kafka, kafka 会将状态保存到 zookeeper,使得消费者可以自由退出或者重新加入继续消费。
kafka 没有消息确认机制,完全由 consumer 设置 offset 来进行消费。因此,kafka broker 不需要维护消息状态,有利于提高吞吐率。
与很多消息队列系统不同的是, kafka 不会删除已消费的信息, 而是根据配置的超时时间或者文件大小限制,删除较早发送的消息或过大的partition文件。
replica
kafka 在0.8之后版本中支持了副本机制, 每个 topic 分为多个 partition, 每个 partition 存在多个 replica。
这些 replica 分布于不同的 broker 节点上, 降低单个 broker 宕机对系统可用性的影响。
kafka 的副本分布策略是: 在拥有 n 个 broker 节点的集群中, 将第 i 个 partition 的第 j 个 replica 存储在第 (i + j) % n 个 broker 上。
同一个 partition 的 replica 中存在一个 leader,生产者消费者只与 leader replica 进行交互, 其它 replica 从leader中同步数据。
kafka提供了两种主从复制机制:
- 同步复制:消息被 partition 的所有 alive 状态 replica 复制消息才会成功提交,这种方式保证一致性却极大影响吞吐率。
- 异步提交:消息被 partition 的 leader replica 写入即提交成功, 其它 replica 会异步同步数据。这种方式吞吐率较高但一致性较低,leader 崩溃可能导致消息丢失。
kafka通过两种机制判断alive状态:
- zookeeper的心跳机制:broker必须维护zookeeper的session
- slave 副本从 leader 复制数据的延迟不能超过阈值。
体验kafka
安装kafka
直接下载kafka二进制文件官方教程, zookeeper可以使用官方docker镜像:
docker run -d zookeeper
使用docker-compose运行wurstmeister/kafka-docker
使用Docker运行kafka可以参考:Docker下kafka学习,三部曲之一:极速体验kafka
在ubuntu系统上安装kafka可以参考: How To Install Apache Kafka on Ubuntu 14.04
在mac系统上可以直接使用Homebrew进行安装
这里作者选择用homebrew进行安装.
brew install kafka
配置文件在/usr/local/etc/kafka/server.properties
和/usr/local/etc/kafka/zookeeper.properties
。
启动zookeeper:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
启动kafka:
kafka-server-start /usr/local/etc/kafka/server.properties &
命令行工具
创建topic:
kafka-topics --zookeeper localhost:2181 --create --topic test --partitions 30 --replication-factor 2
- zookeeper: 集群依赖的zookeeper服务地址
- topic: topic 名称
- partitions: topic 的 partition 数
- replication-factor: 每个 partition 的副本数
查看 topic 信息:
kafka-topics --zookeeper localhost:2181 --describe --topic test
删除 topic:
kafka-topics --zookeeper localhost:2181 --delete --topic test
查看所有 topic:
kafka-topics --zookeeper localhost:2181 --list
发送消息:
kafka-console-producer --broker-list localhost:9092 --topic test
接收新消息:
kafka-console-consumer --zookeeper localhost:2181 --topic test
从头读取消息:
kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning
以上是关于kafka 消息队列的主要内容,如果未能解决你的问题,请参考以下文章