KStream-KStream 内连接基于具有匹配组合键的消息

Posted

技术标签:

【中文标题】KStream-KStream 内连接基于具有匹配组合键的消息【英文标题】:KStream-KStream inner join based on messages with matching composite-key 【发布时间】:2019-10-22 22:02:43 【问题描述】:

我正在尝试在 KStream-KStream 之间执行内部连接。 我观察到,当来自两个 KStreams 的消息都具有复合键(例如具有许多属性的 java pojo)时,即使使用了 pojo,连接也不起作用 因为复合键实现了 hashCode() 和 equals(Object o) 方法。

UniqueIdKey.java

public class UniqueIdKey 

    private int id;

    public UniqueIdKey() 
    

    public UniqueIdKey(int id) 
        this.id = id;
    

    @JsonGetter("id")
    public int getId() 
        return id;
    

    @JsonSetter("id")
    public void setId(int id) 
        this.id = id;
    

    @Override
    public String toString() 
        return "UniqueIdKey" +
                "id=" + id +
                '';
    

    @Override
    public boolean equals(Object o) 
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        UniqueIdKey that = (UniqueIdKey) o;
        return id == that.id;
    

    @Override
    public int hashCode() 
        return Objects.hash(id);
    

当两个 KStream 的消息带有简单的原始键(例如 String、int、double)时,内连接可以正常工作

我正在使用最新的 spring-cloud-stream (Greenwich.SR1) 和 kafka-client 和 kafka-stream 版本 2.2.1

MainApplication.java

@SpringBootApplication
public class KafkaStreamsTableJoin 

    public static void main(String[] args) 
        SpringApplication.run(KafkaStreamsTableJoin.class, args);
    

    @EnableBinding(KStreamProcessorX.class)
    public static class KStreamToTableJoinApplication 

        @StreamListener
        public void process(@Input("person") KStream<PersonKey, Person> persons,
                                             @Input("school") KStream<SchoolKey, School> schools) 

            //Messages with composite-keys e.g pojo UniqueIdKey.java
            persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey1= " + key + ", PersonValue1= " + value))
                    .join(
                            schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey1= " + key + ", SchoolValue1= " + value)),
                            (person, school) -> 
                                System.out.println("person1= " + person + ", school1= " + school); //**This never gets called**
                                return null;
                            ,
                            JoinWindows.of(Duration.ofSeconds(5)),
                            Joined.with(
                                    new UniqueIdKeySerde(),
                                    new PersonSerde(),
                                    new SchoolSerde())
            );
            //Messages with primitive keys e.g String
            persons.selectKey((PersonKey, Person) -> PersonKey.getId()).peek((key, value) -> System.out.println("Personkey2= " + key + ", PersonValue2= " + value))
                    .join(
                            schools.selectKey((SchoolKey, School) -> SchoolKey.getId()).peek((key, value) -> System.out.println("Schoolkey2= " + key + ", SchoolValue2= " + value)),
                            (person, school) -> 
                                System.out.println("person2= " + person + ", school2= " + school); //**This one works fine**
                                return null;
                            ,
                            JoinWindows.of(Duration.ofSeconds(5)),
                            Joined.with(
                                    Serdes.Integer(),
                                    new PersonSerde(),
                                    new SchoolSerde())
                    );
            //Messages with composite-keys e.g pojo UniqueIdKey.java
            persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey3= " + key + ", PersonValue3= " + value))
                    .join(
                            schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey3= " + key + ", SchoolValue3= " + value)),
                            new Joiner(),                           //**This never gets called**
                            JoinWindows.of(Duration.ofSeconds(5)),
                            Joined.with(
                                    new UniqueIdKeySerde(),
                                    new PersonSerde(),
                                    new SchoolSerde())

                    );
        
    
    interface KStreamProcessorX 

        @Input("person")
        KStream<?, ?> inputPersonKStream();

        @Input("school")
        KStream<?, ?> inputSchoolKStream();
    

Joiner.java

public class Joiner implements ValueJoiner<Person, School, Null> 

    @Override
    public Null apply(Person person, School school) 
        System.out.println("Joiner person3= " + person + " ,Joiner school3= " + school);
        return null;
    

Person.java

public class Person 

    private double age;

    public Person() 
    

    public Person(double age) 
        this.age = age;
    

    @JsonGetter("age")
    public double getAge() 
        return age;
    

    @JsonSetter("age")
    public void setAge(double age) 
        this.age = age;
    

    @Override
    public String toString() 
        return "Person" +
                "age=" + age +
                '';
    

PersonKey.java

