Kafka Producer - org.apache.kafka.common.serialization.StringSerializer 找不到

Posted

技术标签:

【中文标题】Kafka Producer - org.apache.kafka.common.serialization.StringSerializer 找不到【英文标题】:Kafka Producer - org.apache.kafka.common.serialization.StringSerializer could not be found 【发布时间】:2016-09-18 16:31:55 【问题描述】:

我创建了一个简单的 Kafka 生产者和消费者。我正在使用 kafka_2.11-0.9.0.0。这是我的生产者代码。

public class KafkaProducerTest 
public static String topicName = "test-topic-2";
public static void main(String[] args) 
    // TODO Auto-generated method stub
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer",
            StringSerializer.class.getName());
    props.put("value.serializer",
            StringSerializer.class.getName());

    Producer<String, String> producer = new KafkaProducer(props);
    for (int i = 0; i < 100; i++) 
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
                topicName, Integer.toString(i), Integer.toString(i));
        System.out.println(producerRecord);
        producer.send(producerRecord);
    

    producer.close();



在启动捆绑包时,我面临以下错误:

2016-05-20 09:44:57,792 | ERROR | nsole user karaf | ShellUtil                        | 44 - org.apache.karaf.shell.core - 4.0.3 | Exception caught while executing command
org.apache.karaf.shell.support.MultiException: Error executing command on bundles:
    Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
    at org.apache.karaf.shell.support.MultiException.throwIf(MultiException.java:61)
    at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:69)[24:org.apache.karaf.bundle.core:4.0.3]
    at org.apache.karaf.bundle.command.BundlesCommand.execute(BundlesCommand.java:54)[24:org.apache.karaf.bundle.core:4.0.3]
    at org.apache.karaf.shell.impl.action.command.ActionCommand.execute(ActionCommand.java:83)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:67)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:87)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.executeCmd(Closure.java:480)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.executeStatement(Closure.java:406)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Pipe.run(Pipe.java:108)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:182)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:119)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.CommandSessionImpl.execute(CommandSessionImpl.java:94)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.karaf.shell.impl.console.ConsoleSessionImpl.run(ConsoleSessionImpl.java:270)[44:org.apache.karaf.shell.core:4.0.3]
    at java.lang.Thread.run(Thread.java:745)[:1.8.0_66]
Caused by: java.lang.Exception: Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
    at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:66)[24:org.apache.karaf.bundle.core:4.0.3]
    ... 12 more
Caused by: org.osgi.framework.BundleException: Activator start error in bundle NewKafkaArtifact [162].
    at org.apache.felix.framework.Felix.activateBundle(Felix.java:2276)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.felix.framework.Felix.startBundle(Felix.java:2144)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.felix.framework.BundleImpl.start(BundleImpl.java:998)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.karaf.bundle.command.Start.executeOnBundle(Start.java:38)[24:org.apache.karaf.bundle.core:4.0.3]
    at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:64)[24:org.apache.karaf.bundle.core:4.0.3]
    ... 12 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:317)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at com.NewKafka.NewKafkaArtifact.KafkaProducerTest.main(KafkaProducerTest.java:25)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
    at com.NewKafka.NewKafkaArtifact.StartKafka.start(StartKafka.java:11)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
    at org.apache.felix.framework.util.SecureAction.startActivator(SecureAction.java:697)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.felix.framework.Felix.activateBundle(Felix.java:2226)[org.apache.felix.framework-5.4.0.jar:]
    ... 16 more

我尝试设置key.serializervalue.serializer,如下所示:

props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer",StringSerializer.class.getName());

也喜欢,但仍然遇到同样的错误。我在这里做错了什么。

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

【问题讨论】:

嘿@Sanjeev 尝试使用 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");我已经发布了完整的例子 【参考方案1】:

它与您使用的版本有关。 也建议在 0.8.2.2_1 版本中使用。 建议你调整一下你正在使用的kafka版本,试试看。 代码方面,我交叉检查了 kafka 开发列表中的许多代码示例,看起来您的编写方式正确。

