kafka生产者java客户端

Posted weishao-lsv

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka生产者java客户端相关的知识,希望对你有一定的参考价值。

producer

  包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息。
  位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群。如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露。
 

常用配置

bootstrap.servers

 用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2; 

 

acks

生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;
acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。

 

retries

生产者发送失败后,重试的次数 

 


batch.size

当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。 

 


linger.ms

默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。
可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。

 

batch.size

batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。 

 


buffer.memory

生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。 

 

key.serializer,value.serializer

说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

 

Producer

  

public class Producer {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);  
    private KafkaProducer<String, String> kafkaProducer;  
    private Random random = new Random();  
    private String topic;  
    private int retry;  
    
    public Producer() {
        this("my_init");
    }
    
    public Producer(String topic) {
        this(topic,3);  
    }
    
    public Producer(String topic,int retry) {
         this.topic = topic;  
         this.retry = retry; 
         if (null == kafkaProducer) {  
             Properties props = new Properties();  
             InputStream inStream = null;  
             try {  
                 inStream = this.getClass().getClassLoader().getResourceAsStream("kafka-producer.properties");  
                 props.load(inStream);  
                 kafkaProducer = new KafkaProducer<String, String>(props);  
             } catch (IOException e) {  
                 LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);  
             } finally {  
                 if (null != inStream) {  
                     try {  
                         inStream.close();  
                     } catch (IOException e) {  
                         LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);  
                     }  
                 }  
             }  
         }  
    }
    
    /** 
     * 通过kafkaProducer发送消息 
     * @param topic 消息接收主题 
     * @param partitionNum  哪一个分区 
     * @param retry  重试次数 
     * @param message 具体消息值 
     */  
    public RecordMetadata sendKafkaMessage(final String message) {  
  
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", message);  
        Future<RecordMetadata> meta = kafkaProducer.send(record, new Callback() {  
//send方法是异步的,添加消息到缓存区等待发送,并立即返回,这使生产者通过批量发送消息来提高效率 public void onCompletion(RecordMetadata recordMetadata,Exception exception) { if (null != exception) { LOGGER.error("kafka发送消息失败:" + exception.getMessage(),exception); retryKakfaMessage(message); } } }); RecordMetadata metadata = null; try { metadata = meta.get(); } catch (InterruptedException e) { } catch (ExecutionException e) {} return metadata; } /** * 当kafka消息发送失败后,重试 */ private void retryKakfaMessage(final String retryMessage) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", retryMessage); for (int i = 1; i <= retry; i++) { try { kafkaProducer.send(record); return; } catch (Exception e) { LOGGER.error("kafka发送消息失败:" + e.getMessage(), e); retryKakfaMessage(retryMessage); } } } /** * kafka实例销毁 */ public void close() { if (null != kafkaProducer) { kafkaProducer.flush(); kafkaProducer.close(); } } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getRetry() { return retry; } public void setRetry(int retry) { this.retry = retry; } }

 

TestProducer

  

public class TestProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);  
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for(int i=0;i<3;i++){
            executor.submit(new Runnable() {
                @Override
                public void run() {
                     String topic = "2017-11-6-test";
                     Producer p = new Producer(topic);
                     for(int n=1;n<=5;n++){
                         String str = "hello world => "+n;
                         RecordMetadata message = p.sendKafkaMessage(str);
                         LOGGER.info("发送信息: "+message.topic()+"---"+message.partition()+"---"+message.offset());
                     }
                     p.close();
                }
            });
        }
        System.out.println("this is main");
        executor.shutdown();//这个表示 线程执行完之后自动退出
        System.out.println("hello world");
    }
}

 









以上是关于kafka生产者java客户端的主要内容,如果未能解决你的问题,请参考以下文章

Kafka的Java客户端-生产者

4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程

kafka消息中间件及java示例

kafka生产者java客户端

kafka基础篇——kafka生产者客户端

java面试笔试题代码,先收藏了