初识中间件Kafka

Posted DotNet开发跳槽

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了初识中间件Kafka相关的知识,希望对你有一定的参考价值。

初识中间件Kafka

Author:SimplelWu

什么是消息中间件?
  • 非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件

  • 关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统。

什么是Kafka?

Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。

kafka官方:http://kafka.apache.org/

Kafka作为一个分布式的流平台,这到底意味着什么?

我们认为,一个流处理平台具有三个关键能力:

  • 发布和订阅消息(流),在这方面,它类似于一个消息队列或企业消息系统。

  • 容错的方式存储消息(流)。

  • 在消息流发生时处理它们。

什么是kakfa的优势?

它应用于2大类应用:

  • 构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。

  • 构建实时流的应用程序,对数据流进行转换或反应。

kafka有四个核心API
  • 应用程序使用 Producer API 发布消息到1个或多个topic(主题)。

  • 应用程序使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。

  • 应用程序使用 Streams API 充当一个流处理器,从1个或多个topic消费输入流,并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。

  • Connector API允许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,一个关系数据库的连接器可捕获每一个变化。

lient和Server之间的通讯,是通过一条简单、高性能并且和开发语言无关的TCP协议。并且该协议保持与老版本的兼容。Kafka提供了Java Client(客户端)。除了Java Client外,还有非常多的其它编程语言的Client。

主流消息中间件比较

ActiveMQ RabbitMQ Kafka
跨语言 支持(Java优先) 语言无关 支持(Java优先)
支持协议 OpenWire,Stomp, XMPP,AMQP AMQP
优点 遵循JMS规范,安装部署方便。 继承Erlang天生的并发性,最初用于金融行业,稳定性,安全性有保障。 依赖zk,可动态扩展节点,高性能,高吞吐量,无线扩容消息可指定追溯。
缺点 根据其他用户反馈,会莫名丢失消息,目前重心在下一代的apolle上,目前社区不活跃,对5.X维护较少。 Erlang语言难度较大,不支持动态扩展。 严格的顺序机制,不支持消息优先级,不支持标准的消息协议,不利于平台迁移。
综合评价 适合中小企业消息应用场景,不适合上千个队列的应用场景。 适合对稳定性要求较高的企业应用。 一般应用在大数据日志处理或对实时性,可靠性要求稍低的场景。
Kafka好处
  • 可靠性 - Kafka是分布式,分区,复制和容错的。

  • 可扩展性 - Kafka消息传递系统轻松缩放,无需停机。

  • 耐用性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。

  • 性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。

Kafka非常快,并保证零停机和零数据丢失。

应用场景
  • 指标 - Kafka通常用于操作监控数据。 这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。

  • 日志聚合解决方案 - Kafka可用于跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。

  • 流处理 - 流行的框架(如Storm和Spark Streaming)从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。 Kafka的强耐久性在流处理的上下文中也非常有用。

Kafka相关术语
序号 组件和说明
1 Topics(主题)属于特定类别的消息流称为主题。 数据存储在主题中。主题被拆分成分区。 对于每个主题,Kafka保存一个分区的数据。 每个这样的分区包含不可变有序序列的消息。 分区被实现为具有相等大小的一组分段文件。
2 Partition(分区)主题可能有许多分区,因此它可以处理任意数量的数据。
3 Partition offset(分区偏移)每个分区消息具有称为 offset 的唯一序列标识。
4 Replicas of partition(分区备份)副本只是一个分区的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。
5 Brokers(经纪人)代理是负责维护发布数据的简单系统。 每个代理中的每个主题可以具有零个或多个分区。 假设,如果在一个主题和N个代理中有N个分区,每个代理将有一个分区。假设在一个主题中有N个分区并且多于N个代理(n + m),则第一个N代理将具有一个分区,并且下一个M代理将不具有用于该特定主题的任何分区。假设在一个主题中有N个分区并且小于N个代理(n-m),每个代理将在它们之间具有一个或多个分区共享。 由于代理之间的负载分布不相等,不推荐使用此方案。
6 Kafka Cluster(Kafka集群)Kafka有多个代理被称为Kafka集群。 可以扩展Kafka集群,无需停机。 这些集群用于管理消息数据的持久性和复制。
7 Producers(生产者)生产者是发送给一个或多个Kafka主题的消息的发布者。 生产者向Kafka经纪人发送数据。 每当生产者将消息发布给代理时,代理只需将消息附加到最后一个段文件。 实际上,该消息将被附加到分区。 生产者还可以向他们选择的分区发送消息。
8 Consumers(消费者)Consumers从经纪人处读取数据。 消费者订阅一个或多个主题,并通过从代理中提取数据来使用已发布的消息。
9 Leader(领导者) Leader 是负责给定分区的所有读取和写入的节点。每个分区都有一个服务器充当Leader 。
10 Follower(追随者)跟随领导者指令的节点被称为Follower。 如果领导失败,一个追随者将自动成为新的领导者。 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。
使用Kafka
  1. 安装jdk

  2. 安装zookepper 官方:http://zookeeper.apache.org/

  3. 安装kafka

我这里jdk是已经安装好的。

安装zookepper:

tar -zxvf zookeeper-3.4.13.tar.gz #解压
cd zookeeper-3.4.13/config #进入配置目录#zookeeper运行需要config里有config文件。但是解压后默认只有zoo_sample.cfg,我们将名字修改下即可mv zoo_sample.cfg zoo.cfg #修改配置文件名字

启动zookeper,来到bin目录:

./zkServer.sh start #启动zookepper

停止zookeper,来到bin目录:

./zkServer.sh start #停止zookepper

kafka下载:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

使用Kafka,

tar -zxvf kafka_2.11-2.1.0.tgz #解压kafka

启动zookpper服务,来到kafka的bin目录:

./zookeeper-server-start.sh config/zookeeper.properties #启动服务

启动kafka服务:

./kafka-server-start.sh config/server.properties #启动kafka服务

创建一个主题:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #topic_name

topic_name:主题的名字'test'。

创建好后查看主题:

kafka-topics.sh --list --zookeeper localhost:2181

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。

发送消息:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test #主题为test

进入之后就可发送消息!!!

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。

消费消息:

#topic主题需要与被消费的主题对应上./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Kafka常用命令
#查看所有主题列表kafka-topics.sh --zookeeper localhost:2181 --list#查看指定topic信息kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_name#控制台向topic生产数据kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name 
#控制台消费topic的数据kafka-console-consumer.sh  --zookeeper localhost:2181  --topic topic_name --from-beginning 
#查看topic某分区偏移量最大(小)值kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable  --time -1 --broker-list localhost:9092 --partitions 0 
#增加topic分区数kafka-topics.sh --zookeeper localhost:2181  --alter --topic topic_name --partitions 10#删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic topic_name

来源:https://www.cnblogs.com/SimpleWu/p/10137494.html