KStream-KStream 内连接基于具有匹配组合键的消息
Posted
技术标签:
【中文标题】KStream-KStream 内连接基于具有匹配组合键的消息【英文标题】:KStream-KStream inner join based on messages with matching composite-key 【发布时间】:2019-10-22 22:02:43 【问题描述】:我正在尝试在 KStream-KStream 之间执行内部连接。 我观察到,当来自两个 KStreams 的消息都具有复合键(例如具有许多属性的 java pojo)时,即使使用了 pojo,连接也不起作用 因为复合键实现了 hashCode() 和 equals(Object o) 方法。
UniqueIdKey.java
public class UniqueIdKey
private int id;
public UniqueIdKey()
public UniqueIdKey(int id)
this.id = id;
@JsonGetter("id")
public int getId()
return id;
@JsonSetter("id")
public void setId(int id)
this.id = id;
@Override
public String toString()
return "UniqueIdKey" +
"id=" + id +
'';
@Override
public boolean equals(Object o)
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UniqueIdKey that = (UniqueIdKey) o;
return id == that.id;
@Override
public int hashCode()
return Objects.hash(id);
当两个 KStream 的消息带有简单的原始键(例如 String、int、double)时,内连接可以正常工作
我正在使用最新的 spring-cloud-stream (Greenwich.SR1) 和 kafka-client 和 kafka-stream 版本 2.2.1
MainApplication.java
@SpringBootApplication
public class KafkaStreamsTableJoin
public static void main(String[] args)
SpringApplication.run(KafkaStreamsTableJoin.class, args);
@EnableBinding(KStreamProcessorX.class)
public static class KStreamToTableJoinApplication
@StreamListener
public void process(@Input("person") KStream<PersonKey, Person> persons,
@Input("school") KStream<SchoolKey, School> schools)
//Messages with composite-keys e.g pojo UniqueIdKey.java
persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey1= " + key + ", PersonValue1= " + value))
.join(
schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey1= " + key + ", SchoolValue1= " + value)),
(person, school) ->
System.out.println("person1= " + person + ", school1= " + school); //**This never gets called**
return null;
,
JoinWindows.of(Duration.ofSeconds(5)),
Joined.with(
new UniqueIdKeySerde(),
new PersonSerde(),
new SchoolSerde())
);
//Messages with primitive keys e.g String
persons.selectKey((PersonKey, Person) -> PersonKey.getId()).peek((key, value) -> System.out.println("Personkey2= " + key + ", PersonValue2= " + value))
.join(
schools.selectKey((SchoolKey, School) -> SchoolKey.getId()).peek((key, value) -> System.out.println("Schoolkey2= " + key + ", SchoolValue2= " + value)),
(person, school) ->
System.out.println("person2= " + person + ", school2= " + school); //**This one works fine**
return null;
,
JoinWindows.of(Duration.ofSeconds(5)),
Joined.with(
Serdes.Integer(),
new PersonSerde(),
new SchoolSerde())
);
//Messages with composite-keys e.g pojo UniqueIdKey.java
persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey3= " + key + ", PersonValue3= " + value))
.join(
schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey3= " + key + ", SchoolValue3= " + value)),
new Joiner(), //**This never gets called**
JoinWindows.of(Duration.ofSeconds(5)),
Joined.with(
new UniqueIdKeySerde(),
new PersonSerde(),
new SchoolSerde())
);
interface KStreamProcessorX
@Input("person")
KStream<?, ?> inputPersonKStream();
@Input("school")
KStream<?, ?> inputSchoolKStream();
Joiner.java
public class Joiner implements ValueJoiner<Person, School, Null>
@Override
public Null apply(Person person, School school)
System.out.println("Joiner person3= " + person + " ,Joiner school3= " + school);
return null;
Person.java
public class Person
private double age;
public Person()
public Person(double age)
this.age = age;
@JsonGetter("age")
public double getAge()
return age;
@JsonSetter("age")
public void setAge(double age)
this.age = age;
@Override
public String toString()
return "Person" +
"age=" + age +
'';
PersonKey.java
public class PersonKey
private String firstName;
private String lastName;
private int id;
public PersonKey()
public PersonKey(String firstName, String lastName, int id)
this.firstName = firstName;
this.lastName = lastName;
this.id = id;
@JsonGetter("firstName")
public String getFirstName()
return firstName;
@JsonSetter("firstName")
public void setFirstName(String firstName)
this.firstName = firstName;
@JsonGetter("lastName")
public String getLastName()
return lastName;
@JsonSetter("lastName")
public void setLastName(String lastName)
this.lastName = lastName;
@JsonGetter("id")
public int getId()
return id;
@JsonSetter("id")
public void setId(int id)
this.id = id;
@Override
public String toString()
return "PersonKey" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", id=" + id +
'';
@Override
public boolean equals(Object o)
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersonKey personKey = (PersonKey) o;
return id == personKey.id &&
Objects.equals(firstName, personKey.firstName) &&
Objects.equals(lastName, personKey.lastName);
@Override
public int hashCode()
return Objects.hash(firstName, lastName, id);
School.java
public class School
private String address;
public School()
public School(String address)
this.address = address;
@JsonGetter("address")
public String getAddress()
return address;
@JsonSetter("address")
public void setAddress(String address)
this.address = address;
@Override
public String toString()
return "School" +
"address='" + address + '\'' +
'';
SchoolKey.java
public class SchoolKey
private String name;
private String country;
private String city;
private int id;
public SchoolKey()
public SchoolKey(String name, String country, String city, int id)
this.name = name;
this.country = country;
this.city = city;
this.id = id;
@JsonGetter("name")
public String getName()
return name;
@JsonSetter("name")
public void setName(String name)
this.name = name;
@JsonGetter("country")
public String getCountry()
return country;
@JsonSetter("country")
public void setCountry(String country)
this.country = country;
@JsonGetter("city")
public String getCity()
return city;
@JsonSetter("city")
public void setCity(String city)
this.city = city;
@JsonGetter("id")
public int getId()
return id;
@JsonSetter("id")
public void setId(int id)
this.id = id;
@Override
public String toString()
return "SchoolKey" +
"name='" + name + '\'' +
", country='" + country + '\'' +
", city='" + city + '\'' +
", id=" + id +
'';
@Override
public boolean equals(Object o)
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchoolKey schoolKey = (SchoolKey) o;
return id == schoolKey.id &&
Objects.equals(name, schoolKey.name) &&
Objects.equals(country, schoolKey.country) &&
Objects.equals(city, schoolKey.city);
@Override
public int hashCode()
return Objects.hash(name, country, city, id);
两个 KStream 都分别使用来自“人”和“学校”主题的数据。人和学校消息共享相同的“id”,基于该“id”执行内部连接。
人物.主题
CreateTime:1559902106959-"firstName":"JONH","lastName":"wICK","id":1-"age":34.0
CreateTime:1559902106986-"firstName":"Harley","lastName":"valla","id":2-"age":42.0
CreateTime:1559902106991-"firstName":"Mike","lastName":"PENCE","id":3-"age":23.0
CreateTime:1559902106996-"firstName":"Ali","lastName":"Akbar","id":4-"age":53.0
CreateTime:1559902107000-"firstName":"Arslan","lastName":"Akhtar","id":5-"age":53.0
CreateTime:1559902107005-"firstName":"Will","lastName":"David","id":6-"age":13.0
CreateTime:1559902107009-"firstName":"Beoionca","lastName":"Christ","id":7-"age":64.0
学校.主题
CreateTime:1559902107055-"name":"BMIA","country":"PK","city":"Islamabad","id":1-"address":"Sector F/8"
CreateTime:1559902107068-"name":"CMII","country":"Hk","city":"Rawalpindi","id":2-"address":"Sector G/8"
CreateTime:1559902107073-"name":"SCSV","country":"USA","city":"Lahore","id":3-"address":"Sector H/8"
CreateTime:1559902107079-"name":"NVS","country":"SW","city":"Faisalbad","id":4-"address":"Sector J/8"
CreateTime:1559902107082-"name":"SNVJ","country":"CH","city":"Shikarpur","id":5-"address":"Sector C/8"
CreateTime:1559902107088-"name":"DBJ","country":"CN","city":"Talaqand","id":6-"address":"Sector Z/8"
CreateTime:1559902107092-"name":"SCNJ","country":"SE","city":"Karachi","id":7-"address":"Sector S/8"
控制台输出结果
Personkey1= UniqueIdKeyid=1, PersonValue1= Personage=34.0
Personkey2= 1, PersonValue2= Personage=34.0
Personkey3= UniqueIdKeyid=1, PersonValue3= Personage=34.0
SchoolKey1= UniqueIdKeyid=1, SchoolValue1= Schooladdress='Sector F/8'
Schoolkey2= 1, SchoolValue2= Schooladdress='Sector F/8'
SchoolKey3= UniqueIdKeyid=1, SchoolValue3= Schooladdress='Sector F/8'
Personkey1= UniqueIdKeyid=2, PersonValue1= Personage=42.0
Personkey2= 2, PersonValue2= Personage=42.0
Personkey3= UniqueIdKeyid=2, PersonValue3= Personage=42.0
SchoolKey1= UniqueIdKeyid=2, SchoolValue1= Schooladdress='Sector G/8'
Schoolkey2= 2, SchoolValue2= Schooladdress='Sector G/8'
SchoolKey3= UniqueIdKeyid=2, SchoolValue3= Schooladdress='Sector G/8'
Personkey1= UniqueIdKeyid=3, PersonValue1= Personage=23.0
Personkey2= 3, PersonValue2= Personage=23.0
Personkey3= UniqueIdKeyid=3, PersonValue3= Personage=23.0
SchoolKey1= UniqueIdKeyid=3, SchoolValue1= Schooladdress='Sector H/8'
Schoolkey2= 3, SchoolValue2= Schooladdress='Sector H/8'
SchoolKey3= UniqueIdKeyid=3, SchoolValue3= Schooladdress='Sector H/8'
Personkey1= UniqueIdKeyid=4, PersonValue1= Personage=53.0
Personkey2= 4, PersonValue2= Personage=53.0
Personkey3= UniqueIdKeyid=4, PersonValue3= Personage=53.0
SchoolKey1= UniqueIdKeyid=4, SchoolValue1= Schooladdress='Sector J/8'
Schoolkey2= 4, SchoolValue2= Schooladdress='Sector J/8'
SchoolKey3= UniqueIdKeyid=4, SchoolValue3= Schooladdress='Sector J/8'
Personkey1= UniqueIdKeyid=5, PersonValue1= Personage=53.0
Personkey2= 5, PersonValue2= Personage=53.0
Personkey3= UniqueIdKeyid=5, PersonValue3= Personage=53.0
SchoolKey1= UniqueIdKeyid=5, SchoolValue1= Schooladdress='Sector C/8'
Schoolkey2= 5, SchoolValue2= Schooladdress='Sector C/8'
SchoolKey3= UniqueIdKeyid=5, SchoolValue3= Schooladdress='Sector C/8'
Personkey1= UniqueIdKeyid=6, PersonValue1= Personage=13.0
Personkey2= 6, PersonValue2= Personage=13.0
Personkey3= UniqueIdKeyid=6, PersonValue3= Personage=13.0
SchoolKey1= UniqueIdKeyid=6, SchoolValue1= Schooladdress='Sector Z/8'
Schoolkey2= 6, SchoolValue2= Schooladdress='Sector Z/8'
SchoolKey3= UniqueIdKeyid=6, SchoolValue3= Schooladdress='Sector Z/8'
Personkey1= UniqueIdKeyid=7, PersonValue1= Personage=64.0
Personkey2= 7, PersonValue2= Personage=64.0
Personkey3= UniqueIdKeyid=7, PersonValue3= Personage=64.0
SchoolKey1= UniqueIdKeyid=7, SchoolValue1= Schooladdress='Sector S/8'
Schoolkey2= 7, SchoolValue2= Schooladdress='Sector S/8'
SchoolKey3= UniqueIdKeyid=7, SchoolValue3= Schooladdress='Sector S/8'
person2= Personage=34.0, school2= Schooladdress='Sector F/8'
person2= Personage=42.0, school2= Schooladdress='Sector G/8'
person2= Personage=23.0, school2= Schooladdress='Sector H/8'
person2= Personage=53.0, school2= Schooladdress='Sector J/8'
person2= Personage=53.0, school2= Schooladdress='Sector C/8'
person2= Personage=13.0, school2= Schooladdress='Sector Z/8'
person2= Personage=64.0, school2= Schooladdress='Sector S/8'
UniqueIdKeySerde.java
import kafka.streams.join.UniqueIdKey;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
public class UniqueIdKeySerde extends Serdes.WrapperSerde<UniqueIdKey>
public UniqueIdKeySerde ()
super(new JsonSerializer<UniqueIdKey>(), new JsonDeserializer<UniqueIdKey>(UniqueIdKey.class));
示例spring-cloud-stream 应用程序具有可重现的调试步骤
【问题讨论】:
快速浏览一下,我不确定我是否能够关注这些问题。如果您可以提供一个带有可重现的调试步骤的小型示例应用程序,将会很有帮助。 我在上面已编辑问题的末尾添加了示例应用程序链接。谢谢 【参考方案1】:当 Kafka Streams 计算 join 的聚合时,它在比较键时不比较 Java objects,但它比较键 byte[]
数组,即序列化的密钥。因此,equals()
和 hashCode
未被使用。
您需要确保使用的序列化程序为键写入匹配的 byte[]
数组以使连接工作。
【讨论】:
感谢您的回复。我在上面编辑的问题中添加了 UniqueIdKeySerde.java 代码作为我正在使用的 Serde。 PersonKeySerde.java 和 SchoolKeySerde.java 是相似的。请问我是否应该只在消费者级别重构 UniqueIdKeySerde,因为这是两个 KStreams 的最终匹配复合键(UniqueIdKey)?另外,如果您能提供一个简单的代码示例,说明如何为自定义 pojo 实现 Serde(为键编写匹配的 byte[] 数组以使连接工作),我将不胜感激。非常感谢 嗯...乍一看,我无法发现 Serde 的问题,但我不是 JSON 专家,也不熟悉 Spring 的 JSON Serde。顺便说一句:Kafka 也包含一个 JSON serde。---如果您的原始类型连接有效,您也可以使用它,并添加一个selectKey()
在写入结果之前将原始密钥更改为 JSON 密钥(如果您想要 JSON 作为输出格式)。
也许这有助于作为如何编写序列化器的示例(同一个包包含反序列化器和 Serde):github.com/apache/kafka/blob/trunk/streams/src/main/java/org/… - 该示例实际上可能更先进,因为您需要它,因为is 包装了其他序列化程序——您可以直接“嵌入”其他序列化程序的代码。【参考方案2】:
Serde 示例:
public class StateProvinceKeySerde extends JsonSerde<StateProvinceKey>
public StateProvinceKeySerde()
super(StateProvinceKey.class);
关键示例:
public class StateProvinceKey
private String stateCode;
private String countryCodeAlpha2;
public IBMStateProvinceKey()
public StateProvinceKey(String stateCode, String countryCodeAlpha2)
this.stateCode = stateCode;
this.countryCodeAlpha2 = countryCodeAlpha2;
public String getStateCode()
return stateCode;
public void setStateCode(String stateCode)
this.stateCode = stateCode;
public String getCountryCodeAlpha2()
return countryCodeAlpha2;
public void setCountryCodeAlpha2(String countryCodeAlpha2)
this.countryCodeAlpha2 = countryCodeAlpha2;
public byte[] serialize()
ObjectMapper objectMapper = new ObjectMapper();
try
return objectMapper.writeValueAsBytes(this);
catch (JsonProcessingException e)
return new byte[0];
【讨论】:
感谢您的回复。我尝试通过在 UniqueIdKeySerde.java 和 UniqueIdKey.java 中进行更改来使用您的 Serde,但响应仍然相同。不用找了。复合键 pojo 仍然不允许 KStreram-KStream 加入。 检查一下spring cloud stream的版本,试试最新的。我做了一个示例,使用复合键加入 GlobalKtable,它可以工作。不知道为什么你的例子不起作用。 您能否尝试使用复合键加入 KStream-KStream。由于某种原因,复合键连接适用于使用简单序列化程序的 KStream-KTable 连接,但不适用于 KStream-KStream 连接。以上是关于KStream-KStream 内连接基于具有匹配组合键的消息的主要内容,如果未能解决你的问题,请参考以下文章