Karaf - Kafka OSGI 捆绑包 - 生产者问题

Posted

技术标签:

【中文标题】Karaf - Kafka OSGI 捆绑包 - 生产者问题【英文标题】:Karaf - Kafka OSGI bundle - Producer issue 【发布时间】:2016-04-16 13:13:54 【问题描述】:

我正在尝试在 apache Karaf 版本 4.0.3 中为 Kafka 生产者创建一个简单的捆绑包。

这是我的 Java 代码

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class","org.apache.kafka.clients.producer.internals.DefaultPartitioner");
Producer<String, String> producer = new KafkaProducer<String,String>(props,new StringSerializer(),new StringSerializer());
//for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("test","data", outputData));

producer.close();

我已经在 pom.xml 中明确声明了各自的依赖

<dependency>
        <groupId>org.apache.servicemix.bundles</groupId>
        <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
        <version>0.9.0.0_1</version>
</dependency>

我也部署了那个 kafka 客户端包。

但在启动制作人时,我在 first Attempt 看到以下异常。

Exception in thread "pool-135-thread-1" java.lang.ExceptionInInitializerError
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194)
    .
    .
    .
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319)
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.clients.producer.internals.DefaultPartitioner for configuration partitioner.class: Class org.apache.kafka.clients.producer.internals.DefaultPartitioner could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:78)
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:94)
    at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:206)
    ... 12 more

然后连续这个...

Exception in thread "pool-136-thread-1" java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.clients.producer.ProducerConfig
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194)
.
.
.
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319)
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745

有人遇到过类似的问题吗?

【问题讨论】:

OSGi 运行时不关心您的 Maven 依赖项。它只关心你包中的清单。如果您按名称加载类,就像您似乎所做的那样,那么您需要添加构建时配置,以便将相关包添加到导入包列表中。或者您将DynamicImport-Package: * 添加到您的清单中。你如何做到这一点取决于你如何构建你的包。与maven-bundle-plugin? 是的,我也遇到了同样的问题。有趣的是,如果我嵌入 kafka jar(servicemix 之一或原始 kafka 之一),当我实例化 KafkaProducer 时,我会立即收到此错误,即使我可以很好地引用 KafkaProducer。要么是一些奇怪的东西弄乱了错误消息,要么是一些黑暗的类加载正在发生...... 【参考方案1】:

使用 Kafka 客户端版本 0.8.2.2_1,解决了这个问题。

【讨论】:

【参考方案2】:

我在 0.9.0 中看到了同样的问题。事实证明,设置了一个 Thread 上下文加载器,在这种情况下,Kafka 使用该类加载器来解析。所以线程上下文类加载器应该是:

一个可以解析所有 Kafka 相关内容的类加载器 null

不知道这是否会咬我,但补充:

Thread.currentThread().setContextClassLoader(null);

成功了。

【讨论】:

您的解决方案与 JIRA 中建议的解决方法一致:issues.apache.org/jira/browse/KAFKA-3218 当这个拉取请求被合并时,应该有另一个可用的解决方案:github.com/apache/kafka/pull/1421【参考方案3】:

可能我对其他人有用,在JIRA 中建议解决方法

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);

【讨论】:

以上是关于Karaf - Kafka OSGI 捆绑包 - 生产者问题的主要内容,如果未能解决你的问题,请参考以下文章

启动捆绑包时缺少要求 osgi.wiring.package

Osgi 环境中的 NoClassDefFoundError

OSGi:无法在 Apache Karaf 中找到 UserAdmin

在 Netbeans 和 Karaf 中开发 OSGI 包时如何管理依赖关系?

高效的 OSGi 开发工作流程

jdo/OSGi:捆绑更新后构建 JDO PersistenceManagerFactory 时出错