生成嵌入式 kafka 时出错

Posted

技术标签:

【中文标题】生成嵌入式 kafka 时出错【英文标题】:Error producing to embedded kafka 【发布时间】:2016-06-27 18:48:18 【问题描述】:

我正在尝试在我的代码中嵌入一个 kafkaserver。我使用下面的示例代码来尝试学习如何做到这一点,但由于某种原因,我的生产者无法向嵌入式服务器发送消息(它在 60 秒后超时)。我正在使用卡夫卡 0.8.2.2。谁能告诉我我做错了什么?

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.TopicMetadata; 
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.Utils;
import org.apache.commons.collections.functors.ExceptionPredicate;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Properties;

public class KafkaLocalBroker 

public static final String TEST_TOPIC = "test-topic";

public KafkaConfig kafkaConfig;
public KafkaServer kafkaServer;
public TestingServer zookeeper;


public KafkaLocalBroker() throws Exception

        zookeeper = new TestingServer(true);
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper.getConnectString());
        props.put("broker.id", 0);
        kafkaConfig = new KafkaConfig(props);

        kafkaServer = new KafkaServer(kafkaConfig, new Time() 
            public long nanoseconds() 
                return System.nanoTime();
            

            public long milliseconds() 
                return System.currentTimeMillis();
            

            public void sleep(long ms) 
                try 
                    Thread.sleep(ms);
                 catch(InterruptedException e)
                    // Do Nothing
                
            
        );
        kafkaServer.startup();
        System.out.println("embedded kafka is up");
    

    public void stop()
        kafkaServer.shutdown();
        System.out.println("embedded kafka stop");
    

    /**
     * a main that tests the embedded kafka
     * @param args
     */
    public static void main(String[] args) 

    KafkaLocalBroker kafkaLocalBroker = null;
        //init kafka server and start it:
        try 
            kafkaLocalBroker = new KafkaLocalBroker();
         catch (Exception e)

        
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 1);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //send one message to local kafka server:
        for (int i=0; i<10; i++)
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(TEST_TOPIC, "test-message" + i);
            producer.send(data, (metadata, exception) -> 
                if (exception != null) 

                    System.out.println("Failed to write log message: " + exception.getMessage());

                 else 
                    System.out.println("Successful write to offset  in partition  on topic : " +
                            metadata.offset() + ", " + metadata.partition() + ", "+ metadata.topic());

                
            );
        

        //consume messages from Kafka:
        SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "clientId");
        long offset = 0L;
        while (offset < 160)  //this is an exit criteria just for this test so we are not stuck in enless loop
            // create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB
            FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(TEST_TOPIC, 0, offset, 100000).build();//new FetchRequest(TEST_TOPIC, 0, offset, 1000000);

            // get the message set from the consumer and print them out
            FetchResponse messages = consumer.fetch(fetchRequest);
            for(MessageAndOffset msg : messages.messageSet(TEST_TOPIC, 0)) 

                ByteBuffer payload = msg.message().payload();
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                try 
                    System.out.println(new String(bytes, "UTF-8"));
                 catch (Exception e)

                
                // advance the offset after consuming each message
                offset = msg.offset();
            
        

        producer.close();
        //close the consumer
        consumer.close();
        //stop the kafka broker:
        if(kafkaLocalBroker != null) 
            kafkaLocalBroker.stop();
        
    

编辑:我在下面包含了从生产者返回的异常:

org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。

【问题讨论】:

你如何描述失败的特征?任何具体的错误信息?你确定是生产者而不是消费者有问题吗? producer.send 挂起 60 秒,然后吐出我在上面的编辑中包含的 TimeoutException。 你解决过这个问题吗? 我将我的版本升级到 0.10.0.0 并且它工作正常。 【参考方案1】:

用于创建 kafka producer 的属性对 0.8 无效。通过producerconfig 并更改属性。或者更新kafka版本

【讨论】:

我使用'org.apache.kafka.clients.producer.KafkaProducer'还是'org.apache.kafka.producer.Producer'? 当我使用包含链接中给出的配置时出现此异常:线程“main”中的异常 org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which没有默认值。 看来kafka生产者至少需要配置的bootstrap.servers、key.serializer和value.serializer

以上是关于生成嵌入式 kafka 时出错的主要内容,如果未能解决你的问题,请参考以下文章

春天Kafka集成测试听众不工作

集成测试之嵌入式Kafka

使用嵌入式 Firebird 时出错

将 Kafka 连接嵌入 Ksqldb-server 时挂载(卷)不起作用

c++ 中的嵌入式 python 代码 - 导入 python 库时出错

Kafka - 从命令行生成时出错(字符('<'(代码 60)):预期有效值)