消费者消费消息分析
Posted lisin-lee-cooper
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消费者消费消息分析相关的知识,希望对你有一定的参考价值。
消费者者实例代码
public class KafkaConsumerClient
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig()
Properties props = new Properties();
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("client.id", "consumer.client.id.demo");
return props;
public static void main(String[] args)
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try
while (isRunning.get())
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
System.out.println("topic = " + record.topic()
+ ", partition = "+ record.partition()
+ ", offset = " + record.offset());
System.out.println("key = " + record.key()
+ ", value = " + record.value());
//do something to process record.
catch (Exception e)
log.error("occur exception ", e);
finally
consumer.close();
订阅主题和分区
public void subscribe(Collection<String> topics,
ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
消费者消息的类型对象
public class ConsumerRecord<K, V>
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
位移自动提交,拉取的一批消息没有消费完成发生异常会丢失消息;
手动提交,拉取的一批消息没有消费完成发生异常,最前面的消息重新消费的时候会重复消费。
以上是关于消费者消费消息分析的主要内容,如果未能解决你的问题,请参考以下文章
源码分析RocketMQ消息消费机制----消费者拉取消息机制