Apache Doris 在 日增百亿数据 360商业化的统一 应用实践 查询结果秒出 OLAP (apache ii评分表)

Apache Doris 在 日增百亿数据 360商业化的统一 应用实践 查询结果秒出 OLAP (apache ii评分表)

作者|360 商业化数据团队 窦和雨、王新新

360 公司致力于成为互联网和安全服务提供商,是互联网免费安全的倡导者,先后推出 360 安全卫士、360 手机卫士、360 安全浏览器等安全产品以及 360 导航、360 搜索等用户产品。

360 商业化依托 360 产品庞大的用户覆盖能力和超强的用户粘性,通过专业数据处理和算法实现广告精准投放,助力数十万中小企业和 KA 企业实现价值增长。360 商业化数据团队主要是对整个广告投放链路中所产生的数据进行计算处理,为产品运营团队提供策略调整的分析数据,为算法团队提供模型训练的优化数据,为广告主提供广告投放的效果数据。

业务场景

在正式介绍Apache Doris在 360 商业化的应用之前,我们先对广告业务中的典型使用场景进行简要介绍:

实时数仓演进

为提升各场景下数据服务的效率,助力相关业务团队更好推进商业化增长,截至目前实时数仓共经历了三种模式的演进,分别是 Storm + Druid + MySQL 模式、Flink + Druid + TIDB 的模式 以及 Flink + Doris 的模式,本文将为大家进行详细介绍实时数仓演进过程以及新一代实时数仓在广告业务场景中的具体落地。

第一代架构

该阶段的实时数仓是基于 Storm + Druid + MySQL 来构建的,Storm 为实时处理引擎,数据经 Storm 处理后,将数据写入 Druid ,利用 Druid 的预聚合能力对写入数据进行聚合。

架构痛点:

最初我们试图依靠该架构解决业务上所有的实时问题,经由 Druid 统一对外提供数据查询服务,但是在实际的落地过程中我们发现 Druid 是无法满足某些分页查询和 Join 场景的,为解决该问题,我们只能利用 MySQL 定时任务的方式将数据定时从 Druid 写入 MySQL 中(类似于将 MySQL 作为 Druid 的物化视图),再通过 Druid + MySQL 的模式对外提供服务。通过这种方式暂时可以满足某些场景需求,但随着业务规模的逐步扩大,当面对更大规模数据下的查询分析需求时,该架构已难以为继,架构的缺陷也越发明显:

第二代架构

基于第一套架构存在的问题,我们进行了首次升级,这次升级的主要变化是将 Storm 替换成新的实时数据处理引擎 Flink ,Flink 相较于 Storm 不仅在许多语义和功能上进行了扩展,还对数据的一致性做了保证,这些特性使得报表的时效性大幅提升;其次我们使用 TiDB 替换了 MySQL ,利用 TIDB 分布式的特性,一定程度上解决了分库分表难以维护的问题(TiDB 在一定程度上比 MySQL 能够承载更大数据量,可以拆分更少表)。在升级完成后,我们按照不同业务场景的需求,将 Flink 处理完的数据分别写入 Druid 和 TiDB ,由 Druid 和 TIDB 对外提供数据查询服务。

架构痛点:

虽然该阶段的实时数仓架构有效提升了数据的时效性、降低了 MySQL 分库分表维护的难度,但在一段时间的使用之后又暴露出了新的问题,也迫使我们进行了第二次升级:

新一代实时数仓架构

第二次升级我们引入 Apache Doris 结合 Flink 构建了新一代实时数仓架构,借鉴离线数仓分层理念对实时数仓进行分层构建,并统一 Apache Doris 作为数仓 OLAP 引擎,由 Doris 统一对外提供服务。

