KStream-KStream 内连接抛出 java.lang.ClassCastException
Posted
技术标签:
【中文标题】KStream-KStream 内连接抛出 java.lang.ClassCastException【英文标题】:KStream-KStream inner join throws java.lang.ClassCastException 【发布时间】:2019-10-26 04:09:21 【问题描述】:在@StreamListener
的处理方法中,我将学校 KStream 映射到人员 KStream,并通过 .through() 方法填充一个主题“人员”,我从中生成一个 KStream
在另一个 process1 方法中
@StreamListener
。
MianApplication.java
@SpringBootApplication
public class KafkaStreamsTableJoin
public static void main(String[] args)
SpringApplication.run(KafkaStreamsTableJoin.class, args);
@EnableBinding(KStreamProcessorX.class)
public static class KStreamToTableJoinApplication
@StreamListener
public void process(@Input("school") KStream<SchoolKey, School> schools)
schools.map((schoolKey, school) ->
return KeyValue.pair(new PersonKey("Adam", "Smith", schoolKey.getId()), new Person(12));
)
.through("person", Produced.with(new PersonKeySerde(), new PersonSerde()));
@StreamListener
public void process1(@Input("school_1") KStream<SchoolKey, School> schools, @Input("person") KStream<PersonKey, Person> persons)
schools.selectKey((schoolKey, school) -> schoolKey.getId())
.join(persons.selectKey((personKey, person) -> personKey.getId()),
(school, person) ->
System.out.println("school_app2= " + school + ", person_app2= " + person);
return null;
,
JoinWindows.of(Duration.ofSeconds(1)),
Joined.with(Serdes.Integer(), new SchoolSerde(), new PersonSerde())
);
interface KStreamProcessorX
@Input("person")
KStream<?, ?> inputPersonKStream();
@Input("school")
KStream<?, ?> inputSchoolKStream();
@Input("school_1")
KStream<?, ?> inputSchool1KStream();
在方法 process1 中,此 KStream 需要与另一个 KStream 加入,但出现以下异常:
Exception in thread "stream-join-sample_2-654e8060-5b29-4694-9188-032a9779529c-StreamThread-1" java.lang.ClassCastException: class kafka.streams.join.School cannot be cast to class kafka.streams.join.Person (kafka.streams.join.School and kafka.streams.join.Person are in unnamed module of loader 'app')
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$reverseJoiner$0(AbstractStream.java:98)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:889)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
我认为这个异常与不正确的 serde 有关,但我不知道是哪个 serde 造成了问题以及如何解决它。还是在方法过程中的映射过程中, 触发了重新分区,这与不正确的serde有关?
POJO 和 Serde 的:
Person.java
public class Person
private double age;
public Person()
public Person(double age)
this.age = age;
@JsonGetter("age")
public double getAge()
return age;
@JsonSetter("age")
public void setAge(double age)
this.age = age;
@Override
public String toString()
return "Person" +
"age=" + age +
'';
PersonSerde.java
public class PersonSerde extends Serdes.WrapperSerde<Person>
public PersonSerde ()
super(new JsonSerializer<>(), new JsonDeserializer<>(Person.class));
PersonKey.java
public class PersonKey
private String firstName;
private String lastName;
private int id;
public PersonKey()
public PersonKey(String firstName, String lastName, int id)
this.firstName = firstName;
this.lastName = lastName;
this.id = id;
@JsonGetter("firstName")
public String getFirstName()
return firstName;
@JsonSetter("firstName")
public void setFirstName(String firstName)
this.firstName = firstName;
@JsonGetter("lastName")
public String getLastName()
return lastName;
@JsonSetter("lastName")
public void setLastName(String lastName)
this.lastName = lastName;
@JsonGetter("id")
public int getId()
return id;
@JsonSetter("id")
public void setId(int id)
this.id = id;
@Override
public String toString()
return "PersonKey" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", id=" + id +
'';
@Override
public boolean equals(Object o)
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersonKey personKey = (PersonKey) o;
return id == personKey.id &&
Objects.equals(firstName, personKey.firstName) &&
Objects.equals(lastName, personKey.lastName);
@Override
public int hashCode()
return Objects.hash(firstName, lastName, id);
PersonKeySerde.java
public class PersonKeySerde extends Serdes.WrapperSerde<PersonKey>
public PersonKeySerde ()
super(new JsonSerializer<>(), new JsonDeserializer<>(PersonKey.class));
School 类的 serde 和 pojo 类似于 Person 类。
application.yml
spring.application.name: stream-join-sample
spring.cloud.stream.bindings.school:
destination: school
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.school:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_1
spring.cloud.stream.bindings.person:
destination: person
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.person:
consumer:
keySerde: kafka.streams.serde.PersonKeySerde
valueSerde: kafka.streams.serde.PersonSerde
application-id: stream-join-sample_2
spring.cloud.stream.bindings.school_1:
destination: school
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.school_1:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_2
spring.cloud.stream.kafka.streams.binder:
brokers: localhost
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 100
Sample Application 具有可重复的步骤:
【问题讨论】:
也许在KStreamKStreamJoin.java:94
设置一个断点来查看数据来自哪里?
@MatthiasJ.Sax 感谢您的回复。我在断点处添加了调试窗口的屏幕截图供您查看。我还为示例应用程序添加了link,其中包含可重复的步骤。
这听起来完全正确。连接使用两个窗口存储,一个用于左侧,一个用于右侧。似乎两者都配置了正确的 serdes。我从 Github 下载了你的代码来研究这个,我相信这实际上是使用的 JsonSerdes 中的一个错误。类型在记录头中编码,但头永远不会被清理——这会导致两种不同的类型被编码,并且在从主题中读取数据时“随机”选择一种——并且它选择了错误的类型。
作为一种解决方法,您可以将selectKey()
替换为transform()
并清除transform()
中的标题...不过这是一个hack。您应该再次提交 SpringBoot 项目的票。
非常感谢@MatthiasJ.Sax。现在作为一种解决方法,我在消息生成期间调用through()
之前应用selectKey()
将消息上传到主题,这样我们就不必在消息消费期间使用selectKey()
,因为两个KStreams 的键已经匹配。然而,这使得主题不是很可重用,因为消息包含更具体的键。我将尝试使用您对transform()
的建议并清除标题并提出修复错误的票。感谢您的所有支持。
【参考方案1】:
会不会是在主题或基础变更日志主题中的某处存在一些陈旧数据?您能否尝试使用新主题和不同的应用程序 ID 来查看是否可以解决您的问题?
下面是一个示例配置:
spring.cloud.stream.bindings.school:
destination: school-abc
spring.cloud.stream.kafka.streams.bindings.school:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_diff_id_1
spring.cloud.stream.bindings.person:
destination: person-abc
spring.cloud.stream.kafka.streams.bindings.person:
consumer:
keySerde: kafka.streams.serde.PersonKeySerde
valueSerde: kafka.streams.serde.PersonSerde
application-id: stream-join-sample_diff_id_2
spring.cloud.stream.bindings.school_1:
destination: school-abc
spring.cloud.stream.kafka.streams.bindings.school_1:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_diff_id_2
spring.cloud.stream.kafka.streams.binder:
brokers: localhost
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 100
请注意,我更改了主题名称、应用程序 ID 等。您可能想要更新填充主题的任何生产者。
另外,请注意,您不需要指定内容类型,将 useNativeDecoding
设置为 false
等,因为这些是当前版本的 kafka 流绑定器中的默认值。
【讨论】:
感谢您的回复。我尝试通过清理所有内容并使用您的 yml 重新运行整个 zookeeper,kafka,但仍然出现相同的异常。此外,我还为示例应用程序添加了link,并带有可重现的步骤。感谢您是否可以尝试运行它。谢谢【参考方案2】:我从 GitHub 下载了你的代码来深入研究,结果发现它实际上是使用的 JsonSerializer
/JsonDeserializer
中的一个错误。类型(School
、Person
、PersonKey
、SchoolKey
)在记录标头中进行编码,但标头永远不会被清除。每次更改类型时,只附加一个新的标头(标头键不唯一,允许重复)。
对于某些记录,同一类型只是被多次编码,因此,这部分代码有效。但是,在某些情况下,会编码不同的类型,并且在从主题中读取数据时会“随机”选择一种类型(找到的第一个标头)。发生在连接之前,但在从重新分区主题接收数据时。如果选择了错误的类型,代码会在稍后崩溃并显示ClassCastException
。
新答案:
在此票证https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/685 的讨论之后,您应该禁用将类型信息写入记录标题的方式:
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
注意,所有手动创建的Serdes
,即通过调用new
必须手动配置:
Map<String, Object> config = new HashMap<>();
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
PersonKeySerde personKeySerde = new PersonKeySerde();
personKeySerde.configure(config, true);
PersonSerde personSerde = new PersonSerde();
personSerde.configure(config, false);
// ...
.through("person", Produced.with(personKeySerde, personSerde));
原答案:
作为一种解决方法,您可以将map
和selectKey()
替换为transform()
,并清除transform()
中的标题。虽然这是一个黑客。您应该向 SpringBoot 项目提交票证,以便他们可以修复 JsonSerializer
/JsonDeserializer
。
以下代码删除了标头并确保使用正确的类型,避免ClassCastException
:
@SpringBootApplication
public class KafkaStreamJoinApplication
public static void main(String[] args)
SpringApplication.run(KafkaStreamJoinApplication.class, args);
@EnableBinding(KStreamProcessorX.class)
public static class KafkaKStreamJoinApplication
@StreamListener
public void process(@Input("school") KStream<SchoolKey, School> schools)
// replace map() with transform()
schools.transform(new TransformerSupplier<SchoolKey, School, KeyValue<PersonKey, Person>>()
@Override
public Transformer<SchoolKey, School, KeyValue<PersonKey, Person>> get()
return new Transformer<SchoolKey, School, KeyValue<PersonKey, Person>>()
ProcessorContext context;
@Override
public void init(final ProcessorContext context)
this.context = context;
@Override
public KeyValue<PersonKey, Person> transform(final SchoolKey key, final School value)
// clear all headers; would be sufficient to only remove type header
for (Header h : context.headers().toArray())
context.headers().remove(h.key());
// same a "old" map code:
return KeyValue.pair(new PersonKey("Adam", "Smith", key.getId()), new Person(12));
@Override
public void close()
;
)
.through("person", Produced.with(new PersonKeySerde(), new PersonSerde()));
@StreamListener
public void process1(@Input("school_1") KStream<SchoolKey, School> schools, @Input("person") KStream<PersonKey, Person> persons)
// replace selectKey() with transform()
schools.transform(new TransformerSupplier<SchoolKey, School, KeyValue<Integer, School>>()
@Override
public Transformer<SchoolKey, School, KeyValue<Integer, School>> get()
return new Transformer<SchoolKey, School, KeyValue<Integer, School>>()
ProcessorContext context;
@Override
public void init(final ProcessorContext context)
this.context = context;
@Override
public KeyValue<Integer, School> transform(final SchoolKey key, final School value)
// clear all headers; would be sufficient to only remove type header
for (Header h : context.headers().toArray())
context.headers().remove(h.key());
// effectively the same as "old" selectKey code:
return KeyValue.pair(key.getId(), value);
@Override
public void close()
;
)
// replace selectKey() with transform()
.join(persons.transform(new TransformerSupplier<PersonKey, Person, KeyValue<Integer, Person>>()
@Override
public Transformer<PersonKey, Person, KeyValue<Integer, Person>> get()
return new Transformer<PersonKey, Person, KeyValue<Integer, Person>>()
ProcessorContext context;
@Override
public void init(final ProcessorContext context)
this.context = context;
@Override
public KeyValue<Integer, Person> transform(final PersonKey key, final Person value)
// clear all headers; would be sufficient to only remove type header
for (Header h : context.headers().toArray())
context.headers().remove(h.key());
// effectively same as "old" selectKey code:
return KeyValue.pair(key.getId(), value);
@Override
public void close()
;
),
(school, person) ->
System.out.println("school_app2= " + school + ", person_app2= " + person);
return null;
,
JoinWindows.of(Duration.ofSeconds(1)),
Joined.with(Serdes.Integer(), new SchoolSerde(), new PersonSerde())
);
interface KStreamProcessorX
@Input("person")
KStream<?, ?> inputPersonKStream();
@Input("school")
KStream<?, ?> inputSchoolKStream();
@Input("school_1")
KStream<?, ?> inputSchool1KStream();
【讨论】:
嗯 - 类型头现在被反序列化器删除,默认情况下,since Spring for Apache Kafka version 2.2。所以我还不确定这里发生了什么。 我在调试器中确认标头已被删除。我还在他的应用程序中完全禁用了类型标题,我们仍然得到类转换异常。见here。 似乎@muhammad-arslan-akhtar 正在使用2.1.5.RELEASE
虽然:github.com/arslanakhtar61/kafkaStreamJoin
那是boot release,boot 2.1.5拉入spring-kafka 2.2.6。以上是关于KStream-KStream 内连接抛出 java.lang.ClassCastException的主要内容,如果未能解决你的问题,请参考以下文章
去重 KStream-KStream 的中间结果加入 Kafka Streams