public class PersonKey 

    private String firstName;
    private String lastName;
    private int id;

    public PersonKey() 
    

    public PersonKey(String firstName, String lastName, int id) 
        this.firstName = firstName;
        this.lastName = lastName;
        this.id = id;
    

    @JsonGetter("firstName")
    public String getFirstName() 
        return firstName;
    

    @JsonSetter("firstName")
    public void setFirstName(String firstName) 
        this.firstName = firstName;
    

    @JsonGetter("lastName")
    public String getLastName() 
        return lastName;
    

    @JsonSetter("lastName")
    public void setLastName(String lastName) 
        this.lastName = lastName;
    

    @JsonGetter("id")
    public int getId() 
        return id;
    

    @JsonSetter("id")
    public void setId(int id) 
        this.id = id;
    

    @Override
    public String toString() 
        return "PersonKey" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                ", id=" + id +
                '';
    

    @Override
    public boolean equals(Object o) 
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PersonKey personKey = (PersonKey) o;
        return id == personKey.id &&
                Objects.equals(firstName, personKey.firstName) &&
                Objects.equals(lastName, personKey.lastName);
    

    @Override
    public int hashCode() 
        return Objects.hash(firstName, lastName, id);
    

School.java

public class School 

    private String address;

    public School() 
    

    public School(String address) 
        this.address = address;
    

    @JsonGetter("address")
    public String getAddress() 
        return address;
    

    @JsonSetter("address")
    public void setAddress(String address) 
        this.address = address;
    

    @Override
    public String toString() 
        return "School" +
                "address='" + address + '\'' +
                '';
    

SchoolKey.java

public class SchoolKey 

    private String name;
    private String country;
    private String city;
    private int id;

    public SchoolKey() 
    

    public SchoolKey(String name, String country, String city, int id) 
        this.name = name;
        this.country = country;
        this.city = city;
        this.id = id;
    

    @JsonGetter("name")
    public String getName() 
        return name;
    

    @JsonSetter("name")
    public void setName(String name) 
        this.name = name;
    

    @JsonGetter("country")
    public String getCountry() 
        return country;
    

    @JsonSetter("country")
    public void setCountry(String country) 
        this.country = country;
    

    @JsonGetter("city")
    public String getCity() 
        return city;
    

    @JsonSetter("city")
    public void setCity(String city) 
        this.city = city;
    

    @JsonGetter("id")
    public int getId() 
        return id;
    

    @JsonSetter("id")
    public void setId(int id) 
        this.id = id;
    

    @Override
    public String toString() 
        return "SchoolKey" +
                "name='" + name + '\'' +
                ", country='" + country + '\'' +
                ", city='" + city + '\'' +
                ", id=" + id +
                '';
    

    @Override
    public boolean equals(Object o) 
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        SchoolKey schoolKey = (SchoolKey) o;
        return id == schoolKey.id &&
                Objects.equals(name, schoolKey.name) &&
                Objects.equals(country, schoolKey.country) &&
                Objects.equals(city, schoolKey.city);
    

    @Override
    public int hashCode() 
        return Objects.hash(name, country, city, id);
    

两个 KStream 都分别使用来自“人”和“学校”主题的数据。人和学校消息共享相同的“id”,基于该“id”执行内部连接。

人物.主题

CreateTime:1559902106959-"firstName":"JONH","lastName":"wICK","id":1-"age":34.0
CreateTime:1559902106986-"firstName":"Harley","lastName":"valla","id":2-"age":42.0
CreateTime:1559902106991-"firstName":"Mike","lastName":"PENCE","id":3-"age":23.0
CreateTime:1559902106996-"firstName":"Ali","lastName":"Akbar","id":4-"age":53.0
CreateTime:1559902107000-"firstName":"Arslan","lastName":"Akhtar","id":5-"age":53.0
CreateTime:1559902107005-"firstName":"Will","lastName":"David","id":6-"age":13.0
CreateTime:1559902107009-"firstName":"Beoionca","lastName":"Christ","id":7-"age":64.0

学校.主题

CreateTime:1559902107055-"name":"BMIA","country":"PK","city":"Islamabad","id":1-"address":"Sector F/8"
CreateTime:1559902107068-"name":"CMII","country":"Hk","city":"Rawalpindi","id":2-"address":"Sector G/8"
CreateTime:1559902107073-"name":"SCSV","country":"USA","city":"Lahore","id":3-"address":"Sector H/8"
CreateTime:1559902107079-"name":"NVS","country":"SW","city":"Faisalbad","id":4-"address":"Sector J/8"
CreateTime:1559902107082-"name":"SNVJ","country":"CH","city":"Shikarpur","id":5-"address":"Sector C/8"
CreateTime:1559902107088-"name":"DBJ","country":"CN","city":"Talaqand","id":6-"address":"Sector Z/8"
CreateTime:1559902107092-"name":"SCNJ","country":"SE","city":"Karachi","id":7-"address":"Sector S/8"

