Kafka---生产者消费者API
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka---生产者消费者API相关的知识,希望对你有一定的参考价值。
1、生产者
package Kafka.stu;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @Author shall潇
* @Date 2021/5/20
* @Description
*/
public class MyProducer2 {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092"); //配置kafka端口
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //配置key序列化类型
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); //配置value序列化类型
properties.put(ProducerConfig.ACKS_CONFIG,"1");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); //加载配置
ProducerRecord<String, String> record = new ProducerRecord<>("secDemo", "456hello 122222"); //指定 主题 和 消息
producer.send(record); //发送消息
producer.close(); //关闭资源
}
}
循环从控制台输入
package Kafka.stu;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
/**
* @Author shall潇
* @Date 2021/5/20
* @Description
*/
public class MyProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop101:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
/*
* ack 应答机制
* 1(默认):这意味着producer在ISR中的leader已成功收到数据并得到确认。如果leader宕机了,则会丢失数据。
* 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
* -1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员由于某些情况会增加也会减少,最少就只剩一个leader),这样就变成了acks=1的情况
*/
properties.put(ProducerConfig.ACKS_CONFIG,"0");
String content="1";
while (content.equals("1")){
Scanner sc = new Scanner(System.in);
System.out.print("请输入内容:");
String s = sc.next();
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("secDemo", s);
producer.send(record);
System.out.print("是否退出:0:退出 1:继续发送");
content=sc.next();
}
}
}
2、消费者
package Kafka.stu;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
/**
* @Author zhangxiaoxiao
* @Date 2021/5/21
* @Description
*/
public class MyConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop103:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); //设置会话间隔
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //设置是否自动提交
// properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //设置自动提交时间间隔
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
/*
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
* */
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group_1"); //创建用户组
//创建单个用户
/*
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Collections.singleton("secDemo"));
while (true) {
ConsumerRecords<String, String> record = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> o : record) {
System.out.println(o.offset() + " " + o.key() + " " + o.value());
}
}
*/
//secDemo有四个分区,一个消费者组(group_1)中只有一个消费者,
//最优化的方式是在消费者组中,即有几个分区,就创建几个消费者
//设置自动提交后,同一个组的用户第一次可以取到,消费者提交过后,就取不到了,想要在取,更换用户组名,
//创建多个用户
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Collections.singleton("secDemo"));
while (true) {
ConsumerRecords<String, String> record = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> o : record) {
System.out.println(o.offset() + " " + o.key() + " " + o.value());
}
}
}
}).start();
}
}
}
以上是关于Kafka---生产者消费者API的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段