java版本的Kafka消息写入与读取

Posted guoyansi19900907

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java版本的Kafka消息写入与读取相关的知识,希望对你有一定的参考价值。

安装zookeeper:  https://www.cnblogs.com/guoyansi19900907/p/9954864.html

并启动zookeeper

安装kafka https://www.cnblogs.com/guoyansi19900907/p/9961143.html

并启动kafka.

1.创建maven  java项目

2.添加依赖

 <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

3.创建生产者:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {

    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.put("bootstrap.servers","192.168.123.128:9092");

        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Arrays.asList("gys"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }
    }
}

 

4.创建消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {

    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.put("bootstrap.servers","192.168.123.128:9092");

        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Arrays.asList("gys"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }
    }
}

5.先运行消费者,然后会出现一个监听的控制台,运行生产者。

以上是关于java版本的Kafka消息写入与读取的主要内容,如果未能解决你的问题,请参考以下文章

kafka消费者java版本读取不到消息怎么办

java设计模式书籍,全套教学资料

storm集成kafka的应用,从kafka读取,写入kafka

Flume 读取RabbitMq消息队列消息,并将消息写入kafka

java程序员日常工作内容,震撼来袭免费下载!

clickhouse与kafka集成