写在前面
近些年,大数据背后的价值也开始得到关注和重视,越来越多的企业开始保存和分析数据,希望从中挖掘大数据的价值。大数据产生的根本还是增量数据,单纯的用户数据不足以构成大数据,然而用户的行为或行为相关的日志的数据量,加之随着物联网的发力,产生的增量数据将不可预估,存储和查询增量数据尤为关键。所以,在笔者的工作经历中,本着以下的目标,寻找更优的大数据存储和查询方案:
目前大数据存储查询方案大概可以分为:Hbase 系、Dremel 系、预聚合系、Lucene 系,笔者就自身的使用经验说说这几个系的优缺点,如有纰漏,欢迎一起探讨。
数据查询包括大体可以分为两步,首先根据某一个或几个字段筛选出符合条件的数据,然后根据关联填充其他所需字段信息或者聚合其他字段信息,本文中提到的大数据技术,都将围绕这两方面。
一、Hbase 系
Hbase 的表包含的的概念有 rowkey、列簇、列限定符、版本 (timestamp) 和值,对应实际 Hdfs 的存储结构可以用下图做一个简单总结:
(点击放大图像)
Hbase 表中的每一个列簇会对应一个实际的文件,某种层面来说,Hbase 并非真正意义的列式存储方案,只是列簇存储。每个文件有若干个>二、Dremel 系
Google 的 Dremel,其最早用于网页文档数据分析,所以设计为嵌套的数据结构,当然它也可以用于扁平的二维表数据存储。开源技术中,Parquet 算是 Dremel 系的代表,各种查询引擎 (Hive/Impala/Drill)、计算框架甚至一些序列化结构数据(如 ProtoBuf)都对其进行了支持,甚至 Spark 还专门针对 Parquet 的数据格式进行了优化,前途一片光明,本文主要结合 Parquet 来展开论述。
可以用下图简单表示 Parquet 的文件格式:
(点击放大图像)
Parquet 的数据水平切分为多个 Row Group,Row Group 为数据读写的缓存单元,每个 Row Group 包含各个的数据列 (Column Chunk),数据列有若干 Page,Page 是压缩和编码的单元,其相应存储的信息包括元数据信息 (PageHeader)、重复深度 (Repetition Levels)、定义深度 (Definition Levels) 和列值 (Values) 信息。
Page 实际有三种类型:数据 Page、字典 Page 和索引 Page。数据 Page 用于存储当前行组中该列的值,字典 Page 存储该列值的编码字典,每一个列块中最多包含一个字典 Page,索引 Page 目前还不支持,未来可能会引入 Bloom Filter,能够判断列值是否存在,更有利于判断搜索条件,提升查询速度。
从 Parquet 的存储结构来看,Parquet 没有严格意义上的索引,在查询的过程中需要直接对 Row Group 的列数据进行扫描,有两方面来保证查询优化,一个是映射下推 (Project PushDown),另外一个是谓词下推 (Predicate PushDown)。
映射下推主要是利用列式存储的优势,查询数据时只需要扫描查询中需要的列,由于每一列的所有值都是连续存储的,所以分区取出每一列的所有值就可以实现 TableScan 算子,而避免扫描整个文件内容。
谓词下推在数据库之类的查询系统中最常用的优化手段之一,通过将一些过滤条件尽可能的在最底层执行,减少上层交互的数据量,从而提升性能。另外,针对谓词下推 Parquet 做了更进一步的优化,优化的方法是对每一个 Row Group 的每一个 Column Chunk 在存储的时候都计算对应的统计信息,包括该 Column Chunk 的最大值、最小值和空值个数。通过这些统计值和该列的过滤条件可以判断该 Row Group 是否需要扫描。未来还会增加诸如 Bloom Filter 和 Index 等优化数据,更加有效的完成谓词下推。
通过这两方面的优化,Parquet 的查询时扫描数据的性能能够得到大幅度提升。那 Parquet 如果填充数据(不同的列拼成一行记录)和聚合数据呢?
主要是使用了 Striping/Assembly 算法实现的,该算法的思想是将数据的值分为三部分:重复深度 (Repetition Levels)、定义深度 (Definition Levels) 和列值 (Values)。通过重复深度可以在读取的时候结合 Schema 的定义可以知道需要在哪一层创建一个新的 repeated 节点(如第一层的的为 0,表示是新记录,否则则表示 repeat 的数据),然后通过定义深度知道该值的路径上第几层开始是未定义,从而还原出数据的嵌套结构,如此便能清楚的把一行数据还原出来。由于缺少行号对应的列正向索引,没有办法直接寻址,单纯的依赖于 Striping/Assembly 算法还原数据或者聚合处理,相对来说成本会高很多。
另外,Parquet 的实时写方面是硬伤,基于 Parquet 的方案基本上都是批量写。一般情况,都是定期生成 Parquet 文件,所以数据延迟比较严重。为了提高数据的实时性,还需要其他解决方案来解决数据实时的查询,Parquet 只能作为历史数据查询的补充。
Parquet 存储是相对索引的存储来说,是一种折中处理,没有倒排索引,而是通过 Row Group 水平分割数据,然后再根据 Column 垂直分割,保证数据 IO 不高,直接 Scan 数据进行查询,相对 Hbase 的方案,Scan 的性能更好。这种方式,避免了存储索引和生成索引的开销,随着索引 Page 的完善,相信查询性能值得信赖。而对于数据还原和聚合也没有利用正向索引,而是通过 Striping/Assembly 算法来解决,这种方式更好能够很取巧的解决数据嵌套填充的问题, 但是相对直接使用正向索引来说成本会很高。
另外,由于是基于 Row Group 为读写的基本单元,属于粗粒度的数据写入,数据生成应该还是以离线处理为主,很难提高数据写的实时性,而引入其他的解决方案又会带来存储架构的复杂性,维护成本都会相应增加。
三、预聚合系
笔者认为,这种方式需要以有损数据为代价,虽然能够满足短期的 OLAP 需求,但是对于数据存储是非常不利的,会丢掉数据本身存在的潜在价值。另外,查询的指标也相对固定,没有办法灵活的自由定义所需的指标,只能查询提前聚合好的指标。
四、Lucene 系
Lucene 中把一条数据对应为一个 Document,数据中的字段对应 Lucene 的 Field,Field 的信息可以拆分为多个 Term,同时 Term 中会包含其所属的 Field 信息,在 Lucene 中每一个 Document 都会分配一个行号。然后在数据接入时建立 Term 和行号的对应关系,就能够根据字段的信息快速的搜索出相应的行号,而 Term 与行号的对应关系我们称之为字典。大部分时候查询是多个条件的组合,于是 Lucene 引入了跳表的思想,来加快行号的求交和求并。字典和跳表就共同组成了 Lucene 的倒排索引。Lucene 从 4 开始使用了 FST 的数据结构,即得到了很高的字典压缩率,又加快了字典的检索。
为了快速的还原数据信息和聚合数据,Lucene 还引入了列正向索引和行正向索引。列正向索引主要是行号和 Term 的对应关系,行正向主要是行号和 Document 的对应关系。这两种索引都是可以根据需要配置使用,例如只有单纯的查询,只是用行正向索引就可以,为了实现数据的聚合则必须列正向索引。
有了这些索引后,就可以通过 Term 来查询出行号,利用正向索引根据行号还原数据信息,或者对数据进行聚合。
另外,为了满足全文检索的需求,Lucene 还引入了分词、词向量、高亮以及打分的机制等等。总的来看,Lucene 的整个索引体系比较臃肿,其设计的根本还是搜索引擎的思想,满足全文检索的需求。
Lucene 本身是单机版的,没有办法分布式,也就以为着其能处理的还是小数据量。ElasticSearch 提供了 Lucene 的分布式处理的解决方案,其核心思想是将 Lucene 的索引分片。
在写入场景中,对于同一个 index 的数据,会按照设定的分片数分别建立分片索引,这些分片索引可能位于同一台服务器,也可能不同。同时,各分片索引还需要为自己对应的副本进行同步,直到副本写入成功,一次写入才算完整的完成。当然,单个文档的写入请求只会涉及到一个分片的写入。搜索场景则大致是逆过程,接受请求的节点将请求分发至所有承担该分片查询请求的节点,然后汇总查询请求。这里值得注意的是,任意一个搜索请求均需要在该 index 的所有分片上执行。
由于 ElasticSearch 是一个搜索框架,对于所有的搜索请求,都必须搜索所有的分片。对于一个针对内容的搜索应用来说,这显然没有什么问题,因为对应的内容会被存储到哪一个分片往往是不可知的。然而对于日志、行为类数据则不然,因为很多时候我们关注的是某一个特定时间段的数据,这时如果我们可以针对性的搜索这一部分数据,那么搜索性能显然会得到明显的提升。
同时,这类数据往往具有另一个非常重要的特征,即时效性。很多时候我们的需求往往是这样的:对于最近一段时间的热数据,其查询频率往往要比失去时效的冷数据高得多,而 ElasticSearch 这样不加区分的分片方式显然不足以支持这样的需求。
而另外一方面,ElasticSearch 对于聚合分析场景的支持也是软肋,典型的问题是,使用 Hyperloglog 这类求基数的聚合函数时,非常容易发生 oom。这固然跟这类聚合算法的内存消耗相对高有关(事实上,hll 在基数估计领域是以内存消耗低著称的,高是相对 count,sum 这类简单聚合而言)。
五、Tindex
数果智能根据开源的方案自研了一套数据存储的解决方案,该方案的索引层通过改造 Lucene 实现,数据查询和索引写入框架通过扩展 Druid 实现。既保证了数据的实时性和指标自由定义的问题,又能满足大数据量秒级查询的需求,系统架构如下图,基本实现了文章开头提出的几个目标。
(点击放大图像)
Tindex 主要涉及的几个组件
Tindex-Segment, 负责文件存储格式,包括数据的索引和存储,查询优化,以及段内数据搜索与实时聚合等。Tindex 是基于 Lucene 的思想重构实现的,由于 Lucene 索引内容过于复杂,但是其索引的性能在开源方案中比较完善,在数据的压缩和性能之间做了很好的平衡。我们通过改造,主要保留了其必要的索引信息,比原有的 Lucene 节省了更多的存储空间,同时也加快了查询速度。主要改进有以下几点:
1、高效压缩存储格式
对于海量行为数据的存储来说,存储容量无疑是一个不容忽视的问题。对于使用索引的方案来说,索引后的数据容量通常相对原有数据会有一定程度的膨胀。针对这类情况,Tindex 针对索引的不同部分,分别使用了不同形式的压缩技术,保障了能够支持高效查询的同时仅仅需要较少的容量。对于数据内容部分,使用字典的方式编码存储,每条记录仅仅存储文档编号。对于字典本身的存储,使用了前缀压缩的方式,从而降低高基数维度的空间消耗。实际情况下,使用 Tindex 压缩后的数据占用的存储容量仅仅为原始数据的 1/5 左右。
2、列式倒排和正向索引的存储
由于实际使用中,往往需要同时支持搜索和聚合两种场景,而这两种方式对于索引结构的需求是完全相反的。针对这两种情况,Tindex 结合了倒排索引和列正向索引这两种不同类型的索引。对于倒排索引部分,使用字典和跳表等技术,实现了数据的快速检索,而对于正向部分,则通过高效的压缩技术,实现了对于海量行下指定列的快速读取。同时,根据不同的情况,可以选择性的只建立其中一种索引(默认情况对于每一列均会同时建两种索引),从而节省大约一般的存储空间和索引时间。
Tindex-Druid,负责分布式查询引擎、指标定义引擎、数据的实时导入、实时数据和元数据管理以及数据缓存。之所以选择 Druid 是因为我们发现其框架扩展性、查询引擎设计的非常好,很多性能细节都考虑在内。例如:
框架可灵活的扩展,也是我们考虑的一个很重要的元素,在我们重写了索引后,Druid 社区针对高基数维度的查询上线了 groupByV2,我们很快就完成了 groupByV2 也可见其框架非常灵活。
在我们看来,Druid 的查询引擎很强大,但是索引层还是针对 OLAP 查询的场景,这就是我们选择 Druid 框架进行索引扩展的根本原因。 另外其充分考虑分布式的稳定性,HA 策略,针对不同的机器设备情况和应用场景,灵活的配置最大化利用硬件性能来满足场景需要也是我们所看重的。
在开源的 Druid 版本上自研,继承了 Druid 所有优点的同时,对查询部分代码全部重新实现,从而在以下几个方面做了较大改进:
1、去掉指标预聚合,指标可以在查询时自由定义:
对于数据接入来说,不必区分维度和指标,只需要定义数据类型即可,数据使用原始数据的方式进行存储。当需要聚合时,在查询时定义指标即可。假设我们要接入一条包含数字的数据,我们现在只需要定义一个 float 类型的普通维度。
2、支持多种类型:
不同于原生的 Druid 只支持 string 类型维度的情况,我们改进后的版本可以支持 string, int, long, float、时间等多种维度类型。在原生的 Druid 中,如果我们需要一个数值型的维度,那么我们只能通过 string 来实现,这样会带来一个很大的问题,即基于范围的过滤不能利用有序的倒排表,只能通过逐个比较来实现(因为我们不能把字符串大小当成数值大小,这样会导致这样的结果‘12’ < ’2’),从而性能会非常差,因为数值类型维度很容易出现高基维。对于改进后的版本,这样的问题就简单多了,将维度定义为对应的类型即可。
3、实现数据动态加载:
原有的 Druid 线上的数据,需要在启动时,全部加载才可以提供查询服务。我们通过改造,实现了 LRU 策略,启动的时候只需要加载段的元数据信息和少量的段信息即可。一方面提升了服务的启动时间,另外一方面,由于索引文件的读取基本都是 MMap,当有大量数据段需要加载,在内存不足的情况,会直接使用磁盘 swap Cache 换页,严重影响查询性能。数据动态加载的很好的避免了使用磁盘 swap Cache 换页,查询都尽量使用内存,可以通过配置,最大限度的通过硬件环境提供最好的查询环境。
HDFS,大数据发展这么多年,HDFS 已经成为 PB 级、ZB 级甚至更多数据的分布式存储标准,很成熟了,所以数果也选用 HDFS,不必重新造轮子。Tindex 与 HDFS 可以完美结合,可以作为一个高压缩、自带索引的文件格式,兼容 Hive,Spark 的所有操作。
Kafka/MetaQ, 消息队列,目前 Tindex 支持 kafka、MetaQ 等消息队列,由于 Tindex 对外扩展接口都是基于 SPI 机制实现,所以如有需要也可以扩展支持更多的消息队列。
Ecosystem Tools, 负责 Tindex 的生态工具支持,目前主要支持 Spark、Hive,计划扩展支持 Impala、Drill 等大数据查询引擎。
支持冷数据下线,通过离线方式(spark/Hive)查询,对于时序数据库普遍存在的一个问题是,对于失去时效性的数据,我们往往不希望它们继续占据宝贵的查询资源。然后我们往往需要在某些时候对他们查询。对于 Tindex 而言,可以通过将超过一定时间的数据定义为冷数据,这样对应的索引数据会从查询节点下线。当我们需要再次查询时,只需要调用对应的离线接口进行查询即可。
SQL Engine, 负责 SQL 语义转换及表达式定义。
Zookeeper, 负责集群状态管理。
未来还会持续优化改造后的 Lucene 索引,来得到更高的查询性能。优化指标聚合方式,包括:小批量的处理数据,充分利用 CPU 向量化并行计算的能力;利用 code compile 避免聚合虚函数频繁调用;与大数据生态对接的持续完善等等。
后续笔者还会深入讲解每一部分的详细实现原理及实践经验,敬请关注!如有凝问,可以加笔者微信 happyjim2010,一起交流!
作者简介
王劲 ,数果智能,创始人 &CEO。曾任酷狗音乐大数据技术负责人、大数据架构师,负责酷狗大数据技术规划、建设、应用。
13 年 IT 从业经验,2 年分布式应用开发,1 年移动互联网广告系统架构设计,5 年大数据技术实践经验,多年的团队管理经验,主要研究方向流式计算、大数据存储计算、分布式存储系统、NoSQL、搜索引擎等。