消息队列之kafka(API)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列之kafka(API)相关的知识,希望对你有一定的参考价值。
1.模拟实现kafka的生产者消费者(原生API)
解决相关依赖:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.0</version>
</dependency>
生产者:
packagecom.zy.kafka;
importjava.util.Properties;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
publicstaticvoidmain(String[] args) {
//1.加载配置文件
//1.1封装配置文件对象
Properties prps=newProperties();
//配置broker地址
prps.put("bootstrap.servers", "hadoop02:9092");
//配置ack级别:0 1 -1(all)
prps.put("acks", "all");
//重试次数
prps.put("retries", 3);
prps.put("batch.size", 16384);
prps.put("linger.ms",1);
prps.put("buffer.memory", 33554432);
//指定(message的K-V)的序列化
prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2.创建生产者对象(指定的key和value的泛型)
Producer<String, String>producer=new KafkaProducer<>(prps);
//生产者发送消息
for(inti=0;i<100;i++) {
/**
* ProducerRecord<String, String>(topic, value)
* topic:主题名称
* key:
* value:
*/
//消息的封装对象
ProducerRecord<String, String>pr=newProducerRecord<String, String>("test_topic", "key"+i, "value"+i);
producer.send(pr);
}
producer.close();
}
}
消费者:
packagecom.zy.kafka;
importjava.util.Arrays;
importjava.util.Properties;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
publicstaticvoidmain(String[] args) {
//1.加载配置文件
//1.1封装配置文件对象
Properties prps=newProperties();
//配置broker地址
prps.put("bootstrap.servers", "hadoop02:9092");
//指定消费的组的ID
prps.put("group.id", "test");
//是否启动自动提交(是否自动提交反馈信息,向zookeeper提交)
prps.put("enable.auto.commit", "true");
//自动提交的时间间隔
prps.put("auto.commit.interval.ms", "1000");
//指定(message的K-V)的序列化
prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建kafka的消费者
KafkaConsumer<String, String>consumer=newKafkaConsumer<>(prps);
//添加消费主题
consumer.subscribe(Arrays.asList("kafka_test"));
//开始消费
while(true) {
//设置从哪里开始消费,返回的是一个消费记录
ConsumerRecords<String, String>poll = consumer.poll(10);
for(ConsumerRecord<String, String>p:poll) {
System.out.printf("offset=%d,key=%s,value=%s
",p.offset(),p.key(),p.value());
}
}
}
}
2.以shell命令的方式API
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import kafka.admin.TopicCommand;
public class KafkaAPI {
public static void main(String[] args) throws IOException {
/*
kafka-topics.sh --create --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 --replication-factor 3 --partitions 10 --topic kafka_test11
*/
//创建一个topic
String ops[]=new String []{
"--create",
"--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181",
"--replication-factor","3",
"--topic","zy_topic","--partitions","5"
};
String list[]=new String[] {
"--list",
"--zookeeper",
"hadoop01:2181,hadoop02:2181,hadoop03:2181"
};
//以命令的方式提交
TopicCommand.main(list);
}
}
以上是关于消息队列之kafka(API)的主要内容,如果未能解决你的问题,请参考以下文章