控制台输出结果

Personkey1= UniqueIdKeyid=1, PersonValue1= Personage=34.0
Personkey2= 1, PersonValue2= Personage=34.0
Personkey3= UniqueIdKeyid=1, PersonValue3= Personage=34.0
SchoolKey1= UniqueIdKeyid=1, SchoolValue1= Schooladdress='Sector F/8'
Schoolkey2= 1, SchoolValue2= Schooladdress='Sector F/8'
SchoolKey3= UniqueIdKeyid=1, SchoolValue3= Schooladdress='Sector F/8'
Personkey1= UniqueIdKeyid=2, PersonValue1= Personage=42.0
Personkey2= 2, PersonValue2= Personage=42.0
Personkey3= UniqueIdKeyid=2, PersonValue3= Personage=42.0
SchoolKey1= UniqueIdKeyid=2, SchoolValue1= Schooladdress='Sector G/8'
Schoolkey2= 2, SchoolValue2= Schooladdress='Sector G/8'
SchoolKey3= UniqueIdKeyid=2, SchoolValue3= Schooladdress='Sector G/8'
Personkey1= UniqueIdKeyid=3, PersonValue1= Personage=23.0
Personkey2= 3, PersonValue2= Personage=23.0
Personkey3= UniqueIdKeyid=3, PersonValue3= Personage=23.0
SchoolKey1= UniqueIdKeyid=3, SchoolValue1= Schooladdress='Sector H/8'
Schoolkey2= 3, SchoolValue2= Schooladdress='Sector H/8'
SchoolKey3= UniqueIdKeyid=3, SchoolValue3= Schooladdress='Sector H/8'
Personkey1= UniqueIdKeyid=4, PersonValue1= Personage=53.0
Personkey2= 4, PersonValue2= Personage=53.0
Personkey3= UniqueIdKeyid=4, PersonValue3= Personage=53.0
SchoolKey1= UniqueIdKeyid=4, SchoolValue1= Schooladdress='Sector J/8'
Schoolkey2= 4, SchoolValue2= Schooladdress='Sector J/8'
SchoolKey3= UniqueIdKeyid=4, SchoolValue3= Schooladdress='Sector J/8'
Personkey1= UniqueIdKeyid=5, PersonValue1= Personage=53.0
Personkey2= 5, PersonValue2= Personage=53.0
Personkey3= UniqueIdKeyid=5, PersonValue3= Personage=53.0
SchoolKey1= UniqueIdKeyid=5, SchoolValue1= Schooladdress='Sector C/8'
Schoolkey2= 5, SchoolValue2= Schooladdress='Sector C/8'
SchoolKey3= UniqueIdKeyid=5, SchoolValue3= Schooladdress='Sector C/8'
Personkey1= UniqueIdKeyid=6, PersonValue1= Personage=13.0
Personkey2= 6, PersonValue2= Personage=13.0
Personkey3= UniqueIdKeyid=6, PersonValue3= Personage=13.0
SchoolKey1= UniqueIdKeyid=6, SchoolValue1= Schooladdress='Sector Z/8'
Schoolkey2= 6, SchoolValue2= Schooladdress='Sector Z/8'
SchoolKey3= UniqueIdKeyid=6, SchoolValue3= Schooladdress='Sector Z/8'
Personkey1= UniqueIdKeyid=7, PersonValue1= Personage=64.0
Personkey2= 7, PersonValue2= Personage=64.0
Personkey3= UniqueIdKeyid=7, PersonValue3= Personage=64.0
SchoolKey1= UniqueIdKeyid=7, SchoolValue1= Schooladdress='Sector S/8'
Schoolkey2= 7, SchoolValue2= Schooladdress='Sector S/8'
SchoolKey3= UniqueIdKeyid=7, SchoolValue3= Schooladdress='Sector S/8'
person2= Personage=34.0, school2= Schooladdress='Sector F/8'
person2= Personage=42.0, school2= Schooladdress='Sector G/8'
person2= Personage=23.0, school2= Schooladdress='Sector H/8'
person2= Personage=53.0, school2= Schooladdress='Sector J/8'
person2= Personage=53.0, school2= Schooladdress='Sector C/8'
person2= Personage=13.0, school2= Schooladdress='Sector Z/8'
person2= Personage=64.0, school2= Schooladdress='Sector S/8'

