Kafka 2.8.0 JAVA API基本使用
Posted 没有Job的Steve
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 2.8.0 JAVA API基本使用相关的知识,希望对你有一定的参考价值。
以下测试皆在windows下进行,请根据自己情况酌情配置kafka zookeeper等环境
本人使用的是jdk11,代码中可能存在jdk9的新特性,使用jdk9以前的jdk的朋友请自行转换
kafka环境变量等暂时略过
1.java导入依赖
<!--导入kafka依赖-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.0</version>
</dependency>
kafka Producer
导入相关依赖后,创建测试类ProducerDemo;
-
创建生产者对象
使用KafkaProducer 创建kafka生产者对象,这时可以发现kafka不允许我们使用空构造来创建对象;
那么我们就选用传入properties的方式创建kafka生产者
创建生产者的时候,跟控制台命令一样,我们需要指定集群名称以及序列化器,而这些相关设置都会存储在我们的配置文件中;
kafka给我们提供了ProducerConfig类,并在其中已经给我们提前准备好了我们所需要的key,在向properties中put键值时,可以直接使用producerConfig的静态常量作为key;并传入相应value
-
向kafka中发送信息
使用kafkaProducer向kafka中发送信息,可以使用其提供的send()方法 ;使用时可以看到其需要传入ProducerRecord以及一个可选的Callback
ProducerRecord: 即为每条数据所封装成的对象
CallBack:可选;获取函数的回调
-
close()
在真实生产环境中,我们可能不需要手动调用close方法关闭kafkaProducer,但是目前的测试阶段,如果不使用close关闭,可能会导致发送的信息在设置等待的时间内,不会被真正的发送;
流在关闭的时候会对数据进行回收操作
/**
* 描述:kafkaProducer生产者
*
* <pre>
* HISTORY
* ****************************************************************************
* ID DATE PERSON REASON
* 1 2021/8/10 23:14 Bambi Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class ProducerPartitionerDemo01 {
public static void main(String[] args) {
Properties properties = new Properties();
//自行修改为对应的集群地址 kafka默认为9092,此处我没有更改
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//需要传入序列化器的全类名,kafka需要通过反射全类名去获取序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
for (int i = 0; i < 10; i++) {
//使用callBack收集回调信息,使用了lamdba表达式
kafkaProducer.send(new ProducerRecord("此处使用自己存在的主题","value"),((metadata, exception) ->{
if(exception==null){
System.out.println("没有错误,数据添加成功");
}
} ));
}
//关闭
kafkaProducer.close();
}
}
自定义分区器
如果想要自己根据业务需求编写自定义的分区规则,可以自定义分区器;
说到自定义,就势必需要去实现某个接口或者继承某个类
这里, 我们需要实现的是kafka给我们的Partitioner接口,实现后重写方法
/**
* 描述: 自定义分区器
*
* <pre>
* HISTORY
* ****************************************************************************
* ID DATE PERSON REASON
* 1 2021/8/10 22:24 Bambi Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class MyPartitioner implements Partitioner {
/**
* 编写分区规则
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//根据业务需求编写分区规则
return 0;
}
@Override
public void close() {
}
/**
* 读取配置信息
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {
}
}
在编写规则时可以参考Kafka对Partitioner的默认实现 DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
//如果key也不存在,则会对可用分区进行轮询
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
//如果没有指定分区,且存在key值,则会根据key的hash进行取模来选择分区
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
实现同步发送
正常情况下kafka生产者发送信息采用的是异步发送的方式,主线程将信息发送给共享变量 RecordAccumulator ,Sender线程不同的从共享变量中拉取数据发送到broker上;
实现逻辑
在两个线程其中一个执行的时候去阻塞另一个线程,实现串行
我们可以发现kafkaProducer的send()方法是存在返回值 Future 的;
而我们知道,当future对象调用get()方法时,不仅会获得当前线程回调的对象,还会阻塞当前线程
我们便使用这个方法来实现同步发送
同步发送的使用场景相对较少,我们可以使用同步发送来确保区内有序,即当上一条信息发送后,未接收到ack之前,阻塞发送线程,不继续发送,从而实现有序
消费者API
编写消费者api的逻辑与生产者十分的相像,使用kafka提供的 KafkaConsumer 来创建消费者对象
并在配置文件中传递对应信息,可以使用ConsumerConfig中的静态属性充当key值
/**
* 描述:kafka消费者
*
*
* <pre>
* HISTORY
* ****************************************************************************
* ID DATE PERSON REASON
* 1 2021/8/11 0:21 Bambi Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
//连接的集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//生产者需要指定序列化器,那么消费者就需要指定对应的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交的延迟,提交的是消费者的offset
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroup01");
//创建消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
//订阅主题
//此处可以添加多个集群
//可以看到这里没有返回值,也就是说,这里只是单纯的指定了主题,如果想获取主题中的信息,需要使用别的方法
kafkaConsumer.subscribe(List.of("你的主题"));
while (true){
//获取的类型与Producer类似,不过为ConsumerRecords类,想要得到单个数据,需要遍历输出
//新版本建议使用传入Duration的方式,直接传入毫秒数的方式以过时
ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ZERO);
consumerRecords.forEach(stringStringConsumerRecord -> {
//可以看到,使用consumerRecord去调用方法的时候,可以获取到Key,所以key并不只是用来划分分区之用,如果没有指定key,会输出null
System.out.println(stringStringConsumerRecord.key()+":"+stringStringConsumerRecord.value()+" :"+stringStringConsumerRecord.offset());
});
}
//consumer进行订阅拉去信息的时候不需要手动关闭,因为顺序执行完毕后,jvm会关闭;所以可以使用一个while循环来持续消费
}
}
启动消费者,会发现我们可以连接到对应的主题,但是不会获取到先前已经存在的信息;
在我们使用控制台调用消费者时,如果我们想获取该分区已经存在的信息,我们可以使用 --from-beginning指令将offset放到最前端从头获取;
java api中也是一样;
kafka中 命令行能做的事情,在配置文件中应该都有相关的配置
我们进入ConsumerConfig,可以查看到其已经给我们提供了 AUTO_OFFSET_RESET_CONFIG这个属性;
根据下方的doc描述,该属性默认值为lastest,这也是我们为什么在不设置的时候会无法获取已存在信息的原因,我们可以手动在配置文件中传入
earierlast
此处注意这个指定的生效条件:
-
只有当当前消费者/消费者组第一次消费(即还没有offset时),或当前的offset在这个server中不存在时,指令才会生效
这里解释一下为什么会不存在,kafka的数据默认时7天清空一次,如果我们拿着已经清空的数据的offset去寻找数据,就会出现offset在server中不存在的现象,此时AUTO_OFFSET_RESET_CONFIG就会生效
关于offset的手动提交
我们为什么需要手动提交? 自动提交无法保证准确的提交时机
如果设置的提交延时过短,会丢是数据
如果设置的延时过长,会导致数据重复
1.在配置文件中关闭自动提交
既然我们需要手动提交,则必然需要在配置文件中将自动提交置为false
ENABLE_AUTO_COMMIT_CONFIG,<----将它改成false
-
在消费结束后进行手动提交
使用consumer的 commitSync() 同步提交,或commitAsync() 异步提交
-
commitSyn:
相比于异步提交,因为其提交offset时自带失败重试的机制,相对更加可靠
-
commitAsync:
同步提交相对可靠,但是会阻塞当先线程,影响吞吐量;
在大多数情况下,我们会选用异步提交的方式
-
自定义存储offset
手动提交虽然可以解决丢是数据的问题,但是仍然会存在数据重复的现象;
kafka也早已考虑到这种情况,所以允许我们自定义存储offset的规则,(比如我们可以和mysql的写入操作进行事务绑定...)
但是相对于自定义分区器,自定义存储offset要相对麻烦一些;在0.9版本之后,kafka会将offset暂存在kafka内置的一个主题中,想要去维护一个offset,就需要考虑到消费者的Rebalance问题
即,如果当前消费者所消费的分区挂掉了,消费者需要转移到另一分区去消费,此时的offset需要定位到这个分区最近提交的offset
为此,我们需要实现kafka提供的ConsumerRebalanceListener
/**
* 描述: 自定义存储Offset
*
* <pre>
* HISTORY
* ****************************************************************************
* ID DATE PERSON REASON
* 1 2021/8/11 23:26 Bambi Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class ConsumerConfigOffset {
//创建一个Map在暂存当前offset
private static Map<TopicPartition,Long> currentOffset = new ConcurrentHashMap<>();
public static void main(String[] args) {
//配置文件较为冗长,我写了个工具类进行配置,相关配置内容已经提到过,就不再赘述
PropertiesUtils propertiesUtils = new PropertiesUtils();
Properties properties = propertiesUtils.ConsumerProperties("localhost:9092", "bambiOffset", "false", "100", 1);
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
//在此处创建ConsumerRebalanceListener类
kafkaConsumer.subscribe(List.of("solo1"), new ConsumerRebalanceListener() {
//在Rebalance之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
//在Rebalance之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
partitions.forEach(partition -> {
//定位到分区中最近的offset,继续消费
kafkaConsumer.seek(partition,getOffset(partition));
});
}
});
while (true){
ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofMillis(1000));
poll.forEach(consumerRecord ->{
System.out.printf("offset = %d %n",consumerRecord.offset());
System.out.printf("key = %s %n",consumerRecord.key());
System.out.printf("value = %s %n",consumerRecord.value());
//将下标缓存到offset中
currentOffset.put(new TopicPartition(consumerRecord.topic(),consumerRecord.partition()),consumerRecord.offset());
});
commitOffset(currentOffset);
}
}
/**
* 提交当前offset
* @param currentOffset
*/
private static void commitOffset(Map<TopicPartition , Long> currentOffset){
//处理异步提交的业务逻辑
}
//获取当前分区的offset
private static long getOffset(TopicPartition partition){
return 0;
}
}
以上是关于Kafka 2.8.0 JAVA API基本使用的主要内容,如果未能解决你的问题,请参考以下文章