[译] 流式处理:使用 Apache Kafka 的 Streams API 实现 Rabobank 的实时财务告警

Posted K栈IO

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[译] 流式处理:使用 Apache Kafka 的 Streams API 实现 Rabobank 的实时财务告警相关的知识,希望对你有一定的参考价值。

原文:https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/

本文讨论使用 Apache Kafka 的 Streams API 向 Rabobank 的客户发送告警。Rabobank(荷兰合作银行)总部位于荷兰,在全球拥有 900 多个分支机构,48,000 名员工和 681 亿欧元的资产。Rabobank 是一家由客户和银行组成的合作银行,一家对社会负责的银行。其目标是成为荷兰金融市场的领导者。Rabobank 还致力于成为全球食品和农业领域的领先银行。Rabobank 向全球数百万客户提供金融产品和服务。


在过去的几年中,Rabobank 一直在为成为一家实时的,事件驱动的银行而进行积极的投资。如果你熟悉银行的业务流程,应该会明白这并不能一蹴而就。许多银行业务流程都是在非商用硬件上以批处理作业的形式进行的,因此迁移工作非常艰巨。但如前所述,Rabobank 接受了这一挑战
,并定义了一个业务事件总线(BEB,Business Event Bus),用于应用程序之间共享整个组织架构的业务事件。
Rabobank 选择 Apache Kafka 作为底层的主要引擎,并编写了自己的的 BEB 客户端库,以方便应用程序开发人员使用简单的消息生产/消费以及灾难恢复等功能。

Rabobank 采用 Active-Active 的 Kafka 设置,Kafka 集群在多个数据中心进行对称镜像。当数据中心出现故障或由操作人员干预后,BEB 客户端(包括本文讨论的基于 Kafka Streams 的应用程序)会切换到另一个 Kafka 集群,而无需重新启动。实现在灾难情景和计划维护时段内的 24×7 不间断运行。BEB 客户端库为生产者、消费者以及流式应用提供了这种切换机制。

Rabo Alerts 是一个由一系列生产、消费、流式消息等微服务组成的系统,基于 BEB 实现。下面讨论的所有数据类型和代码都可以在 GitHub 中找到。本文将在一定程度上简化源码清单(如删除未使用的字段),但这些清单仍反映了生产中实际运行的代码。

案例:Rabo Alerts

Rabo Alerts 服务可以让 Rabobank 的客户接收其关注的财务事件告警。例如某笔款项从账户中扣除或者记入账户,以及其它更复杂的事件。客户可以根据自己的偏好配置告警,并通过第三方渠道发送:如电子邮件、短信和移动推送通知。值得一提的是,Rabo Alerts 并不是一项新的或试用服务,它已经投产十多年,可供数百万账户持有者试用。

面临的问题

旧的 Rabo Alerts 实现主要是在大型机系统上。所有的处理步骤都是面向批处理的,大型机会根据告警类型派生告警,并每隔几分钟发送,但每天只发送几次。这种实现非常稳定可靠,但 Rabobank 希望解决两个问题:(1)灵活性不足;(2)告警发送速度慢。

由于对现有告警进行更改或添加新(更智能)的告警需要很大的工作量,因此旧的 Rabo Alerts 对适应新业务需求的灵活性很低。在过去几年中,Rabobank 在其在线环境中引入新功能的步伐大幅增加,旧有僵化的告警解决方案变得越来越成问题。

告警的传递速度也是一个问题,旧的 Rabo Alerts 可能需要 5 分钟到 4-5 小时才能向用户发送告警(取决于告警类型和批处理窗口)。如果在十年前,这个速度可能足够快了,但如今客户的期望值要高得多。现在 Rabobank 向客户提供“相关信息”的时间窗口要比过去十年小得多。

因此,如何重新设计现有的机制,使其具有更强的扩展性及更快的速度,便是摆在眼前的问题。当然,重新设计的 Rabo Aerts 也需要稳定可靠,以便能够正确地为现有数百万的用户群提供服务。

从小处着手

在过去的一年里,我们一直使用 Kafka 及其 Streams API 重新设计和实现告警机制。由于整个 Rabo Alerts 服务相当庞大,我们决定从四个简单但使用率高的告警开始:

  • 余额高于阈值

  • 余额低于阈值

  • 超过阈值的贷记(Credit)

  • 超过阈值的借记(Debit)