Thread.currentThread().setContextClassLoader(null);

【讨论】:

抱歉回复晚了。将 kafka 版本更新到 0.8.2.2_1 或最新版本不起作用。在设置属性之前添加以下行就可以了。 Thread.currentThread().setContextClassLoader(null);【参考方案2】:

最近我找到了解决方案。将 Thead Context loader 设置为 null 为我解决了这个问题。谢谢。

Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);

【讨论】:

【参考方案3】:

我通过阅读kafka客户端源码找到原因。

kafka客户端使用Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader())获取Class对象,并创建实例,关键是classLoader,由最后一个参数指定,方法Utils.getContextOrKafkaClassLoader()的实现是

public static ClassLoader getContextOrKafkaClassLoader() 
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    if (cl == null)
        return getKafkaClassLoader();
    else
        return cl;

所以,默认情况下org.apache.kafka.common.serialization.StringSerializer的Class对象是由applicationClassLoader加载的,如果你的目标类没有被applicationClassLoader加载,就会出现这个问题!

要解决这个问题,只需像这样在新的KafkaProducer实例之前将当前线程的ContextClassLoader设置为null

Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);

希望我的回答能让你知道发生了什么。

【讨论】:

如果我添加这个 Thread.currentThread().setContextClassLoader(null);我在注册其他 bean 时遇到问题。 @Sheel 您可以将当前线程上下文类加载器保存在一个变量中,将当前线程的上下文类加载器设置为 null,创建 kafka 生产者,然后从变量中恢复提到的类加载器。有一个例子:https://***.com/a/53653490/1673775。这仍然是解决方法。如果您需要真正的解决方案,您可以考虑使用其他版本的 kafka 或尝试找出应用中的类加载发生了什么。 谢谢哥们,其实我是在导入错误的文件。这个问题已经解决了。现在正在努力解决另一个问题,让你知道。【参考方案4】:

尝试使用这些道具代替你的道具。

  props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");

  props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");

这是完整的 Kafka Producer 示例:-

import java.util.Properties; 
import org.apache.kafka.clients.producer.Producer;    
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class FxDateProducer 

   public static void main(String[] args) throws Exception

      if(args.length == 0)
         System.out.println("Enter topic name”);
         return;
      

      String topicName = args[0].toString(); 
      Properties props = new Properties();

      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");

      //Set acknowledgements for producer requests.      
      props.put("acks", “all");

      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);

      //Specify buffer size in config
      props.put("batch.size", 16384);

      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);

      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);

      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");

      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);

      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   

【讨论】:

【参考方案5】:

问题似乎与类加载器有关,正如@Ram Ghadiyaram 在他的回答中指出的那样。为了让它与 kafka-clients 2.x 一起使用,我必须执行以下操作:

public Producer<String, String> createProducer() 
            ClassLoader original = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(null);
    Properties props = new Properties();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        BOOTSTRAP_SERVERS);
    ... etc ...

    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    Thread.currentThread().setContextClassLoader(original);
    return producer;


这允许系统继续使用原始类加载器加载其他类。这在 Wildfly/JBoss 中是必需的(我正在使用的特定应用是 Keycloak)。

【讨论】:

【参考方案6】:

这是由于 kafka 版本问题而发生的。确保您使用正确的 kafka 版本。我使用的版本是'kafka_2.12-1.0.1'

但请尝试在您的代码中使用以下属性。这解决了我的问题。

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

之前我使用了以下导致问题的属性。

//props.put("key.serializer","org.apache.kafka.common.serialization.Stringserializer");
//props.put("value.serializer","org.apache.kafka.common.serialization.Stringserializer");

【讨论】:

以上是关于Kafka Producer - org.apache.kafka.common.serialization.StringSerializer 找不到的主要内容,如果未能解决你的问题,请参考以下文章

Kafka之Producer

Kafka系列之-自定义Producer

kafka 0.10.2 消息生产者(producer)

五 Kafka producer 拦截器(interceptor) 和 六 Kafka Streaming案例

kafka producer consumer

kafka 0.8.2 消息生产者 producer