Kafka Streams - SerializationException:未知的魔术字节

Posted

技术标签:

【中文标题】Kafka Streams - SerializationException:未知的魔术字节【英文标题】:Kafka Streams - SerializationException: Unknown magic byte 【发布时间】:2019-05-19 00:19:42 【问题描述】:

我正在尝试创建一个处理 Avro 记录的 Kafka Streams 应用程序,但我收到以下错误:

Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我不确定是什么导致了这个错误。我只是想先将 Avro 记录放入应用程序,然后处理它们,然后输出到另一个主题,但它似乎不起作用。我在下面的应用程序中包含了代码。谁能看到它为什么不起作用?

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    Serde<String> stringSerde = Serdes.String();
    Serde<trackingReport> specificAvroTrackingReportSerde = new SpecificAvroSerde<trackingReport>();

    specificAvroTrackingReportSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);


    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, trackingReport> inputreports = builder.stream("intesttopic", Consumed.with(stringSerde, specificAvroTrackingReportSerde));


    KStream<String, trackingReport> outputreports = inputreports;

    String outputTopic = "outtesttopic";
    outputreports.to(outputTopic, Produced.with(stringSerde, specificAvroTrackingReportSerde));

    Topology topology = builder.build();

    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();

【问题讨论】:

【参考方案1】:

未知的魔法字节!

意味着您的数据不符合 Schema Registry 预期的有线格式。

或者,换句话说,您尝试读取的数据不是 Avro,正如 Confluent Avro 反序列化器所期望的那样。

顺便说一句,运行kafka-avro-console-consumer 可能会出现同样的错误,因此您可能也想使用它进行调试

如果您确定您的数据确实是 Avro,并且架构实际上是作为消息的一部分发送的(需要查看您的生产者代码),那么您不应该使用需要特定字节的 Confluent Avro 反序列化器消息中的格式。相反,您可以使用 ByteArrayDesrializer 并自己读取 Avro 记录,然后将其传递给 Apache Avro BinaryDecoder class。作为奖励,您可以将该逻辑提取到您自己的 Deserialzer 类中

另外,如果输入主题是 Avro,我认为你不应该使用这个属性来读取字符串

DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

【讨论】:

是的,我刚刚检查了运行命令,它也没有工作。我的制作人与***.com/questions/53781639/…的制作人相同 是的,我理解该属性,但我认为可以覆盖,就像我对 Consumed.with 所做的那样 intesttopic 与上一篇文章中发送的主题不同 顺便说一下,outputreports 是一个不必要的变量。无需将 KStream 变量复制到新名称 您的反序列化程序需要反转您在生产者中使用的任何序列化程序。在 Kafka Streams 中,您有一个结合了两者的 Serde 类...我不确定这是否能回答您的问题

以上是关于Kafka Streams - SerializationException:未知的魔术字节的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams 开发单词计数应用

Kafka Streams入门指南

Kafka streams概览

初探Kafka Streams

Kafka Streams应用程序在kafka服务器上打开了太多文件

如何限制kafka-streams中的rocksdb内存使用量