Kafka
Posted hanchaoyue
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka相关的知识,希望对你有一定的参考价值。
课程内容:
1. 简单的操作一下集群
2. 简单的介绍几个工具(企业)
3. Producer的原理(核心,重点)
4. 简单kafka的代码
5. 介绍里面的核心参数(重点)
========================
消费者原理
--replica-factor 2
--partitions 2我们一般设置分区数,建议是节点的倍数
=========================================
Producer的原理
*******************kafka************************************
topic:TopicA
多个分区
p0:leader parititon hdp1
p1:leader partition hdp2
需要把数据发送到leader partition
生产者,生产数据,需要把数据封装成ProducerRecord
①ProducerRecord
②序列化
③partitioner(获取元数据,找到一个broker就可以
知道有多少个分区,并清除哪个是leader partition,并把数据发送到哪)
存入
缓冲区***
④Sender (一个线程)从缓冲区取出消息,封装为一个批次(Batch)
Batch
Batch
Batch
--------------------------------------------
zookeeper
------------------------------------------------
broker(去zookeeper注册,选举controller)
controller(监听zookeeper元数据,动态变化,进行同步)
(hdp1)
去zookeeper同步元数据信息,(并分发给其他节点)
p0
-------------------------------------------------
broker(去zookeeper注册,选举controller)
(hdp2)
p1
---------------------------------------------------
//创建了一个配置文件的对象
Properties props = new Properties();
//这个参数,目的就是为了获取kafka集群的元数据
//我们写一个主机也行,写多个的话,更安全
//使用的是主机名,原因是server.properties里面填进去的是主机名,必须配置hosts文件
props.put("bootstrap.servers","hadoop1:9092,hadoop2:9092,hadoop3:9092");
//设置序列化器==》kafka的数据是用的网路传输的,所以里面都是二进制的数据
//我们发送消息的时候,默认的情况下就是发送一个消息就可以了
//但是你也可以给你的每条消息都指定一个key也是可以的
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
//调优的参数,后面解释
// acks
//-1: 如何判断一条消息发送成功?首先消息要写入leader partition,这些消息还需要被另外的所有的这个分区的副本同步了,才算发送成功
// 1: 发送的消息写到leader partition 就算写入成功,然后服务器端就返回响应就可以了,默认就是这个参数,有可能会丢数据
// 0: 消息主要发送出去了,就认为是成功的(允许丢数据,只是处理一些不重要的日志,不需要得到准确的数据)
// kafka里面的分区是有副本的,比如一个主题TopicA,这个主题有两个parittion,每个partition有三个副本
// p0: leader parittion ,follower parititon,follower partition
// p1: leader partition,follower partition,follower partition
props.put("acks","-1")
//重试次数,网络抖动(5-10次)
props.put("retries",3)
//每隔多久重试一次 2s
props.put("retry.backoff.ms",2000)
//提升消息吞入量
//设置压缩格式,lz4,
props.put("compression.type","lz4")
//适当增大缓冲区大小,32M(基本上这个参数不需要设置)
props.put("buffer.memory",33554432)
//批次大小,默认16k,这里设置32k;设置这个批次的大小 还跟我们的消息的大小有关
//假设一条消息1k==》设置100k
props.put("batch.size",323840)
//比如我们设置的一个批次的大小是32k,但是size没有满,无论如何到了这个时间都要把消息发送出去了
//默认是0,100ms
props.put("linger.ms",100)
//这个值,默认是1M,代表的是,生产者发送消息的时候,最大的一条消息(注意说的不是批次)
// byte,如果消息超过1M,程序会报错,可以设置为10M
props.put("max.request.size",1024*1024*10)
// 消息发送出去后,多久没有响应,默认为超时
// 如果网络不稳定,可以适当增大
props.put("request.timeout.ms",3000)
//创建生产者
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
//创建消息
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"xoxo","this_is_key","this_is_value")
//发送消息,
//异步发送:性能比较好,也是我们生产里面使用的方式
//同步发送:等到返回响应,发送下一条,性能不好,我们生产里面一般不用
producer.send(record,new Callback(){
public void onCompletion(RecordMetadata metadata,Exception exception){
if(exception == null){
sout("success")
} else {
sout("error")
}
}
})
憨
以上是关于Kafka的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Flink 1.14.0 消费 kafka 数据自定义反序列化器
kafkaThe group member needs to have a valid member id before actually entering a consumer group(代码片段
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段
Camel-Kafka java.io.EOFException - NetworkReceive.readFromReadableChannel