kafkaAPI使用

Posted 不疯魔难成活

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafkaAPI使用相关的知识,希望对你有一定的参考价值。

相关依赖

<!-- Kafka 依赖包 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>

 

一个简单的Kafka生产者一般步骤如下:

创建 Properties 对象,设置生产者级别配置。以下3个配置是必须指定的。

(1)

  bootstrap.servers 配置连接 Kafka 代理列表,不必包含 Kafka 集群所有的代理地址,当 连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能 够成功连上 Kafka 集群 在多代理集群的情况下建议至少配置两个代理。

  key.serializer :配置用于序列化消息 Key 的类。

  value.serializer :配置用于序列化消息实际数据的类。

(2)根据 Properties 对象实例化一个 KafkaProducer 对象。

(3)实例化 ProducerRecord 对象, 每条消息对应一个 ProducerRecord 对象。

(4)调用 KafkaProducer 发送消息的方法将 ProducerRecord 发送到 Kafka 相应节点。 Kafka提供了两个发送消息的方法,即 send(ProducerRecord <String,String> record 方法和sendσroducerRecord<string,string> record,Callback callback)方法,带有回调函数的 send() 方法要实现 org.apache kafka.clients.producer Callback 接口。如果消息发送发生异常, Callback 接口的 onCompletion会捕获到相应异常。 KafkaProducer 默认是异步发送消息, 会将消息缓存到消息缓冲区中,当消息 在消息缓冲区中累计到一定数量后作为一个 RecordBatch 再发迭。生产者发送消息实质分两个阶段:第一阶段是将消息发送到消息缓冲区;第二阶段是 Sender 线程负责将缓冲区的消息发送到代理,执行真正的I/O操作,而在第一阶段执行完后就返回一个Future 象,根据对Future对象处理方式的不同,KafkaProducer 支持两种发送消息方式。

 

 demo完整网址:

链接:https://pan.baidu.com/s/1xpXXESCBsn7jjGOJQ8X9Eg 密码:17h5

  

 

以上是关于kafkaAPI使用的主要内容,如果未能解决你的问题,请参考以下文章

kafkaAPI使用

一文详解Kafka API

95-10-160-启动-KafkaApis

一文详解Kafka API

Kafka Consumer API示例

真香!kafkaapi获取topic的groupid