我们的数据主要源自于维表物料数据和业务打点日志。维表物料数据会定时全量同步到 Redis 或者 Aerospike (类似于 Redis 的 KV 存储)中,通过 Binlog 变更进行增量同步。业务数据由各个团队将日志收集到 Kafka,内部称为 ODS 原始数据(ODS 原始数据不做任何处理),我们对 ODS 层的数据进行归一化处理,包括字段命名、字段类型等,并对一些无效字段进行删减,并根据业务场景拆分生成 DWD 层数据,DWD 层的数据通过业务逻辑加工以及关联 Redis 中维表数据或者多流 Join,最后生成面向具体业务的大宽表(即 DWT 层数据),我们将 DWT 层数据经过聚合、经由 Stream Load 写入 Doris 中,由 Doris 对外提供数据查询服务。在离线数仓部分,同样也有一些场景需要每日将加工完的 DWS 数据经由 Broker Load 写入到 Doris 集群中,并利用 Doris 进行查询加速,以提升我们对外提供服务的效率。

选择 Doris 的原因

基于 Apache Doris 高性能、极简易用、实时统一等诸多特性,助力 360 商业化成功构建了新一代实时数仓架构,本次升级不仅提升了实时数据的复用性、实现了引擎的统一,而且满足了各大业务场景严苛的数据查询分析需求,使得整体实时数据流程架构变得简单,大大降低了其维护和使用的成本。我们选择 Doris 作为统一 OLAP 引擎的重要原因大致可归结为以下几点:

在 AB 实验平台的具体落地

Apache Doris 目前广泛应用于 360 商业化内部的多个业务场景。比如在实时大盘场景中,我们利用 Doris 的 Aggregate 模型对请求、曝光、点击、转化等多个实时流进行事实表的 Join ;依靠 Doris 事务特性保证数据的一致性;通过多个物化视图,提前根据报表维度聚合数据、提升查询速度,由于物化视图和 Base 表的一致关系由 Doris 来维护保证,这也极大的降低了使用复杂度。比如在账户实时消费场景中,我们主要借助 Doris 优秀的查询优化器,通过 Join 来计算同环比......

接下来仅以 AB 实验平台这一典型业务场景为例,详尽的为大家介绍 Doris 在该场景下的落地实践,在上述所举场景中的应用将不再赘述。

AB 实验在广告场景中的应用非常广泛,是衡量设计、算法、模型、策略对产品指标提升的重要工具,也是精细化运营的重要手段,我们可以通过 AB 实验平台对迭代方案进行测试,并结合数据进行分析和验证,从而优化产品方案、提升广告效果。

在文章开头也有简单介绍,AB 实验场景所承载的业务相对比较复杂,这里再详细说明一下:

基于以上特点,我们在 AB 实验场景中一方面需要保证数据算的快、数据延迟低、用户查询数据快,另一方面也要保证数据的准确性,保障数据不丢不重。

数据落地

当面对一条流量可能包含几十个实验标签 ID 的情况时,从分析角度出发,只需要选中一个实验标签和一个对照实验标签进行分析;而如果通过的方式在几十个实验标签中去匹配选中的实验标签,实现效率就会非常低。

最初我们期望从数据入口处将实验标签打散,将一条包含 20 个实验标签的流量拆分为 20 条只包含一个实验标签的流量,再导入 Doris 的聚合模型中进行数据分析。而在这个过程中我们遇到一个明显的问题,当数据被打散之后会膨胀数十倍,百亿级数据将膨胀为千亿级数据,即便聚合模型会对数据再次压缩,但这个过程会对集群造成极大的压力。因此我们放弃该实现方式,开始尝试将压力分摊一部分到计算引擎,这里需要注意的是,如果将数据直接在 Flink 中打散,当 Job 全局 Hash 的窗口来 Merge 数据时,膨胀数十倍的数据也会带来几十倍的网络和 CPU 消耗。

接着我们开始第三次尝试,这次尝试我们考虑在 Flink 端将数据拆分后立刻进行 Local Merge,在同一个算子的内存中开一个窗口,先将拆分的数据进行一层聚合,再通过 Job 全局 Hash 窗口进行第二层聚合,因为 Chain 在一起的两个算子在同一个线程内,因此可以大幅降低膨胀后数据在不同算子之间传输的网络消耗。该方式 通过两层窗口的聚合,再结合 Doris 的聚合模型,有效降低了数据的膨胀程度 ,其次我们也同步推动实业务方定期清理已下线的实验,减少计算资源的浪费。

