应用地图功能时在同一类上获取Kafka Streams Class Cast异常

Posted

技术标签:

【中文标题】应用地图功能时在同一类上获取Kafka Streams Class Cast异常【英文标题】:Getting Kafka Streams Class Cast Exception on the same Class while applying map function 【发布时间】:2018-09-25 16:06:30 【问题描述】:

UserRecord.java(由 Maven Avro 插件自动生成)

UserRecord extends SpecificRecordBase implement SpecificRecord

UserRecordSerde.java

UserRecordSerde extends SpecificAvroSerde

application.yml

spring.cloud.stream.bindings.input.destination: userTopic
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde: LongSerdespring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: LongSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: SpecificAvroSerde

类 - StreamListener - 原始流在 avro 中带有 null 键和 UserRecord 对象

@StreamListener
        public KStream<Long, ArrayList<UserRecord>> handleUserRecords (@Input KStream<?, UserRecord> userRecordStream)  <br/>
        Map<String, Object> serdeConfig = new HashMap();
        serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); <br/>
        Serde<ArrayList<UserRecord>> userRecordListSerde = new SpecificAvroSerde();
        userRecordListSerde.configure(serdeConfig, false); <br/>
        return userRecordStream
            .map((key, value) -> new KeyValue(value.getUserID, value)
            .groupByKey(Serialized.with(Serdes.Long(), userRecordSerde))
            .aggregate(ArrayList::new, Long key, UserRecord value, ArrayList agg ->
            
               agg.add(value);
               return agg;
            , userRecordListSerde)
        .toStream();
    

例外

java.lang.ClassCastException: com.example.UserRecord cannot be cast to com.example.UserRecord
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)

【问题讨论】:

听起来你的 UserRecordSerde 加载 UserRecordClassLoader 与使用 handleUserRecords() 方法的此类不同。 【参考方案1】:

为什么不直接从配置中删除它呢? spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde 并通过默认配置直接使用SpecificAvroSerdespring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

【讨论】:

我也尝试过使用 SpecificAvroSerde,但没有成功。为了避免类加载器问题 - 我添加了 - System.setProperty("spring.devtools.restart.enabled", "false"); 现在我遇到了另一个异常 - java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord 你能在 Github 上分享你的示例应用吗?我会仔细看看。 这里是我们使用 Avro Serdes 的示例。看看application.yml。为什么不比较? github.com/spring-cloud/spring-cloud-stream-samples/tree/master/… 不错。你能否评论一下你是如何让它工作的,以便其他人有同样的问题可以参考这个?

以上是关于应用地图功能时在同一类上获取Kafka Streams Class Cast异常的主要内容,如果未能解决你的问题,请参考以下文章

从具有相同类的所有元素中获取文本

我可以从同一个 Java 应用程序中的 2 个不同的 kafka 服务器集群获取数据吗?

关于百度地图

如何在 Spring Boot 应用程序的同一个域类上同时使用 Spring Data JPA 和 Spring Data Elasticsearch 存储库?

同一类上的两组序列化属性

我可以设置 Kafka Stream 消费者 group.id 吗?