Spring Kafka - 事件溯源 - 如何使用 Kafka + KafkaStreams API 查询某些实体状态的示例

Posted

技术标签:

【中文标题】Spring Kafka - 事件溯源 - 如何使用 Kafka + KafkaStreams API 查询某些实体状态的示例【英文标题】:Spring Kafka - Event sourcing - Example of how to query some entity state using Kafka + KafkaStreams API 【发布时间】:2017-12-15 14:03:15 【问题描述】:

我正在使用 Kafka 来实现基于事件溯源的架构。

假设我以 JSON 格式存储事件:

"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" 

我想执行一个查询来获取某个日期的 productId=X 产品的数量。

你能用 Spring Kafka KStreams 展示这个查询的大致实现吗?

更新:我使用 Spring Kafka KStreams 对此进行了一些改进,但我遇到了反序列化错误。

这是我的 Spring Cloud Stream Kafka Producer

public interface ProductProducer

    final String OUTPUT = "productsOut";

    @Output(ProductProducer.OUTPUT)
    MessageChannel output();


配置:

spring:
  application:
    name: product-generator-service
  cloud:
    stream:
      kafka:
        binder:
          brokers:
          - kafka
          zk-nodes:
          - kafka
        bindings:
          productsOut:
            producer:
              sync: true
      bindings:
        productsOut: 
          destination: orders
          content-type: application/json

我使用以下代码发送消息,将 Map 正确序列化为 JSON 对象:

Map<String, Object> event = new HashMap<>();
event.put("name", "ProductCreated");
event.put("productId", product.getId());
event.put("quantity", product.getQuantity());
event.put("dateAdded", new Date());
        productProducer.output().send(MessageBuilder.withPayload(event).build(), 500);

MessageBuilder.withPayload(event).build() -> GenericMessage [payload=quantity=1, productId=1, name=ProductCreated, dateAdded="xxxxx", headers=id=fc531176-e3e9-61b8-40e3-08074fabee4d, timestamp=1499845483095]

ProductService 应用程序中,我可以使用 Spring Cloud Stream 侦听器阅读此消息:

@Component
public class ProductListener

    @StreamListener(ProductConsumer.INPUT)
    public void handleProduct(Map<String, Object> event)

但是使用 KStream 我遇到了反序列化错误:

@Configuration
public class KStreamsConfig 

    private static final String STREAMING_TOPIC1 = "orders";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() 
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-kstream");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    

    @Bean
    public FactoryBean<KStreamBuilder> myKStreamBuilder(StreamsConfig streamsConfig) 
        return new KStreamBuilderFactoryBean(streamsConfig);
    

    @Bean
    public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder) 

        Serde<Integer> integerSerde = Serdes.Integer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<Integer, JsonNode> stream = kStreamBuilder.stream(null, integerSerde, jsonSerde, STREAMING_TOPIC1);
        stream.print();
        return stream;
    


例外

org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
 at [Source: [B@288e4e9a; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
 at [Source: [B@288e4e9a; line: 1, column: 4]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3528)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:30)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46)
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

更新 2:

为了找出 KStream 中的内容,我将键和值都更改为字符串反序列化器,这就是正在打印的内容:

KStream<Integer, String> stream = kStreamBuilder.stream(null, integerSerde, stringSerde, STREAMING_TOPIC1);

打印值:

[KSTREAM-SOURCE-0000000000]: null , �contentType

为什么我没有得到 JSON 字符串?

更新 3: 我修复了反序列化问题,原因是消息生产者(Spring Cloud Stream)默认添加了一些标头作为有效负载的一部分。我只需要禁用此标头包含即可开始在 Kafka Streams 中正确接收消息:

spring:
  application:
    name: product-service
  cloud:
    stream:
      kafka:
        binder:
          brokers:
          - kafka
          zk-nodes:
          - kafka
        bindings:
          productsOut:
            producer:
              sync: true
      bindings:
        productsIn:
          group: product-service 
          destination: orders
          consumer:
            max-attempts: 5
            header-mode: raw
        productsOut: 
          destination: orders
          content-type: application/json
          producer:
            header-mode: raw

KStream定义:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

输出:

[KSTREAM-SOURCE-0000000000]: null , "quantity":0,"productId":0,"name":"ProductCreated","dateAdded":1499930385450

现在一切都已正确设置:如何实现我需要的交互式查询? -> 获取某天productId=X的产品数量

