Kafka1.0X之基础API的使用

Posted 笨小孩撸代码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka1.0X之基础API的使用相关的知识,希望对你有一定的参考价值。

今天在部署Kafka1.0.1的时候,发现Kafka的很多API结果有所调整,所以基于1.0.1版本开发了一个基本的producer和consumer 
安装和之前基本类似,没有很多区别,需要注意的是很大地方需要填写主机名不要填写ip了

记得需要修改window的hosts文件,将主机名配置进去

测试

生产端

package xlucas.kafka;//依赖的API也变化import org.apache.kafka.clients.producer.*;import java.util.Properties;/** * Created by Xlucas on 2018/3/20. */public class Kafka_Producer {
    public static void main(String[] args){        //这个是用来配置kafka的参数
        Properties prop=new Properties();        //这里不是配置broker.id了,这个是配置bootstrap.servers
        prop.put("bootstrap.servers","cdh2:9092,cdh3:9092");        //下面是分别配置 key和value的序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        //这个地方和1.0X之前的版本有不一样的,这个是使用kafkaproducer 类来实例化
        Producer<String, String> producer=new KafkaProducer<>(prop);        for (int i = 0; i < 100; i++) {            //这个send 没有什么变化
            producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));

        }
            producer.close();
    }
}

消费端

package xlucas.kafka;//依赖的API也变化
import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;/** * Created by Xlucas on 2018/3/20. */public class Kafka_Consumer {
        public static void main(String [] args){
            //这个是用来配置kafka的参数
            Properties props=new Properties();
            //这里不是配置zookeeper了,这个是配置bootstrap.servers
            props.put("bootstrap.servers","cdh2:9092,cdh3:9092");
            //这个配置组,之前我记得好像不配置可以,现在如果不配置那么不能运行
            props.put("group.id","test");
            //序列化
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            Consumer<String,String> consumer =new KafkaConsumer<String, String>(props);
            //配置topic
            consumer.subscribe(Arrays.asList("test"));
            while (true) {
                //这里是得到ConsumerRecords实例
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    //这个有点好,可以直接通过record.offset()得到offset的值
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
}
 
   
   
 

创建的test topic的信息

[hadoop@cdh5 bin]$ ./kafka-topics.sh --topic test --describe --zookeeper cdh3:2181,cdh4:2181,cdh5:2181Topic:test      PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: test     Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: test     Partition: 1    Leader: 2       Replicas: 2,3,4 Isr: 2,3,4
        Topic: test     Partition: 2    Leader: 3       Replicas: 3,4,1 Isr: 3,4,1

测试我们可以看到打印出这样的结果


以上是关于Kafka1.0X之基础API的使用的主要内容,如果未能解决你的问题,请参考以下文章

GraphQL 响应类型/片段之争

Java基础之方法的调用重载以及简单的递归

openGL之API学习(一六七)默认着色器 顶点属性索引 别名索引

java基础之二十一-;Stream api

DOM探索之基础详解——学习笔记

Android组件之Fragment(一)---基础知识与运用