一、前言
随着大数据技术的飞速发展,海量数据存储和计算的解决方案层出不穷,生产环境和大数据环境的交互日益密切。数据仓库作为海量数据落地和扭转的重要载体,承担着数据从生产环境到大数据环境、经由大数据环境计算处理回馈生产应用或支持决策的重要角色。
数据仓库的主题覆盖度、性能、易用性、可扩展性及数据质量都是衡量数据仓库解决方案好坏的重要指标。携程机票部门数据仓库也在不断摸索向着这些目标砥砺前行。
二、携程机票数据仓库技术栈
携程机票部门的数据仓库建设主要基于公司公共部门的大数据基础环境及数据调度平台,辅以部分自运维的开源存储引擎和基于开源组件二次开发的数据同步工具和运维工具。
2.1 数仓技术演进历史
机票部门的数据仓库源于 2008 年,当时生产环境数据落地主要使用 SQLServer,数据仓库处理的目标数据体量不大,因此选择的 SQLServer、Informaticas、Kettle 这样的数据仓库方案,数据模型设计及报表定制使用 SAP 的商用平台 BO。
随着机票业务系统的日益复杂,特别是生产环境引入消息中间件 Kafka 存储日志数据后,这套方案不可扩展性的缺点日趋明显,SQLServer 的存储和计算能力很大程度上限制了数仓数据的主题覆盖度及性能。
在 2014 年,公司公共部门 hadoop 集群部署上线,并且引入了 zeus 调度平台及>
随着生产业务对实时监控、流量回放的需求增强,2016 年机票部门部署了 ElasticSearch,用以实时落地从 Kafka 同步的各个主流程服务日志,并通过统一的交易标识 (transactionID) 串联用户的一次完整的搜索、下单等行为,用于生产排障和流量回放。基于 Hive 的搜索性能一直被广泛诟病,特别是针对 adhoc 查询,机票部门在 2016 年调研并部署了 Facebook 开源的基于内存和 Pipeline 的查询引擎 Presto,在没有享受到 local 数据获取的前提下,查询性能较原生的 Hive 引擎或者 Spark 引擎都有很大的提升。
在 2018 年,为了支持数仓数据的可视化运营平台,我们先后引入了 ClickHouse 和 CrateDB 作为后台的存储和查询引擎,特别是引入 CrateDB 以后,亿级体量的表四个维度的聚合耗时 P90 下降到了 4 秒。
实时数据处理技术也经过了 Esper,Storm,Spark Streaming 和 Flink 的迭代,并慢慢收敛到 Flink。总体的技术演进历史如图 1 所示。
图 1 数仓技术演进历史
2.2 当前技术栈
生产环境的数据可以大致分成三类:
1)业务数据,主要存储在 MySQL 和 SQLServer,在这些关系型数据库里面有数以万计的表承接着各种生产服务的业务数据写入;
2)基础数据,也是存储在 MySQL 和 SQLServer 中,生产应用时一般会建立一层中心化缓存(如 Redis)或者本地缓存;
3)日志数据,这类数据的特点是”append only”,对已经生成的数据不会有更新的操作,考虑到这类数据的高吞吐量,生产环境一般会用消息队列 Kafka 暂存;
数据仓库在实施数据同步时,会根据需求在实时、近实时以及 T+1 天等不同的频率执行数据同步,并且在大数据环境会用不同的载体承接不同频率同步过来的数据。在携程机票,实时同步的目标载体是 ElasticSearch、CrateDB 或者 HBase,近实时(一般 T+1 小时)或者 T+1 天的目标载体是 Hive。
从生产的数据载体来讲,主要包括 DB 和消息队列,他们的数据同步方案主要是:
1)生产 DB 到 Hive 的同步使用 taobao 开源的>
2)从 Kafka 到 Hive 同步使用 Camus,但是由于 Camus 的性能问题及消费记录和消费过期较难监控的问题,我们基于 spark-sql-kafka 开发了 hamal,用于新建的 Kafka 到 Hive 的同步;Kafka 实时同步的载体主要是 ElasticSearch 或者 CrateDB,主要通过 Flink 实施。
生产数据被同步数据仓库后,会在数仓内完成数据清洗、信息整合、聚合计算等数据扭转流程,最终数据出仓导入到其它载体,这一系列的流程调度由公司 DP 团队运维的调度平台 Zeus 完成。
图 2 携程机票数仓技术栈
2.3 实时 VS 离线
当前机票部门的数据仓库建设主要基于离线数据,一方面跟 OTA 销售产品不属于快消品相关,实时当前并不是刚需;另一方面实时处理场景下需要对计算资源、存储资源稳定性有更高的要求,保持数据一致性的代价很大。结合两方面,如果业务对实时需求不高就铺开做实时数仓,ROI 很难达标。
当然,随着携程业务体量的增长,数据使用方对数据实时性要求日益增高,我们团队在 2020 年也会探索实时数据仓库的实施方案,并在一两个重要的数据主题域上先行试点。
三、数据仓库建设时涉及的共性问题
从团队职能上来讲,数据仓库团队需要负责从生产环境同步数据,在内部完成各层级的扭转计算,参与所有数仓流程及报表的运维,并基于数仓公共数据层和应用数据层数据开发相关应用。
3.1 数据同步
为了保持数仓数据主题覆盖足够全面,我们部门几乎将所有生产表和 Kafka topics 都同步到了 Hive。以下会对同步最常见的两种场景 DB->Hive 和 Kafka->Hive 相关的实践做介绍。
3.1.1 DB 同步到 Hive
特别对生产表到 Hive 的同步,人工配置脚本的方式显然不能处理数以万计的表,因此需要一个自动化的同步方案。自动同步方案需要不仅仅要解决自动创建表脚本、创建对应的同步脚本问题,还需要在当表结构发生变更的时候,能够自动地感知表结构的变化,并且修改表结构和对应的同步脚本。
DB 到 Hive 同步需要依赖两个数据源,1)Schema 表的元数据信息,简单地包括各个字段信息、字段类型及主键定义;2)统计数据,它主要描述的是这个表在数据产生后有没有 UPDATE 和 DELETE,这个决定着后续表的分区方式。
对业务型数据,一条数据生成后可能会有 Update,因为在数仓里绝大部分场景需要用到数据的最新状态,所以我们会用一个分区存放所有历史数据的最新状态,这类表我们称之为历史切片表。对日志型数据,生产上数据产生后就不会有任何修改,我们会选择使用增量分区,每个分区会放当天的增量数据。对基础数据,整个表的数据增加、更新的频率都非常低,在 ods 层我们会每天全量同步一份到最新数据分区,并且会建立一个无分区的下游维表,将数据状态为有效的数据放到这张下游无分区维表中方便流程使用。
有了上述这两个数据源以后,我们会根据 DBA Schema 服务返回的元数据信息生成 Hive 表的脚本,并调度执行生成新的 Hive 表,再依据统计数据决定表的分区方式,进而生成对应新建表的同步脚本。当表创建或者表结构发生变更的时候,通过 Schema 服务两天输出的比对,我们会发现表结构的变更并映射到对应 Hive 表结构变更,同时可以改变对应的同步脚本。还有一种思路是可以通过 DB 发布系统的日志,获知每天 DB 创建、表创建以及表结构变化的增量。
图 3 生产 DB 到 Hive 的同步
有一个坑点就是生产物理删除,如果出现了物理删除并且需要在 Hive 表里将删除数据识别并标记出来,当前可能需要通过全量同步的方法(考虑到从生产环境取数的代价,全量同步业务主键字段即可)解决,特别对 SQLServer。因此可以跟生产的开发协商尽量使用逻辑删除,这样数仓对删除数据的感知代价会小很多。
3.1.2 Kafka 同步到 Hive
当前我们非实时同步主要在使用 Linkedin 很久以前的一个工具 Camus,当然 DP 团队经过优化和企业本地化二次开发。但从使用感受来看,Camus 会有如下可能不足的地方:
1)基于 mapreduce,mapreduce 在 yarn 集群上抢占资源的能力较弱,在资源竞争高峰会有同步变慢的情况发生;
2)消费记录存储在 HDFS 各个文件里,这样对消费记录的获取和针对消费过期的监控都很不方便;
3)Kafka Topic 和 Hive 表的血缘关系获取不方便;
因此,我们基于 spark-sql-kafka 开发 hamal,旨在解决如上痛点并且让配置更加的简洁。实现的过程大概包括,spark-sql-kafka 会根据输入的任务从 Kafka 各个 Partition 消费出 payload 数据,对每条 payload 执行解编码、解压、magic code 等操作,此时会将 payload 数据转化成 json 字符串,这个 json 字符串可以直接作为一个字段写入到 Hive 表里,也可以根据事先配置提取出对应的节点和值作为列和列值写入到 Hive 中,甚至可以通过 Json 的 Schema 推断出 Hive 表结构,并将 Json 各节点对应写到 Hive 表的各列中。
图 4 转化为 json 字符串 RDD 代码示例
如果选择推断的模式,实现的时候可以使用 sampling 的方式,类似 spark jsonRDD 第二个参数,比如说 0.001,Hamal 可以直接指定采样数据条数,从 Kafka topic 中拉取出来,通过 jsonRDD 推断出 StructType,并映射成 Hive 建表语句。对于建好的表,通过表的字段匹配获取数据,最终写入 Hive 表,最后会提交消费记录到一张 Hive 的 ConsumerRecord 表里面。这样其实基于这个表,我们既可以获取 Kafka topic 和 Hive 表的血缘,也可以方便地监控每次同步的数据量。
图 5 Kafka 同步至 Hive Hamal 设计
3.2 数仓分层
分层设计主要参考公司推行的数据规范,将数据仓库的流程分成了生产镜像层(ods)、中间层(edw)、公共数据层(cdm)及应用数据层(adm)。在中间层对 ods 表做异常数据剔除、NULL 值处理、枚举值统一等数据清理和绑定维表信息工作,在公共数据层对中间层表进行进一步的整合,丰富表主题的维度和度量,一般以宽表的形式呈现,用以后续的 adhoc 取数、报表。
根据机票本身的业务特点,我们将数据划分成流量、产量、收益、生产 KPI、业务考核等几大主题域,对数据表的业务分类和有效管理有重要意义。
图 6 数仓分层设计
3.3 数据解析
数据在同步至数据 ods 层后,产品经常会提的一个需求是将 ods 层某个含报文字段的表按照字段设计展开,如果要支持此类需求,数据开发就需要了解生产上这个表各个字段含义及报文字段的契约定义,而这些对应表的写入开发非常熟悉。因此,为了提高整体的工作效率,我们开发了一套数据解析框架,对业务开发封装了大数据组件的 API 调用及相关参数调整,让业务开发更高效地完成熟悉的单条数据解析开发。
图 7 数据解析框架
3.4 数仓运维工具
数据仓库拥有所有生产表的镜像表、数以万计的生产数据同步流程、数据扭转流程以及后续报表,对如此规模的数仓实体的管理和运维需要一个不断迭代的系统支持,从而可以大幅度提高数据工程师的效率。
我们根据数仓建设中遇到的一些费力度较高且需要重复做的操作,开发了一套运维工具集合,目前还在持续迭代中。运维工具集功能主要包括数据实体通用搜索,报表收件人批量变更,维表导入,Oncall 录入,脚本模板生成,序列化与反序列化等等。工具开发难度不大,但对提高效率的帮助很大。
四、数据质量体系
对庞大的数据仓库实体建设完善的数据质量监控体系,光靠人工 one by one 设置检验规则是不够的,需要对几乎所有的实体建立相应的监控,并且不能给大数据集群带来很多额外的计算代价。当这样的覆盖面很广的监控完善后,配合着元数据信息,就有可能在故障的 Root Cause 点第一时间发现故障,并可以清晰地知晓故障的影响范围以及故障恢复的流程优先级调度。
因此,建立完善的数据质量体系需要完善元数据管理,建立轻量的覆盖面广的质量监控,并且对特别重要的流程,需要增加额外的业务相关校验。
4.1 元数据管理
在生产环境和大数据环境存在多种实体,这些实体包括应用、各类表(如 SQLServer、MySQL、MongoDB 的表等)、消息队列 topic、ElasticSearch 的 index、Hive 的表等等,这些实体相互关联,共同支撑着线上的系统,线下的分析。对这些信息的治理,实体的元数据管理至关重要。
在数仓体系中,元数据主要包含基础信息、血缘关系以及标签。基础信息跟数据表相关,具体包括表的字段、存储、分区类型等;血缘会涉及到各类的实体,表、流程、报表、邮件推送等,这些实体之间存在着上下游调用与被调用关系,成体系地管理好这些实体之间的关系,可以清晰地了解到数仓边界,使得对故障的 Root Cause 追溯以及该 Root Cause 带来的影响面评估非常便捷。标签是对实体的分类描述,如层级是属于哪一层,安全是否有涉密,重要等级,是否有非常重要的流程在上面,业务标签是属于订单、前端还是订后。
4.2 数据质量相关因素
数据质量的问题其实一般可以在流程执行的日志中看出端倪,因为人工排查故障的时候,除了常规通过 SQL 查询验证表的增量、业务主键、某些字段值是否正常,另外一个有效手段就是分析运行日志。
从运行日志中可以获取以下信息,流程的开始时间、截止时间流程执行时间、完成状态、每天增量的字节数、增量条数,引擎执行的参数,在用 Spark 或者 MapReduce 执行时消耗资源的情况等等一系列特征。通过对各类计算引擎产生日志的分析,可以获得各类引擎下记录日志数据的 pattern,从而提取出相关的特征信息。遇到特殊的流程或者引擎,可以借用其他手段补齐特征数据,如用 SQL,用 Hadoop 的命令。
图 8 数据质量相关特征
这是我们简单的一个日志输出,第一张是 Spark 的执行日志,下面一张是 MapReduce 的执行日志。
图 9 MR 和 Spark 引擎执行日志示例
有了数据质量特征提取的逻辑,实时流程异常发现可以如下实施:我们可以将质量特征数据计算分成两块,一块是实时的针对单个流程日志的解析出相关特征,一块是离线的基于历史特征数据的统计。我们从消息队列中消费实时获取执行完成的流程 id 和 actionid,通过运维团队提供的详情日志查询接口获取完整日志,通过特征解析逻辑,解析出实时的流程质量相关特征,匹配历史数据,应用规则。当满足异常规则,可以通过元数据信息中的血缘判断影响的范围,推送告警信息。
图 10 实时流程异常监控实施方案
五、应用案例
携程作为平台方,对机票价格没有定价权,价格由产品提供方来提供。在每年航班计划换季的时候,产品提供方会有一小部分概率将价格录入错。错误的运价,特别是很低的错误运价会让航司或供应商蒙受超大的损失。本着公平交易的原则,携程作为销售平台,做了机票价格监控系统。上线至今,发现了数十起价格异常事件。
在生产的消息队列中,我们落地了用户查询返回的所有航班组合和价格信息,数据仓库完成近实时同步,将数据解析处理成异常价格相关特征集,每天的增量在百亿级别。我们从 Kafka 实时消费两类日志数据,一类是查询日志,一类是下单日志,建立匹配,建立规则集发现可疑的低价交易标识,并且进一步监控跟交易标识是否进入下单流程。当某个疑似异常特征带来的订单超过一定阈值时,系统会对这疑似异常特征对应的查询进行自动禁售。
图 11 价格监控系统
六、小结
一套完整的数据仓库实施方案应该包括但不局限于上面介绍的数据同步方案、数据存储方案、数据规范、元数据建设、数据质量体系、运维工具等,每个实施团队应该根据面临的实际情况选择针对每个点的具体技术方案。
携程机票数据仓库团队也正朝着建设全面、规范、易用、高效、精准的数仓路上探索前行,当前在数据同步、数仓数据扭转以及出仓应用方面的实践方案还在随着需求的变化而迭代。接下来,我们团队会着重在数据仓库规范彻底落地以及实时数仓实施这些方向上努力。
致谢
数据仓库建设离不开各兄弟团队的大力支持和配合,感谢机票大数据基础架构团队和公司 DP 团队在机票数仓实践过程中提供的平台、工具、运维、接口方面的支持。
作者介绍 :
华智,携程高级研发经理,现负责数据仓库技术架构、性能优化、数仓规范制定、数据模型设计以及数据应用开发。
原文链接 :