kafka系列kafka常用java API
Posted 不积小流,无以成江海!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka系列kafka常用java API相关的知识,希望对你有一定的参考价值。
引入maven包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.0</version> </dependency>
一、同步发送消息
package com.example.demo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SynProducer { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("acks", "all"); props.put("retries", 2); props.put("batch.size", 16384); props.put("linger.ms", 1000); props.put("buffer.memory", 33554432); props.put("client.id", "producer-syn-1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } public static void main(String[] args) { KafkaProducer<String, String> producer = new KafkaProducer<>(getProps()); for(int i=0; i< 1000; i++){ ProducerRecord<String, String> record = new ProducerRecord<>("test-1", "topic_"+i,"test-"+i); Future<RecordMetadata> metadataFuture = producer.send(record); RecordMetadata recordMetadata = null; try { recordMetadata = metadataFuture.get(); System.out.println("发送成功!"); System.out.println("topic:"+recordMetadata.topic()); System.out.println("partition:"+recordMetadata.partition()); System.out.println("offset:"+recordMetadata.offset()); } catch (InterruptedException|ExecutionException e) { System.out.println("发送失败!"); e.printStackTrace(); } } } }
以上是关于kafka系列kafka常用java API的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Kafka:❤️Kafka的java API编写❤️