Kafka1.0X之基础API的使用
Posted 笨小孩撸代码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka1.0X之基础API的使用相关的知识,希望对你有一定的参考价值。
今天在部署Kafka1.0.1的时候,发现Kafka的很多API结果有所调整,所以基于1.0.1版本开发了一个基本的producer和consumer
安装和之前基本类似,没有很多区别,需要注意的是很大地方需要填写主机名不要填写ip了
记得需要修改window的hosts文件,将主机名配置进去
测试
生产端
package xlucas.kafka;//依赖的API也变化import org.apache.kafka.clients.producer.*;import java.util.Properties;/** * Created by Xlucas on 2018/3/20. */public class Kafka_Producer {
public static void main(String[] args){ //这个是用来配置kafka的参数
Properties prop=new Properties(); //这里不是配置broker.id了,这个是配置bootstrap.servers
prop.put("bootstrap.servers","cdh2:9092,cdh3:9092"); //下面是分别配置 key和value的序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //这个地方和1.0X之前的版本有不一样的,这个是使用kafkaproducer 类来实例化
Producer<String, String> producer=new KafkaProducer<>(prop); for (int i = 0; i < 100; i++) { //这个send 没有什么变化
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
消费端
package xlucas.kafka;//依赖的API也变化
import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;/** * Created by Xlucas on 2018/3/20. */public class Kafka_Consumer {
public static void main(String [] args){
//这个是用来配置kafka的参数
Properties props=new Properties();
//这里不是配置zookeeper了,这个是配置bootstrap.servers
props.put("bootstrap.servers","cdh2:9092,cdh3:9092");
//这个配置组,之前我记得好像不配置可以,现在如果不配置那么不能运行
props.put("group.id","test");
//序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String,String> consumer =new KafkaConsumer<String, String>(props);
//配置topic
consumer.subscribe(Arrays.asList("test"));
while (true) {
//这里是得到ConsumerRecords实例
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//这个有点好,可以直接通过record.offset()得到offset的值
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
创建的test topic的信息
[hadoop@cdh5 bin]$ ./kafka-topics.sh --topic test --describe --zookeeper cdh3:2181,cdh4:2181,cdh5:2181Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,4 Isr: 2,3,4
Topic: test Partition: 2 Leader: 3 Replicas: 3,4,1 Isr: 3,4,1
测试我们可以看到打印出这样的结果
以上是关于Kafka1.0X之基础API的使用的主要内容,如果未能解决你的问题,请参考以下文章