一.Kafka入门到精通-Kafka快速入门
Posted 墨家巨子@俏如来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一.Kafka入门到精通-Kafka快速入门相关的知识,希望对你有一定的参考价值。
前言
在内卷严重的程序员圈子中,原地踏步就是退步,所以不能再躺平啦,赶紧爬起来学习,接下来博主将推出《Kafka入门到精通》系列文章,让你可以在企业中玩起Kafka来得心应手,此乃升职加薪必备呀。
Kafka认识
Kafka 是由Linkedin公司开发的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,是一款基于发布订阅模式的开源消息引擎系统。相对于其他的消息组件来说Kafka拥有更好的吞吐量、内置分区、具有复制和容错的功能,这使它成为一个非常理想的大型消息处理应用。使用场景如:网页浏览记录,日志收集,监控数据等等。
Kafka 的标准定位是分布式流式处理平台,早期的定位是以消息引擎的身份出现的,随着 Kafka 的不断演进, Kafka 开发团队日益发现经 Kafka 交由下游数据处理平台做的事情 Kafka 自己也可以做,因此在 Kafka 0.10.0.0 版本正式推出了 Kafka Streams ,即流式处理组件 。自 Kafka 正式成为了 个流式处理框架,而不仅仅是消息引擎了。
如图所示,Kafka的工作流程为
- 生产者发送消息到Kafka集群
- 消费者从Kafka拉取消息
- Kafka依赖于Zookeeper处理服务的协调
Kafka快速安装(windows)
第一步:下载kafka ,https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
下载后解压,由于Kafka 使用 ZooKeeper作为服务协调工具, 如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器,Kafka 内置提供了 ZooKeeper 服务器以及 组相关的管理脚本,我们直接使用这个内置ZooKeeper 即可,进入到bin\\windows 目录,执行如下列命令(windows):
zookeeper-server-start.bat ../../config/zookeeper.properties
zookeeper.properties是作为zookeeper的配置文件,比如你想修改zookeeper的默认端口通过配置文件修 clientPort=2181项即可 ,如下:
如果是在linux启动,cd到bin目录,执行下面命令
zookeeper-server-start.sh ../config/zookeeper.properties
启动效果如下
对于运行 Kafka 言,至少要求安装Java 7版本 ,Zookeeper的端口是2181接下来我们启动 Kafka 务器,进入到bin\\windows 目录,执行如下列命令(windows):
kafka-server-start.bat ../../config/server.properties
server.properties作为kafka的配置文件,我们关注三个配置,你也可以根据情况进行修改
broker.id=0 #如果要集群部署,broker.id不能重复
advertised.listeners=PLAINTEXT://127.0.0.1:9092 #kafka的地址和端口
zookeeper.connect=127.0.0.1:2181 #zookeeper的地址和端口
启动效果如下
控制台输出结尾处的“ Kafka Server O], started ,标志 Kafka 服务器启动成功,默认的服务端口是 9092
Kafka快速入门
下面我们基于SpringBoot来快速入门Kafka,做一个发送消息和接收消息的案例
第一步:搭建SpringBoot工程,导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
第二步,创建启动类,yml配置如下
spring:
application:
name: application-kafka
kafka:
bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-consumer-group #消费者的ID,这个对应 config/consumer.properties中的group.id
第三步:创建测试类,发送消息, 通过 注入 KafkaTemplate 发送
@SpringBootTest(classes = KafkaApplication.class)
@RunWith(SpringRunner.class)
public class ProducerTest
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Test
public void send() throws InterruptedException
//发送消息:第一个参数是topic,第二个参数是消息的内容。Topic我们可以理解为对消息的分类
kafkaTemplate.send("topic-hello", "你好kafka");
System.out.println("发送完成");
Thread.sleep(2000);
第四步:创建消费者,接收消息, 通过 :@KafkaListener(topics = “topic”) 监听topic中的消息
@Component
public class HelloConsumer
@KafkaListener(topics = "topic-hello")
public void handler(String message)
System.out.println("收到消息:"+message);
第五步:使用命令 创建Topic,否则会找不到Topic
创建名为“topic-hello”的TOPIC , 进入 bin/windows目录,cmd执行:
kafka-topics.bat --create --zookeeper localhost:2181 --topic topic-hello --partitions 1 --replication-factor 1
执行测试方法,查看控制台效果
文章结束,希望对你有所帮助,喜欢的话点赞收藏加评论哦
以上是关于一.Kafka入门到精通-Kafka快速入门的主要内容,如果未能解决你的问题,请参考以下文章
三.Kafka入门到精通-SpringBoot整合Kafka(上)