Kafka 高级API 实战

Posted 大数据小码农

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 高级API 实战相关的知识,希望对你有一定的参考价值。

1. 环境

CDH 5.16.1
kafka版本 2.1.0-kafka-4.0.0

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>2.1.0-kafka-4.0.0</version>
    </dependency>

2.生产者

2.1 生产者,带回调函数

package com.monk.kafka;

import com.monk.utils.PropertiesUtil;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

/**
 * @author wu ning
 * @className: KafkaProducer
 * @description: TODO
 * @date 2019/12/15 15:21
 */
public class NewKafkaProducer extends Thread {
    private String topic;
    private String bootstrapServer = PropertiesUtil.getProperties("bootstrap.servers");

    private KafkaProducer<Integer, String> producer;

    public NewKafkaProducer(String topic) {
        this.topic = topic;

        Properties prop = new Properties();

        prop.put("bootstrap.servers", bootstrapServer);
        prop.put("acks", "1");
        // key序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(prop);
    }

    @Override
    public void run() {
        int i = 1;

        while (true) {
            ProducerRecord producerRecord = new ProducerRecord<Integer, String>(
                    PropertiesUtil.getProperties("kafka.topic"),
                    "new_kafkaproducer ==> " + i);
            producer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception != null){
                        System.out.println("发送失败");
                    }else {
                        System.out.println("offset:" + metadata.offset());
                        System.out.println("partition:" + metadata.partition());
                    }
                }
            });
            i++;
        }
    }
}

2.2 生产者,自定义分区

1.实现Partitioner接口,重写里面的方法
package com.monk.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @author wu ning
 * @className: CustomPartitioner
 * @description: TODO
 * @date 2019/12/15 18:31
 */
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
2.在代码中指定自定义分区类
//指定自定义分区
prop.put("partitioner.class", "com.monk.kafka.CustomPartitioner");
package com.monk.kafka;

import com.monk.utils.PropertiesUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author wu ning
 * @className: NewKafkaProducerWithPartitioner
 * @description: TODO
 * @date 2019/12/15 18:33
 */
public class NewKafkaProducerWithPartitioner extends Thread {

    private String topic;
    private String bootstrapServer = PropertiesUtil.getProperties("bootstrap.servers");

    private KafkaProducer<Integer, String> producer;

    public NewKafkaProducerWithPartitioner(String topic) {
        this.topic = topic;

        Properties prop = new Properties();

        prop.put("bootstrap.servers", bootstrapServer);
        prop.put("acks", "1");
        // key序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //指定自定义分区
        prop.put("partitioner.class", "com.monk.kafka.CustomPartitioner");

        producer = new KafkaProducer<>(prop);
    }


    @Override
    public void run() {

        for(int i = 1;i<= 10;i++){

            ProducerRecord<Integer, String> producerRecord =
                    new ProducerRecord<>(topic, "NewKafkaProducerWithPartitioner ==> " + i);

            producer.send(producerRecord,(metadata,exception) ->{
                if(exception != null){
                    System.out.println("发送失败");
                }else {
                    System.out.println("offset:" + metadata.offset());
                    System.out.println("partition:" + metadata.partition());
                }
            });
        }

        producer.close();
    }
}

3. 消费者

3.1 消费者,简单模式

package com.monk.kafka;

import com.monk.utils.PropertiesUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * @author wu ning
 * @className: NewKafkaConsumer
 * @description: TODO
 * @date 2019/12/15 18:48
 */
public class NewKafkaConsumer extends Thread {
    private String topic;

    private String bootstrapServer = PropertiesUtil.getProperties("bootstrap.servers");

    private KafkaConsumer<Integer,String> kafkaConsumer;

    public NewKafkaConsumer(String topic){
        this.topic = topic;

        Properties props = new Properties();

        props.put("bootstrap.servers", bootstrapServer);
        //消费者组ID
        props.put("group.id", "test1");

        //设置自动提交offset
        props.put("enable.auto.commit", "true");
        //设置自动提交offset的延时(可能会造成重复消费的情况)
        props.put("auto.commit.interval.ms", "1000");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        //key-value的反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        kafkaConsumer = new KafkaConsumer<Integer, String>(props);
    }

    @Override
    public void run() {
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        while(true){
            //间隔100毫秒,从topic拉取消息
            ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);

            for(ConsumerRecord record:records){
                System.out.println("==>  " + record.value());
            }
        }
    }
}

3.2 消费者,消费指定partition,定位offset

//消费指定分区
kafkaConsumer.assign(Collections.singleton(topicPartition));

//指定偏移量
kafkaConsumer.seek(topicPartition,3985);
package com.monk.kafka;

import com.monk.utils.PropertiesUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Properties;

/**
 * @author wu ning
 * @className: NewKafkaConsumerWithPartition
 * @description: TODO
 * @date 2019/12/15 19:14
 */
public class NewKafkaConsumerWithPartition extends Thread{
    private String topic;

    private String bootstrapServer = PropertiesUtil.getProperties("bootstrap.servers");

    private KafkaConsumer<Integer,String> kafkaConsumer;

    public NewKafkaConsumerWithPartition(String topic){
        this.topic = topic;

        Properties props = new Properties();

        props.put("bootstrap.servers", bootstrapServer);
        //消费者组ID
        props.put("group.id", "test11");

        //设置自动提交offset
        props.put("enable.auto.commit", "true");
        //设置自动提交offset的延时(可能会造成重复消费的情况)
        props.put("auto.commit.interval.ms", "1000");

        //key-value的反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        kafkaConsumer = new KafkaConsumer<Integer, String>(props);
    }

    @Override
    public void run() {
        TopicPartition topicPartition = new TopicPartition(topic, 0);

        //消费指定分区
        kafkaConsumer.assign(Collections.singleton(topicPartition));

        //指定偏移量
        kafkaConsumer.seek(topicPartition,3985);


        while(true){
            //间隔100毫秒,从topic拉取消息
            ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);

            for(ConsumerRecord record:records){
                System.out.println("==>  " + record.value());
                System.out.println("partition ==> " + record.partition());
                System.out.println("offset ==> " + record.offset());
            }
        }
    }
}

以上是关于Kafka 高级API 实战的主要内容,如果未能解决你的问题,请参考以下文章

Springboot集成kafka高级应用实战

Express实战 - 应用案例- realworld-API - 路由设计 - mongoose - 数据验证 - 密码加密 - 登录接口 - 身份认证 - token - 增删改查API(代码片段

大数据云计算高级实战Hadoop,Flink,Spark,Kafka,Storm,Docker高级技术大数据和Hadoop技能

Kafka消息队列大数据实战教程-第四篇(Kafka客户端Producer API)

Kafka消息队列大数据实战教程-第四篇(Kafka客户端Producer API)

Kafka - 消费接口分析