KStream-KStream 内连接抛出 java.lang.ClassCastException

Posted

技术标签:

【中文标题】KStream-KStream 内连接抛出 java.lang.ClassCastException【英文标题】:KStream-KStream inner join throws java.lang.ClassCastException 【发布时间】:2019-10-26 04:09:21 【问题描述】:

@StreamListener 的处理方法中,我将学校 KStream 映射到人员 KStream,并通过 .through() 方法填充一个主题“人员”,我从中生成一个 KStream 在另一个 process1 方法中 @StreamListener

MianApplication.java

@SpringBootApplication
public class KafkaStreamsTableJoin 

    public static void main(String[] args) 
        SpringApplication.run(KafkaStreamsTableJoin.class, args);
    

    @EnableBinding(KStreamProcessorX.class)
    public static class KStreamToTableJoinApplication 

        @StreamListener
        public void process(@Input("school") KStream<SchoolKey, School> schools)   

            schools.map((schoolKey, school) -> 
                return KeyValue.pair(new PersonKey("Adam", "Smith", schoolKey.getId()), new Person(12));
            )
            .through("person", Produced.with(new PersonKeySerde(), new PersonSerde()));
        

        @StreamListener
        public void process1(@Input("school_1") KStream<SchoolKey, School> schools, @Input("person") KStream<PersonKey, Person> persons) 

            schools.selectKey((schoolKey, school) -> schoolKey.getId())
                    .join(persons.selectKey((personKey, person) -> personKey.getId()),
                            (school, person) -> 
                                System.out.println("school_app2= " + school + ", person_app2= " + person);
                                return null;
                            ,
                            JoinWindows.of(Duration.ofSeconds(1)),
                            Joined.with(Serdes.Integer(), new SchoolSerde(), new PersonSerde())
                    );
        
    

    interface KStreamProcessorX 

        @Input("person")
        KStream<?, ?> inputPersonKStream();

        @Input("school")
        KStream<?, ?> inputSchoolKStream();

        @Input("school_1")
        KStream<?, ?> inputSchool1KStream();

    


在方法 process1 中,此 KStream 需要与另一个 KStream 加入,但出现以下异常:

Exception in thread "stream-join-sample_2-654e8060-5b29-4694-9188-032a9779529c-StreamThread-1" java.lang.ClassCastException: class kafka.streams.join.School cannot be cast to class kafka.streams.join.Person (kafka.streams.join.School and kafka.streams.join.Person are in unnamed module of loader 'app')
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$reverseJoiner$0(AbstractStream.java:98)
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:889)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)

我认为这个异常与不正确的 serde 有关,但我不知道是哪个 serde 造成了问题以及如何解决它。还是在方法过程中的映射过程中, 触发了重新分区,这与不正确的serde有关?

POJO 和 Serde 的:

Person.java

public class Person 

    private double age;

    public Person() 
    

    public Person(double age) 
        this.age = age;
    

    @JsonGetter("age")
    public double getAge() 
        return age;
    

    @JsonSetter("age")
    public void setAge(double age) 
        this.age = age;
    

    @Override
    public String toString() 
        return "Person" +
                "age=" + age +
                '';
    


PersonSerde.java

public class PersonSerde extends Serdes.WrapperSerde<Person> 
    public PersonSerde () 
        super(new JsonSerializer<>(), new JsonDeserializer<>(Person.class));
    

PersonKey.java

