Apache Doris 万亿数据秒级响应 在360数科实时数仓中的应用 (apache ii评分表)

Apache Doris 万亿数据秒级响应 在360数科实时数仓中的应用 (apache ii评分表)

作者:360 数科中间件团队

编辑整理:SelectDB

作为以人工智能驱动的金融科技平台,360 数科携手金融合作伙伴,为尚未享受到普惠金融服务的优质用户提供个性化的互联网消费金融产品,致力于成为连接用户与金融合作伙伴的科技平台。360 数科旗下产品主要有 360 借条、360 小微贷、360 分期等,截止目前,已累计帮助 141 家金融机构为 4300 万用户提供授信服务、为 2630 万用户提供借款服务、单季促成交易金额 1106.75 亿元。同时作为国内领先的信贷科技服务品牌,360 数科在三季度累计注册用户数首次突破 2 亿。

业务需求

随着金融科技业务的不断发展,对数据的安全性、准确性、实时性提出了更严格的要求,早期 Clickhouse 集群用于分析、标签业务场景,但是存在稳定性较低、运维复杂和表关联查询较慢等问题,除此之外,我们业务中有部分报表数据分散存储在各类 DB 中,这也导致维护管理复杂度较高,亟需做出优化和重构。

系统选型及对比

基于以上需求及痛点,我们对实时数仓的选型目标提出了明确的需求,我们希望新的 MPP 数据库具有以下几个特点:

2022 年 3 月开始,我们对符合以上特点的数据库Apache Doris展开了为期两个月的调研测试。以下是 Apache Doris 1.1.2 在各个方面的满足情况。

基于上述情况,我们决定采用 Apache Doris,除了可以满足上文提到的几个特点,我们还考虑以下几个方面:

平台架构

360 数科大数据平台(毓数)提供一站式大数据管理、开发、分析服务,覆盖大数据资产管理、数据开发及任务调度、自助分析及可视化、统一指标管理等多个数据生命周期流程。在整个 OLAP 中,目前 Apache Doris 主要运用离线数仓分析加速、自助 BI 报表等业务场景。

在引入 Doris 后,考虑已有数据分析业务以及数据规模,Doris 集群将先同步部分业务上优先级更高的数据。通过上述架构图可以看到,依托 Doris 强大的查询性能,我们将把 Doris 架设在 Hive 数仓的上层,为特定场景进行查询加速,这样的架构建设起来成本很低,只需要完成数据从 Hive 数仓到 Doris 集群的导入适配,因为 Doris 集群并没有产生任何新表,可以直接复用已经建设好的数据血缘关系。

数据导入方案 ,我们在调研了 Stream Load 和 Broker Load 之后,从导入性能、开发成本上进行了评估,在导入性能上,Broker Load 要比 Stream Load 略胜一筹,而在开发成本上两种方式并没有明显的差异。而且对于大表的同步,Broker Load 的导入方式可以做到单表一次导入一个事务,而 Stream Load 在单表数据量超 10G 时则需要拆分后进行数据导入。因此数据导入选择使用 Broker Load 来进行。

数仓即席查询方案 ,我们自行开发的查询引擎支持多查询引擎动态切换的机制,通过识别查询数据的元信息对本次查询做自动的查询引擎(Doris/Presto/Spark/Hive)路由和故障切换。

Doris 支持原生 MySql 协议,对标准 SQL 支持良好,使得 Doris 可以和一些 BI 工具(帆软、观远等)无缝结合,因此单独搭建了一个 Doris 报表分析集群作为 BI 工具数据源。

应用实践

Doris 对 Hive 数仓的查询加速方案

在即席查询场景中,传统的查询引擎(Hive/Spark/Presto)越来越满足不了数据开发者、数据分析师对查询响应性能提出的高要求,动辄几十秒甚者分钟级的查询耗时极大的限制了相关场景的开发效率。

为提高查询性能,我们通过架设的 Doris 数仓加速层来缩短查询耗时,目前我们在不开启 Doris 缓存、不开启用物化视图等优化策略的情况下,命中 Doris 即席查询平均耗时即可从几分钟缩短至 5 秒内。

未来我们将通过分析相关查询的特征,通过开启缓存、创建相关物化视图等策略来进一步优化 Doris 的查询性能。

实现 Doris 加速的核心是支持查询引擎动态切换,查询引擎动态切换的工作机制如下:

查询引擎会及时收集 Hive 和 Doris 的元信息,包括库、表、表字段、表行数等信息,在用户提交即席查询请求时,首先会解析出用户查询的表,并按照如下顺序判断:

