为什么已有Elasticsearch 我们还要重造实时分析引擎AresDB (为什么已有联系人存了名字 来电却没有显示)

为什么已有Elasticsearch 我们还要重造实时分析引擎AresDB (为什么已有联系人存了名字 来电却没有显示)

作者 |,,,, andSteven Chen

在 Uber,我们能够利用实时分析技术获得商业洞察力,提高运营效率,而且能基于数据驱动做决策,以改善 Uber 平台上的体验。例如,我们的运营团队依靠数据来监控市场健康状况,从而发现平台上潜在的问题;基于机器学习模型的软件则利用数据来预测乘客需求和司机数量;数据科学家利用数据来改进机器学习模型以便更好地进行预测。

过去,我们采用过许多第三方数据库作为我们的实时分析解决方案,但没有一款数据库既能够实现所有功能,同时又满足我们对于可伸缩性、性能、成本和运营的要求。

在以下章节中,我们将详述 AresDB 的设计,以及它如何使我们更有效地统一、简化和改进 Uber 的实时分析数据库解决方案。在阅读这篇文章后,我们希望你能够在自己的项目中尝试一下 AresDB,并根据你自己的分析需求发掘这个工具的价值!

首先送上 AresDB 开源项目地址:top="1381">Uber 的实时分析应用

数据分析对 Uber 业务的成功起到至关重要的作用,在其他功能中,这些分析被用于:

我们可以将这些功能按照不同需求归纳如下:

仪表盘 决策系统 即席查询
查询模式 固定 固定 随意
查询QPS
查询延迟
数据集 子集 子集 所有数据

仪表盘和决策系统利用实时分析系统,在较高 QPS 和较低延迟下,基于相对较小但非常有价值的数据子集(具有最大的数据时效性)构建相似查询。

为什么我们需要再造分析引擎?

在 Uber,实时分析最常被用来计算时间序列聚合,通过这些计算,能让我们洞察用户体验从而相应地改进我们的服务,我们可以在一个时间范围内针对任意过滤或连接(Join)后的数据请求特定维度(如按天、按小时、按城市 ID 和行程状态)的指标。多年来,Uber 已经部署过多种解决方案,通过不同的方式来解决这个问题。

我们尝试过的第三方解决方案包括:

虽然这些技术各具优势,但都缺少关键功能来满足我们的使用场景。我们需要跳出思维定式,更确切地说是要基于 GPU 来实现一个统一又精简的解决方案。

利用 GPU 进行实时分析

为了以较高帧率将视图渲染为逼真的图像,GPU 会高速并行处理大量的几何图形和像素。虽然在过去几年里,处理单元的时钟频率增长已趋近平稳,但是根据摩尔定律,这只不过是单纯增加了芯片上的晶体管数量。因此,每秒以十亿次(GFLOP/s)计的 GPU 计算速度正在迅速增加,下方图 1 比较了在过去几年中 NVIDIA 的 GPU 和 Intel 的 CPU 的理论计算速度。

图 1:近年来 CPU 和 GPU 单精度浮点性能的比较,图片来自 NVIDIA 的CUDA C编程指南

在我们设计实时分析查询引擎时,集成 GPU 处理便是我们的最佳选择。在 Uber,最常见的实时分析查询场景需要处理数天的数据和数百万到数十亿条记录,然后在短时间内过滤和聚合这些数据,通用 GPU 的并行处理模型完全适用于这种类型的计算任务,因为他们能够做到:

当我们决定使用基于 GPU 的分析数据库之后,我们又评估了现有的一些能满足我们需求的解决方案:

总的来说,这些引擎证明了通过 GPU 技术处理数据具有巨大的优势和潜力,同时也激励我们自研基于 GPU 的实时分析解决方案来满足 Uber 的需求,有了这些构思,我们构建了开源的 AresDB。

AresDB 架构概述

抽象来看,AresDB 将大部分数据存储在主机内存中,内存连接到 CPU,由 CPU 处理数据输入过程,如遇异常可通过磁盘恢复数据。查询时,AresDB 将数据从主机内存传输到 GPU 内存,以便在 GPU 上并行处理。如下方图 2 所示,AresDB 由一个内存存储器、一个元数据存储器和一个磁盘存储器组成:

图2:AresDB单实例架构包含内存和磁盘存储以及元存储