public class PersonKey 

    private String firstName;
    private String lastName;
    private int id;

    public PersonKey() 
    

    public PersonKey(String firstName, String lastName, int id) 
        this.firstName = firstName;
        this.lastName = lastName;
        this.id = id;
    

    @JsonGetter("firstName")
    public String getFirstName() 
        return firstName;
    

    @JsonSetter("firstName")
    public void setFirstName(String firstName) 
        this.firstName = firstName;
    

    @JsonGetter("lastName")
    public String getLastName() 
        return lastName;
    

    @JsonSetter("lastName")
    public void setLastName(String lastName) 
        this.lastName = lastName;
    

    @JsonGetter("id")
    public int getId() 
        return id;
    

    @JsonSetter("id")
    public void setId(int id) 
        this.id = id;
    

    @Override
    public String toString() 
        return "PersonKey" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                ", id=" + id +
                '';
    

    @Override
    public boolean equals(Object o) 
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PersonKey personKey = (PersonKey) o;
        return id == personKey.id &&
                Objects.equals(firstName, personKey.firstName) &&
                Objects.equals(lastName, personKey.lastName);
    

    @Override
    public int hashCode() 
        return Objects.hash(firstName, lastName, id);
    

PersonKeySerde.java

public class PersonKeySerde extends Serdes.WrapperSerde<PersonKey> 
    public PersonKeySerde () 
        super(new JsonSerializer<>(), new JsonDeserializer<>(PersonKey.class));
    

School 类的 serde 和 pojo 类似于 Person 类。

application.yml

spring.application.name: stream-join-sample

spring.cloud.stream.bindings.school:
  destination: school
  contentType: application/json
  consumer:
    useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.school:
  consumer:
    keySerde: kafka.streams.serde.SchoolKeySerde
    valueSerde: kafka.streams.serde.SchoolSerde
    application-id: stream-join-sample_1

spring.cloud.stream.bindings.person:
  destination: person
  contentType: application/json
  consumer:
    useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.person:
  consumer:
    keySerde: kafka.streams.serde.PersonKeySerde
    valueSerde: kafka.streams.serde.PersonSerde
    application-id: stream-join-sample_2

spring.cloud.stream.bindings.school_1:
  destination: school
  contentType: application/json
  consumer:
    useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.school_1:
  consumer:
    keySerde: kafka.streams.serde.SchoolKeySerde
    valueSerde: kafka.streams.serde.SchoolSerde
    application-id: stream-join-sample_2

spring.cloud.stream.kafka.streams.binder:
  brokers: localhost
  configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    commit.interval.ms: 100

Sample Application 具有可重复的步骤:

【问题讨论】:

也许在KStreamKStreamJoin.java:94 设置一个断点来查看数据来自哪里? @MatthiasJ.Sax 感谢您的回复。我在断点处添加了调试窗口的屏幕截图供您查看。我还为示例应用程序添加了link,其中包含可重复的步骤。 这听起来完全正确。连接使用两个窗口存储,一个用于左侧,一个用于右侧。似乎两者都配置了正确的 serdes。我从 Github 下载了你的代码来研究这个,我相信这实际上是使用的 JsonSerdes 中的一个错误。类型在记录头中编码,但头永远不会被清理——这会导致两种不同的类型被编码,并且在从主题中读取数据时“随机”选择一种——并且它选择了错误的类型。 作为一种解决方法,您可以将selectKey() 替换为transform() 并清除transform() 中的标题...不过这是一个hack。您应该再次提交 SpringBoot 项目的票。 非常感谢@MatthiasJ.Sax。现在作为一种解决方法,我在消息生成期间调用through() 之前应用selectKey() 将消息上传到主题,这样我们就不必在消息消费期间使用selectKey(),因为两个KStreams 的键已经匹配。然而,这使得主题不是很可重用,因为消息包含更具体的键。我将尝试使用您对transform() 的建议并清除标题并提出修复错误的票。感谢您的所有支持。 【参考方案1】:

会不会是在主题或基础变更日志主题中的某处存在一些陈旧数据?您能否尝试使用新主题和不同的应用程序 ID 来查看是否可以解决您的问题?

下面是一个示例配置:

spring.cloud.stream.bindings.school:
  destination: school-abc
spring.cloud.stream.kafka.streams.bindings.school:
  consumer:
    keySerde: kafka.streams.serde.SchoolKeySerde
    valueSerde: kafka.streams.serde.SchoolSerde
    application-id: stream-join-sample_diff_id_1