考虑到 AB 实验分析场景的特点,我们将实验 ID 作为 Doris 的第一个排序字段,利用前缀索引可以很快定位到目标查询的数据。另外根据常用的维度组合建立物化视图,进一步缩小查询的数据量, Doris 物化视图基本能够覆盖 80% 的查询场景 ,我们会定期分析查询 SQL 来调整物化视图。 最终我们通过模型的设计、前缀索引的应用,结合物化视图能力,使大部分实验查询结果能够实现秒级返回。

数据一致性保障

数据的准确性是 AB 实验平台的基础,当算法团队呕心沥血优化的模型使广告效果提升了几个百分点,却因数据丢失看不出实验效果,这样的结果确实无法令人接受,同时这也是我们内部不允许出现的问题。那么我们该如何避免数据丢失、保障数据的一致性呢?

自研 Flink Sink Doris 组件

我们内部已有一套 Flink Stream API 脚手架,因此借助 Doris 的幂等写特性和 Flink 的二阶段提交特性,自研了 Sink To Doris 组件,保证了数据端到端的一致性,并在此基础上新增了异常情况的数据保障机制。

在 Doris 0.14 版本中(初期使用的版本),我们一般通过“同一个 Label ID 只会被写入一次”的机制来保证数据的一致性;在 Doris 1.0 版本之后,通过 “Doris 的事务结合 Flink 二阶段提交”的机制来保证数据的一致性。这里详细分享使用 Doris 1.0 版本之后,通过 “Doris 的事务结合 Flink 二阶段提交”机制保证数据的一致性的原理与实现。

如右图所示,我们首先在数据写入阶段先将数据写入本地文件,一阶段过程中将数据预提交到 Doris,并保存事务 ID 到状态,如果 Checkpoint 失败,则手动放弃 Doris 事务;如果 Checkpoint 成功,则在二阶段进行事务提交。对于二阶段提交重试多次仍然失败的数据,将提供数据以及事务 ID 保存到 HDFS 的选项,通过 Broker Load 进行手动恢复。为了避免单次提交数据量过大,而导致 Stream Load 时长超过 Flink Checkpoint 时间的情况,我们提供了将单次 Checkpoint 拆分为多个事务的选项。 最终成功通过二阶段提交的机制实现了对数据一致性的保障。

应用展示

下图为 Sink To Doris 的具体应用,整体工具屏蔽了 API 调用以及拓扑流的组装,只需要通过简单的配置即可完成 Stream Load 到 Doris 的数据写入 。

集群监控

在集群监控层面,我们采用了社区提供的监控模板,从集群指标监控、主机指标监控、数据处理监控三个方面出发来搭建 Doris 监控体系。其中集群指标监控和主机指标监控主要根据社区监控说明文档进行监控,以便我们查看集群整体运行的情况。除社区提供的模板之外,我们还新增了有关 Stream Load 的监控指标,比如对当前 Stream Load 数量以及写入数据量的监控,如下图所示:

除此之外,我们对数据写入 Doris 的时长以及写入的速度也比较关注,根据自身业务的需求,我们对任务写入数据速度、处理数据耗时等数据处理相关指标进行监控,帮助我们及时发现数据写入和读取的异常情况,借助公司内部的报警平台进行监控告警,报警方式支持电话、短信、推推、邮件等

总结与规划

目前 Apache Doris 主要应用于广告业务场景, 已有数十台集群机器,覆盖近 70% 的实时数据分析场景,实现了全量离线实验平台以及部分离线 DWS 层数据查询加速。当前日均新增数据规模可以达到百亿级别,在大部分实时场景中,其查询延迟在 1s 内 。同时,Apache Doris 的成功落地使得我们完成了实时数仓在 OLAP 引擎上的统一。Doris 优异的分析性能及简单易用的特点,也使得数仓架构更加简洁。

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。