【问题讨论】:

在此处查看示例:github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/… 也请查看此博客文章:confluent.io/blog/… 嗨,Matthias,感谢您指出该示例。我已经进步了一点,但我现在遇到了反序列化错误。 如果将KStream&lt;Integer, JsonNode&gt; stream = kStreamBuilder.stream(null, integerSerde, jsonSerde, STREAMING_TOPIC1); 更改为KStream&lt;Integer, JsonNode&gt; stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1); 会发生什么? 同样的事情:Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null') 您确定数据写入正确吗?也许你可以使用控制台消费者来验证? 【参考方案1】:

我设法通过混合使用 Spring Cloud Streams(生成消息)和 Spring Kafka 来处理 KafkaStreams 并实现交互式查询来解决这个问题(重要:请注意问题更新 3:到能够结合两者):

Kafka 流配置

@Configuration
public class KStreamsConfig 

    private static final String STREAMING_TOPIC1 = "orders";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() 
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-streams");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    

    @Bean
    public KStreamBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) 
        return new KStreamBuilderFactoryBean(streamsConfig);
    

    @Bean
    public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) 

        Serde<Integer> integerSerde = Serdes.Integer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

        stream.map( (key, value) -> 
            return new KeyValue<>(value.get("productId").asInt(), value.get("quantity").asInt());
        ).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");

        stream.print();
        return stream;
    


请注意我如何生成 KTable 存储 ProductsStock,稍后我将在服务中查询。

产品服务

@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;

@Override
    public Integer getProductStock(Integer id) 
        KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
        ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
        streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
        return keyValueStore.get(id);

【讨论】:

从 spring 上下文中获取 store 的方法为我节省了大量时间!【参考方案2】:

即将发布的 spring cloud stream kafka binder 1.3.0.M1 版本将支持 kstream binding。 有一个PR,您可以在其中跟踪该计划的进度。

这是一个使用 KStream binder 的更一般的示例 (WordCount):WordCount Sample using Spring Cloud Stream support for Kafka Streams

有了这个,你可以实现你正在寻找的东西 通过以下方式。

此 StreamListener 方法将侦听 Kafka 主题,并在过去 30 秒的时间窗口内写入 ID 等于 123 的产品计数的另一个主题。

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class ProductCountApplication 

  public static final int = 123;

  @StreamListener("input")
  @SendTo("output")
  public KStream<?, String> process(KStream<?, Product> input) 

        return input
                .filter((key, product) -> product.getID() == PRODUCT_ID)
                .map((k,v) -> new KeyValue<>(v, v))
                .groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
                .count(TimeWindows.of(30000), "product-store")
                .toStream()
                .map((w,c) -> new KeyValue<>(null, "Product with id 123 count: " + c));
  


这里是使用的application.yml:

spring.cloud.stream.kstream.binder.streamConfiguration:
  key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the key
  value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the value
spring.cloud.stream.bindings.output.producer:
  headerMode: raw # Incoming data has no embedded headers
  useNativeEncoding: true # Write data using the native Serde
spring.cloud.stream.bindings.input.consumer:
  headerMode: raw # Outbound data has no embedded headers

运行程序时,需要传入输入/输出目的地(topics):

--spring.cloud.stream.bindings.input.destination=products 
--spring.cloud.stream.bindings.output.destination=counts

【讨论】:

感谢 Spring Cloud Stream 下一个版本的更新。虽然这可能是一个解决方案,但我想使用交互式查询(使用ReadOnlyKeyValueStore 来查询值),而不是另一个主题。使用这种方法,我们是否可以使用类似:.groupByKey().reduce( (v1, v2) -&gt; v1 + v2, "ProductsStock"); 来生成可查询的 KTable?此外,我们还需要访问 KafkaStreams 对象才能访问商店:streams.store("ProductsStock", QueryableStoreTypes.keyValueStore()); 我们如何访问它?

以上是关于Spring Kafka - 事件溯源 - 如何使用 Kafka + KafkaStreams API 查询某些实体状态的示例的主要内容,如果未能解决你的问题,请参考以下文章

基于Kafka构建事件溯源模式的微服务

基于Kafka构建事件溯源模式的微服务

「事件驱动架构」事件溯源,CQRS,流处理和Kafka之间的多角关系

基于 Kafka 的事件溯源并发写入

微服务、REST、事件溯源和数据一致性

Apache Kafka 中的事件外包