kafka生产者和消费者的具体交互以及核心参数详解
Posted 身前一尺是我的世界
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka生产者和消费者的具体交互以及核心参数详解相关的知识,希望对你有一定的参考价值。
目录
buffer.memory&&batch.size&&linger.ms
enable.auto.commit&&auto.commit.interval.ms
目标
- 本文以kafka3.1版本为环境讲解生产者和消费者之间的交互;
- 熟悉kafka生产者和消费者的核心参数。
实战
生产者发送消息
相关依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.1-SNAPSHOT</version>
</dependency>
生产者发送消息代码
package com.kafka;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* KAFKA生产者官方文档:
* https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
*/
public class MyProducer
//如果是集群,则用逗号分隔。
public static final String KAFKA_BROKER_LIST="kafka服务地址:端口";
//主题
public static final String TOPIC_NAME = "liNingShoesTopic";
public static void main(String[] args) throws Exception
new MyProducer().synSend();
/**
* 同步发送消息:发送成功以后才发下一条消息。
*/
public void synSend() throws ExecutionException, InterruptedException
Properties props = new Properties();
//向kafka服务器发消息,多个服务器用逗号隔开。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//网络传输,所以要把发送的数据的、key转成字节数组的格式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//网络传输,所以要把发送的数据的value转成字节数组的格式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 3; i++)
Future<RecordMetadata> send = producer.send(new ProducerRecord<String, String>(TOPIC_NAME,"这里是我发送的key。", "这里是我发送的内容。"));
RecordMetadata recordMetadata = send.get();
System.out.println("主题="+recordMetadata.topic()+";偏移量="+recordMetadata.offset()+";分区="+recordMetadata.partition());
producer.close();
/**
* 异步发送消息
* 注意:异步发送消息要注意发送消息的过程中,producer还没有关闭。
*/
public void asySend() throws ExecutionException, InterruptedException
Properties props = new Properties();
//向kafka服务器发消息,多个服务器用逗号隔开。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//网络传输,所以要把发送的数据的、key转成字节数组的格式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//网络传输,所以要把发送的数据的value转成字节数组的格式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
int msgNum = 3;
//因为是异步发送,为了确保消息还没发送完producer就被关闭,所以用了CountDownLatch计数。
CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 0; i < msgNum; i++)
producer.send(new ProducerRecord<String, String>(TOPIC_NAME, "这里是我发送的key。", "这里是我发送的内容。"), new Callback()
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception)
if (exception != null)
System.err.println("发送消息失败:" + exception.getStackTrace());
if (recordMetadata != null)
System.out.println("主题="+recordMetadata.topic()+";偏移量="+recordMetadata.offset()+";分区="+recordMetadata.partition());
countDownLatch.countDown();
);
countDownLatch.await(5, TimeUnit.SECONDS);
producer.close();
/**
* 指定分区发送消息。
* 如果不指定分区,则默认通过hash运算和对分区总数取模来确定分区。
* org.apache.kafka.clients.producer.internals.DefaultPartitioner.class
* public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions)
* return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
*
*/
public void choosePartitionSend() throws ExecutionException, InterruptedException
Properties props = new Properties();
//向kafka服务器发消息,多个服务器用逗号隔开。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//网络传输,所以要把发送的数据的、key转成字节数组的格式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//网络传输,所以要把发送的数据的value转成字节数组的格式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 3; i++)
//我这里指定分区1,把消息发送到分区1。
Future<RecordMetadata> send = producer.send(new ProducerRecord<String, String>(TOPIC_NAME,1,"这里是我发送的key。", "这里是我发送的内容。"));
RecordMetadata recordMetadata = send.get();
System.out.println("主题="+recordMetadata.topic()+";偏移量="+recordMetadata.offset()+";分区="+recordMetadata.partition());
producer.close();
消费者消费消息
相关依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.1-SNAPSHOT</version>
</dependency>
消费者消费消息代码
package com.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
/**
* KAFKA消费者官方文档:
* https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
*/
public class MyConsumer
//主题
public static final String TOPIC_NAME = "liNingShoesTopic";
//消费者组
public static final String GROUP_ID_CONFIG = "liNingGroup";
//如果是集群,则用逗号分隔。
public static final String KAFKA_BROKER_LIST="kafka服务地址:端口";
public static void main(String[] args)
new MyConsumer().assign();
/**
* 消费消息后自动偏移提交
*/
public void autoCommitOffset()
Properties props = new Properties();
//kafka服务器地址和端口
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//设置消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
//如果为true,则将在后台定期提交使用者的偏移量。
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//如果开启了自动提交偏移量功能,则每隔1秒提交一次偏移量。
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//将key和value反序列化
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅给定的主题列表以获取动态分配的分区。
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true)
/*
* poll()是拉取消息的长轮询,我这里设置0.1秒循环一次。主要作用:
* 1.判断消费者是否还活着;
* 2.消费者会存活在自己所在的消费者组中,并且持续的消费指定的分区的消息。
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.println("偏移量=" + record.offset() + ";" + "key=" + record.key() + ";" + "value=" + record.value());
/**
* 手动同步提交偏移量
*/
public void manualCommitSync()
Properties props = new Properties();
//kafka服务器地址和端口
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//设置消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
//如果为true,则将在后台定期提交使用者的偏移量。
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//如果开启了自动提交偏移量功能,则每隔1秒提交一次偏移量。
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//将key和value反序列化
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅给定的主题列表以获取动态分配的分区。
consumer.subscribe(Arrays.asList(TOPIC_NAME));
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true)
//100毫秒循环一次。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.println("偏移量=" + record.offset() + ";" + "key=" + record.key() + ";" + "value=" + record.value());
buffer.add(record);
//每当消费消息>=5就提交一次偏移量。
if (buffer.size() >= 5)
System.out.println("提交偏移量。");
//一般可以在这里加try,在catch里面重复提交。
consumer.commitSync();
buffer.clear();
/**
* 手动异步提交偏移量
*/
public void manualCommitAsy()
Properties props = new Properties();
//kafka服务器地址和端口
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//设置消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
//如果为true,则将在后台定期提交使用者的偏移量。
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//如果开启了自动提交偏移量功能,则每隔1秒提交一次偏移量。
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//将key和value反序列化
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅给定的主题列表以获取动态分配的分区。
consumer.subscribe(Arrays.asList(TOPIC_NAME));
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true)
//100毫秒循环一次。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.println("偏移量=" + record.offset() + ";" + "key=" + record.key() + ";" + "value=" + record.value());
buffer.add(record);
//每当消费消息>=5就提交一次偏移量。
if (buffer.size() >= 5)
System.out.println("提交偏移量。");
consumer.commitAsync(new OffsetCommitCallback()
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e)
//e!=null表示异步提交失败,此时可以再次提交。
//生产上一般用同步提交。
if(e!=null)
System.out.println("异步提交消息失败。");
);
buffer.clear();
/**
* 对订阅的主题轮询按照分区的粒度来消费,手动设置并提交偏移量。
*/
public void manualCommitOffsetByPartition()
Properties props = new Properties();
//kafka服务器地址和端口
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//设置消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
//如果为true,则将在后台定期提交使用者的偏移量。
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//如果开启了自动提交偏移量功能,则每隔1秒提交一次偏移量。
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//将key和value反序列化
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅给定的主题列表以获取动态分配的分区。
consumer.subscribe(Arrays.asList(TOPIC_NAME));
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
try
while(true)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
//拉取消息以后根据消息获取所有分区。
for (TopicPartition partition : records.partitions())
//获取给定分区的记录
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
//循环输出每个分区的记录
for (ConsumerRecord<String, String> record : partitionRecords)
System.out.println(record.offset() + ": " + record.value());
//最终消费到的偏移量
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//提交指定主题和分区列表的指定偏移量。
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
finally
consumer.close();
/**
* 指定分区来消费消息
* 指定分区有哪些有点?
* 官网给出的评价:消费者失败不会导致分配的分区重新平衡。每个消费者独立行动,即使它与另一个消费者共享一个 groupId。
*/
public void assign()
Properties props = new Properties();
//kafka服务器地址和端口
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//设置消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
//如果为true,则将在后台定期提交使用者的偏移量。
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//如果开启了自动提交偏移量功能,则每隔1秒提交一次偏移量。
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//将key和value反序列化
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//手动将分区列表分配给此使用者。
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
while (true)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.println("偏移量=" + record.offset() + ";" + "key=" + record.key() + ";" + "value=" + record.value());
/**
* 消费限定时间内,指定的分区消息。这里我设定从一个小时内开始消费消息。
*/
public void assignByTime()
Properties props = new Properties();
//kafka服务器地址和端口
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//设置消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
//如果为true,则将在后台定期提交使用者的偏移量。
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//如果开启了自动提交偏移量功能,则每隔1秒提交一次偏移量。
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//将key和value反序列化
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从指定时间点开始消费
Map<TopicPartition, Long> map = new HashMap<>();
//设置从前一小时产生的数据开始消费。毫秒时间戳-一小时的时间戳。
Long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
for (PartitionInfo par : topicPartitions)
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
//按时间戳查找给定分区的偏移量。
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
List<TopicPartition> topicPartitionList=new ArrayList<>();
parMap.forEach((k,v)->
if(v!=null)
System.out.println("一个小时之内:分区"+k.partition()+"有消息,偏移量="+v.offset());
else
System.out.println("一个小时之内:分区"+k.partition()+"没有消息。");
topicPartitionList.add(k);
);
//手动将分区列表分配给此使用者。
consumer.assign(topicPartitionList);
while (true)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.println("偏移量=" + record.offset() + ";" + "key=" + record.key() + ";" + "value=" + record.value());
/**
* 指定分区从头开始消费。
*/
public void consumeFromBeginning()
Properties props = new Properties();
//kafka服务器地址和端口
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//设置消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
//将key和value反序列化
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从指定时间点开始消费
Map<TopicPartition, Long> map = new HashMap<>();
//手动将分区列表分配给此使用者。
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
while (true)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.println("偏移量=" + record.offset() + ";" + "key=" + record.key() + ";" + "value=" + record.value());
/**
* 指定分区,指定偏移量消费。
*/
public void consumeFromOffset()
Properties props = new Properties();
//kafka服务器地址和端口
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
//设置消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
//将key和value反序列化
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从指定时间点开始消费
Map<TopicPartition, Long> map = new HashMap<>();
//手动将分区列表分配给此使用者。
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
//指定0分区从第20个偏移量开始消费,即跳过0至19的偏移量开始消费。
consumer.seek(new TopicPartition(TOPIC_NAME,0),20);
while (true)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.println("偏移量=" + record.offset() + ";" + "key=" + record.key() + ";" + "value=" + record.value());
生产者核心参数
acks
官网文档
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0
If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and theretries
configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to-1
.acks=1
This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.acks=all
This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.Note that enabling idempotence requires this config value to be 'all'. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.
Type: string Default: all Valid Values: [all, -1, 0, 1] Importance: low
When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.
Type: int Default: 1 Valid Values: [1,...] Importance: high Update Mode: cluster-wide
官网文档释义
acks=all或者acks=-1
此时要看min.insync.replicas(最小同步副本数量,默认等于1)参数设定的值。例如:min.insync.replicas=2,表示需要有2个kafka节点写入数据才会返回客户端提示成功。推荐大于等于2。
acks=0
生产者不需要等待kafka节点回复确认收到消息,就可以继续发送下一条消息。
acks=1
消息写入Leader(主节点),但是不需要等待其他Follower(从节点)同步消息,就可以继续发送下一条消息。如果Follower没有同步到消息,而Leader宕机,则消息丢失。这种模式等同于acks=all或者acks=-1,且min.insync.replicas=1。
综上所述
- acks=all或者acks=-1,且min.insync.replicas>1时安全性最高,但发送消息的效率差。适用于金融业务。
- acks=0时发送消息效率最高,但安全性最差。适用不重要的日志。
- acks=1时介于两者之间。
实现
props.put(ProducerConfig.ACKS_CONFIG,"1");
注意:如果acks=all或者acks=-1,则需要在kafka的server.properties配置文件中设置min.insync.replicas的值。
retries&&retry.backoff.ms
官网文档
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Produce requests will be failed before the number of retries has been exhausted if the timeout configured by
delivery.timeout.ms
expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead usedelivery.timeout.ms
to control retry behavior.Enabling idempotence requires this config value to be greater than 0. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.
Allowing retries while setting
enable.idempotence
tofalse
andmax.in.flight.requests.per.connection
to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
Type: int Default: 2147483647 Valid Values: [0,...,2147483647] Importance: high
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
Type: long Default: 100 Valid Values: [0,...] Importance: low
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires
max.in.flight.requests.per.connection
to be less than or equal to 5 (with message ordering preserved for any allowable value),retries
to be greater than 0, andacks
must be 'all'.Idempotence is enabled by default if no conflicting configurations are set. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting configurations are set, a
ConfigException
is thrown.
Type: boolean Default: true Valid Values: Importance: low
官网文档释义
retries
生产者向消费者发送消息失败后允许重试的次数,重试次数范围:[0,2147483647]; 如果要开启重试,需要将enable.idempotence设置为false或1。
retry.backoff.ms
重试间隔次数。
实现
/**
* 重试3次,注意:可能会因为网络延迟问题导致发送消息成功但回复客户端不及时的问题,所以会导致消息重复发送,
* 因此需要在消费者端控制重复消费的问题。
*/
props.put(ProducerConfig.RETRIES_CONFIG,3);
//重试间隔:500毫秒
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,500);
注意:需要在kafka的server.properties配置文件中设置enable.idempotence为false或1,且acks必须为all或者-1。
buffer.memory&&batch.size&&linger.ms
The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for
max.block.ms
after which it will throw an exception.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.
Type: long Default: 33554432 Valid Values: [0,...] Importance: high
The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
No attempt will be made to batch records larger than this size.
Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.
Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated for this partition, we will 'linger' for the
linger.ms
time waiting for more records to show up. Thislinger.ms
setting defaults to 0, which means we'll immediately send out a record even the accumulated batch size is under thisbatch.size
setting.
Type: int Default: 16384 Valid Values: [0,...] Importance: medium
The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record, the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get
batch.size
worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Settinglinger.ms=5
, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.
Type: long Default: 0 Valid Values: [0,...] Importance: medium
官网文档释义
buffer.memory
消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
batch.size
kafka本地线程会从缓冲区取数据批量发送,默认值是16384,即16kb/次。
linger.ms
默认值是0,即消息必须立即被发送,但这样会影响性能一般设置100毫秒左右,如果100毫秒内,如果100毫秒内,batch没满,也必须把消息发送出去。
实现
//消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//kafka本地线程会从缓冲区取数据批量发送,默认值是16384,即16kb/次。
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//默认值是0,即消息必须立即被发送,但这样会影响性能一般设置100毫秒左右,如果100毫秒内,如果100毫秒内,batch没满,也必须把消息发送出去。
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
消费者核心参数
enable.auto.commit&&auto.commit.interval.ms
官网文档
If true the consumer's offset will be periodically committed in the background.
Type: boolean Default: true Valid Values: Importance: medium
The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if
enable.auto.commit
is set totrue
.
Type: int Default: 5000 (5 seconds) Valid Values: [0,...] Importance: low
官网文档释义
enable.auto.commit
如果设置为true,则消费者的偏移量自动提交会以一定的频率自动提交。默认值为true。
auto.commit.interval.ms
如果enable.auto.commit=true,则该配置生效,其值表示消费者自动提交的频率,单位毫秒。默认事件是5秒。
实现
//如果为true,则将在后台定期提交使用者的偏移量。
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//如果开启了自动提交偏移量功能,则每隔1秒提交一次偏移量。
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
消费者丢失消息的情况:假如设置自定提交,时间间隔是2秒。如果消费者拿到消息后继续处理业务一共用了2秒钟还没有完成,此时处理业务的服务器宕机。此时消费者偏移量已经自动提交了,而消息却没有消费完。下次消费时就消费不到该消息了。
消费者重复消费的情况:假如设置自定提交,时间间隔是2秒。如果消费者1秒钟已经处理完了业务,此时消费者并没有来得及提交偏移量服务器就宕机了。下次消费时仍然会消费这条已经消费过的消息。
综上所属:
- 一般用手动提交,即设置enable.auto.commit=false。
- 一般用同步提交的方式,即处理完业务后再提交。
消费者核心方法
assign
官网文档
assign
public void assign(Collection<TopicPartition> partitions)
Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment and will replace the previous assignment (if there is one).If the given list of topic partitions is empty, it is treated the same as unsubscribe().
Manual topic assignment through this method does not use the consumer's group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change. Note that it is not possible to use both manual partition assignment with assign(Collection) and group assignment with subscribe(Collection, ConsumerRebalanceListener).
If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new assignment replaces the old one.
Specified by:
assign
in interfaceConsumer<K,V>
Parameters:
partitions
- The list of partitions to assign this consumerThrows:
IllegalArgumentException
- If partitions is null or contains null or empty topics
IllegalStateException
- Ifsubscribe()
is called previously with topics or pattern (without a subsequent call to unsubscribe())See Also:
官网文档释义
- 该方法手动分配主题不使用消费者的组管理功能;
- 当组成员或集群和主题元数据发生变化时,不会触发重新平衡操作;
- 如果启用了自动提交,则在新分配替换旧分配之前将触发异步提交(基于旧分配);
- 不能和subscribe方法共存;
- 当组成员或集群和主题元数据发生变化时,不会触发重新平衡操作。
subscribe
官网文档
subscribe
public void subscribe(Collection<String> topics)
Subscribe to the given list of topics to get dynamically assigned partitions. Topic subscriptions are not incremental. This list will replace the current assignment (if there is one). It is not possible to combine topic subscription with group management with manual partition assignment through assign(Collection). If the given list of topics is empty, it is treated the same as unsubscribe().This is a short-hand for subscribe(Collection, ConsumerRebalanceListener), which uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer subscribe(Collection, ConsumerRebalanceListener), since group rebalances will cause partition offsets to be reset. You should also provide your own listener if you are doing your own offset management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
Specified by:
subscribe
in interfaceConsumer<K,V>
Parameters:
topics
- The list of topics to subscribe toThrows:
IllegalArgumentException
- If topics is null or contains null or empty elements
IllegalStateException
- Ifsubscribe()
is called previously with pattern, or assign is called previously (without a subsequent call to unsubscribe()), or if not configured at-least one partition assignment strategySee Also:
官网文档释义
- 订阅给定的主题列表以获取动态分配的分区;
- 不能和assign同时使用;
- 通常用另一个有监听器的重载方法实现寻找偏移量的功能;
- 会发生重组平衡,重组平衡会导致分区偏移量被重置,监听器可以实现在重组平衡前完成偏移量的提交。
以上是关于kafka生产者和消费者的具体交互以及核心参数详解的主要内容,如果未能解决你的问题,请参考以下文章