图 2:AresDB 单实例架构包含内存和磁盘存储以及元存储

与大多数关系型数据库管理系统(RDBMS)不同,AresDB 中没有数据库作用域或 Schema 作用域,所有表都同属于一个 AresDB 集群或实例并具有相同作用域,以使用户能够直接引用的。用户可以将自己的数据存储为事实表(Fact Table)和维度表(Dimension Table)。

事实表(Fact Table)

事实表存储一个无限的时间序列事件流,用户用事实表存储实时发生的事件(Event)或事实(Fact),每个事件都关联一个事件时间,该表通常通过事件时间来查询。为了说明用事实表存储的信息类型,我们举一个 Uber 打车的例子,其中每一次行程都是一个事件,叫车时间通常被定义为事件时间,如果一个事件关联了多个时间戳,那么只有其中一个可以被指定为事实表中显示的事件时间。

维度表(Dimension Table)

维度表存储实体(包括城市、客户端和驱动程序等)的当前属性。例如,用户可以在维度表中存储城市信息,如城市名称、时区和国家。与事实表相比,维度表不会随时间无限增长,大小总是有限的(例如,对 Uber 来说,城市表的上限是世界上实际的城市数量)。维度表不需要一个特别的时间列。

数据类型

下方的表格详述了 AresDB 目前支持的数据类型:

数据类型 容量(字节) 详细信息
布尔型数据,存储为一个二进制位
Int8, Uint8 整数类型,用户可以根据字段的基数(Cardinality)和内存开销来选择
Int16, Uint16
Int32, Uint32
字符串会被自动翻译成枚举类型,SmallEnum类型可以保存基数不高于256的字符串
与SmallEnum相似,基数上限是65535(译注:2^16-1)
浮点数,我们支持Float32,未来若有需求再支持Float64
通用唯一识别码
地理点
可变长度 多边形或多个多边形

用 AresDB 存储字符串会自动转换为枚举类型(enum),这样做可以提高存储和查询的效率,支持大小写敏感的等式检查,但不支持高级操作,例如:字符串拼接、字符串子串、glob 匹配和正则表达式匹配,我们打算日后增加完整的字符串支持。

关键特性

AresDB 的架构支持以下特性:

列式存储(Columnar Storage)

向量(Vector)

AresDB 按照列式格式存储所有数据,每列的值存储为列式值向量,每列值的有效性或无效性用一个二进制位表示,存储在单独的空向量中。

实时存储(Live Store)

AresDB 将未压缩和未排序的列式数据(实时向量)储存在实时存储中,实时存储中记录的数据被实时分发到多个 Batch 的配置容量中,导入时创建新的 Batch,记录归档后清除旧的 Batch。我们用主键索引来定位重复和更新的记录,下面的图 3 展示了我们组织实时记录并通过主键值定位这些记录的过程:

图3:我们使用主键值来定位每个记录的Batch和Batch的位置

图 3:我们使用主键值来定位每个记录的 Batch 和 Batch 的位置

每个 Batch 中每列的值都以列向量的形式存储,每个值向量中值的有效性或无效性被存储为一个独立的 null 向量,每个值的有效性以一个二进制位表示。在下面图 4 的示例中,city_id 列包含五个值:

图4:我们在数据表中存储未压缩列的值(实际值)和null向量(有效性)

图 4:我们在数据表中存储未压缩列的值(实际值)和 null 向量(有效性)

归档存储(Archive Store)

每条记录都按照用户配置的列顺序排序,如图 5 所示,我们先按 city_id 列排序,然后再按 status 列排。

图5:我们先根据city_id列将所有行排序,然后按status列排序,最后使用游程编码压缩每一列。每一列在被排序和压缩后都会有一个计数向量。

图 5:我们先根据 city_id 列将所有行排序,然后按 status 列排序,最后使用游程编码压缩每一列。每一列在被排序和压缩后都会有一个计数向量。

设置按用户配置的列排序顺序的目的是:

某一列只有在用户配置的排序顺序中出现才会被压缩,我们不会尝试压缩高基数列,因为压缩高基数列所节省的存储量可以忽略不计。

在排序之后,会用各种游程编码压缩每个合格列的数据,除了值向量和 null 向量之外,我们还引入了计数向量来表示重复的相同值。