spring.cloud.stream.bindings.person:
  destination: person-abc
spring.cloud.stream.kafka.streams.bindings.person:
  consumer:
    keySerde: kafka.streams.serde.PersonKeySerde
    valueSerde: kafka.streams.serde.PersonSerde
    application-id: stream-join-sample_diff_id_2

spring.cloud.stream.bindings.school_1:
  destination: school-abc
spring.cloud.stream.kafka.streams.bindings.school_1:
  consumer:
    keySerde: kafka.streams.serde.SchoolKeySerde
    valueSerde: kafka.streams.serde.SchoolSerde
    application-id: stream-join-sample_diff_id_2

spring.cloud.stream.kafka.streams.binder:
  brokers: localhost
  configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    commit.interval.ms: 100

请注意,我更改了主题名称、应用程序 ID 等。您可能想要更新填充主题的任何生产者。

另外,请注意,您不需要指定内容类型,将 useNativeDecoding 设置为 false 等,因为这些是当前版本的 kafka 流绑定器中的默认值。

【讨论】:

感谢您的回复。我尝试通过清理所有内容并使用您的 yml 重新运行整个 zookeeper,kafka,但仍然出现相同的异常。此外,我还为示例应用程序添加了link,并带有可重现的步骤。感谢您是否可以尝试运行它。谢谢【参考方案2】:

我从 GitHub 下载了你的代码来深入研究,结果发现它实际上是使用的 JsonSerializer/JsonDeserializer 中的一个错误。类型(SchoolPersonPersonKeySchoolKey)在记录标头中进行编码,但标头永远不会被清除。每次更改类型时,只附加一个新的标头(标头键不唯一,允许重复)。

对于某些记录,同一类型只是被多次编码,因此,这部分代码有效。但是,在某些情况下,会编码不同的类型,并且在从主题中读取数据时会“随机”选择一种类型(找到的第一个标头)。发生在连接之前,但在从重新分区主题接收数据时。如果选择了错误的类型,代码会在稍后崩溃并显示ClassCastException

新答案:

在此票证https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/685 的讨论之后,您应该禁用将类型信息写入记录标题的方式:

props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

注意,所有手动创建的Serdes,即通过调用new必须手动配置:

Map<String, Object> config = new HashMap<>();
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

PersonKeySerde personKeySerde = new PersonKeySerde();
personKeySerde.configure(config, true);

PersonSerde personSerde = new PersonSerde();
personSerde.configure(config, false);

// ...
.through("person", Produced.with(personKeySerde, personSerde));

原答案:

作为一种解决方法,您可以将mapselectKey() 替换为transform(),并清除transform() 中的标题。虽然这是一个黑客。您应该向 SpringBoot 项目提交票证,以便他们可以修复 JsonSerializer/JsonDeserializer

以下代码删除了标头并确保使用正确的类型,避免ClassCastException

