消息队列之kafka(API)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列之kafka(API)相关的知识,希望对你有一定的参考价值。

1.模拟实现kafka的生产者消费者(原生API)

解决相关依赖:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.0</version>
</dependency>

生产者:

packagecom.zy.kafka;

importjava.util.Properties;

importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
    publicstaticvoidmain(String[] args) {
        //1.加载配置文件
        //1.1封装配置文件对象
        Properties prps=newProperties();
        //配置broker地址
        prps.put("bootstrap.servers", "hadoop02:9092");
        //配置ack级别:0 1 -1(all)
        prps.put("acks", "all");
        //重试次数
        prps.put("retries", 3);

        prps.put("batch.size", 16384);
        prps.put("linger.ms",1);
        prps.put("buffer.memory", 33554432);

        //指定(message的K-V)的序列化
        prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //2.创建生产者对象(指定的key和value的泛型)
        Producer<String, String>producer=new KafkaProducer<>(prps);
        //生产者发送消息
        for(inti=0;i<100;i++) {
            /**
             * ProducerRecord<String, String>(topic, value)
             * topic:主题名称
             * key:
             * value:
             */
            //消息的封装对象
            ProducerRecord<String, String>pr=newProducerRecord<String, String>("test_topic", "key"+i, "value"+i);
            producer.send(pr);
        }
producer.close();
    }
}

消费者:

packagecom.zy.kafka;

importjava.util.Arrays;
importjava.util.Properties;

importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
    publicstaticvoidmain(String[] args) {
        //1.加载配置文件
        //1.1封装配置文件对象
        Properties prps=newProperties();
        //配置broker地址
        prps.put("bootstrap.servers", "hadoop02:9092");
        //指定消费的组的ID
        prps.put("group.id", "test");
        //是否启动自动提交(是否自动提交反馈信息,向zookeeper提交)
        prps.put("enable.auto.commit", "true");
        //自动提交的时间间隔
        prps.put("auto.commit.interval.ms", "1000");

        //指定(message的K-V)的序列化
        prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //创建kafka的消费者
        KafkaConsumer<String, String>consumer=newKafkaConsumer<>(prps);
        //添加消费主题
        consumer.subscribe(Arrays.asList("kafka_test"));
        //开始消费
        while(true) {
            //设置从哪里开始消费,返回的是一个消费记录
            ConsumerRecords<String, String>poll = consumer.poll(10);
            for(ConsumerRecord<String, String>p:poll) {
                System.out.printf("offset=%d,key=%s,value=%s
",p.offset(),p.key(),p.value());
            }
        }
    }
}

2.以shell命令的方式API

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import kafka.admin.TopicCommand;

public class KafkaAPI {
    public static void main(String[] args) throws IOException {
        /* 
            kafka-topics.sh             --create             --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181             --replication-factor 3             --partitions 10             --topic kafka_test11
         */
        //创建一个topic
        String ops[]=new String []{
            "--create",
            "--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181",
            "--replication-factor","3",
            "--topic","zy_topic","--partitions","5"
        };
        String list[]=new String[] {
                "--list",
                "--zookeeper",
                "hadoop01:2181,hadoop02:2181,hadoop03:2181"
        };
        //以命令的方式提交
        TopicCommand.main(list);
    }
}

以上是关于消息队列之kafka(API)的主要内容,如果未能解决你的问题,请参考以下文章

消息队列kafka java API, 新版旧版消费代码

消息队列之zeroMQ、rabbitMQ、kafka

消息队列MQ 之 Kafka

消息队列之Kafka从入门到小牛

SpringBoot开发案例之整合Kafka实现消息队列

消息队列之kafka(核心架构)