事件采购 - Apache Kafka + Kafka Streams - 如何确保原子性/交易性

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了事件采购 - Apache Kafka + Kafka Streams - 如何确保原子性/交易性相关的知识,希望对你有一定的参考价值。

我正在使用Apache Kafka Streams评估事件采购,以了解复杂场景的可行性。与关系数据库一样,我遇到过一些案例,原子性/事务性是必不可少的:

购物应用程序有两个服务:

  • OrderService:有一个带有订单的Kafka Streams商店(OrdersStore)
  • ProductService:拥有Kafka Streams商店(ProductStockStore)的产品及其库存。

流:

  1. OrderService发布OrderCreated事件(带有productId,orderId,userId信息)
  2. ProductService获取OrderCreated事件并查询其KafkaStreams Store(ProductStockStore)以检查产品是否有库存。如果有库存,它会发布OrderUpdated事件(也包含productId,orderId,userId info)

关键是这个事件将由ProductService Kafka Stream监听,它会处理它以减少库存,到目前为止一直很好。

但是,想象一下:

  1. 客户1下订单,订单1(产品库存为1)
  2. 客户2同时放置另一个订单,order2,用于同一产品(库存仍为1)
  3. ProductService处理order1并发送消息OrderUpdated以减少库存。此消息放在order2 - > OrderCreated之后的主题中
  4. ProductService处理order2-OrderCreated并发送消息OrderUpdated以再次减少库存。这是不正确的,因为它会引入不一致(库存现在应为0)。

显而易见的问题是,当我们处理第一个OrderUpdated事件时,我们的物化视图(商店)应该直接更新。然而,更新Kafka Stream Store的唯一方法(我知道)是发布另一个由Kafka Stream处理的事件(OrderUpdated)。这样我们就无法以事务方式执行此更新。

我希望能够处理这样的场景。

更新:我将尝试澄清问题的有问题:

ProductService有一个Kafka Streams商店,产品与库存(productId=1, quantity=1)

OrderService在订单主题上发布两个OrderPlaced事件:

  • Event1 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")
  • Event2 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")

ProductService在订单主题上有一个消费者。为简单起见,我们假设一个分区来确保消息按顺序消耗。此使用者执行以下逻辑:

if("OrderPlaced".equals(event.get("eventType"))){

    Order order = new Order();
    order.setId((String)event.get("orderId"));
    order.setProductId((Integer)(event.get("productId")));
    order.setUid(event.get("uid").toString());

    // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
    Integer productStock = getProductStock(order.getProductId());

    if(productStock > 0) {
        Map<String, Object> event = new HashMap<>();
        event.put("name", "ProductReserved");
        event.put("orderId", order.getId());
        event.put("productId", order.getProductId());

        // WRITES A PRODUCT RESERVED EVENT TO orders topic
        orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
    }else{
        //XXX CANCEL ORDER
    }
}

ProductService还有一个Kafka Streams处理器,负责更新库存:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
stream.xxx().yyy(() -> {...}, "ProductsStock");

Event1将首先被处理,因为仍有1个可用产品,它将生成ProductReserved事件。

现在,它已经是Event2了。如果ProductService使用者消耗它,那么在ProductService Kafka Streams Processor处理Event1生成的ProductReseved事件之前,消费者仍会看到product1的ProductStore库存为1,为Event2生成ProductReserved事件,然后在系统中产生不一致。

答案

对于你原来的问题,这个答案有点迟了,但我还是要回答完整性。

有很多方法可以解决这个问题,但我鼓励解决这个问题,这是一种事件驱动的方式。这意味着您(a)验证有足够的库存来处理订单,以及(b)将库存保留为单一库存,所有这些都在一个KStreams操作中。诀窍是通过productId重新生成密钥,这样您就知道同一产品的订单将在同一个线程上顺序执行(因此您无法进入Order1和Order2两次保留同一产品库存的情况)。

有一篇文章讨论了如何做到这一点:https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/

也许更有用的是有一些示例代码也显示了如何完成它:https://github.com/confluentinc/kafka-streams-examples/blob/1cbcaddd85457b39ee6e9050164dc619b08e9e7d/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java#L76

请注意,在此KStreams代码中,第一行如何重新生成productId,然后使用Transformer(a)验证是否有足够的库存来处理订单,以及(b)通过更新状态存储来保留所需的库存。这是使用Kafka的交易功能以原子方式完成的。

另一答案

同样的问题在确保任何分布式系统的一致性方面是典型的。通常使用流程管理器/传奇模式,而不是强烈的一致性。这有点类似于分布式事务中的两阶段提交,但在应用程序代码中明确实现。它是这样的:

订购服务要求产品服务部门保留N个项目。产品服务接受命令并减少库存或拒绝该命令(如果没有足够的可用项目)。在对命令作出肯定回复后,Order Service现在可以发出OrderCreated事件(虽然我称之为OrderPlaced,因为“放置”声音模式是域惯用语,“created”更通用,但这是一个细节)。产品服务要么侦听OrderPlaced事件,要么向其发送显式ConfirmResevation命令。或者,如果发生了其他事情(例如,未能清除资金),则可以发出适当的事件或将CancelReservation命令显式发送到ProductService。为了满足特殊情况,ProductService还可能有一个调度程序(在KafkaStreams标点符号中可以派上用场)取消在超时期限内未确认或中止的预订。

两个服务的编排和处理错误条件以及补偿操作(在这种情况下取​​消保留)的技术性可以直接在服务中处理,或者在显式的Process Manager组件中处理以分离此责任。就个人而言,我会选择一个可以使用Kafka Streams Processor API实现的显式流程管理器。

以上是关于事件采购 - Apache Kafka + Kafka Streams - 如何确保原子性/交易性的主要内容,如果未能解决你的问题,请参考以下文章

电子书丨《Apache Kafka实战》

apache kafka性能测试命令使用和构建kafka-perf

Apache Kafka消息格式的演变(0.7.x~0.10.x)

kafka操作指南

kafka简单安装kafka单机版

Apache-Kafka-Connect , Confluent-HDFS-Connector , Unknown-magic-byte