@SpringBootApplication
public class KafkaStreamJoinApplication 

    public static void main(String[] args) 
        SpringApplication.run(KafkaStreamJoinApplication.class, args);
    

    @EnableBinding(KStreamProcessorX.class)
    public static class KafkaKStreamJoinApplication 

        @StreamListener
        public void process(@Input("school") KStream<SchoolKey, School> schools) 
            // replace map() with transform()
            schools.transform(new TransformerSupplier<SchoolKey, School, KeyValue<PersonKey, Person>>() 
                @Override
                public Transformer<SchoolKey, School, KeyValue<PersonKey, Person>> get() 
                    return new Transformer<SchoolKey, School, KeyValue<PersonKey, Person>>() 
                        ProcessorContext context;

                        @Override
                        public void init(final ProcessorContext context) 
                            this.context = context;
                        

                        @Override
                        public KeyValue<PersonKey, Person> transform(final SchoolKey key, final School value) 
                            // clear all headers; would be sufficient to only remove type header
                            for (Header h : context.headers().toArray()) 
                                context.headers().remove(h.key());
                            
                            // same a "old" map code:
                            return KeyValue.pair(new PersonKey("Adam", "Smith", key.getId()), new Person(12));
                        

                        @Override
                        public void close() 
                    ;
                )
                .through("person", Produced.with(new PersonKeySerde(), new PersonSerde()));
        

        @StreamListener
        public void process1(@Input("school_1") KStream<SchoolKey, School> schools, @Input("person") KStream<PersonKey, Person> persons) 

            // replace selectKey() with transform()
            schools.transform(new TransformerSupplier<SchoolKey, School, KeyValue<Integer, School>>() 
                @Override
                public Transformer<SchoolKey, School, KeyValue<Integer, School>> get() 
                    return new Transformer<SchoolKey, School, KeyValue<Integer, School>>() 
                        ProcessorContext context;

                        @Override
                        public void init(final ProcessorContext context) 
                            this.context = context;
                        

                        @Override
                        public KeyValue<Integer, School> transform(final SchoolKey key, final School value) 
                            // clear all headers; would be sufficient to only remove type header
                            for (Header h : context.headers().toArray()) 
                                context.headers().remove(h.key());
                            
                            // effectively the same as "old" selectKey code:
                            return KeyValue.pair(key.getId(), value);
                        

                        @Override
                        public void close() 
                    ;
                )
                // replace selectKey() with transform()
                .join(persons.transform(new TransformerSupplier<PersonKey, Person, KeyValue<Integer, Person>>() 
                    @Override
                    public Transformer<PersonKey, Person, KeyValue<Integer, Person>> get() 
                        return new Transformer<PersonKey, Person, KeyValue<Integer, Person>>() 
                            ProcessorContext context;

                            @Override
                            public void init(final ProcessorContext context) 
                                this.context = context;
                            

                            @Override
                            public KeyValue<Integer, Person> transform(final PersonKey key, final Person value) 
                                // clear all headers; would be sufficient to only remove type header
                                for (Header h : context.headers().toArray()) 
                                    context.headers().remove(h.key());
                                
                                // effectively same as "old" selectKey code:
                                return KeyValue.pair(key.getId(), value);
                            

                            @Override
                            public void close() 
                        ;
                    ),
                    (school, person) -> 
                        System.out.println("school_app2= " + school + ", person_app2= " + person);
                        return null;
                    ,
                    JoinWindows.of(Duration.ofSeconds(1)),
                    Joined.with(Serdes.Integer(), new SchoolSerde(), new PersonSerde())
                );
        
    

    interface KStreamProcessorX 
        @Input("person")
        KStream<?, ?> inputPersonKStream();

        @Input("school")
        KStream<?, ?> inputSchoolKStream();

        @Input("school_1")
        KStream<?, ?> inputSchool1KStream();
    

【讨论】:

嗯 - 类型头现在被反序列化器删除,默认情况下,since Spring for Apache Kafka version 2.2。所以我还不确定这里发生了什么。 我在调试器中确认标头已被删除。我还在他的应用程序中完全禁用了类型标题,我们仍然得到类转换异常。见here。 似乎@muhammad-arslan-akhtar 正在使用2.1.5.RELEASE 虽然:github.com/arslanakhtar61/kafkaStreamJoin 那是boot release,boot 2.1.5拉入spring-kafka 2.2.6。

以上是关于KStream-KStream 内连接抛出 java.lang.ClassCastException的主要内容,如果未能解决你的问题,请参考以下文章

去重 KStream-KStream 的中间结果加入 Kafka Streams

Wildfly 数据源中的空闲超时后未关闭数据库连接

Spring Form 标记抛出 java.lang.IllegalStateException

如何远程连接到JanusGraph服务器?

Dapper mapping会抛出无效的强制转换

为啥 VsPerfAspNetCmd 会抛出错误“值不在预期范围内”?