如何使用 Spring Cloud Stream Kafka 和每个服务的数据库实现微服务事件驱动架构
Posted
技术标签:
【中文标题】如何使用 Spring Cloud Stream Kafka 和每个服务的数据库实现微服务事件驱动架构【英文标题】:How to implement a microservice Event Driven architecture with Spring Cloud Stream Kafka and Database per service 【发布时间】:2017-06-27 16:11:01 【问题描述】:我正在尝试实现一个事件驱动架构来处理分布式事务。每个服务都有自己的数据库,并使用 Kafka 发送消息以通知其他微服务有关操作。
一个例子:
Order service -------> | Kafka |------->Payment Service
| |
Orders MariaDB DB Payment MariaDB Database
订单收到订单请求。它必须将新订单存储在其数据库中并发布一条消息,以便支付服务意识到它必须为商品收费:
私人订单业务订单业务;
@PostMapping
public Order createOrder(@RequestBody Order order)
logger.debug("createOrder()");
//a.- Save the order in the DB
orderBusiness.createOrder(order);
//b. Publish in the topic so that Payment Service charges for the item.
try
orderSource.output().send(MessageBuilder.withPayload(order).build());
catch(Exception e)
logger.error("", e);
return order;
这些是我的疑惑:
-
步骤 a.-(保存在 Order DB 中)和 b.-(发布消息)应在事务中以原子方式执行。我怎样才能做到这一点?
这和上一个有关:我发送消息是:orderSource.output().send(MessageBuilder.withPayload(order).build());此操作是异步的,并且始终返回 true,无论 Kafka 代理是否关闭。我如何知道消息已到达 Kafka 代理?
【问题讨论】:
【参考方案1】:我认为实现事件溯源的正确方法是让 Kafka 直接从从 RDBMS 二进制日志中读取的插件推送的事件中填充,例如使用 Confluent BottledWater (https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/) 或更活跃的 Debezium (http://debezium.io/)。然后消费微服务可以监听这些事件,消费它们并对它们各自的数据库进行操作,最终与 RDBMS 数据库保持一致。
在此处查看我的完整答案以获取指南: https://***.com/a/43607887/986160
【讨论】:
【参考方案2】:步骤 a.-(保存在 Order DB 中)和 b.-(发布消息)应该是 在事务中以原子方式执行。我怎样才能做到这一点?
Kafka 目前不支持事务(因此也不支持回滚或提交),您需要同步这样的事情。简而言之:你不能做你想做的事。这将在不久的将来发生变化,当KIP-98 被合并时,但这可能还需要一些时间。此外,即使使用 Kafka 中的事务,跨两个系统的原子事务也是一件非常困难的事情,接下来的一切只会通过 Kafka 中的事务支持得到改进,它仍然不能完全解决您的问题。为此,您需要考虑在您的系统中实现某种形式的 two phase commit。
您可以通过配置生产者属性获得一些接近,但最终您必须为您的一个系统选择至少一次或最多一次( MariaDB 或 Kafka)。
让我们从您可以在 Kafka 中执行的操作开始,确保消息传递,然后我们将深入探讨您对整个流程流程的选择以及后果。
保证送达
您可以使用参数 acks 配置在将请求返回给您之前必须有多少个代理确认收到您的消息:通过将其设置为 all 您告诉代理要等到所有副本都确认了您的消息,然后再向您返回答案。这仍然不能 100% 保证您的消息不会丢失,因为它只被写入页面缓存,并且理论上存在代理在将其持久化到磁盘之前失败的情况,其中消息可能仍然丢失。但这与您将获得的一样好保证。 您可以通过降低经纪人强制 fsync 到磁盘的间隔(强调文本和/或flush.ms)进一步降低数据丢失的风险,但请注意,这些值会带来严重的性能损失。
除了这些设置之外,您还需要等待您的 Kafka 生产者将您的请求的响应返回给您,并检查是否发生异常。这种关系与您问题的第二部分有关,因此我将进一步讨论。 如果响应是干净的,您可以尽可能确定您的数据已到达 Kafka 并开始担心 MariaDB。
到目前为止,我们所讨论的所有内容都只涉及如何确保 Kafka 收到您的消息,但您还需要将数据写入 MariaDB,这也可能会失败,这将需要调用您可能已经发送的消息卡夫卡——这是你不能做的。
所以基本上你需要选择一个系统,在这个系统中你能够更好地处理重复/缺失值(取决于你是否重新发送部分失败),这会影响你做事的顺序。
选项 1
在此选项中,您在 MariaDB 中初始化事务,然后将消息发送到 Kafka,等待响应,如果发送成功,您在 MariaDB 中提交事务。如果发送到 Kafka 失败,您可以在 MariaDB 中回滚您的事务,一切都很好。 但是,如果发送到 Kafka 成功并且您对 MariaDB 的提交由于某种原因失败,那么就无法从 Kafka 取回消息。因此,如果您稍后重新发送所有内容,您将在 MariaDB 中丢失一条消息或在 Kafka 中有一条重复的消息。
选项 2
这几乎正好相反,但您可能更好地删除在 MariaDB 中编写的消息,具体取决于您的数据模型。
当然,您可以通过跟踪失败的发送并稍后重试这些方法来缓解这两种方法,但所有这些更多的是针对更大问题的创可贴。
我个人会采用方法 1,因为提交失败的机会应该比发送本身要小一些,并在 Kafka 的另一端实施某种重复检查。
这与上一个有关:我发送消息是: orderSource.output().send(MessageBuilder.withPayload(order).build()); 此操作是异步的,并且始终返回 true,无论是否 卡夫卡经纪人已关闭。我怎么知道消息已经到达 卡夫卡经纪人?
首先,我承认我对 Spring 不熟悉,所以这可能对您没有用处,但以下代码 sn-p 说明了一种检查异常响应的方法。 通过调用 flush 你阻塞直到所有发送完成(失败或成功),然后检查结果。
Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();
for(MessageType message : messages)
producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback()
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
if (exception != null)
exceptionList.add(exception);
);
producer.flush();
if (!exceptionList.isEmpty())
// do stuff
【讨论】:
我用我遵循的新方法编辑了这个问题,但是鉴于你的回答非常清楚,我将其恢复为原始版本并使用编辑后的版本开始一个新的问题。我会回来反馈你的答案。谢谢! Sönke,一切都清楚了,我很欣赏详尽的解释。对于那些对如何使用 Spring Cloud Stream 确保消息传递感兴趣的人:github.com/spring-cloud/spring-cloud-stream/issues/795 @codependent 如何让 api 网关或任何东西先发送到 kafka,然后两个微服务订阅 kafka 消息.. 这不可行吗?或者您不这样做是因为您希望获得一致的数据而不是最终一致的数据?以上是关于如何使用 Spring Cloud Stream Kafka 和每个服务的数据库实现微服务事件驱动架构的主要内容,如果未能解决你的问题,请参考以下文章
spring-cloud-stream kafka 消费者并发
如何使用 Stream 为 Spring Cloud Dataflow 中的子任务设置全局属性 - Task-Launcher-Dataflow