基于 Kafka 的实时数仓在搜索的实践应用
Posted vivo互联网技术
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于 Kafka 的实时数仓在搜索的实践应用相关的知识,希望对你有一定的参考价值。
作者:vivo互联网服务器团队-Deng jie
一、概述
Apache Kafka 发展至今,已经是一个很成熟的消息队列组件了,也是大数据生态圈中不可或缺的一员。Apache Kafka 社区非常的活跃,通过社区成员不断的贡献代码和迭代项目,使得 Apache Kafka 功能越发丰富、性能越发稳定,成为企业大数据技术架构解决方案中重要的一环。
Apache Kafka 作为一个热门消息队列中间件,具备高效可靠的消息处理能力,且拥有非常广泛的应用领域。那么,今天就来聊一聊基于 Kafka 的实时数仓在搜索的实践应用。
二、为什么需要 Kafka
在设计大数据技术架构之前,通常会做一些技术调研。我们会去思考一下为什么需要 Kafka?怎么判断选择的 Kafka 技术能否满足当前的技术要求?
2.1 早期的数据架构
早期的数据类型比较简单,业务架构也比较简单,就是将需要的数据存储下来。比如将游戏类的数据存储到数据库(mysql、Oracle)。但是,随着业务的增量,存储的数据类型也随之增加了,然后我们需要使用的大数据集群,利用数据仓库来将这些数据进行分类存储,如下图所示:
但是,数据仓库存储数据是有时延的,通常时延为T+1。而现在的数据服务对象对时延要求均有很高的要求,例如物联网、微服务、移动端APP等等,皆需要实时处理这些数据。
2.2 Kafka 的出现
Kafka 的出现,给日益增长的复杂业务,提供了新的存储方案。将各种复杂的业务数据统一存储到 Kafka 里面,然后在通过 Kafka 做数据分流。如下图所示:
这里,可以将视频、游戏、音乐等不同类型的数据统一存储到 Kafka 里面,然后在通过流处理对 Kafka 里面的数据做分流操作。例如,将数据存储到数据仓库、将计算的结果存储到KV做实时分析等。
通常消息系统常见的有两种,它们分别是:
消息队列:队列消费者充当了工作组的角色,每条消息记录只能传递给一个工作进程,从而有效的划分工作流程;
生产&消费:消费者通常是互相独立的,每个消费者都可以获得每条消息的副本。
这两种方式都是有效和实用的,通过消息队列将工作内容分开,用于容错和扩展;生产和消费能够允许多租户,来使得系统解耦。而 Apache Kafka 的优点之一在于它将消息队列、生产和消费结合到了一个强大的消息系统当中。
同时,Kafka 拥有正确的消息处理特性,主要体现在以下几个方面:
可扩展性:当 Kafka 的性能(如存储、吞吐等)达到瓶颈时,可以通过水平扩展来提升性能;
真实存储:Kafka 的数据是实时落地在磁盘上的,不会因为集群重启或故障而丢失数据;
实时处理:能够集成主流的计算引擎(如Flink、Spark等),对数据进行实时处理;
顺序写入:磁盘顺序 I/O 读写,跳过磁头“寻址”时间,提高读写速度;
内存映射:操作系统分页存储利用内存提升 I/O 性能,实现文件到内存的映射,通过同步或者异步来控制 Flush;
零拷贝:将磁盘文件的数据复制到“页面缓存”一次,然后将数据从“页面缓存”直接发送到网络;
高效存储:Topic 和 Partition 拆为多个文件片段(Segment),定期清理无效文件。采用稀疏存储,间隔若干字节建立一条索引,防止索引文件过大。
2.3 简单的应用场景
这里,我们可以通过一个简单直观的应用场景,来了解 Kafka 的用途。
场景:假如用户A正在玩一款游戏,某一天用户A喜欢上了游戏里面的一款道具,打算购买,于是在当天 14:00 时充值了 10 元,在逛游戏商店时又喜欢上了另一款道具,于是在 14:30 时又充值了 30 元,接着在 15:00 时开始下单购买,花费了 20 元,剩余金额为 20 元。那么,整个事件流,对应到库表里面的数据明细应该是如下图所示:
三、Kafka解决了什么问题
早期为响应项目快速上线,在服务器或者云服务器上部署一个 WebServer,为个人电脑或者移动用户提供访问体验,然后后台在对接一个数据库,为 Web 应用提供数据持久化以及数据查询,流程如下图所示:
但是,随着用户的迅速增长,用户所有的访问都直接通过 SQL 数据库使得它不堪重负,数据库的压力也越来越大,不得不加上缓存服务以降低 SQL 数据库的荷载。
同时,为了理解用户行为,又开始收集日志并保存到 Hadoop 这样的大数据集群上做离线处理,并且把日志放在全文检索系统(比如 ElasticSearch)中以便快速定位问题。由于需要给投资方看业务状况,也需要把数据汇总到数据仓库(比如 Hive)中以便提供交互式报表。此时的系统架构已经具有一定的复杂性了,将来可能还会加入实时模块以及外部数据交互。
本质上,这是一个数据集成问题。没有任何一个系统能够解决所有的事情,所以业务数据根据不同用途,存放在不同的系统,比如归档、分析、搜索、缓存等。数据冗余本身没有任何问题,但是不同系统之间太过复杂的数据同步却是一种挑战。如下图所示:
而 Kafka 可以让合适的数据以合适的形式出现在合适的地方。Kafka 的做法是提供消息队列,让生产者向队列的末尾添加数据,让多个消费者从队列里面依次读取数据然后自行处理。如果说之前连接的复杂度是 O(N^2),那么现在复杂度降低到了 O(N),扩展起来也方便多了,流程如下图所示:
四、Kafka的实践应用
4.1 为什么需要建设实时数仓
4.1.1 目的
通常情况下,在大数据场景中,存储海量数据建设数据仓库一般都是离线数仓(时延T+1),通过定时任务每天拉取增量数据,然后创建各个业务不同维度的数据,对外提供 T+1 的数据服务。计算和数据的实时性均比较差,业务人员无法根据自己的即时性需求获取几分钟之前的实时数据。数据本身的价值随着时间的流逝会逐步减弱,因此数据产生后必须尽快的到达用户的手中,实时数仓的建设需求由此而来。
4.1.2 目标
为了适应业务高速迭代的特点,分析用户行为,挖掘用户价值,提高用户留存,在实时数据可用性、可扩展性、易用性、以及准确性等方面提供更好的支持,因此需要建设实时数仓。主要目标包含如下所示:
统一收敛数据出口:统一数据口径,减少数据重复性建设;
降低数据维护成本:提升数据准确性、及时性,优化数据使用体验和成本;
减少数据使用成本:提高数据复用率,避免实时数据重复消费。
4.2 如何构建实时数仓为搜索提供数据
当前实时数仓比较主流的架构一般来说包含三个大的模块,它们分别是消息队列、计算引擎、以及存储。结合上述对 Kafka 的综合分析,结合搜索的业务场景,引入 Kafka 作为消息队列,复用大数据平台(BDSP)的能力作为计算引擎和存储,具体架构如下图所示:
4.3 流处理引擎选择
目前业界比较通用的流处理引擎主要有两种,它们分别是Flink和Spark,那么如何选择流处理引擎呢?我们可以对比以下特征来决定选择哪一种流处理引擎?
Flink作为一款开源的大数据流式计算引擎,它同时支持流批一体,引入Flink作为实时数仓建设的流引擎的主要原因如下:
高吞吐、低延时;
灵活的流窗口;
轻量级容错机制;
流批一体
4.4 建设实时数仓遇到的问题
在建设初期,用于实时处理的 Kafka 集群规模较小,单个 Topic 的数据容量非常大,不同的实时任务都会消费同一个大数据量的 Topic,这样会导致 Kafka 集群的 I/O 压力非常的大。
因此,在使用的过程中会发现 Kafka 的压力非常大,经常出现延时、I/O能性能告警。因此,我们采取了将大数据量的单 Topic 进行实时分发来解决这种问题,基于 Flink 设计了如下图所示的数据分发流程。
上述流程,随着业务类型和数据量的增加,又会面临新的问题:
数据量增加,随着消费任务的增加,Kafka 集群 I/O 负载大时会影响消费;
不用业务之间 Topic 的消费没有落地存储(比如HDFS、HBase存储等),会产生重复消费的情况;
数据耦合度过高,迁移数据和任务难度大。
4.5 实时数仓方案进阶
目前,主流的实时数仓架构通常有2种,它们分别是Lambda、Kappa。
4.5.1 Lambda
随着实时性需求的提出,为了快速计算一些实时指标(比如,实时点击、曝光等),会在离线数仓大数据架构的基础上增加一个实时计算的链路,并对消息队列实现数据来源的流失处理,通过消费消息队列中的数据 ,用流计算引擎来实现指标的增量计算,并推送到下游的数据服务中去,由下游数据服务层完成离线和实时结果的汇总。具体流程如下:
4.5.2 Kappa
Kappa架构只关心流式计算,数据以流的方式写入到 Kafka ,然后通过 Flink 这类实时计算引擎将计算结果存放到数据服务层以供查询。可以看作是在Lambda架构的基础上简化了离线数仓的部分。具体流程如下:
在实际建设实时数仓的过程中,我们结合这2种架构的思想来使用。实时数仓引入了类似于离线数仓的分层理念,主要是为了提供模型的复用率,同时也要考虑易用性、一致性、以及计算的成本。
4.5.3 实时数仓分层
在进阶建设实时数仓时,分层架构的设计并不会像离线数仓那边复杂,这是为了避免数据计算链路过长造成不必要的延时情况。具体流程图如下所示:
ODS层:以Kafka 作为消息队列,将所有需要实时计算处理的数据放到对应的 Topic 进行处理;
DW层:通过Flink实时消费Topic中的数据,然后通过数据清理、多维度关联(JOIN)等,将一些相同维度的业务系统、维表中的特征属性进行关联,提供数据易用性和复用性能力,最终得到实时明细数据;
DIM层:用来存储关联的查询的维度信息,存储介质可以按需选择,比如HBase、Redis、MySQL等;
DA层:针对实时数据场景需求,进行高度聚合汇总,服务于KV、BI等场景。OLAP分析可以使用ClickHouse,KV可以选择HBase(若数据量较小,可以采用Redis)。
通过上面的流程,建设实时数仓分层时,确保了对实时计算要求比较高的任务不会影响到BI报表、或者KV查询。但是,会有新的问题需要解决:
Kafka 实时数据如何点查?
消费任务异常时如何分析?
4.5.4 Kafka监控
针对这些问题,我们调研和引入了Kafka 监控系统——Kafka Eagle(目前改名为EFAK)。复用该监控系统中比较重要的维度监控功能。
Kafka Eagle处理能够满足上诉两个维度的监控需求之外,还提供了一些日常比较实用的功能,比如Topic记录查看、Topic容量查看、消费和生产任务的速率、消费积压等。我们采用了 Kafka-Eagle 来作为对实时数仓的任务监控。Kafka-Eagle 系统设计架构如下图所示:
Kafka-Eagle 是一款完全开源的对 Kafka 集群及应用做全面监控的系统,其核心由以下几个部分组成:
数据采集:核心数据来源 JMX 和 API 获取;
数据存储:支持 MySQL 和 Sqlite 存储;
数据展示:消费者应用、图表趋势监控(包括集群状态、消费生产速率、消费积压等)、开发的分布式 KSQL 查询引擎,通过 KSQL 消息查询;
数据告警:支持常用的 IM 告警(微信,钉钉,WebHook等),同时邮件、短信、电话告警也一并支持。
部分预览截图如下:
1)Topic最近7天写入量分布
默认展示所有Topic的每天写入总量分布,可选择时间维度、Topic聚合维度,来查看写入量的分布情况,预览截图如下所示:
2)KSQL查询Topic消息记录
可以通过编写SQL语句,来查询(支持过滤条件)Topic中的消息记录,预览截图如下所示:
3)消费Topic积压详情
可以监控所有被消费的Topic的消费速率、消费积压等详情,预览截图如下所示:
五、参考资料
1.https://kafka.apache.org/documentation/
2.http://www.kafka-eagle.org/
3.https://github.com/smartloli/kafka-eagle
END
Kafka万亿级消息实战
Kafka 原理以及分区分配策略剖析
Kubernetes 集群无损升级实践
实时数仓在有赞的实践
前言
随着实时技术的不断发展和商家实时应用场景的不断丰富,有赞在实时数仓建设方面做了大量的尝试和实践。本文主要分享有赞在建设实时数仓过程中所沉淀的经验,内容包括以下五个部分:
- 建设背景
- 应用场景
- 方案设计
- 项目应用
- 未来展望
1. 建设背景
1.1 实时需求日趋迫切
产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能力来赋能。传统离线数仓的数据时效性是T+1,调度频率以天为单位,无法支撑实时场景的数据需求。即使能将调度频率设置成小时,也只能解决部分时效性要求不高的场景,对于实效性要求很高的场景还是无法优雅的支撑。因此实时使用数据的问题必须得到有效解决。
1.2 实时技术日趋成熟
实时计算框架已经经历了三代发展,分别是:Storm、SparkStreaming、Flink,计算框架越来越成熟。一方面,实时任务的开发已经能通过编写 SQL 的方式来完成,在技术层面能很好地继承离线数仓的架构设计思想;另一方面,有赞在线数据开发平台所提供的功能对实时任务开发、调试、运维的支持也日渐趋于成熟,开发成本逐步降低,有助于去做这件事。
2. 应用场景
2.1 实时BI看板
通过有赞BI工具基于实时数仓创建实时数据集,使用数据集配置柱状图、线图、饼图等图表来呈现实时汇总数据。目前BI工具所支持接入的实时数据源有Druid、MySQL。
2.2 实时OLAP
实时数仓基于 Druid 和 ClickHouse 等 OLAP 数据库,给用户提供实时数据分析能力。
2.3 实时数据服务
通过有赞统一数据服务平台OneServices将实时数仓沉淀的实时指标以通用接口的方式提供给业务方使用。目前OneServices所支持接入的实时数据源有MySQL、Kylin、Druid、ClickHouse、Tidb。
2.4 实时监控告警
通过实时接入应用程序运行的日志数据,经过结构化处理后,加工成预警指标,将异常信息及时推送。
2.5 实时推荐
通过实时数仓构建用户实时行为特征,提供给算法进行模型训练,给用户推送更加实时的个性化内容。
3. 方案设计
该部分主要介绍实时数仓的方案设计,内容包括:分层设计、ETL设计、数据验证、数据恢复。
3.1 分层设计
传统离线数仓的分层设计大家都很熟悉,为了规范的组织和管理数据,层级划分会比较多,在一些复杂逻辑处理场景还会引入临时层落地中间结果以方便下游加工处理。实时数仓考虑到时效性问题,分层设计需要尽量精简,降低中间流程出错的可能性,不过总体而言,实时数仓还是会参考离线数仓的分层思想来设计。实时数仓分层架构如下图所示 :
3.1.1 ODS(实时数据接入层)
ODS层,即实时数据接入层,通过数据采集工具收集各个业务系统的实时数据,对非结构化的数据进行结构化处理,保存原始数据,几乎不过滤数据;该层数据的主要来源有三个部分:第一部分是业务方创建的NSQ消息,第二部分是业务数据库的Binlog日志,第三部分是埋点日志和应用程序日志,以上三部分的实时数据最终统一写入Kafka存储介质中。
ODS层表命名规范:部门名称.应用名称.数仓层级_主题域前缀_数据库名/消息名
。
例如:接入业务库的 Binlog 实时数仓表命名:deptname.appname.ods_subjectname_tablename
;接入业务方的NSQ消息实时数仓表命名:deptname.appname.ods_subjectname_msgname
。
3.1.2 DWS(实时明细中间层)
DWS层,即实时明细中间层,该层以业务过程作为建模驱动,基于每个具体的业务过程事件来构建最细粒度的明细层事实表;比如交易过程,有下单事件、支付事件、发货事件等,我们会基于这些独立的事件来进行明细层的构建。在这层,事实明细数据同样是按照离线数仓的主题域来进行划分,也会采用维度建模的方式组织数据,对于一些重要的维度字段,会做适当冗余。基于有赞实时需求的场景,重点建设交易、营销、客户、店铺、商品等主题域的数据。该层的数据来源于ODS层,通过FlinkSQL进行ETL处理,主要工作有规范命名、数据清洗、维度补全、多流关联,最终统一写入Kafka存储介质中。
DWS层表命名规范:部门名称.应用名称.数仓层级_主题域前缀_数仓表命名
。
例如:实时事件A的中间层实时数仓表命名:deptname.appname.dws_subjectname_tablename_eventnameA
;实时事件B的中间层
实时数仓表命名:deptname.appname.dws_subjectname_tablename_eventnameB
。
3.1.3 DIM(实时维表层)
DIM层,即实时维表层,用来存放维度数据,主要用于实时明细中间层宽化处理时补全维度使用,目前该层的数据主要存储于HBase 中,后续会基于 QPS 和数据量大小提供更多合适类型的存储介质。
DIM层表命名规范:应用名称_数仓层级_主题域前缀_数仓表命名
。
例如:HBase存储,实时维度表实时数仓表命名:appname_dim_tablename
。
3.1.3 DWA(实时汇总层)
DWA层,即实时汇总层,该层通过DWS层数据进行多维汇总,提供给下游业务方使用,在实际应用过程中,不同业务方使用维度汇总的方式不太一样,根据不同的需求采用不同的技术方案去实现。第一种方式,采用FlinkSQL进行实时汇总,将结果指标存入HBase、MySQL等数据库,该种方式是我们早期采用的方案,优点是实现业务逻辑比较灵活,缺点是聚合粒度固化,不易扩展;第二种方式,采用实时OLAP工具进行汇总,该种方式是我们目前常用的方案,优点是聚合粒度易扩展,缺点是业务逻辑需要在中间层预处理。
DWA层表命名规范:应用名称_数仓层级_主题域前缀_聚合粒度_数据范围
。
例如:HBase存储,某域当日某粒度实时汇总表实时数仓表命名:appname_dwa_subjectname_aggname_daily
。
3.1.4 APP(实时应用层)
APP层,即实时应用层,该层数据已经写入应用系统的存储中,例如写入Druid作为BI看板的实时数据集;写入HBase、MySQL用于提供统一数据服务接口;写入ClickHouse用于提供实时OLAP服务。因为该层非常贴近业务,在命名规范上实时数仓不做统一要求。
3.2 实时ETL
实时数仓ETL处理过程所涉及的组件比较多,接下来盘点构建实时数仓所需要的组件以及每个组件的应用场景。如下图所示:
具体实时ETL处理流程如下图所示:
3.2.1 维度补全
创建调用 Duboo 接口的 UDF 函数在实时流里补全维度是最便捷的使用方式,但如果请求量过大,对 Duboo 接口压力会过大。在实际应用场景补全维度首选还是关联维度表,但关联也存在一定概率的丢失问题,为了弥补这种丢失,可以采用 Duboo 接口调用兜底的方式来补全。伪代码如下:
create function call_dubbo as 'XXXXXXX';
create function get_json_object as 'XXXXXXX';
case when cast( b.column as bigint) is not null then cast( b.column as bigint)
else cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl' ,'serviceName' ,'methodName' ,cast(concat('[',cast(a.column as varchar),']') as varchar),'key'),'rootId') as bigint),a.column)as bigint)
end
3.2.2 幂等处理
实时任务在运行过程中难免会遇到执行异常的情况,当任务异常重启的时候会导致部分消息重新发送和消费,从而引发下游实时统计数据不准确,为了有效避免这种情况,可以选择对实时消息流做幂等处理,当消费完一条消息,将这条消息的Key存入KV,如果任务异常重启导致消息重新发送的时候,先从KV判断该消息是否已被消费,如果已消费就不再往下发送。伪代码如下:
create function idempotenc as 'XXXXXXX';
insert into table
select order_no
from (
select
a.orderNo as order_no,
idempotenc('XXXXXXX', coalesce( order_no, '') ) as rid
from table1
) t
where t.rid = 0;
3.3 数据验证
由于实时数仓的数据是无边界的流,相比于离线数仓固定不变的数据更难验收。基于不同的场景,我们提供了2种验证方式,分别是:抽样验证与全量验证。如下图所示:
3.3.1 抽样验证方案
该方案主要应用在数据准确性验证上,实时汇总结果是基于存储在Kafka的实时明细中间层计算而来,但Kafka本身不支持按照特定条件检索,不支持写查询语句,再加上消息的无边界性,统计结果是在不断变化的,很难寻找参照物进行比对。鉴于此,我们采用了持久化消息的方法,将消息落盘到TiDB存储,基于TiDB的能力对落盘的消息进行检索、查询、汇总。编写固定时间边界的测试用例与相同时间边界的业务库数据或者离线数仓数据进行比对。通过以上方式,抽样核心店铺的数据进行指标准确性验证,确保测试用例全部通过。
3.3.2 全量验证方案
该方案主要应用在数据完整性和一致性验证上,在实时维度表验证的场景使用最多。大体思路:将存储实时维度表的在线HBase集群中的数据同步到离线HBase集群中,再将离线HBase集群中的数据导入到Hive中,在限定实时维度表的时间边界后,通过数据平台提供的数据校验功能,比对实时维度表与离线维度表是否存在差异,最终确保两张表的数据完全一致。
3.4 数据恢复
实时任务一旦上线就要求持续不断的提供准确、稳定的服务。区别于离线任务按天调度,如果离线任务出现Bug,会有充足的时间去修复。如果实时任务出现Bug,必须按照提前制定好的流程,严格按照步骤执行,否则极易出现问题。造成Bug的情况有非常多,比如代码Bug、异常数据Bug、实时集群Bug,如下图展示了修复实时任务Bug并恢复数据的流程。
4. 项目应用
4.1 视频号直播间实时数据统计
微信视频号的用户数已超过2亿,是微信生态内较大的流量入口,视频号直播成为一个重要的线上消费场景。目前,有赞已接入了小程序交易组件,直播时借助小程序的商品交易能力挂载有赞商品进行直播卖货。商家在直播过程中以及直播结束后都需要实时的关注直播的交易数据,根据直播间的实时数据反馈来动态调整经营策略。
本次项目涉及实时指标主要有支付订单数、支付订单金额、支付商品件数、支付人数、支付新客数等,汇总粒度有直播间粒度、商品粒度等。由于直播间的开始时间和结束时间存在跨天的情况,无法按照自然天实时预聚合;对于已经结束的直播计划商家需要能查看历史效果数据。基于以上的产品需求,对于存在跨天聚合的场景,我们采用ClickHouse存明细的方式进行实时汇总;对于支持查看历史效果数据,我们采用实时数据+离线数据的方式去组装最新的全量数据。
由于离线分区数据在凌晨跑批更新的时候存在执行和替换的时间,如何确保每时每刻服务都能查到最全的明细数据,这是实时数据+离线数据方案设计的关键。我们基于ClickHouse设计两种类型的分区:一种是实时分区,realtime_yyyymmdd;另一种是离线分区,offline_yyyymmdd。
- 当现在是T日时,见Step1
- 最新全量数据 = T日实时数据 + (T-1)日离线数据
- 当现在是T+1日时 (T日离线数据正在执行中还未产出),见Step2
- 最新全量数据 = T日实时数据 + (T+1)日实时数据 + (T-1)日离线数据
- 当现在是T+1日时 (T日离线数据已产出),见Step3
- 最新全量数据 = (T+1)日实时数据 + T日离线数据
具体过程如下图,何时从Step2切换到Step3,我们通过服务层代码判断,使用此方案能确保应用端接口在每时每刻的查询都是最新的全量数据。对于实时分区和离线分区分别都配置4天有效期,过期失效的历史分区会自动清理。如图下图所示:
关于新老客户数,这类指标在计算前涉及与历史数据进行比较,所以势必要维护一张动态更新的维度表,在计算新老客户数前,流入进来的消息先与动态维表进行关联,当消息的支付时间比维表的支付时间大则为老客数据;当消息无法关联上维表里的数据时则为新客数据,随后立即更新HBase维度表,当该用户再次支付所产生的消息则统计为老客数。伪代码如下:
insert into table
select
...
, case when b.rowkey is not null
then b.paid_order_first_time
else a.paid_order_time
end
from table1 a
left join table2 for SYSTEM_TIME as of a.proctime as b
on a.column1 = b.rowkey
具体过程如图下图所示:
以上是在项目开发过程中涉及的部分技术方案,项目上线后的效果如下图:
4.2 销售员实时分析
销售员是有赞推出的一款可帮助商家拓宽推广渠道的应用营销工具。商家可通过制定推广计划招募买家加入推广队伍,并在其成功推广后给予一定佣金奖励,以此给店铺带来更多的传播和促进销量提升。销售员分析功能可以让商家直观的看到销售员的经营数据,从而精细化运用销售员,提升管理效率。
本次项目涉及实时指标主要有今日新增销售员数、今日新增下级销售员数、今日销售员成交金额、今日成交客户数、今日支出佣金等。以上指标都是当日实时指标,不用考虑离线场景,所以使用纯实时数据部分就能支撑。结合业务数据量级我们选型使用MySQL存储实时预聚合的数据,后续业务方通过OneServices提供的接口来获取数据进行实时展示。
部分代码如下图所示:
下面这张图有赞OneServices所提供的能力,通过配置的方式,将底层在线存储引擎映射成Dubbo接口,提供上层应用访问。
public class Request implements Serializable
//请求唯一标识,填写uuid即可
String requestId;
//接入应用
String caller;
//秘钥
String secret;
//API名称
String apiName;
//请求参数
Map<String, Object> params;
//是否开启debug模式,如果开启,将返回更多的信息
Boolean debugMode = false;
//系统调用相关补充信息,可不填
Map<String, Object> additional;
以上是在项目开发过程中涉及的部分技术方案,项目上线后的效果如下图:
4.3 消费者机器人服务实时统计
有赞消费者机器人服务占整体服务量的比重已较高,但目前只能查看离线数据,较为滞后,无法针对随时爆发的线上问题及时发现并布防,因此需要开发实时BI看板,以便于实时监控各个来源的流入以及转人工情况,便于发现异常点,及时止损,降低转人工率。
本次项目涉及的实时指标主要有问题流入量、机器人会话量、机器人转人工会话量、直连人工会话量等。实时数据源涉及客户问题入口记录表和机器人会话表,通过监听Binlog的方式实时采集数据,在机器人会话中存储了问题编码,但通过问题编码无法确定问题来源类型,因此必须要将以上两个数据流通过双流JOIN的方式连接在一起,才能区分哪些是消费者机器人所服务的问题以及会话。
流A是客户问题流入的消息流,流B是基于客户问题机器人进行自动回复的会话消息流。客户问题产生会先于机器人服务会话,会话存在有效期,超过有效期问题未结束重算一次新的服务会话。问题会话量的统计周期是一天,超过一天即使问题未结束也不再计算会话量。以上需求是一个明显的双流JOIN场景,如图4.3.1所示,我们采用FlinkSQL提供的Interval JOIN功能。首先A流的服务ID必须等于B流的服务ID,其次A流的创建时间满足以下表达式 a.created_time between b.created_time and b.created_time + INTERVAL ‘1’ DAY ,伪代码如下:
from
(
select
key
, column
, created_time
from
table1
) a
inner join
(
select
key
, created_time
from
table2
) b
on
a.column = b.column
where
a.created_time between b.created_time and b.created_time + INTERVAL '1' DAY
出于性能和存储的考虑,要将过期的数据清除,通过所配置的INTERVAL参数,将过期的STATE自动清除。
在机器人会话表中如果客户多次关闭会话接着再次打开会话(在会话有效期内),在数据库层面会生成多条记录,但是会话编码不变。如果不对数据去重会导致实时统计结果偏高,但是精确去重对性能消耗较大,会影响到数据产出的时效性,与需求方沟通后,决定采用非精确去重,在使用Druid接入Kafka后,对去重字段使用hyperUnique算法处理,损失一定精度的准确性,确保实时汇总数据产出的时效性。
如下图所示:
以上是在项目开发过程中涉及的部分技术方案,项目上线后的效果如下图所示:
5. 未来展望
在实时数仓方面,我们未来有几个重点事情:
- 实时数仓主题域建设覆盖面更广,支撑更多的业务应用
- 建立实时数仓价值评估体系,量化投入与产出
- 推进在线平台能力的完善,优化实时任务血缘、简化参数配置、本地代码调试等提效的功能
以上是关于基于 Kafka 的实时数仓在搜索的实践应用的主要内容,如果未能解决你的问题,请参考以下文章