如果以上要求均被满足,则会将该查询路由到 Doris,否则会依次按照 Presto、Spark、Hive 的顺序进行路由查询,当查询出现异常时,也会按照该顺序依次进行故障转移。

慢查询慢导入分析

对于慢查询和慢导入,Doris 提供了完善的 Profile 机制,在了解相关技术细节后,我们在线上集群开启了 Profile 收集,通过调度任务定时收集慢查询、慢导入的 Profile 信息并落库。

Doris 提供的 Profile 信息非常详细,例如 OLAP_SCAN_NODE 提供了原始的扫描行数,各个索引的过滤行数,每个 Instance 的 EXCHANGE_NODE 提供了接收的数据总行数和接收的数据量大小。这些信息为查询调优提供了详细的依据,我们在使用过程中针对快速定位查询性能的瓶颈进行了优化,取得了良好的效果。

建表规范

在我们的使用场景中,有下列类型的表:

由于当前 Doris 集群中所有的表都是基于 Hive 数仓中各层级的表同步而来,因此目前仅使用了 Duplcate 模型和 Unique 模型,对于 pda、pdi 和 a 表,为了降低 Doris 表的分区数,减轻 FE 元数据管理压力,我们在建 Doris 表时均启用了根据日期划分的动态分区特性,较久远的历史数据我们按年、月的维度分区归档,近期的数据按日、小时分区,未来我们计划通过程序自动识别完成历史分区的归档合并。

对于 pda 表使用场景 ,pda 表需要每日同步全量数据,我们采用了 Duplicate 模型,不考虑使用 Unique 模型数据去重的原因是 Doris 的导入模型本身就提供了基于任务 Label 的数据一致性保证,同步时一次调度周期的 pda 表的一个分区的导入任务能产生唯一且不变的 Label,因此我们可以保证即使错误执行了多次,该分区的数据仍然不会重复。另外,因为 Duplicate 模型相比于 Unique 模型,在导入和查询阶段均不会做预聚合去重,所以可以一定程度上加速导入和查询的性能。

对于 pdi 表使用场景 ,因在实际使用中 pdi 表存在少数对历史数据的部分更新场景(绝大部分是数据更新场景,基本没有数据删除场景),考虑到 Doris 数据表的分区可用性,我们采用了 Unique 模型,这样在更新历史分区的数据时不必做重建分区操作。

对于 a 表使用场景 ,因业务上可以接受短时间数据不可用情况,我们启用了动态分区,在做数据导入时,每次导入都会先删除历史分区,然后将全量数据导入今天的分区内,这样做的考虑是杜绝重建表操作,且实施成本相对比较低,因此我们没有采取动态更新视图绑定当日分区的方案。

在 Doris 之前的版本中,尚未实现 Hive 元数据变更同步和管理功能,为了提高效率开发了 Doris 建表工具,我们通过选择和配置数仓集群、Hive 表名、数据模型、Bucket 数量等参数,自动关联 Hive 表,解析表字段并生成对应的建表语句。经过与社区沟通得知,最近即将发布的 1.2 新版本中已经实现 Multi Catalog,支持 Hive 元数据的对接和 Schema 的自动同步,可以极大程度上减少这一部分的工作。

监控体系

当前 Doris 集群监控体系分为主机指标监控告警、日志告警和集群指标监控告警,总体监控体系如下。

主机指标监控 是基于 Open-Falcon 开发的监控告警平台,主要采集 Doris 集群节点的 CPU、IO、内存、磁盘等相关指标并进行监控告警。

集群指标监控 参考了 Doris 官方文档提供的基于Prometheus和和集群指标监控方案。

日志告警 仍然是基于我们的监控告警平台,主要用于监控 Doris 服务日志中容易识别但其他监控方式成本较高的监控、告警场景,是其他两种监控的补充。通过日志监控告警,我们能够准确识别数据导入任务的失败原因并能进行及时的推送通知。

问题排查和审计日志

为了及时排查一些极端的集群问题,上述针对 Doris 的监控体系建设仍然是不够的。为了在集群 BE 出现异常宕机时快速定位堆栈,需要在所有的 BE 节点开启 Core Dump。除此之外,审计日志在集群的日常运维中也发挥了重要作用。

对于 Doris 集群的审计日志收集一般可以通过 2 种方式:

