OLAP Doris 在京东搜索实时 Apache 中的应用实践 (olive动作)

OLAP Doris 在京东搜索实时 Apache 中的应用实践 (olive动作)

流式计算在近些年的热度与日俱增,从 Google>

2、搜索业务形态

京东搜索作为电商平台的一个入口,为众多商家与用户提供连接的纽带。京东搜索发挥着导流的作用,给用户提供表达需求的入口;为了正确理解用户意图,将用户的需求进行高效的转化,线上同时运行着多个 AB 实验算法,遍及 POP 形态与自营形态的多个商品,而这些商品所属的品类、所在的组织架构以及品牌店铺等属性,都需要在线进行监控,以衡量转化的效果和承接的能力。

3、实时技术的挑战

目前搜索上层应用业务对实时数据的需求,主要包含三部分内容:

1、搜索整体数据的实时分析。

2、 AB 实验效果的实时监控。

3、热搜词的 Top 榜单以反映舆情的变化。

这三部分数据需求,都需要进行深度的下钻,维度细化需要到 SKU 粒度。同时我们也承担着搜索实时数据平台的建设任务,为下游用户输出不同层次的实时流数据。

我们的用户包括搜索的运营、产品、算法以及采销人员。虽然不同用户关心的数据粒度不同、时间频率不同、维度也不同,但是我们希望能够建立统一的实时 OLAP 数据仓库,并提供一套安全、可靠的、灵活的实时数据服务。

目前每日新增的曝光日志达到几亿条记录,而拆分到 SKU 粒度的日志则要翻 10 倍,再细拆到 AB 实验的 SKU 粒度时,数据量则多达上百亿记录,多维数据组合下的聚合查询要求秒级响应时间,这样的数据量也给团队带来了不小的挑战。

4、实时技术架构演进

我们之前的方案是以 Apache Storm 引擎进行点对点的数据处理,这种方式在业务需求快速增长的阶段,可以快速的满足实时报表的需求。但是随着业务的不断发展、数据量逐渐增加以及需求逐渐多样化,弊端随之产生。例如灵活性差、数据一致性无法满足、开发效率较低、资源成本增加等。

为解决之前架构出现的问题,我们首先进行了架构升级,将 storm 引擎替换为 Apache Flink,用以实现高吞吐、exactly once 的处理语义。同时根据搜索数据的特点,将实时数据进行分层处理,构建出 PV 流明细层、SKU 流明细层和 AB 实验流明细层,期望基于不同明细层的实时流,构建上层的实时 OLAP 层。

OLAP 层的技术选型,需要满足以下几点:

1:数据延迟在分钟级,查询响应时间在秒级

2:标准 SQL 交互引擎,降低使用成本

3:支持 join 操作,方便维度增加属性信息

4:流量数据可以近似去重,但订单行要精准去重

5:高吞吐,每分钟数据量在千万级记录,每天数百亿条新增记录

6:前端业务较多,查询并发度不能太低

通过对比目前业界广泛使用的支持实时导入的 OLAP 引擎,我们在 druid、ES、clickhouse 和 doris 之间做了横向比较:

通过对比开源的几款实时 OLAP 引擎,我们发现 doris 和 clickhouse 能够满足我们的需求,但是 clickhouse 的并发度太低是个潜在的风险,而且 clickhouse 的数据导入没有事务支持,无法实现 exactly once 语义,对标准 sql 的支持也是有限的。

最终,我们选定 doris 作为聚合层,用于实时 OLAP 分析。对于流量数据,使用聚合模型建表;对于订单行,我们将聚合模型换成 Uniq 模型,保证同一个订单最终只会存储一条记录,从而达到订单行精准去重的目的。在 flink 处理时,我们也将之前的任务拆解,将反复加工的逻辑封装,每一次处理都生成新的 topic 流,明细层细分了不同粒度的实时流。新方案如下:

目前的技术架构中,flink 的任务是非常轻的,state 状态非常小,并没有使用 KeyedState 自定义状态,而 OperatorState 中只包含 kafka 的 offset 信息,这样保证任务的运行开销很小,稳定性大大提升。同时基于生产的数据明细层,我们直接使用了 doris 来充当聚合层的功能,将原本可以在 flink 中实现的窗口计算,下沉到 doris 中完成。利用 doris 的 routine load 消费实时数据,虽然数据在导入前是明细粒度,但是基于聚合模型,导入后自动进行异步聚合。而聚合度的高低,完全根据维度的个数与维度的基数决定。通过在 base 表上建立 rollup,在导入时双写或多写并进行预聚合操作,这有点类似于物化视图的功能,可以将数据进行高度的汇总,以提升查询性能。

