Kafka生产者开发

Posted 笨小孩撸代码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka生产者开发相关的知识,希望对你有一定的参考价值。

主要介绍kafka生产者的基本使用,同步异步的操作

package xlucas.kafka;import java.util.Properties;import java.util.concurrent.ExecutionException;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.IntegerSerializer;import org.apache.kafka.common.serialization.StringSerializer;/** * Created by Xlucas on 2018/6/4. */public class KafkaProducerDemo {
    public static void main(String[] args){        //消息发送是异步还是同步发送
        boolean isAsync=args.length==0||!args[0].trim().equalsIgnoreCase("sync");
        Properties props=new Properties();        //kafka服务端的主机名和端口号
        props.put("bootstrap.servers","slx-PC:9092");        //kafka客户端的id
        props.put("client.id","kafkaproducer");        //消息的key和value都是字节数组,为了将java对象转化为字节数组,可以配置key.serializer和value.serializer两个序列化,完成转化
        props.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");        //生产者的核心类
        KafkaProducer producer1 = new KafkaProducer<>(props);        //向指定的topic发送消息
        String topic="test";        int messageNo=1;//定义key
        while(true){
            String messageStr="Message_"+messageNo;//消息的value
            long startTime=System.currentTimeMillis();            if(isAsync){//异步消息
                //第一个参数是ProducerRecord 类型的对象,封装了目标topic、消息的key和value
                //第二个参数是一个CallBack对象,当生产者接收到Kafka发来的ACK确认消息的时候,会调用此CallBack对象的onCompletion()方法,实现回调功能
                producer1.send(new ProducerRecord<>(topic,messageNo,messageStr),new ProducerCallBack(startTime,messageNo,messageStr));
            }else{//同步发送
                try {                    //KafkaProducer.send()方法的返回值类型是Future<RecordMetadata>
                    //这里通过Future.get()方法,阻塞当前线程,等待kafka服务端的ACK响应
                    producer1.send(new ProducerRecord<>(topic,messageNo,messageStr)).get();
                    System.out.println("sent message "+messageNo+", "+messageStr);
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;//递增消息的key
        }

    }
}

class ProducerCallBack implements Callback{//回调函数
    private final long startTime;//开始发送消息的时间戳
    private final int key;//消息的key
    private  final String message;//消息的value
    public ProducerCallBack(long startTime,int key ,String message){        this.startTime=startTime;        this.key=key;        this.message=message;
    }    //生产者成功发送消息,收到kafka服务端发来的ACK确认消息后,会调用此回调函数,
    //recordMetadata 生产者发送的消息的元数据,  如果发送过程中出现了异常,此参数是null
    //e 发送过程中出现的异常,如果发送成功,则此参数为null
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {        long elapsedTime=System.currentTimeMillis()-startTime;        if(recordMetadata!=null) {            //recordMetadata 这个中间包含了分区信息,offset等信息
            System.out.println(message + "send to partition " + recordMetadata.partition() + " offset " + recordMetadata.offset() + " " + elapsedTime);
        }else{
            e.printStackTrace();
        }


    }
}
 
   
   
 



以上是关于Kafka生产者开发的主要内容,如果未能解决你的问题,请参考以下文章

Kafka基于Windows的Kafka有关环境搭建以及使用.NET环境开发的案例代码与演示...

Kafka生产者开发

Kafka生产者开发,原理分析,以及参数配置

kafka生产者开发方式

Kafka生产者——结合spring开发

kafka之消息生成者基本知识