kafka入门demo
Posted zgzf
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka入门demo相关的知识,希望对你有一定的参考价值。
1.引入jar
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
2.kafka producer
package com.xq.kafka; /** * @author duanxiaoqiu * @Date 2019-07-04 09:55 **/ import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer private static final String TOPIC="education-info"; private static final String BROKER_LIST="localhost:9092"; private static KafkaProducer<String,String> producer = null; static Properties configs = initConfig(); producer = new KafkaProducer<String, String>(configs); private static Properties initConfig() Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST); properties.put(ProducerConfig.ACKS_CONFIG,"all"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); return properties; public static void main(String[] args) try String message = "hello world"; ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC,message); producer.send(record, new Callback() @Override public void onCompletion(RecordMetadata metadata, Exception exception) if(null==exception) System.out.println("perfect!"); if(null!=metadata) System.out.print("offset:"+metadata.offset()+";partition:"+metadata.partition()); ).get(); catch (Exception e) e.printStackTrace(); finally producer.close();
3.consumer
package com.xq.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; /** * @author duanxiaoqiu * @Date 2019-07-04 09:56 **/ public class Consumer private static final String TOPIC="education-info"; private static final String BROKER_LIST="localhost:9092"; private static KafkaConsumer<String,String> kafkaConsumer = null; static Properties properties = initConfig(); kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Arrays.asList(TOPIC)); private static Properties initConfig() Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"test"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); return properties; public static void main(String[] args) try while(true) ConsumerRecords<String,String> records = kafkaConsumer.poll(100); for(ConsumerRecord record:records) try System.out.println(record.value()); catch(Exception e) e.printStackTrace(); catch(Exception e) e.printStackTrace(); finally kafkaConsumer.close();
以上是关于kafka入门demo的主要内容,如果未能解决你的问题,请参考以下文章
Storm入门经典文章:本地模式运行storm的demo 单机模式跑直一个Word Count & kafka to Storm