文/糜利敏
大家下午好。很感谢大家参加全球 100 案例峰会预热沙龙关于 Doris 的线上 MeetUp。
下面我来介绍下 Doris 在作业帮实时数仓中的应用与实践。
这次的分享主要分三个主题:
1.首先是所在团队的业务与背景介绍
2.其次会介绍下基于 Doris,作业帮的查询系统是如何构建的,以及主要解决的问题
3.未来的规划
我所在团队是作业帮大数据团队,主要负责建设公司级数仓,向各个产品线提供面向业务的数据信息,如到课时长、答题情况等业务数据以及如 pv、uv、活跃等流量类数据,服务于拉新、教学、BI 等多个重要业务线。
在数仓体系中,大数据团队主要负责到 ODS-DWS 的建设,从 DWS 到 ADS 一般是数仓系统和业务线系统的边界。
在过去,由于缺失有效、统一的查询系统,我们探索了很多模式来支持各个业务线发展。
随着需求越来越多,系统也越来越难以维护,交付效率也特别低,需求排队非常严重。因此,提供有效而统一的查询系统,对于实时数仓建设在提高业务支持效率、降低维护成本上都具有非常重大的意义。
经过过去数月的探索与实践,我们确立了以 Doris 为基础的实时查询系统。同时也对整个实时数仓的数据计算系统做了一次大的重构,最终整体的架构图如下:
如图所示(从下到上),原始业务层日志经数据摄入系统进入数仓,在数据清洗计算层,我们将原来基 Spark 系统升级到了 Flink,并且基于 Flink-Sql 提供了统一的数据开发框架,从原有的代码开发升级到 Sql 开发来提升数据的研发效率。
其后查询系统将 Kafka 的数据实时同步到查询引擎内,并通过 OpenAPI 的统一接口对外提供查询服务。
基于 Doris 的查询系统上线后,我们面对一个需求,不用像过去一样做方案调研、开发接口、联调测试,现在只要把数据写入,业务层就可以基于 sql 自己完成数据查询、业务开发,交付效率(数据计算好到提供可读服务)从过去的数人周加快到小时级。
在性能方面,过去基于 ES 或者 mysql 来做,当查询的数据量较大时,我们只能忍受数十个小时到数分钟的延迟,基于 Doris 的方案,加快到分钟级甚至秒级。
Doris 的整体架构非常简单,不依赖任何第三方组件,社区支持度也非常好,从上线到今,我们只需做一些轻量级的运维规范,即可保证高稳定性。
所以说,通过引入 Doris,解决了作业帮内实时数仓查询交付慢、查询慢的痛点问题,对于后续数仓的系统发展起到了非常关键的作用。
接下来,重点讲下查询系统的工作,分两部分:查询系统的架构选型以及原理,以及应用 &实践.
在讲查询引擎之前,先讲下业务场景。
作业帮内,业务场景主要分两种:一种是传统的流量类,比如算 pv、uv、活跃……,作业帮内很多时候还需要看进一步的明细,比如作业帮主 App 在每天各个小时的活跃用户数,还要看 作业帮主 App 每个小时内各个版本的活跃用户数。
第二种是面向我们业务线的工作台,比如教学的老师。比如我们的老师上完课后,会看下自己班内的同学们的出勤数据、课堂测验数据等。
这两种场景下,考虑到调研成本、团队技术生态、维护成本等多种因素,我们最后选择了 Doris 作为我们的查询引擎。主要是 Doris 可在上述两种场景下都可以统一的满足业务的需求。
首先介绍下 Doris。
Doris 是 mpp 架构的查询引擎。
整体架构非常简单,只有 FE、BE 两个服务,FE 负责 Sql 解析、规划以及元数据存储,BE 负责 Sql-Plan 的执行以及数据的存储,整体运行不依赖任何第三方系统,功能也非常丰富如支持丰富的数据更新模型、Mysql 协议、智能路由等。对于业务线部署运维到使用都非常友好。
接下来讲下用 Doris 如何解决我们前面提到的业务场景下的问题。
Doris 有多种数据模型,流量类场景常用的是聚合模型。比如对于前面提到的场景,我们会吧作业帮主 App 各个版本的明细数据存到 base 表中,如果直接从 base 表中读取跨天级的聚合数据,由于数据行比较多,可能会出现查询延迟的问题,因此我们会对常用的天级数据做一次 rollup,这样通过预聚合,来减少查询的数据量,可以加快查询的延迟。
要高效的使用 Doris 的聚合模型,前提都是基于 key 列做数据行筛选,如果使用 value 列,Doris 需要把相关的行全部聚合计算后方可决策是否属于结果集,因此效率比较低。
而对于教研工作台,前面提到的都是基于 value 的筛选,因此使用了 Doris on ES 的模型。主要是考虑到 可以发挥 ES 的任意列检索的能力,来加快查询速度。
在我们的实践中,发现 Doris on ES 相比直接裸用 ES 或社区的其他方案如 Presto on ES 在性能上有很大的提升,接下来介绍下 Doris on ES 高性能的设计原理。
Doris on ES 整体的架构如图,FE 负责查询 ES 的元数据信息如 location、shard 等,BE 负责从 ES 数据节点扫描数据。
Doris on ES 高性能,相比裸用 ES,有几个优化点:
裸用 ES 时,ES 采用的是 Query then Fetch 的模式,比如请求 1000 条文档,ES 有 10 个分片,这时候每个分片都会给协调返回 1000 个 doc id,然后 协调节点其实拿到了 10 * 1000 个 doc id,然后选择 1000 个。这样其实每个分片多返回了 900 个.
Doris on ES 则绕过了协调节点直接去操作>
其次,Doris 从 ES 扫描数据时,也做了很多优化。比如在扫描速度上,采用了顺序扫描、列存优化、谓词下推等,在数据从 ES 传输到 Doris 时,采用就近原则如 BE 会优先访问本机的>
在我们的调研中,Doris on ES 的性能,比 Presto on ES 快了有数十倍。
在作业帮内,除了上面介绍的基于 Doris 的数据模型做的基础应用,要完整的支持业务、保证稳定性、提高效率,还需要其他周边的系统建设。
接下来介绍下基于 Doris,作业帮查询系统架构的整体设计以及工作模式。
这是作业帮查询系统的总体架构。
从上往下,首先是我们平台,包括各个报表平台、元数据管理平台等,主要来提高各个场景的人效。
其下红色部分为我们统一的 api 接口层,这里我们主要是制定了 api 的规范比如请求响应方式、返回码等,来减少系统之间对接的成本。
基于 api 除了提供了主要的读写接口外,也包含了周边的服务建设,比如元数据管理、调度系统等。
接下来就基于一个完整的流程来介绍下各部分系统。
首先是元数据。Doris 基于 mysql 语法建表,已经有元数据,我们这里做元数据,有几个额外的考虑:
要统一元数据,统一数据模型,就得抽象整个数据表的结构,来管理好不同存储上的表,我们基于 env、db、table 为基本单位来管理表,database、table 大家相对熟悉,env 是我们引入的新 namespace,主要用于提供不同集群/业务线的定义,如百度云的数仓集群、腾讯云的数仓集群,表单元下主要包含 field(列类型、值域)、index(如 rollup、bitmap 索引等)、storage(存储属性)。
关于列属性,主要是规范化类型系统,考虑到 json-schema 由于其校验规则丰富、描述能力强,因此对于列值的约束统一使用 json-schema 来做。
对于数据类型,我们设计了公共数据类型以及私有数据类型。公共类如 varchar、int 等,这些在不同的存储系统都有对应的实现,也支持私有类型如 Doris::bitmap,方便私有系统的兼容和扩展。通过这个模式可以将基于各个存储系统的表做了统一的管理。
这是我们线上的真实的一张表。里面包含了列信息以及对应的存储配置。
左图中的纵向红框是 json-schema 的描述,来规范化值域。横向红框为 ES 表的一些 meta 字段,比如 docid、数据更新时间。这些字段可以方便追查数据问题、以及用作数据筛选。
因为我们统一了数据模型,因此可以很方便的对所有表统一设置要增加这些 meta 字段。
通过元数据的统一管理,构建的表质量都非常高。所有的表都在最大化性能的提供查询服务,且由于数据导致的查询不可用 case 为 0。且对于任何业务线的同学,不管是否了解 Doris,都可以分钟级构建出这样一张高质量的表。
建好表后,就是数据的写以及读。统一基于 openapi 来做。
做 api 接口其实本质上也是为了在提供系统能力的前提下,进一步保障系统的稳定性和易用性。
比如要控制业务线的误用(如连接数打满),提供统一的入口方便写 es、Doris,且控制数据质量……
首先介绍下数据写接口。
由于统一了表模型,因此可以很方便的提供统一的写入接口协议。用户也无须关注实际表的存储是 es 还是 Doris 以及处理异构系统的系统。
第二,统一了写接口,就可以统一的对写入的数据会做校验检查,如数据的大小、类型等,这样可以保证数据写入的质量与准确性。这样对于数据的二次加工非常重要。
第三,接入协议中还增加了关键词,如数据的版本。可以解决数据的乱序问题,以及建立统一的写入监控。如下图是我们整个写入数据流的 qps 以及端到端(数据写入存储时间以及数据生产时间)的延迟分位值,这样可以让系统提高可观测性、白盒化。
接下来讲一个具体的场景,写入端是如何解决乱序问题的。
常态下我们的实时数据流是经过 flink 或 spark 计算后写入 kafka,然后由查询系统同步到 Doris/es 中。
当需要修数时,如果直接写入,会导致同一个 key 的数据被互相覆盖,因此为了避免数据被乱序覆盖,就得必须停掉实时流,这个会导致数据时效性式受损。
因此我们基于写入端做了改进,实时数据流、离线修复数据流各自写入不同的 topic,同步服务对每个 topic 做限速消费,如实时流时效性要求高,可以配额调的大些,保证配额,离线时效性则允许配额小点,或者在业务低峰期将配额调大,并基于数据 key&列版本存储做了过滤。这样可以保证时效性的前提下,修数也可以按照预期进行。
最后是读的部分。
在提供 sql 能力的前提下,我们也做了一些额外的方案,比如缓存、统一的系统配置。对于系统延迟、稳定性提升都有很大的改进。并且由于统一了读接口,上述的这些改造,对于业务线来说都是透明的。
除了常规下面向低延迟的读,还有一类场景面向吞吐的读。
介绍下场景,比如 要统计统计某个学部下(各个老师)的学生上课情况:上课人数、上课时长等。
在过去,我们是基于 spark/flink 来处理这类问题,如 spark 消费 kafka 中的课中数据,对于每一条数据,会去 redis 中查询教师信息来补全维度。
常态下,当课中数据到达的时候,教师信息是就绪的,因此没有什么问题。可是在异常下,如维度流迟到、存储查询失败等,会导致课中流到达时,无法获取对应的教师信息,也就无法计算相关维度如学部的统计。
过去面临这种情况时,只能遇到这种异常,如重试如果无法解决,只能丢弃或者紧急人工干预,比如在尾标就绪后再重新回刷课中表,一旦遇到上游 kafka 数据过期就只能从 ods 层或者离线修复,效率特别低,用户体验也非常差。
基于 Doris 模式下,我们使用微批调度的模式。
调度系统会定期(分钟级)执行一个调度任务,基于 sql join 完成数据的选取。这样哪怕在异常下,课中流查不到教师数据,这样 join 的结果只是包含了可以查到教师数据的信息,
待教师数据就绪后,即可自动补全这部分课中数据的维度。整个过程全部自动化来容错。效率非常高。
因此这个模式的主要好处:
最后,讲下其他方面的建议实践,这些相对简单,但是在实际的应用中非常容易忽视。
最后,讲下规划。
Doris 在作业帮实时数仓的建设中发挥了很关键的作用。
在实际的应用中,我们也发现了一些当前的一些不足。
如 Doris on ES 在面对大表的 join 查询时,目前延迟还比较大,因此需要进一步的优化解决;
Doris 自身的 olap 表可以做动态分区,对于 ES 表目前可控性还不足;
其次,当 ES 修改表后,如增加字段,只能删除 Doris 表重建,可能会有短暂的表不可用,需要自动化同步或者支持在线热修改;
最后 Doris on ES 可以支持更多的谓词下推,如 count 等。
我们也希望可以和社区一起,把 Doris 建设的越来越好。
好的。我的分享到此结束。谢谢大家。
问题 1:Doris on ES V.S. sparksql on ES,在功能上和性能上咱们调研过吗?对于使用那个您这边有什么建议吗?
问题 2:Doris 支持 Hive Metastore,和 Flink SQL 是什么关系?刚才讲的太快,有点没听懂
问题 3:_version 字段是一个内部字段?需要用户端写入的时候指定,还是系统自动创建?和 HBase 的 version 的应用场景有区别吗?