在奇富科技的统一 Apache 场景探索实践 Doris OLAP (在奇富科技的上市公司)

在奇富科技的统一 Apache 场景探索实践 Doris OLAP (在奇富科技的上市公司)

作者奇富科技 中间件团队 & SelectDB 技术团队

作为中国卓越的人工智能驱动的信贷科技服务平台,奇富科技(原 360 数科)致力于帮助金融机构提升智能化水平。经过多年金融领域实践,奇富科技以自身强大安全生态为依托,完成了在人工智能、大数据、云计算等技术方面的专业积累。目前,已与银行、消费金融公司、信托公司等建立广泛合作,针对不同类型金融机构的需求提供定制化解决方案,帮助客户完成数字化、智能化升级改造。

消费信贷对于促进消费起着越来越重要的作用,是助力消费恢复、激发潜在需求的重要手段之一。近年来,随着消费信贷规模快速增长,消费信贷的产品和服务也越来越丰富,个人信贷市场呈现的场景化、体验感强等特征。在此背景下,精准营销、精细化风险管理以及提升用户使用体验愈发重要。

为更贴合场景需求、提供精准运营数据支持,奇富科技自 22 年 3 月开始引入Apache Doris,希望通过 Apache Doris 融合用户多维价值数据,基于海量数据进行实时准确的数据分析,以提供个性化的营销方案及决策支持。截止目前 Apache Doris 已在奇富科技被多个业务线应用,每日线上运行数百同步工作流,日均新增及更新的数据规模达到数十亿,承载了上百万次的有效查询。Apache Doris 的应用不仅有效提高了数据处理与分析的速度,还降低了运维成本、确保了系统的稳定性,为奇富科技实现精细化运营和营销广告精准投放提供了强有力的数据支持。

业务场景及应用现状

奇富科技大数据平台提供一站式大数据管理、开发、分析服务,覆盖大数据资产管理、数据开发及任务调度、自助分析及可视化、统一指标管理等多个数据生命周期流程。其中,最典型的数据应用服务即在线报表分析和离线标签服务,离线标签服务又包含了用户标签点查、客群画像、触发式策略生成和人群圈选四大服务。为满足不同业务数据需求,早期架构将数据分散存储在 Clickhouse、Elasticsearch、MySQL 等多个组件中,带来了数据应用和系统运维的挑战:

除此之外,还存在一些性能及稳定性问题,具体表现为:

技术选型

为解决以上问题,奇富科技计划引入一款分析型数据库以满足大部分 OLAP 场景需求,该数据库应具备以下特性:

基于以上选型要求,奇富科技对 ClickHouse、Apache Doris 进行了调研。其中 Apache Doris 在导入、查询、运维等方面的性能符合选型要求,同时在社区规范性、活跃度、开放程度和可持续发展方向表现出色。因此,奇富科技选择 Apache Doris 作为整体 OLAP 场景统一的分析引擎。

接下来将结合真实业务场景,介绍 Apache Doris 在奇富科技的深度应用和优化实践,更好了解 Apache Doris 如何助力奇富科技实现精准营销,提升业务收益。

Apache Doris 在报表分析场景的应用

在报表分析场景中,数据分析师通常会基于重要业务报告、图表、当前业务状况评估报告进行数据分析,旨在为业务策略决策提供重要依据。