考虑到第二种方式操作更简单,因此采用此方式进行日志采集。不过在使用 Auditloader 插件的过程中,陆续发现和修复了一些插件问题,并向社区提交了 PR,与此同时,我们定制开发了内部控制台,便于查看集群的同步任务情况,数据分布情况以及进行审计日志的检索。

审计日志为集群 BE 崩溃时具体 SQL 定位、客户端访问统计、查询 SQL 耗时统计、访问 SQL 特征分析等提供了详细的信息。例如,数据开发曾经反馈查询 Doris SQL 失败,检索日志出现了大量连接数超限的异常,我们通过审计日志,迅速定位到了问题原因是由于上游导入工作流 Bug 在短时间内创建较多的数据库连接。另外,对于曾经使用的低版本 Doris 出现数次 BE 异常宕机问题,我们通过 gdb 调试工具定位到崩溃时 SQL 的后,配合审计日志也能快速的定位到导致崩溃的具体 SQL。

优化实践

数据导入实践和调优

初期数据源主要来自 Hive 数仓,因此大部分数据导入以 Broker Load 方式为主。大数据平台自助导入任务工作流适配了 Doris Broker Load 导入方式,数据开发零代码——通过简单的勾选配置即可完成自助的 Doris 数据导入工作流创建。

而在 Broker Load 的使用过程中,我们也陆续遇到了一些问题,这里拿出几个典型的问题和一些调优经验来分享。

在 Broker Load 导入时遇到的问题:

tablet writer write failed, tablet_id=xxx, txn_id=xxx, err=-238
复制代码

我们推测造成 -238 错误的原因可能是分桶设置太少,接着我们通过 BE 节点的挂载数据来查看单个 Tablet 下的文件大小,我们发现单个 Tablet 的文件占用空间远大于官方推荐的 10GB 上限范围,这也证明了我们的推测正确,因此我们通过适当提高 Doris 表的分桶数,使得这个问题有了较大的缓解。

顺便说一下,如果出现 -235(旧版本是-215)异常,一般是由于 Compaction 过慢导致 Tablet 版本堆积超过限制,这个时候通过 Grafana 看到 BE Compaction Score 在导入前后有明显的波动,而且绝对值很高。如果遇到此问题可以参阅 ApacheDoris 公众号文章:Doris 最佳实践-Compaction调优(3) 对Compaction过程进行调优。

Hive 表在使用过程中会有一些 DDL 的执行,从而导致表字段新增,我们数仓的 Hive 表均使用 ORC 格式存储,那么就会导致 Hive 表中部分历史分区的 ORC 文件中字段信息缺失(缺失新增字段),而新分区的 ORC 文件中字段是正常的,这个时候如果对历史数据重新导入,就会有下面的异常信息:

detailMessage: ParseError : Invalid column selected xxx
复制代码

在阅读了 Broker Load 相关代码后确认了问题原因:在一次 Broker Load 导入过程中,导入任务的字段解析器会读取一个 ORC 文件头解析字段信息,但解析器只会解析一次,如果一次导入过程中同时有新、历史分区的 ORC 文件,那么就可能导致任务失败。

修复的方法也很简单,只需针对每个 ORC 文件重新解析一次文件头的字段信息即可。在了解问题原因及分析解决思路后,我们也和社区的同学一起修复了这个问题并提交了相关 PR。

这个问题的错误表现和问题 2 比较类似,具体原因是 Broker Load 导入过程没有对 ORC 文件做判空,遇到空 ORC 文件仍会尝试解析 ORC 文件字段信息导致报错,我们把这个问题反馈给社区后,社区的同学很快修复了该问题。

创建 Broker Load 任务,使用 Kerberos 认证访问 HDFS 的 Hive 文件导入数据,Hive 文件路径中分区和下一级目录使用通配符 *,访问所有分区所有文件,任务提交后隔 40 多秒出现如下的错误:

type:ETL_RUN_FAIL; msg:errCode = 2, detailMessage = Broker list path exception. path=hdfs:xxx
复制代码

在阅读了 Broker Load 的访问 HDFS 相关代码后确认了问题原因,Broker Load 调用 HDFS 的 LS、DU 方法时会获取文件目录信息,由于路径下的文件过多导致耗时会超过 45 秒,而 Thrift 设置的 Socket 请求超时默认小于 40 秒,所以出现了上述的 RPC 异常,问题反馈社区后,对 FE 增加了配置参数 broker_timeout_ms ,设置为 90 秒后解决问题。

关于 Broker Load 的导入性能调优策略

