Kafka使用Java客户端进行访问

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka使用Java客户端进行访问相关的知识,希望对你有一定的参考价值。

添加maven依赖包

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka_2.10</artifactId>  
    <version>0.8.2.0</version>  
</dependency>

建立包结构

  建立包结构如下图所示为例:

技术分享

  在log4j.properties中输入:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

生产者代码

技术分享
 1 package com.juyun.kafka;
 2 
 3 import java.util.Properties;
 4 
 5 import org.apache.log4j.PropertyConfigurator;
 6 
 7 import kafka.javaapi.producer.Producer;  
 8 import kafka.producer.KeyedMessage;  
 9 import kafka.producer.ProducerConfig;  
10 import kafka.serializer.StringEncoder;  
11   
12 public class KafkaProducerExample extends Thread {  
13     private String topic;  
14       
15     public KafkaProducerExample(String topic){  
16         super();  
17         this.topic=topic;  
18     }  
19       
20     @Override  
21     public void run() {  
22         Producer<Integer, String> producer=CreateProducer();  
23         for (int i = 1; i < 10; i++) {  
24             String message="message"+i;  
25             producer.send(new KeyedMessage<Integer, String>(topic, message)); // 调用producer的send方法发送数据  
26             System.out.println("发送:"+message);  
27             try {  
28                 sleep(1000);  
29             } catch (InterruptedException e) {  
30                 e.printStackTrace();  
31             }  
32         }  
33     }  
34       
35     public Producer<Integer, String> CreateProducer(){  
36         Properties props=new Properties();  
37         props.setProperty("zookeeper.connect", "172.16.0.157:2181"); // 与zookeeper建立连接  
38         props.setProperty("serializer.class", StringEncoder.class.getName()); // key.serializer.class默认为serializer.class
39         props.setProperty("metadata.broker.list", "172.16.0.157:9092"); // kafka broker对应的主机,格式为host1:port1,host2:port2
40         props.put("request.required.acks","1"); // 等待topic中某个partition leader保存成功的状态反馈
41         Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props)); // 通过配置文件,创建生产者  
42         return producer;  
43     }  
44       
45     public static void main(String[] args){
46         PropertyConfigurator.configure("C:/Users/juyun/workspace/Kafka/src/main/java/com/juyun/logs/log4j.properties"); // 加载.properties文件
47         new KafkaProducerExample("test").start(); // 输入topic,启动线程  
48     }  
49       
50 }  
KafkaProducerExample.java

消费者代码

技术分享
 1 package com.juyun.kafka;
 2 
 3 import java.util.HashMap;  
 4 import java.util.List;  
 5 import java.util.Map;  
 6 import java.util.Properties;
 7 
 8 import org.apache.log4j.PropertyConfigurator;
 9 
10 import kafka.consumer.Consumer;  
11 import kafka.consumer.ConsumerConfig;  
12 import kafka.consumer.ConsumerIterator;  
13 import kafka.consumer.KafkaStream;  
14 import kafka.javaapi.consumer.ConsumerConnector;  
15   
16   
17 public class KafkaConsumerExample extends Thread{  
18     private String topic; 
19       
20     private KafkaConsumerExample(String topic) {  
21         super();  
22         this.topic=topic;  
23     }  
24       
25     @Override  
26     public void run() {  
27         ConsumerConnector consumer = createConsumer(); // 创建消费者连接  
28         Map<String,Integer> topicCountMap=new HashMap<String, Integer>(); // 定义一个map  
29         topicCountMap.put(topic, 1);  
30         // Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流  
31         Map<String, List<KafkaStream<byte[], byte[]>>> MessageStreams = consumer.createMessageStreams(topicCountMap);  
32         // 取出 topic1对应的 streams
33         KafkaStream<byte[], byte[]> kafkaStream = MessageStreams.get(topic).get(0);
34         // 迭代获取到的流
35         ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
36         while (iterator.hasNext()) {  
37             String message = new String(iterator.next().message());  
38             System.out.println("接收到:"+message);  
39         }  
40     }  
41       
42     public ConsumerConnector createConsumer(){  
43         Properties properties = new Properties();  
44         properties.setProperty("zookeeper.connect", "172.16.0.157:2181"); 
45         properties.put("zookeeper.connectiontimeout.ms", "6000");
46         properties.setProperty("group.id", "group1"); // 设置这个消费者所在的group
47         // 只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
48         ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
49         return createJavaConsumerConnector;  
50     }  
51       
52     public static void main(String[] args) { 
53         PropertyConfigurator.configure("C:/Users/juyun/workspace/Kafka/src/main/java/com/juyun/logs/log4j.properties"); // 加载.properties文件
54         new KafkaConsumerExample("test").start();  
55     }  
56 }  
KafkaConsumerExample.java

执行程序

  需要先启动zookeeper

#进入到Zookeeper的bin目录下
cd /opt/zookeeper-3.4.8/bin
#启动服务
./zkServer.sh start

  再启动Kafka

#进入到Kafka安装目录
bin/kafka-server-start.sh config/server.properties

  并可以同时在命令终端启动生产者和消费者进行检测

#启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic
#启动消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic --from-beginning

 

以上是关于Kafka使用Java客户端进行访问的主要内容,如果未能解决你的问题,请参考以下文章

kafka安装和使用远程代码进行访问 ---附踩坑记录

Kafka Kerberos客户端访问

如何在kubernetes上构建kafka集群后公开kafka以进行外部访问?

Java客户端访问Kafka

Java客户端访问Kafka

MySQL系列:kafka停止命令