无法通过java代码向kafka主题发送消息

Posted

技术标签:

【中文标题】无法通过java代码向kafka主题发送消息【英文标题】:Not able to send messages to kafka topic through java code 【发布时间】:2016-11-10 15:01:52 【问题描述】:

我正在使用卡夫卡。这是我的代码,我想向 kafka 服务器发送消息,主题名称为“west”,消息为“message1”。虽然我没有看到主题中发送的消息,但我没有收到任何错误在这里?

class SimpleProducer 

  public static void main(String[] args) throws Exception       
    Properties props = new Properties();
    props.put("bootstrap.servers","172.xxxxxxxxx:9092");
    props.put("serializer.class", "kafka.serializer.DefaultEncoder");
    props.put("acks", "1");
    props.put("retries", 1);
    props.put("batch.size", 16384);
    props.put("linger.ms", 0);
    props.put("client.id", "foo");
    props.put("buffer.memory", 33554432);
    props.put("timeout.ms", "500");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "500"); 
    props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");

    System.out.println("ready to send msg");

    try 
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        producer.send(new ProducerRecord<String, String>("west","message1"));

        System.out.println("Message sent successfully");
        producer.close();
    
    catch(Exception e)
    
        System.out.println("Messgae doesn't sent successfully");
        e.printStackTrace();

    
  

【问题讨论】:

您如何检查写入并提交给代理的消息? 到目前为止,我正在将消息发送到某个特定主题并使用以下命令进行检查:- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic-名字——从头开始 我正在编写一个自定义生产者来将我的消息从 Java 获取到主题。调用显示的 main 不会导致有关 Kafka 主题的消息,也不会打印任何错误消息。有人知道为什么消息没有到达我的主题吗? 【参考方案1】:

您用来发送消息的 API 是异步的。使用有两个参数的 send() 形式。第二个参数是一个回调,您可以使用它来查看发送是否真的有效或某处是否存在错误。

【讨论】:

【参考方案2】:
   producer.send(yourRecord,
                 new Callback() 
                     public void onCompletion(RecordMetadata metadata, Exception e) 
                         if(e != null) 
                            e.printStackTrace();
                          else 
                            System.out.println("The offset of the record we just sent is: " + metadata.offset());
                         
                     
                 );

【讨论】:

请不要提供仅是代码的答案。解释应该如何使用这段代码以及为什么它可以解决问题。

以上是关于无法通过java代码向kafka主题发送消息的主要内容,如果未能解决你的问题,请参考以下文章

预验证发送到 Kafka 主题的消息

无法使用 kafka-node 向 kafka Producer 发送消息

如何从同一生产者向不同的 Kafka 主题和模式注册表生成消息

发送到 kafka 主题时序列化消息时出错

Java程序员面试必备——kafka的专业术语

java客户端向单机版kafka发送消息没有接收到