这些告警的每一个都可以从当前账户系统的支付信息流中派生出来。例如:“当我的余额低于 100 欧元时向我发送短信”或“当有人给我超过 1000 欧元时向我推送消息”(通常用于存款通知)。

以下截图说明如何通过手机银行 app 配置 Rabo Alerts ——

告警拓扑

我们的第一步是重新设计告警过程,基本流程如下:

  1. 挖掘来自支付工厂的交易流,产生一连串的 AccountEntry(账户会计条目)。注意,每一个支付交易总是由两个 AccountEntry 组成,即借记(Debit)和贷记(Credit)。

  2. 对每个 AccountEntry 执行以下步骤:

    • i. 查看客户是否为给定账号配置了 Rabo Alert。

    • ii. 如果是,检查此 AccountEntry 是否符合客户设置的告警条件。

    • iii. 如何符合,通过客户配置的渠道(电子邮件、短信、消息推送)发送告警。

    • a. 将具有账户读取权限的账号转换为一个客户列表。

    • b. 对每个客户执行以下步骤:

步骤 1 需要与执行交易的核心银行系统建立链接。
步骤 2a 需要建立一个查询表,其中包含所有账户的所有客户权限。
步骤 2b 需要建立一个查询表,其中包含所有客户的 Rabo Alert 设置。

该流程的使用及其需求见下图:

图中所有白色框都是 Kafka 主题(Topic),其中列出了它们的 Avro 键/值数据类型。大部分数据类型都是不言自明的,但以下数据类型值得一提:

  • CustomerAlertSettings:特定客户的告警设置,这些设置包括:

    • CustomerAccountAlertSettings:客户为特定账户设定的告警配置列表。这个列表指定了客户希望接收特定账户的哪些告警及其阈值。

  • ChannelType:可用的渠道类型枚举,当前为 EMAIL、PUSH 和 SMS。

  • AccountEntry:一条支付账户的会计记账。一个记账条目是一个支付交易的一半,可以是一个借记条目(Debit),也可以是一个贷记条目(Credit)。

  • OutboundMessage:发送给客户的消息内容。包含消息类型和参数,但不包含其寻址。这些信息由 Outbound topic 的 Key 承载。

蓝色框表示独立的应用程序(或称微服务),是使用 Spring Boot 实现的可执行 jar,并部署在托管平台上。它们一起组成了实现 Rabo Alerts 的所有必要功能:

  • Alert Settings Manager:告警配置管理器。向一个 compacted topic (开启了 Log Compaction 的 Kafka 主题)发布每个客户的所有自定义告警设置。

  • Account Authorization Manager:账户授权管理器。账户并不是和客户一对一绑定,而是可以由不同的用户查看。例如,配偶之间共享账户;或企业账户针对不同员工的不同授权。这些情况下可能会产生任意的账户/用户间的授权关系。该应用程序向一个 compacted topic 发布账号和授权客户ID的关系。它是实时的,以便授权的变化在发送告警时能立即生效。

  • Account Entry Bridge:通过 IBM MQ 从 Rabobank 基于大型机的支付工厂中检索所有支付流,并转发到 Kafka 的 topic。

  • Alerting:核心告警服务,参见下文。

  • Device Resolver:设备解析器,辅助应用。从外部系统查找所有客户的移动设备,并将相同的告警消息写入各个设备对应的 topic 中(PushId)。客户移动设备的查找可以通过一个 compacted topic 完成,但由于各种不同的原因,此处是通过远程服务调用的方式实现的。

  • Senders:每一个 Sender 消费其绑定的渠道 topic 的消息,并发送给寻址客户。每种渠道都被分配了各自的 Kafka topic,以使各种渠道的故障能彼此分离。例如,当电子邮件服务器关闭时,告警消息依然可以通过消息推送的方式发送出去。

废话少说,放码过来

使用 Kafka Streams 编码实现告警只需要 2 个类。

第一个类是 BalanceAlertsTopology。这个类使用给定的 KStreamBuilder 定义主要的 Kafka Streams 拓扑(Topology)。它实现了 BEB 的 TopologyFactory,是一个 BEB 客户端库使用的自定义接口,用于在应用程序启动后或 Kafka 集群切换(如数据中心切换/故障转移)时生成新的 Kafka Streams Topology。

KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages =
    builder.<AccountId, AccountEntry>stream(accountEntryStream)
        .leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> {
          if (isNull(customerIds)) {
            return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList();
          } else {
            return customerIds.getCustomerIds().stream()
                .map(customerId -> KeyValue.pair(customerId, accountEntry))
                .collect(toList());
          }
        })
        .flatMap((accountId, accountentryByCustomer) -> accountentryByCustomer)
        .through(customerIdToAccountEntryStream)
        .leftJoin(alertSettings, Pair::with)
        .flatMapValues(
            (Pair<AccountEntry, CustomerAlertSettings> accountEntryAndSettings) ->
                BalanceAlertsGenerator.generateAlerts(
                    accountEntryAndSettings.getValue0(),
                    accountEntryAndSettings.getValue1())
        );

// Send all Email messages from addressedMessages
addressedMessages
    .filter((e, kv) -> kv.key instanceof EmailAddress)
    .map((k, v) -> v)
    .to(emailMessageStream);

// Send all Sms messages from addressedMessages
addressedMessages
    .filter((e, kv) -> kv.key instanceof PhoneNumber)
    .map((k, v) -> v)
    .to(smsMessageStream);

// Send all Push messages from addressedMessages
// (CustomerId is later resolved to a list of customer's mobile devices)
addressedMessages
    .filter((e, kv) -> kv.key instanceof CustomerId)
    .map((k, v) -> v)
    .to(customerPushMessageStream);

该 Topology 定义了以下几个步骤:

  • 1-13 行,从消费 AccountEntry 流开始,当检索到一个 AccountEntry 时,会查找哪些客户有权访问该账户,并将结果存储在一个中间 topic 中,以 CustomerId 为 Key,AccountEntry 为 Value。该 topic 的意思是“这个客户(Key)的这个 AccountEntry(Value)需要处理”。

  • 14-20 行,针对每个客户执行。检查客户的告警设置,如果 AccountEntry 符合客户的告警设置,会要求辅助类生成 OutboundMessage

  • 22-39 行,遍历所有的 OutboundMessage,并将它们分配到各自的渠道 topic。

告警消息是在 17 行调用辅助类 BalanceAlertsGenerator 生成的。其主要方法是 generateAlerts(),该方法获取一个 AccountEntry,并从具有该账户查看权限的客户中获取告警配置。以下是它的代码:

public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry,
                                                                             CustomerAlertSettings settings) {
      /* 使用告警设置为一个 AccountEntry 生成完成寻址的告警,步骤如下:
      *  1) 使用特定账户的告警设置,过滤掉不属于该账户的 AccountEntry
      *  2) 匹配告警设置中的每一项设置,生成适当的消息
      *  3) 为生成的消息添加寻址信息
      */

  if (settings == null) {
    return new ArrayList<>();
  }

  return settings.getAccountAlertSettings().stream()
      .filter(accountAlertSettings -> matchAccount(accountEntry, accountAlertSettings))
      .flatMap(accountAlertSettings -> accountAlertSettings.getSettings().stream())
      .flatMap(accountAlertSetting -> Stream.of(
          generateBalanceAbove(accountEntry, accountAlertSetting),
          generateBalanceBelow(accountEntry, accountAlertSetting),
          generateCreditedAbove(accountEntry, accountAlertSetting),
          generateDebitedAbove(accountEntry, accountAlertSetting))
      )
      .filter(Optional::isPresent).map(Optional::get)
      .flatMap(messageWithChannels -> mapAddresses(messageWithChannels.getValue0(), settings.getAddresses())
          .map(address -> KeyValue.pair(address, messageWithChannels.getValue1())))
      .collect(toList());
}

该方法执行以下步骤:

  • 13 行,流化所有账户相关的告警设置(一个账户一个对象)。

  • 14 行,将告警设置中的账号和 AccountEntry 中的账号进行匹配。

  • 15 行,流化告警设置中的各项设置。

  • 16-21 行,构造要发送的一系列消息,以及用于发送消息的渠道列表(这里对每种告警类型都使用了单独的方法),结果是一个 Pair<List, OutboundMessage> 流。

  • 22 行,过滤空结果。

  • 25 行,收集所有结果,并作为 List 返回。

这个类的其它辅助方法:

  • matchAccount():通过比较账号和币种,来匹配 AccountEntry 和账户告警设置。

  • generateBalanceAbove/Below():生成 BalanceAbove/Below 告警消息(余额高于/低于阈值)。

  • generateDebited/CreditedAbove():生成 Debited/CreditedAbove 告警消息(超出阈值的借记/贷记)。

  • buildMessage():构建一个 OutboundMessage

再加上一些其它额外的类来将这个功能包装在一个独立的应用程序中,这就是它的全部功能!