在明细层采用 kafka 直接对接到 doris,还有一个好处就是这种方式天然的支持数据回溯。数据回溯简单说就是当遇到实时数据的乱序问题时,可以将“迟到”的数据进行重新计算,更新之前的结果。这是因为我们导入的是明细数据,延迟的数据无论何时到达都可以被写入到表中,而查询接口只需要再次进行查询即可获得最新的计算结果。最终方案的数据流图如下:

5、技术取舍

实时数据处理的技术实现,需要在低延迟、准确性和成本之间进行取舍。我们采用 doris 作为实时仓库的聚合层,其实也是在多方面进行了取舍。例如:

1、routine load 的导入任务,为了达到更高的写入吞吐量,我们将实时导入任务的最大时间间隔设置了 30s,即增加了导入延迟,换来了更大的吞吐

2、为了降低开发成本,节省计算资源,我们通过建立 rollup 来支持快速的查询需求,但是却增加了存储压力以及写入时的 IO 压力

3、PV、UV 等流量指标在聚合时采用的是 HLL 计算,降低了精度,换来了更短的查询响应时间

以上几点取舍,是结合业务场景与需求的要求而决定的,并非绝对的情况。所以,面对实时数据大规模、无界、乱序等特点,实时流计算的选型,最终考虑的就是如何取舍。

6、Doris 在大促期间的优化

上文提到我们在 doris 中建立了不同粒度的聚合模型,包括 PV 粒度、SKU 粒度以及 AB 实验粒度。我们这里以每日生产数据量最大的曝光 AB 实验模型为例,阐述在 doris 中如何支持大促期间每日新增百亿条记录的查询的。

AB 实验的效果监控,业务上需要 10 分钟、30 分钟、60 分钟以及全天累计等四个时间段,同时需要根据渠道、平台和一二三级品类等维度进行下钻分析,观测的指标则包含曝光 PV、UV、曝光 SKU 件次、点击 PV、点击 UV 等基础指标,以及 CTR 等衍生指标。

在数据建模阶段,我们将曝光实时数据建立聚合模型,其中 K 空间包含日期字段、分钟粒度的时间字段、渠道、平台、一二三级品类等,V 空间则包含上述的指标列,其中 UV 和 PV 进行 HLL 近似计算,而 SKU 件次则采用 SUM 函数,每到来一条记录则加 1。由于 AB 实验数据都是以 AB 实验位作为过滤条件,因此将实验位字段设置为分桶字段,查询时能够快速定位 tablet 分片。值得注意的是,HLL 的近似度在目前 PV 和 UV 的基数下,实际情况误差在 0.8%左右,符合预期。

目前 doris 的集群共 30+台 BE,存储采用的是支持 NVMe 协议的 SSD 硬盘。AB 实验曝光 topic 的分区数为 40+,每日新增百亿条数据。在数据导入阶段,我们主要针对导入任务的三个参数进行优化:最大时间间隔、最大数据量以及最大记录数。当这 3 个指标中任何一个达到设置的阈值时,任务都会触发导入操作。为了更好的了解任务每次触发的条件,达到 10 分钟消费 6 亿条记录的压测目标,我们通过间隔采样的方法,每隔 3 分钟采样一次任务的情况,获取 Statistic 信息中的 receivedBytes、cimmittedTaskNum、loadedRows 以及 taskExecuteTimeMs 数值。通过对上述数值在前后 2 个时间段的差值计算,确定每个任务触发的条件,并调整参数,以在吞吐和延迟之间进行平衡,最终达到压测的要求。

为了实现快速的多维数据查询,基于 base 表建立了不同的 rollup,同时每个 rollup 的字段顺序,也要遵循过滤的字段尽可能放到前面的原则,充分利用前缀索引的特性。这里并不是 rollup 越多越好,因为每个 rollup 都会有相应的物理存储,每增加一个 rollup,在写入时就会增加一份 IO。最终我们在此表上建立了 2 个 rollup,在要求的响应时间内尽可能多的满足查询需求。

7、总结与展望

京东搜索是在今年 5 月份引入 doris 的,第一个应用的上线到现在已经运行半年时间。目前集群版本是 0.11.33,规模是 30+台 BE,线上同时运行着 10+个 routine load 任务,每日新增数据条数在 200 亿+,已经成为京东体量最大的 doris 用户。从结果看,用 doris 替换 flink 的窗口计算,既可以提高开发效率,适应维度的变化,同时也可以降低计算资源,用 doris 充当实时数据仓库的聚合层,并提供统一的接口服务,保证了数据的一致性和安全性。

我们在使用中也遇到了查询相关的、任务调度相关的 bug,也在推动京东 OLAP 平台升级到 0.12 版本。接下来待版本升级后,我们计划使用 bitmap 功能来支持 UV 等指标的精准去重操作,并将推荐实时业务应用 doris 实现。除此之外,为了完善实时数仓的分层结构,为更多业务提供数据输入,我们也计划使用适当的 flink 窗口开发聚合层的实时流,增加数据的丰富度和完整度。

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。