支持 Upsert 操作的实时导入

客户端向 AresDB 导入数据,需要通过 Ingestion HTTP API 发起一个 UpsertBatch 的 Post 请求,UpsertBatch 是一种自定义的序列化二进制格式,这种格式可以保证数据是随机访问的,同时也能最大限度地减少空间开销。

当 AresDB 接收到一个导入数据的 UpsertBatch 请求时,首先会将它写入重做日志(Redo Log)的末尾,用于从异常中恢复。然后,AresDB 会识别出事实表中的延迟记录(late records),每当向实时存储导入数据时都会跳过这些数据。如果记录的事件时间早于归档的截止事件时间,则被认为是“延迟”记录。AresDB 使用主键索引来定位 Batch 应该被放到哪个实时存储中,如下面图 6 所示,全新的记录(记录的主键值从未出现过)将被存入空白空间,而已有的记录将直接更新:

图6:在导入过程中,追加UpsertBatch到重做日志后,将“延迟”记录追加到回填队列中,而在实时存储中存入其他记录。

图 6:在导入过程中,追加 UpsertBatch 到重做日志后,将“延迟”记录追加到回填队列中,而在实时存储中存入其他记录。

归档(Archiving)

在导入数据时,要么将新记录追加到实时存储中,要么更新实时存储中已有的记录,抑或将数据追加到等待放入归档存储的回填队列中。

我们会定期运行一个名为“归档”的计划过程,将实时存储中的新记录(以前从未归档的记录)合并到归档存储中,归档只处理实时存储中的记录,其事件时间将位于旧的截止时间(上次归档过程的截止时间)和新的截止时间(基于表 Schema 中的归档延迟设置项中的新的截止时间)。

我们每天批量处理归档数据的时候,用记录的事件时间来确定记录应该被合并到哪个归档 Batch。在合并过程中,归档不需要给主键值索引去重,因为只有在新旧截止时间范围内的记录会被归档。

接下来的图 7 展示了一条基于给定记录时间的时间线:

图7:通过事件时间和截止时间来确定哪些记录是新的(实时的),哪些记录是旧的(事件时间早归档截止时间)。

图 7:通过事件时间和截止时间来确定哪些记录是新的(实时的),哪些记录是旧的(事件时间早归档截止时间)。

在这种情况下,两次归档操作运行的间隔时间被称作归档间隔(Archiving Interval),而在事件发生后直到可被归档前的这段持续时间被称作归档延迟(Archiving Delay),二者都可以在 AresDB 的表 Schema 配置中定义。

回填(Backfill)

如图 7 所示,事实表中的旧记录(事件时间早于归档截止时间)被追加到回填队列中,最终由回填进程处理。每当回填队列的时间或大小达到阈值,就会触发这个进程。与实时存储的导入过程相比,回填操作是异步进行的,CPU 和内存资源开销相对更大,回填在以下场景中使用:

与归档不同的是,回填是幂等的,并且需要基于主键值进行去重,被回填的数据最终对查询可见。

回填队列一直保留在内存中,并且预先配置了一定的空间。在大量回填载入期间,直到回填操作运行后清除了队列才会解除客户端阻止运行的状态。

查询处理(Query Processing)

在当前的实现下,用户需要使用 Uber 创建的 Ares 查询语言(AQL)来对 AresDB 进行查询。AQL 是一种有效的时间序列分析查询语言,不像其他类 SQL 语言一样遵循 SELECT FROM WHERE GROUP BY 这种标准的 SQL 语法。相反,AQL 是在结构化字段中指定的,可以与 JSON、YAML 和 Go 对象一起使用。例如,与

SELECT count(*) FROM trips GROUP BY city_id WHERE status = ‘completed’ AND request_at >= 1512000000
复制代码