第一次测试运行

在第一次的初步实现后,我们进行了测试运行。事实令我们惊讶,并且期望值高涨。从支付订单确认到移动设备收到告警的整个过程只需要一到两秒钟,且一秒的情况居多。这个过程还包括了支付工厂所花费的时间(验证支付订单,交易处理),因此响应时间可能会依当时的支付工厂工作量而有所不同。整个告警链——从 AccountEntry 在 Kafka 上发布,到将消息推送给客户——通常在 120 毫秒内完成。在发送阶段,推送(PUSH)告警是最快的,仅需 100-200 毫秒即可到达客户的移动设备。电子邮件(EMAIL)和短信(SMS)稍慢,通常在发出消息后的 2-4 秒到达。相比之下,旧有的体系通常需要几分钟的时间才能提供告警。

下面的视频演示了使用我的个人测试账户进行告警传输的速度。请注意,虽然是测试用的,但这也是一个正常运行的 Rabobank 支付账户!

【只是一段演示视频,markdown 插入视频比较麻烦,原文看吧,或看下面的文字解说】

首先我在我的设备上启用了告警,并配置了阈值为 0 的 DebitedAboveThreshold 告警(“More withdrawn than”)。这意味着超过 0 欧元的任何支付都会向我发送告警。我设置了 PUSH 和 SMS 两种渠道告警(视频中未演示),因此告警会通过两个渠道发给我。保存设置并返回主屏幕后,我开始向我的同事 Joris Meijer 转账 1 欧元,并通过指纹验证。之后付款订单被发送到支付工厂进行处理。在订单确认关闭之前,推送(PUSH)告警已经在屏幕顶部弹出,如通知窗口所示。几秒钟后,相同的告警消息也以 SMS 的方式到达。

回顾

新机制简洁而优雅,只需要少数 Java 类组成。这个逻辑大约四个星期写完,但要使整个拓扑工作需要大约六个月的时间。这主要是因为 Alert Settings ManagerAccount Authorizations Manager 以及 Account Entry Bridge 需要和银行的其它业务模块达成一致。

在团队内部的告警测试之后,需要更彻底更大规模的测试。毕竟我们希望确保客户不会错过告警或接收到不该接收告警。我们选用了 25,000 名 Rabobank 的员工作为试点小组,对这个新机制进行了为期两个月的试用。这样可以更好的观察系统在生产数据及高负载下的运行表现。另外,Rabobank 的员工比付费客户更能容忍告警失败(有时确实会失败)的情况。在试用期间,我们优化了告警生成并消除了一些外围应用的边界错误。

经批准,新体系于 6 月 8 日上线为数百万 Rabobank 客户提供服务。这对我们来说是非常激动人心的时刻——不仅因为它有效,而且因为我们永远不可能回头。我们通过延迟几秒而不是几分钟或几小时的告警,有效提升了客户的期望值。如果由于某种原因导致某个组件服务失败,客户会立即注意到,因为告警会延迟。因此我们密切关注这套体系,但到目前为止,它一直运行良好且可预测。

下一步

新体系提供了实时告警,且易于扩展,满足了 Rabobank 对于速度和灵活性的要求。但这里提到的四种告警类型并不是全部。客户还可以配置其它约 10 种告警,例如“当我收到来自指定账户的付款时提醒我”和“当付款单无法执行时提醒我”。下一步是将这些告警从大型机迁移到新体系,但这需要连接更多的支付系统,例如支付订单执行引擎。我们将在未来的几个月为此努力,且不会止于此,新的实现也激发了大量新的想法,我们将很快公开讨论(甚至展示)。

关于作者

关于 Apache Kafka Streams API

如果你喜欢本文,可能会希望继续使用以下资源了解有关 Apache Kafka Streams API 的更多信息:

关于译文

本译文经原作者授权后,首发于 K栈(kweny.io)。转载请注明原作者以及原文和译文出处。


以上是关于[译] 流式处理:使用 Apache Kafka 的 Streams API 实现 Rabobank 的实时财务告警的主要内容,如果未能解决你的问题,请参考以下文章

Spark streaming + Kafka 流式数据处理,结果存储至MongoDBSolrNeo4j(自用)

使用 apache spark 流式处理实时日志

译Flink + Kafka 0.11端到端精确一次处理语义的实现

Streaming SQL for Apache Kafka

使用 QuestDB 和 Apache Kafka 处理时间序列数据

译Apache Kafka最佳实践