Kafka多维度系统精讲之-Kafka核心API——Consumer学习笔记(重要)

Posted 勇敢牛牛IT资源

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka多维度系统精讲之-Kafka核心API——Consumer学习笔记(重要)相关的知识,希望对你有一定的参考价值。

 Kafka的Consumer基础入门


/**
 * 工作里这种用法,有,但是不推荐
 */
private static void helloworld() 
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "true");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
    // 消费订阅哪一个Topic或者几个Topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while (true) 
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<String, String> record : records) 
            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
        
    

Kafka的Consumer手动提交offset

/**
 * 手动提交offset
 */
private static void commitedOffset() 
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "false");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
    // 消费订阅哪一个Topic或者几个Topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while (true) 
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<String, String> record : records) 
            // 想把数据保存到数据库,成功就成功,不成功...
            // TODO record 2 db
            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
            // 如果失败,则回滚, 不要提交offset
        

        // 如果成功,手动通知offset提交
        consumer.commitAsync();
    

Kafka的Consumer手动提交offset,并且手动控制partition

/**
 * 手动提交offset,并且手动控制partition
 */
private static void commitedOffsetWithPartition() 
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "false");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
    // 消费订阅哪一个Topic或者几个Topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while (true) 
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        // 每个partition单独处理
        for (TopicPartition partition : records.partitions()) 
            List<ConsumerRecord<String, String>> pRecord = records.records(partition);
            for (ConsumerRecord<String, String> record : pRecord) 
                System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                        record.partition(), record.offset(), record.key(), record.value());

            
            long lastOffset = pRecord.get(pRecord.size() - 1).offset();
            // 单个partition中的offset,并且进行提交
            Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
            offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
            // 提交offset
            consumer.commitSync(offset);
            System.out.println("=============partition - " + partition + " end================");
        
    

 

 

链接: https://pan.baidu.com/s/1F-ySQ8chot6ro9sAn7Re0A 提取码: ib0i 

以上是关于Kafka多维度系统精讲之-Kafka核心API——Consumer学习笔记(重要)的主要内容,如果未能解决你的问题,请参考以下文章

Kafka多维度系统精讲,从入门到熟练掌握

Kafka核心API——Connect API

Kafka核心API——Connect API

Kafka核心API——Consumer消费者

Kafka核心API——Consumer消费者

大数据日志传输之Kafka实战教程