消费者消费消息分析

Posted lisin-lee-cooper

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消费者消费消息分析相关的知识,希望对你有一定的参考价值。

消费者者实例代码

public class KafkaConsumerClient 
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig()
        Properties props = new Properties();
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("client.id", "consumer.client.id.demo");
        return props;
    

    public static void main(String[] args) 
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        try 
            while (isRunning.get()) 
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) 
                    System.out.println("topic = " + record.topic() 
                            + ", partition = "+ record.partition() 
                            + ", offset = " + record.offset());
                    System.out.println("key = " + record.key()
                            + ", value = " + record.value());
                    //do something to process record.
                
            
         catch (Exception e) 
            log.error("occur exception ", e);
         finally 
            consumer.close();
        
    

订阅主题和分区

public void subscribe(Collection<String> topics, 
    ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)

消费者消息的类型对象

public class ConsumerRecord<K, V> 
    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private volatile Long checksum;
	

位移自动提交,拉取的一批消息没有消费完成发生异常会丢失消息;
手动提交,拉取的一批消息没有消费完成发生异常,最前面的消息重新消费的时候会重复消费。

以上是关于消费者消费消息分析的主要内容,如果未能解决你的问题,请参考以下文章

kafka原理分析

kafka源码分析 消费消息

消费者消费消息分析

源码分析RocketMQ消息消费机制----消费者拉取消息机制

34 生产案例:从RocketMQ底层原理分析为什么会发生重复发优惠券

AWS SQS:当消费者发生错误时移动到死信队列