运行自定义构建的 Kafka 流 DSL 应用程序返回 java.lang.ClassNotFoundException
Posted
技术标签:
【中文标题】运行自定义构建的 Kafka 流 DSL 应用程序返回 java.lang.ClassNotFoundException【英文标题】:running custom built Kafka streams DSL app returns java.lang.ClassNotFoundException 【发布时间】:2017-08-10 18:30:27 【问题描述】:我正在尝试从包含 json 数据的 kafka 主题中读取数据,并根据“实体”字段的值写入新主题。我正在使用以下代码从 kafka 读写
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class entityDataLoader
public static void main(final String[] args) throws Exception
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-example");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Set up serializers and deserializers, which we will use for overriding the default serdes
// specified above.
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteArraySerde = Serdes.ByteArray();
// In the subsequent lines we define the processing topology of the Streams application.
final KStreamBuilder builder = new KStreamBuilder();
// Read the input Kafka topic into a KStream instance.
final KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "postilion-events");
String content = textLines.toString();
String entity = JSONExtractor.returnJSONValue(content, "entity");
System.out.println(entity);
textLines.to(entity);
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
有什么办法可以成功运行此应用程序吗?
使用 Netbeans,我使用依赖项构建并将 jar 文件放在 /home/kafka 路径中,并尝试将其作为类路径运行并指定我创建的类(使用命令 java -cp mavenproject.jar postilionkafka.entityDataLoader
)。我收到以下错误
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/streams/processor/TopologyBuilder
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.streams.processor.TopologyBuilder
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
感谢@James,我已经能够解决这个问题。我无法从主题中的记录中提取实体数据。主题中的记录是JSON,一个例子是"date":"string":"2017-03-20","time":"string":"20:04:13:563","event_nr":1572470,"interface":"Transaction Manager","event_id":5001,"date_time":1490040253563,"entity":"Transaction Manager","state":0,"msg_param_1":"string":"ISWSnk","msg_param_2":"string":"Application startup","msg_param_3":null,"msg_param_4":null,"msg_param_5":null,"msg_param_6":null,"msg_param_7":null,"msg_param_8":null,"msg_param_9":null,"long_msg_param_1":null,"long_msg_param_2":null,"long_msg_param_3":null,"long_msg_param_4":null,"long_msg_param_5":null,"long_msg_param_6":null,"long_msg_param_7":null,"long_msg_param_8":null,"long_msg_param_9":null,"last_sent":"long":1490040253563,"transmit_count":"int":1,"team_id":null,"app_id":"int":4,"logged_by_app_id":"int":4,"entity_type":"int":3,"binary_data":null
我想根据 entity 字段的值写入主题(对于下面的 json 示例,它应该写入主题 Transaction Manager。如果我运行我当前的代码,我会收到以下错误
SLF4J:无法加载类“org.slf4j.impl.StaticLoggerBinder”。 SLF4J:默认为无操作(NOP)记录器实现 SLF4J:有关详细信息,请参阅http://www.slf4j.org/codes.html#StaticLoggerBinder。 org.apache.kafka.streams.kstream.internals.KStreamImpl@568db2f2 未找到对象 位置 0 处的意外字符 (o)。 空值 线程“主”java.lang.NullPointerException 中的异常:主题不能为空 在 java.util.Objects.requireNonNull(Objects.java:228) 在 org.apache.kafka.streams.kstream.internals.KStreamImpl.to(KStreamImpl.java:353) 在 org.apache.kafka.streams.kstream.internals.KStreamImpl.to(KStreamImpl.java:337) 在 postilionkafka.dataload.main(dataload.java:35)
JSONExtractor 类定义为
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import org.json.simple.parser.JSONParser;
class JSONExtractor
/**
*
*/
public static String returnJSONValue(String args, String value)
JSONParser parser = new JSONParser();
String app= null;
System.out.println(args);
try
Object obj = parser.parse(args);
JSONObject JObj = (JSONObject)obj;
app= (String) JObj.get(value);
return app;
catch(ParseException pe)
System.out.println("No Object found");
System.out.println(pe);
return app;
【问题讨论】:
我能够从您使用JSONExtractor
类提供的示例 JSON 中提取实体。尝试记录 args
输入参数,以确保您传递的是您认为传递的内容。
【参考方案1】:
这看起来像一个简单的类路径问题,尝试在classpath参数中添加所有非标准java的jar,例如:
java -cp kafka-stream.jar:mavenproject.jar postilionkafka.entityDataLoader
这往往会很快变得过于复杂,这也是我们使用 Maven 管理依赖项的原因之一。我通常直接从 IDE 运行我正在处理的任何应用程序,这也是一种更简单的调试方法。如果我确实必须在我的 IDE 之外启动,我仍然会从我的 IDE 中尝试,IntelliJ 会注销包含所需依赖项的执行命令,并节省我重新建立可能是什么以及如何从我的本地 Maven 仓库。
如果从 IDE 运行不适合您,另一种方法是使用 Maven exec。请在 running a project from maven 上查看此答案。
【讨论】:
我注意到错误是针对TopologyBuilder
的,但您的代码中没有引用它。但是,当我尝试在类路径上仅运行生成的 jar 来复制您的问题时,我看到了完全相同的错误。所以问题是 kafka-streams jar 不在你的类路径中,但仅仅添加它是不够的。您还需要包含该 jar 的所有运行时依赖项,以及它的依赖项(等等)。
谢谢 James... 稍微测试一下。我几分钟后到家
你好 James...它成功了....我的代码有一些问题需要解决,但非常感谢您的努力
你好@James 我的代码有问题,我无法弄清楚。我在下面添加了一个新答案。
嗨@Zigmaphi,您的异常似乎是由JSON ParseException
引起的,然后您从returnJSONValue
返回null
以上是关于运行自定义构建的 Kafka 流 DSL 应用程序返回 java.lang.ClassNotFoundException的主要内容,如果未能解决你的问题,请参考以下文章
Transflow:Quake 是如何构建以 DSL 为核心的低代码系统?
Transflow:Quake 是如何构建以 DSL 为核心的低代码系统?