等效的 JSON 格式的 AQL 写法如下:

 “table”: “trips”, “dimensions”: [{“sqlExpression”: “city_id”} “measures”: [{“sqlExpression”: “count(*)”};”>“rowFilters”: [“status = ‘completed'” “timeFilter”: {“column”: “request_at”,“from”: “2 days ago”
复制代码

AQL 采用 JSON 格式,为仪表盘和决策系统开发者提供比 SQL 更好的程序化查询体验,因为开发者可以用代码轻松地编写和操作查询语句,并且无须担心诸如 SQL 注入这样的问题。JSON 格式是 Web 浏览器、前端服务器和后端服务器这种经典架构的通用查询格式,一直到数据库(AresDB)都支持。此外,AQL 还为时间过滤和桶化(Bucketization)提供了方便的语法糖,并提供原生的时区支持。这门语言还支持隐式子查询等功能,以避免常见的查询错误并使后台开发者可以方便地进行查询分析和重写。

尽管 AQL 提供了各种好处,但我们充分意识到大多数工程师更熟悉 SQL,所以接下来,我们的研究方向就包括为查询操作提供 SQL 接口来增强 AresDB 的用户体验。

如下图 8 描绘了 AQL 查询的执行流程:

图8:AresDB的查询执行流程利用了我们自主开发的AQL查询语言,可以快速高效地处理并检索数据。

图 8:AresDB 的查询执行流程利用了我们自主开发的 AQL 查询语言,可以快速高效地处理并检索数据。

查询编译(Query Compilation)

AQL 查询会被编译成内部的查询上下文,过滤器、Dimension 和 Measurement(译注:相当于关系型数据库中的 Table)中的表达式被解析成抽象语法树(AST),以便后续通过 GPU 进行处理。

数据馈送(Data Feeding)

AresDB 将归档数据发送至 GPU 进行并行处理前,会对数据进行预过滤以降低性能开销。由于存档数据是按照预先配置的列顺序排列的,所以一些过滤器可能可以通过二分搜索定位相应的匹配范围来利用此排序。特别是,所有首个 X 排序列上的相等过滤器和 X+1 排序列上的范围过滤器(可选)都可以作为预过滤处理,如图 9 所示:

图9:AresDB预过滤列数据,然后将其发送到GPU进行处理。

图 9:AresDB 预过滤列数据,然后将其发送到 GPU 进行处理。

在预过滤后,只需将绿底色的值(满足过滤条件的值)推送到 GPU 以进行并行处理,输入数据被馈送到 GPU,每次执行一个 Batch(包括实时 Batch 和归档 Batch)。

AresDB 利用CUDA流(译注:CUDA 流表示 GPU 中的操作队列)来馈送并执行流水数据。每次查询交替使用两个流,以便在处理过程中同步传输数据。下方图 10 的时间线描绘了这一过程:

图10:在AresDB中,两个CUDA流交替传输和处理数据

图 10:在 AresDB 中,两个 CUDA 流交替传输和处理数据

查询执行(Query Execution)

为简单起见,AresDB 利用Thrust库来实现查询执行过程,该程序提供了经过微调的并行算法构建块,以便在当前的查询引擎中快速实现。

Thrust 使用随机访问迭代器来访问输入和输出向量数据,每个 GPU 线程将输入迭代器取到其工作负载位置,读取值并执行计算,然后将结果写入到输出迭代器上的对应位置。

AresDB 遵循 OOPK 模型(每个内核一个操作符)来计算表达式。

下面的图 11 是通过查询编译阶段的维度表达式 request_at – request_at % 86400 生成的示例 AST:

图11:AresDB利用OOPK模型对表达式求值进行建模

图 11:AresDB 利用 OOPK 模型对表达式求值进行建模

在 OOPK 模型中,AresDB 查询引擎遍历 AST 树的每个叶子节点,并为其父节点返回一个迭代器。在根节点也是叶子节点的情况下,直接在输入迭代器上执行根操作。

在每个非根非叶节点(本示例中的取模运算),分配一个临时的暂存空间向量来存储 request_at % 86400 表达式产生的中间结果,利用 Thrust 在 GPU 上启动一个内核函数来计算这个运算符的输出。结果存储在暂存空间迭代器中。

在根节点上,启动内核函数的方式与非根非叶结点相同,并根据表达式的类型采取不同的输出操作,具体如下:

在表达式求值后,执行排序和归约操作以备最终聚合,在排序和归约操作中,我们使用维度向量的值作为排序和归约的关键值,并使用度量向量的值作为要聚合的值。通过这种方式,具有相同维度值的行将组合在一起并进行聚合。下面的图 12 详述了这个排序和归约的过程:

图12:在表达式求值之后,AresDB根据维度(键值)和度量(值)向量上的关键值对数据进行排序和归约。

图 12:在表达式求值之后,AresDB 根据维度(键值)和度量(值)向量上的关键值对数据进行排序和归约。

AresDB 还支持以下高级查询功能:

资源管理

AresDB 作为基于内存的数据库,需要管理以下类型的内存使用:

分配者 管理模式
实时存储向量(实时存储列数据) 跟踪式
归档存储向量(归档存储列数据) 管理式(加载和释放)
主键索引(利用散列表删除重复记录) 跟踪式
回填队列(存储等待回填的“延迟”到达数据) 跟踪式
归档/回填过程临时存储(归档和回填过程汇总分配的临时内存) 跟踪式
导入/查询临时存储;进程开销;分配碎片化; Golang和C 静态配置预估

当 AresDB 投入生产时,它会利用配置的总内存运算。此预算由所有六种内存类型共享,并且还应为操作系统和其他进程留出充足的空间。此预算还包括静态配置的开销预估、服务器监控的实时数据存储以及服务器可根据剩余内存预算决定加载和释放的归档数据。

下面的图 13 描绘了 AresDB 主机的内存模型:

图13:AresDB管理自己的内存使用情况,使其不超过配置的进程总预算。

图 13:AresDB 管理自己的内存使用情况,使其不超过配置的进程总预算。

AresDB 允许用户为事实表在列级别配置预加载的日期和优先级,并且仅在预加载日期内预加载归档数据。非预加载的数据按需从磁盘中加载到内存。一旦内存占满,AresDB 也可以将归档数据从主机内存中清除。AresDB 的释放策略基于预加载天数、列优先级、Batch 日期和列的大小。

AresDB 还管理多个 GPU 设备,建立设备资源模型(如 GPU 线程和设备内存),当处理查询的时候跟踪 GPU 内存的使用情况。AresDB 通过设备管理器管理 GPU 设备,并将 GPU 设备资源划分为两个维度:GPU 线程和设备内存,当处理查询时能够跟踪使用情况。在查询编译之后,AresDB 允许用户估计执行查询所需的资源量。无论在哪台设备上,只有满足内存要求的设备才能够启动查询,如果设备没有足够的内存,则查询必须等待资源满足后执行。目前,AresDB 可以在同一 GPU 设备上同时运行一或多个查询,只要该设备满足所有资源要求。

在目前的实现中,AresDB 不会在设备内存中缓存输入数据,如果缓存下来,则可以在多个查询中重用这些数据。AresDB 的目标是支持针对不断实时更新且难以正确缓存的数据集的查询,我们打算在未来的迭代中实现具有数据缓存功能的 GPU 内存,这一步将有助于优化查询性能。

使用案例:Uber 的摘要仪表盘

在 Uber,我们用 AresDB 构建仪表盘来实时提取业务洞察。AresDB 扮演着存储持续更新的新鲜原始数据和用 GPU 低功耗能力在几秒钟内计算针对这些数据的关键指标,从而用户可以交互地使用仪表盘。例如,在数据存储中具有较长寿命的匿名行程数据由多种服务进行更新,包括我们的调度、支付和评级系统。为了有效利用形成数据,用户将数据切分并划分为不同的维度,从而为实时决策获得洞察。

用 AresDB 打造的 Uber 摘要仪表盘是一个广泛使用的分析仪表盘,公司的各个团队会用它来检索相关的产品指标,并实时响应来提高用户体验。

图14:Uber摘要仪表盘按小时呈现的视图用AresDB查看特定时间段的实时数据分析。

图 14:Uber 摘要仪表盘按小时呈现的视图用 AresDB 查看特定时间段的实时数据分析。

为了构建上面的模拟仪表盘,我们对以下的表进行建模:

Trips(事实表)

request_at
1542058870
1541977200

Cities(维度表)

San Francisco America/Los_Angeles
America/New_York

AresDB 中的表 Schema

要创建上述两个建模表,我们首先需要在 AresDB 中按照以下 Schema 创建表:

{“name”: “trips”,“columns”: [{“name”: “request_at”,“type”: “Uint32”,},{“name”: “trip_id”,“type”: “UUID”},{“name”: “city_id”,“type”: “Uint16”,},{“name”: “status”,“type”: “SmallEnum”,},{“name”: “driver_id”,“type”: “UUID”},{“name”: “fare”,“type”: “Float32”,}],“primaryKeyColumns”: [1],“isFactTable”: true,“config”: {“batchSize”: 2097152,“archivingDelayMinutes”: 1440,“archivingIntervalMinutes”: 180,“recordRetentionInDays”: 30},“archivingSortColumns”: [2,3]} {“name”: “cities”,“columns”: [{“name”: “city_id”,“type”: “Uint16”,},{“name”: “city_name”,“type”: “SmallEnum”},{“name”: “timezone”,“type”: “SmallEnum”,}],“primaryKeyColumns”: [0],“isFactTable”: false,“config”: {“batchSize”: 2097152}}

如 Schema 中所述,Trips 表被创建为事实表,表示事实发生的行程事件,而 Cities 表被创建为维度表,存储有关城市的信息。

创建表之后,用户可以利用AresDB客户端库从事件总线(如 Apache)或流式或 Batch 平台(如 Apache或 Apache)中获取数据。

针对 AresDB 的示例查询

在模拟仪表盘中,我们选择两个指标作为示例:行程总价和活跃司机。在仪表盘中,用户可以按城市指标过滤数据,例如:旧金山(San Francisco)。为了在仪表盘上显示过去 24 小时这两个指标的时间序列,我们可以在 AQL 中运行以下查询:

过去24小时中旧金山总行程费用(按小时) 过去24小时中旧金山活跃司机数(按小时)
{“table”: “trips”,“joins”: [{“alias”: “cities”,“name”: “cities”,“conditions”: [“= trips.city_id”]}],“dimensions”: [{“sqlExpression”: “request_at”,“timeBucketizer”: “hour”}],“measures”: [{“sqlExpression”: “sum(fare)”}],“rowFilters”: [“status = ‘completed’”,“cities.city_name = ‘San Francisco’”],“timeFilter”: {“column”: “request_at”,“from”: “24 hours ago”},“timezone”: “America/Los_Angeles”} {“table”: “trips”,“joins”: [{“alias”: “cities”,“name”: “cities”,“conditions”: [“= trips.city_id”]}],“dimensions”: [{“sqlExpression”: “request_at”,“timeBucketizer”: “hour”}],“measures”: [{“sqlExpression”: “countDistinctHLL(driver_id)”}],“rowFilters”: [“status = ‘completed’”,“cities.city_name = ‘San Francisco’”],“timeFilter”: {“column”: “request_at”,“from”: “24 hours ago”},“timezone”: “America/Los_Angeles”}

查询的示例结果:

上述模拟查询将在以下时间序列结果中产生结果,这些结果可以很容易地绘制成时间序列图,如下所示:

过去24小时中旧金山总行程费用(按小时) 过去24小时中旧金山活跃司机数(按小时)
{“results”: [{“1547060400”: 1000.0,“1547064000”: 1000.0,“1547067600”: 1000.0,“1547071200”: 1000.0,“1547074800”: 1000.0,…}]} {“results”: [{“1547060400”: 100,“1547064000”: 100,“1547067600”: 100,“1547071200”: 100,“1547074800”: 100,… }]}

在上面的示例中,我们演示了如何利用 AresDB 在几秒钟内实时导入实时发生的原始事件,并立即针对数据发起任意的用户查询,从而在亚秒内计算指标。AresDB 帮助工程师轻松构建数据产品来提取对企业至关重要的指标,员工或机器可以通过这些指标做出具有实时洞察能力的决策。

后续步骤

AresDB 在 Uber 被广泛使用,为我们的实时数据分析仪表盘提供支持,使我们能够针对业务的各个方面大规模制定数据驱动的决策。通过开源这个工具,我们希望社区中的其他人可以利用 AresDB 分析自己的数据。

未来,我们打算通过以下功能来赋能这个项目:

的开源协议是 Apache,我们鼓励你试用 AresDB 并加入我们的社区。

如果你对建立大规模实时数据分析技术感兴趣,欢迎申请加入我们的团队。

AresDB 开源项目地址:top="16534">鸣谢

特别感谢 Kate Zhang、Jennifer Anderson、Nikhil Joshi、Abhi Khune、Shengyue Ji、Chinmay Soman、Xiang Fu、David Chen 还有 Li Ning,是你们让这个项目取得了巨大的成功!

查看英文原文:

更多内容,请关注 AI 前线

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。