Kafka Consumer API示例

Posted eugene0

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Consumer API示例相关的知识,希望对你有一定的参考价值。

既然翻到这里,默认就认为已经基本掌握了Kafka的基础知识,本小结主要给出一次使用Kafka Consumer API的示例。我们都知道Kafka API有旧版(0.8版之前)和新版(0.9版之后),这里讲的是新版,官网KafkaConsumer有更详细介绍,可自行前往~

1 环境配置

  • 操作系统: Ubuntu 16.04
  • kafka_2.11-0.10.2.2
  • JDK: 1.8.0_181
  • IntelliJ IDEA Maven
  • VNC

2 操作过程

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kafka.test</groupId>
    <artifactId>test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>maven-kafka</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.2</version>
        </dependency>
        <dependency>
            <groupId><org.apache.kafka/groupId>
            <artifactId>kafka_2.11</artifactId>
            <version><0.10.2.2/version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>org.xerial.snappy</groupId>
            <artifactId>snappy-java</artifactId>
            <version>1.1.2.6</version>
        </dependency>

    </dependencies>

</project>

kafka consumer消费数据,未SASL认证,这里的代码只能消费生产者正在推送的数据:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;


public class KafkaConsumerTest 
    public static void main(String[] args) throws Exception 

        // Kafka consumer configuration settings
        String topicName = "XXXX";
        Properties props = new Properties();
        
        props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
        props.put("group.id", "XXXX");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("offsets.storage", "kafka");

        // 要发送自定义对象,需要指定对象的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")
        final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
        consumer.subscribe(Arrays.asList(topicName));

        while (true) 
            ConsumerRecords<String, Object> records = consumer.poll(100);
            for (ConsumerRecord<String, Object> record : records) 
                System.out.println(record.value());
            
        

    

kafka consumer消费数据,SASL认证,可以消费历史数据:

import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;


public class KafkaConsumerTest 
    public static void main(String[] args) throws Exception 

      
        String topicName = "XXXX";
        Properties props = new Properties();



        System.setProperty("java.security.auth.login.config", "/opt/kafka/kafka1/kafka_2.11-0.10.2.2/config/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
        props.put("group.id", "XXXX");        
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("offsets.storage", "kafka");
        props.put("max.poll.records",1000);

        // 要发送自定义对象,需要指定对象的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")

        final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
        consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() 
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) 

            

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) 
                Map<TopicPartition,Long> beginningOffset = consumer.beginningOffsets(collection);

                //--from-beginning
                for(Map.Entry<TopicPartition,Long> entry : beginningOffset.entrySet())
                    //seekToBeginning()
                    consumer.seekToBeginning(collection);
                

            
        );

        while (true) 
            ConsumerRecords<String, Object> records = consumer.poll(20000);
            for (ConsumerRecord<String, Object> record : records) 
                System.out.println(record.value());
            
        


    

另外,选择把kafka作为消息的中间件,主要是拿到数据持久化到本地或者HDFS待分析挖掘出重要的信息,可以使用Sparkstreaming存到HDFS,这里给出从控制台信息存到了本地磁盘。

import java.io.*;


public class IO2File 
    public static void main(String[] args) throws IOException
        File f = new File("out.json");
        f.createNewFile();
        FileOutputStream fileOutputStream = new FileOutputStream(f);
        PrintStream printStream = new PrintStream(fileOutputStream);
        System.setOut(printStream);
        System.out.println("xxxxxxx out.json");//  结合上面的代码 直接把kafka消息的信息打印到控制台 然后存到磁盘
    

有时间再把Kafka基本原理 存储 配置信息 SASL授权 Spark都总结出来。

以上是关于Kafka Consumer API示例的主要内容,如果未能解决你的问题,请参考以下文章

使用 Kafka Avro Console Consumer 时如何为特定的 Schema 注册表传递参数?

学习笔记Kafka—— Kafka Consumer API及开发实例

学习笔记Kafka—— Kafka Consumer API及开发实例

学习笔记Kafka—— Kafka Consumer API及开发实例

阿里大牛实战总结归纳—Kafka架构原理

Kafka多维度系统精讲之-Kafka核心API——Consumer学习笔记(重要)