UniqueIdKeySerde.java

import kafka.streams.join.UniqueIdKey;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

public class UniqueIdKeySerde extends Serdes.WrapperSerde<UniqueIdKey> 
    public UniqueIdKeySerde () 
        super(new JsonSerializer<UniqueIdKey>(), new JsonDeserializer<UniqueIdKey>(UniqueIdKey.class));
    

示例spring-cloud-stream 应用程序具有可重现的调试步骤

【问题讨论】:

快速浏览一下,我不确定我是否能够关注这些问题。如果您可以提供一个带有可重现的调试步骤的小型示例应用程序,将会很有帮助。 我在上面已编辑问题的末尾添加了示例应用程序链接。谢谢 【参考方案1】:

当 Kafka Streams 计算 join 的聚合时,它在比较键时比较 Java objects,但它比较键 byte[] 数组,即序列化的密钥。因此,equals()hashCode 未被使用。

您需要确保使用的序列化程序为键写入匹配的 byte[] 数组以使连接工作。

【讨论】:

感谢您的回复。我在上面编辑的问题中添加了 UniqueIdKeySerde.java 代码作为我正在使用的 Serde。 PersonKeySerde.java 和 SchoolKeySerde.java 是相似的。请问我是否应该只在消费者级别重构 UniqueIdKeySerde,因为这是两个 KStreams 的最终匹配复合键(UniqueIdKey)?另外,如果您能提供一个简单的代码示例,说明如何为自定义 pojo 实现 Serde(为键编写匹配的 byte[] 数组以使连接工作),我将不胜感激。非常感谢 嗯...乍一看,我无法发现 Serde 的问题,但我不是 JSON 专家,也不熟悉 Spring 的 JSON Serde。顺便说一句:Kafka 也包含一个 JSON serde。---如果您的原始类型连接有效,您也可以使用它,并添加一个 selectKey() 在写入结果之前将原始密钥更改为 JSON 密钥(如果您想要 JSON 作为输出格式)。 也许这有助于作为如何编写序列化器的示例(同一个包包含反序列化器和 Serde):github.com/apache/kafka/blob/trunk/streams/src/main/java/org/… - 该示例实际上可能更先进,因为您需要它,因为is 包装了其他序列化程序——您可以直接“嵌入”其他序列化程序的代码。【参考方案2】:

Serde 示例:

public class StateProvinceKeySerde extends JsonSerde<StateProvinceKey> 

  public StateProvinceKeySerde() 
    super(StateProvinceKey.class);
  

关键示例:

public class StateProvinceKey 

  private String stateCode;
  private String countryCodeAlpha2;

  public IBMStateProvinceKey() 

  

  public StateProvinceKey(String stateCode, String countryCodeAlpha2) 
    this.stateCode = stateCode;
    this.countryCodeAlpha2 = countryCodeAlpha2;
  

  public String getStateCode() 
    return stateCode;
  

  public void setStateCode(String stateCode) 
    this.stateCode = stateCode;
  

  public String getCountryCodeAlpha2() 
    return countryCodeAlpha2;
  

  public void setCountryCodeAlpha2(String countryCodeAlpha2) 
    this.countryCodeAlpha2 = countryCodeAlpha2;
  

  public byte[] serialize()
    ObjectMapper objectMapper = new ObjectMapper();
    try 
      return objectMapper.writeValueAsBytes(this);
     catch (JsonProcessingException e) 
    
    return new byte[0];
  

【讨论】:

感谢您的回复。我尝试通过在 UniqueIdKeySerde.java 和 UniqueIdKey.java 中进行更改来使用您的 Serde,但响应仍然相同。不用找了。复合键 pojo 仍然不允许 KStreram-KStream 加入。 检查一下spring cloud stream的版本,试试最新的。我做了一个示例,使用复合键加入 GlobalKtable,它可以工作。不知道为什么你的例子不起作用。 您能否尝试使用复合键加入 KStream-KStream。由于某种原因,复合键连接适用于使用简单序列化程序的 KStream-KTable 连接,但不适用于 KStream-KStream 连接。

以上是关于KStream-KStream 内连接基于具有匹配组合键的消息的主要内容,如果未能解决你的问题,请参考以下文章

去重 KStream-KStream 的中间结果加入 Kafka Streams

MySQL:基于另一个并不总是具有匹配值的连接表进行排序

基于 SQL 查询中最接近的文本匹配连接表?

基于另一列的最大值的列上的 SQL 内连接 [重复]

内连接外连接的区别?

数据库操作-内连接外连接