Kafka Streams“Consumed.with()”与KafkaAvroDeserializer

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Streams“Consumed.with()”与KafkaAvroDeserializer相关的知识,希望对你有一定的参考价值。

我需要从KafkaAvroDeserializer使用的主题创建一个流,而不是标准的kafka反序列化器。这是因为它将向下发送到汇合JDBC Sink连接器(不支持标准序列化器/反序列化器)中使用的主题。在创建主题时,我使用了KafkaAvroSerializer来获取键和值。

我的原始代码(在我更改为使用Kafka Avro Serializers作为密钥之前)是:

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(Serdes.String(), uploadSerde));

注意:由于密钥是使用KafkaAvroSerializer序列化的,因此上面的Serdes.string不会正确反序列化。所以,也许有另一种形式的代码可以让我构建一个流,而不必设置密钥serdes(所以它默认为配置中的内容),我可以设置值serde(uploadSerde)?

如果没有,有人可以告诉我如何将“Serdes.String()”标准deserizlaizer更改为KafkaAvroDeserializer吗?例如

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(<What can I insert here for the KafkaAvroDeserializer.String???>, uploadSerde));

在我的消费者中,我正在设置正确的默认反序列化器:

streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

如果使用表单(并允许我的消费者中指定的默认值为KafkaAvro):

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC);

我得到以下内容:

2018-04-08 00:24:53,433] ERROR [fc-demo-client-StreamThread-1] stream-thread [fc-demo-client-StreamThread-1] Failed to process stream task 0_0 due to the following error:    (org.apache.kafka.streams.processor.internals.AssignedTasks)
java.lang.ClassCastException: [B cannot be cast to java.lang.String
at     org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at    org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
....

我使用avsc文件中的Java生成的类,并使用avro架构中生成的Java类初始化uploadSerde。

谢谢。

答案

键和值的逻辑是相同的。因此,您可以以相同的方式处理。

您的困惑是关于在配置中设置消费者反序列化器。请注意,这些配置被忽略(由于内部原因)。您无法直接配置使用者的反序列化程序。你总是需要使用Serdes。因此,如果要为使用者设置默认反序列化器,则需要在配置中指定默认的Serde。

所以我创建了一个围绕KafkaAvroSerializer和KafkaAvroDeserializer的包装器,它实例化了这些包装器,然后在Consumed.with中使用包装器作为关键参数

究竟。或者您也可以在配置中将此Serde设置为默认值。

本来以为使用KafkaAvroSerialize'd字符串键从主题创建流是一个常见的用例

对此不确定。如果它是一个普通的字符串,我认为人们可以直接使用StringDeserializer而不是将字符串包装为Avro(不确定)。另请注意,建议在处理Avro时使用模式注册表。 Confluent的模式注册表附带了对抗Avro Serdes:https://github.com/confluentinc/schema-registry/(免责声明:我是Confluent的员工。)

以上是关于Kafka Streams“Consumed.with()”与KafkaAvroDeserializer的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams 开发单词计数应用

Kafka Streams入门指南

Kafka streams概览

初探Kafka Streams

Kafka Streams应用程序在kafka服务器上打开了太多文件

如何限制kafka-streams中的rocksdb内存使用量