早期架构在应对该场景时,会出现数据表同步时效 SLA 达标率(每日统计)波动较大的问题,经过深入分析是由数据同步过程所涉及组件繁多导致。不仅如此,组件繁多还增加了数据同步的复杂性,对数据源(MySQL、ClickHouse)和同步任务(如 top="2579.359375">报表引擎升级

为了解决这些问题,我们使用 Apache Doris 替换了 ClickHouse 和 MySQL ,由 Doris 统一为报表提供服务,降低报表数据源的复杂度。从下图可知,Doris 架设于 Hive 数仓上层,可充分利用 Doris 强大的查询性能,为报表分析场景提供查询加速。同时 Apache Doris 支持原生 MySQL 协议,可与帆软、观远等 BI 工具无缝结合,提升了报表分析的灵活性与易用性。

导入任务优先级

在原先的逻辑中,Doris 对于导入任务的处理没有优先级的概念。当有大量任务同时提交时,Doris 会按照先进先出的逻辑执行任务。因此,我们主导设计并向社区贡献了导入任务优先级功能。该功能可以根据用户设定的优先级来处理导入任务,从而可以保证部分高优任务的时效性。详细流程如下图所示:

通过优先级设置,即使在任务高峰期,依然可以保证高优任务的时效性,极大提升了数据表同步时效 SLA 达标率。为数据的传输及应用提供了更灵活、高效的支持。

异地双机房高可用方案

为保障 Doris 集群的可用性及稳定性、确保在机房故障时集群仍然可读,并在故障恢复后主集群的数据能够回写,为 Doris 集群配备双机房容灾能力势在必行。经过方案调研,决定通过自行开发 Replicator 主从同步插件来实现双机房容灾建设,以下为具体的架构设计:

在主集群安装 Replicator 插件,该插件可以拦截并解析主集群执行的全量 SQL,经过滤操作,可筛选涉及库、表结构变更和数据增、删、改相关的 SQL,并将相关 SQL(部分 SQL 需改写)发送到从集群进行回放。

更进一步的,为了应对主集群可能遭遇意外情况,奇富科技设计了一套灵活的 DNS 解析地址切换机制。当主集群不可用时,切换 DNS 解析地址到从集群,完成主从集群的切换。

除此之外,我们还在 Doris 控制台(内部自研的 Doris 集群维护管理服务)开发了 Validator 数据校验模块,可定时对 Doris 数据表主从集群一致性情况进行校验并上报,包括元信息、行数等。

应用收益

以表同步耗时为例,利用 Doris Broker Load 方式有效降低数据导入的复杂度,提高数据导入时效性,成功将报表数据同步时效 SLA 达标率提升至99% 以上。

以查询时效为例,下图为引入 Apache Doris 后报表分析集群的查询时效监控,平均耗时稳定在 10s 以内、P90 查询耗时稳定在 30 秒以内 ,相比旧架构平均耗时降低了 50% 以上。随着应用程度的加深和数据规模的增长,查询性能还得以进一步提升,获得更多内部业务线的认可。

Apache Doris 在离线标签服务场景的应用

在标签服务场景中,数据分析师会根据用户的多重维度信息,基于一定策略进行用户标签的打标,标签主要包括行为标签和属性标签。离线标签服务则以上述标签为基础,为下方四个场景提供服务:

由于离线标签服务需同时满足四个场景的需求,因此对数据导入时效性和可用性有较高要求,以确保服务持续稳定运行。同时,需要保证用户标签查询的点查性能,支持实时查询标签信息。在人群圈选方面,还需具备精准去重和集合交并集运算能力,保证营销活动和广告投放的精准性和有效性。

在原先的架构中(如上左图所示),导入的数据会逐步生成标签信息,并对标签信息进行加工、合并为 JSON 文件(合并操作是为了减少 Elasticsearch 的更新次数及负载),合并后的 JSON 文件导入到 Elasticsearch 中提供数据服务。然而,这种方式带来两个问题:

为解决上述问题,我们基于 Apache Doris 对架构进行了改造,如上图右图所示,在标签数据加工过程中,利用 Apache Doris 进行标签宽表的存储和处理,当上游标签准备就绪时,可使用 Aggregate Key + replace_if_not_null 实现标签合并和部分列更新的效果。当所有标签均更新到标签宽表后,继续将其同步到 Duplicate Key 模型的标签明细宽表中,以提升查询性能。

这样的改造使得我们能更加及时地处理标签数据, 标签数据的导入时效从 4 小时缩短至 1 小时以内 。此外,借助 Doris 完善的 Bitmap 索引以及高并发查询性能, 实现了秒级人群圈选 。在点查询方面, QPS 达到 700+ ,确保了查询的快速响应。

基于 Apache Doris 的湖仓查询加速实践

为缩短业务决策周期、提升数据分析师的工作效率,对于离线湖仓查询加速的重要性日益凸显。因此自 22 年 9 月起,我们开始将 ApacheDoris 应用在离线查询加速场景中,彼时 Doris 仅支持以 Hive 外部表的形式进行查询,由于外部表需要配置映射关系,当 Hive 元数据发生变更时需要手动更新,人工维护成本较高,因此选择将 Hive 中数据导入进 Doris 中以实现查询加速。

具体而言,会先根据查询频率对数据表优先级进行排序,通过 Doris Broker Load 将查询频率 Top 500 的数据表从 Hive 导入到 Doris。通过路由引擎定时调度收集数据表在 Doris 和 Hve 中表的元信息,包括表的字段、字类型以及数据规模。当收到查询语句时,路由器检测数据的是否在 Doris 中,如存在则会路由到 Doris 引擎,从而实现查询加速。

而该方案并不完美,依赖于对于 Hive 数据的导入。当面对大规模数据导入任务时,可能造成集群资源紧张导致查询和导入性能下降的问题,甚至可能引起集群不稳定,对业务使用产生负面影响。

为解决上述问题,我们对架构进行了进一步升级。在 Apache Doris 1.2 版本支持了 Multi-Catalog 多源数据目录,基于该能力简单配置即可将 Hive 集群的数据表完整同步到 Doris 中,解决了配置外表繁琐、元信息无法自动同步等问题。以下是数仓查询加速新方案:

在新方案中,主要通过以下几个步骤完成数仓查询加速方案:

当用户提交查询语句后,路由引擎(即查询网关)会根据收集的数据元信息进行判断,为查询语句匹配最优查询路径。路由引擎还会分析查询涉及的分区数和表行数,来匹配最佳执行引擎( Doris 、Presto、SparkSQL ),例如当查询语句所依赖数据表都存在于 Doris 内表中且元数据一致时,将直接路由至 Doris 内表进行查询,无法命中则将查询下压,由 Doris Hive Catalog 进行查询或者通过 Presto 进行查询。

通过这种策略,用户无需关心查询由哪个引擎执行,也无需关注查询引擎间的语法差异。查询总能以最高效的方式执行,实现查询加速,并能保障结果的准确性 。使用上述方案也遭遇了一些问题,接下来分享应对策略及优化方案。

Hive 元数据准实时同步

使用上述查询加速方案时,如果 Hive 元信息更新后无法准实时同步至 Doris ,用户查询时会出现结果不一致问题。为避免该问题发生,可使用 Doris 元数据自动刷新特性(基于 HMS Event Listener 实现)实现元信息准实时同步。这种方式需要在 Hive 端开启 HMS Event Listener ,并在 Doris 端开启 HMS Event 消费开关。具体实现如下图所示:

对于 Hive 端 ,当开启 Listener 后,因 Hive 本身具有内置的 Listener 实现,会对 Hive 集群中 DDL 操作以及 Insert 操作进行拦截。在发起拦截操作后,生成对应的 HMS Event(事件),并写入 Hive 元信息数据库中。

对于 Doris 端 ,当开启消费 HMS Event 开关后,Doris 的 Master FE 节点会启动后台线程,调度式批量拉取 HMSEvent, 通过消费 HMS Event 获取 Hive 端的元数据变化,并将该变化同步到 Doris 内部维护的 Hive 元信息中,实现数据的准实时同步 。在实施过程中也遇到了一些新的问题,在此对优化方案进行分享:

优化方案 1:Hive DDL 长时间阻塞

启动 Hive 端的 HMS Event Listener 后,Hive 处理新增表和分区时需要获取表、分区名和文件信息,生成相应的 HMS Event,而 Doris 回放 HMS Event 时并不需要文件信息。因此当 Hive 表、分区文件数过多或集群繁忙时,获取文件信息的操作会延长 HMS Event 生成时间,导致 Hive DDL 操作耗时增加。

基于此,我们重写了 Hive DbNotificationListener,屏蔽对 Hive 表、表分区文件信息收集,有效减少不必要信息的拉取与采集,缩短了 Hive 端的执行效率。

优化方案 2:消费速率与事件产生速率不匹配

Hive 集群繁忙时,DDL 或 Insert 事件频率高,且 Doris 采用单线程串行模式消费 HMS Event,可能会出现 Doris 消费速率与 Hive 时间产生速率不匹配的问题。

为了提高 Doris 消费速度,我们在 Doris 端增加了预先事件合并(HMS Event Merge)步骤,过滤掉一部分 HMS Event,大幅降低了需要消费的 HMS Even 数量。预先事件合并的核心思想是判断批量 HMS Event 中是否存在可以抵消前面某个 HMS Event 产生的效果,如果是则跳过前面的 HMS Event。 通过这样的方式,HMS Event 数量能够大幅度降低,进而提升了 Doris 处理 HMS Event 的消费速度

预先事件合并原理如上图所示,第一个事件 DropPartitionEvent(db1.tbl1)和最后一个事件 DropTableEvent(db1.tbl1)针对同一张 Hive 表。DropPartitionEvent 发生在前,DropTableEvent 发生在后。在处理 DropPartitionEvent 时,Doris 会删除该 Hive 表的应分区缓存信息;在处理 DropTableEvent 时,Doris 会删除该 Hive 表的表缓存信息。由于后一个 HMS Event 产生的效果可以完全抵消前一个 HMS Event 产生的效果,因此可以跳过前面的 HMS Event。

我们对集群 4 天内所处理 HMS Event 处理情况进行抽取,从上图可知, 经 HMS Event Merge 操作后,Doris 消费事件数量时显著降低,较原来减少约 2-5 倍的数量。完成优化后,再未出现消费速率低于产生速率的问题

优化方案 3:HMS Event 类型缺失,影响 FE 稳定性

针对 CDH 或者 HDP 集群 Hive 版本产生压缩格式的 Event ,新增了 GzipJSONMessageDeserializer 反序列化器进行处理。原因是 Slave FE 节点一般通过 Journal Log 方式消费 HMS Event ,如果消费失败会导致 FE 服务直接退出, 通过新增 Fallback 策略可以提升消费逻辑的健壮性,尽量避免 FE 服务退出的问题

引入弹性计算节点,Deploy on Yarn 实现弹性扩缩容

当需要处理大规模数据或进行高性能计算时,需要更多计算资源提供支持。若部署的 Doris BE 集群时有上百个节点,每一个节点需要多个 CPU,那么在运行过程中需要庞大的资源支撑,从而带来较大的资源成本和部署成本。为降低资源和部署成本,我们选择引入 Doris 的弹性计算节点(Elastic Compute Node),并选择将弹性计算节点与 Hadoop 集群的其他组件(如>

在此基础上,基于 Doris 计算节点的无状态特性,可通过 Skein 进一步简化 Doris 计算节点 Deploy on Yarn 的部署流程,实现节点弹性扩缩容。在实际运行过程中,我们依据用户的查询习惯,在夜间查询较少时缩容、在白天业务高峰时扩容,最大化利用集群资源、提高资源利用率。

如上图,在 Yaml 文件中定义 Doris 计算节点的数量和所需资源信息,并将安装包、配置文件、启动脚本统一打包至分布式文件系统。当需进行版本升级或集群启停时,只需一行命令即可在分钟内完成整个集群上百个计算节点的启停操作。

Hive 视图查询优化

在 Hive Catalog 查询运行过程中会有偶发的查询失败问题,因此我们对数百业务用户查询失败的原因进行了深度分析,发现 28% 是由查询视图引起,24% 是由于用户使用了 Doris 无法匹配的函数导致。

如下图所示,当用户发送查询请求后,在 Doris 内部将经历词法分析、语法分析、逻辑执行计划和分布式执行计划四个阶段。Hive 视图识别在第三阶段进行,即当逻辑执行计划阶段生成 Hive ScanNode 后,对其进行初始化时,Doris 会识别该表是否为 Hive 视图。如果是 Hive 视图,将直接抛出查询异常导致查询失败。

因此我们对 Hive 查询执行进行了优化,将 Hive 视图的识别提前至语法分析阶段(第二阶段)进行,如果识别为 Hive 视图,则通过 HMS Client 客户端获取该视图定义的 DDL ,并将 DDL 带入 Doris 解析视图依赖的 Hive 实体表。在逻辑执行计划生成阶段(第三阶段),将生成与 Hive 实体表相对应的 HiveScanNode,查询将继续进行。通过简单改造,Doris 即可通过 Hive Catalog 实现对 Hive 视图的查询支持。

应用收益

查询加速主要得益于 Apache Doris 的 Multi-Catalog 特性,相对于外部表,Multi-Catalog 无需创建表与表之间的映射关系,可以实现元数据层的对接,如上文所述 只需简单的配置,实现将整个 Hive 集群的数据表完整地同步到 Doris 中,彻底解决了以往配置外表繁琐、不支持元信息自动同步等问题

此外,借助 Doris Multi-Catalog 替代了早期架构中多个数据组件,统一了数据源入口和数据查询出口,降低架构的复杂度、缩短数据处理与查询的流程,大大提高数据查询的效率。

结束语

从 22 年引入 Doris 以来,凭借其优异的性能、较低的运维复杂度和较高稳定性,迅速在奇富科技内部多个业务场景得到大规模的应用。截止目前, 生产环境共有近十套集群、上百 BE 节点、CPU Core 超过 1000+,总数据规模达到数十 TB ,每日有数百个同步工作流在运行,日新增数据或更新的规模达近百亿,每天支持业务方百万次的有效查询量 。通过 Doris 的应用极大的降低了架构的复杂度,有效提高了数据处理与分析的速度,降低了运维成本,确保了系统的稳定性。

未来我们将继续深入使用 Apache Doris 、并引入 2.0 版本,为用户带来更加实时、统一的数据处理和分析体验。后续我们将重点关注存算分离架构、数据湖分析特性以及 Doris Manager 组件的应用。

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。