Kafka简介+Kafka Tool使用简介+使用实例
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka简介+Kafka Tool使用简介+使用实例相关的知识,希望对你有一定的参考价值。
参考技术A
详细安装访问: https://www.jianshu.com/p/c74e0ec577b0
macOS 可以用homebrew快速安装,访问地址: https://www.jianshu.com/p/cddd25da8061
原文链接: https://www.jianshu.com/p/06884c5bf3f1
查看topic列表:
创建topic:
--create :创建命令;
--topic :后面指定topic名称;
--replication-factor :后面指定副本数;
--partitions :指定分区数,根据broker的数量决定;
--zookeeper :后面指定 zookeeper.connect 的zk链接
查看某个topic:
Kafka 作为消息系统的一种, 当然可 以像其他消 息中 间件一样作为消息数据中转的平台。 下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。
1、引入依赖
2、消息生产者
示例 中用 KafkaProducer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。
bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理服务器,则服务器地址之间用逗号分隔, 比如” 192.168.1.1 :9092,192.168.1.2:9092” 。 localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。
key.serializer 和 value.serializer 表示消息的序列化类型 。 Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的 消息序列化为 二 进制类型,示例中是发送文本消息到服务器 , 所以使用的是StringSerializer。
key.deserializer 和 value.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。
zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。
有 了 消息生产者之后 , 就可以调用 send 方法发送消息了。该方法的入参是 ProducerRecord类型对象 , ProducerRecord 类提供了多种构造函数形参,常见的有如下三种 :
ProducerRecord(topic,partition,key,value);
ProducerRecord(topic,key,value);
ProducerRecord(topic, value) ;
其中 topic 和 value 是必填的, partition 和 key 是可选的 。如果指定了 pa时tion,那么消息会被发送至指定的 partition ;如果没指定 partition 但指定了 Key,那么消息会按照 hash(key)发送至对应的 partition: 如果既没指定 partition 也没指定 key,那么 消息会按照 round-robin 模式发送(即以轮询的方式依次发送〉到每一个 partition。示例中将向 test-topic 主题发送三条消息。
3、消息消费者
和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。
bootstrap. servers 和生产者一样,表示 Kafka 集群。
group.id 表示消费者的分组 ID。
enable.auto.commit 表示 Consumer 的 offset 是否自 动提交 。
auto.commit.interval .ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。
key. deserializer 和 value.deserializer 表示用字符串来反序列化消息数据。
消息消费者使用 subscribe 方法 订阅了 Topic 为 test-topic 的消息。 Consumer 调用poll 方法来轮询 Kafka 集群的消息, 一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止 。 如果读取到消息,则打印出消息记录的 pa此ition, offset、key 等。
以上是关于Kafka简介+Kafka Tool使用简介+使用实例的主要内容,如果未能解决你的问题,请参考以下文章