我们针对 Broker Load 导入调优的主要方向在确保 Doris 集群不承压的情况下尽可能提高导入并发度,下面根据 2 个典型的案例来说明:

部分 pdi/pda 表数据规模在 T 级别,在进行全量导入时,如果只提交一个 Broker Load Job ,将因为导入任务的并发不够,导致导入耗时达到 5-6 小时。针对此问题,我们可以对导入任务进行 Job 拆分,在大数据平台也适配这种场景,支持任务的自动拆分和重试机制,具体的拆分方式如下图:

不过要注意的是,拆分后可能会对集群有较高的写入压力,要及时监控导入任务和集群的状态,特别针对 -235 的情况可能需要进行 Compaction 调优。

数据开发对部分报表的同步时效提出了很高的要求,我们在针对性的优化表同步时效时,发现一些表导入耗时较长,但通过集群监控体系发现相关表同步期间,BE、FE 节点的 CPU、内存、磁盘 IO 、网卡 IO 并没有达到瓶颈,集群的 Compaction Score 在此期间也一直稳定在低位,且整个同步过程同步任务均未出现-235、-238 等相关的错误,我们推测瓶颈可能还是在导入任务的并发程度上。

因为有些表在 Hive 数仓是非分区的表,所以第 1 种通过划分分区范围拆分多个导入 Job 的方式就行不通了,理论上仍然可以通过划分不同的 HDFS 文件来拆分 Job,但是这种方式在毓数大数据平台还需要进一步去适配,所以我们还是优先考虑通过调整集群配置的方式彻底解决此问题:

首先可以通过适当调高 FE 的 max_broker_concurrency 去提高 Scan HDFS 文件阶段的并发度(最高调高至 BE 节点数),而对于 Table Sink 阶段,可通过调高 FE 的 default_load_parallelism (设置,可调整到 BE 节点数)和 send_batch_parallelism 参数( SQL Session 执行 set global send_batch_parallelism=5 或在提交 Broker Load 中的 PROPERTIES 中指定,最高调整到 5,如果超过此值,需要同步调整的 max_send_batch_parallelism_per_job 参数),提高该阶段并发度。通过提高 Broker Load Job 各阶段导入的并发度,相关报表的同步时效显著提升,这里我们选取 5 张典型表为例,优化前后的同步时效表现如下:

双机房容灾建设

为了保障 Doris 集群的可用性,我们需要为 Doris 集群提供双机房容灾能力。Doris 目前虽然可以通过不同的 Tag 将 BE 分组部署在多个机房,但是无法解决机房出现问题时的 FE 可用性问题。经过方案调研分析,我们决定通过自行开发 Replicator 主从同步插件去实施双机房容灾建设,具体的架构如下:

通过在主集群安装 Replicator 插件,Replicator 插件会拦截并解析主集群执行的全量 SQL,然后经过过滤操作,筛选涉及库、表结构变更和数据增、删、改相关的 SQL,并将相关 SQL(部分 SQL 需要改写)发送到备集群进行重放。除此之外,我们在 Doris 控制台开发了 Validator 数据校验程序,定期校验主备集群间的数据结构差异和数据差异并上报,在主集群因各种问题导致不可用时,直接通过切换 DNS 解析地址到备集群 LVS 地址完成主备集群的切换。

总结规划

效果总结

从 2022 年 3 月份开始进行对实时数仓沟通进行调研,7 月份正式上线生产,集群数据规模快速增长。目前,生产环境共有 2 个集群,数百张表,几十 TB 数据,每日有数百个同步工作流在运行,几十亿规模的数据新增/更新。在此规模下,Doris 对业务支持良好,稳定运行。

未来规划

在近期的规划中,我们希望 Doris 能支撑更多的业务场景、发挥更大价值,例如基于 Doris 建立实时数仓、基于 Doris 重构用户行为画像、Doris HIVE 外表特性等。同时我们计划通过分析用户的查询 SQL 特征,结合 Doris 的查询缓存和物化视图特性,进一步提升查询效率。通过开发集群探查工具,实时探测集群数据表的数据分布情况,比如 Tablet 有没有过大,Tablet 数据分布是否均匀等,综合探查集群的运行情况并自动给出优化建议。

目前我们使用了 Doris 有大半年时间,在这半年期间一直保持和社区同学进行交流(提交 Issues 和 PRs),非常感谢 SelectDB 团队一直以来对我们的技术支持。最后祝 Apache Doris 越来越好,为基础软件建设添砖加瓦。

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。