在 Uber,实时数据(乘车请求数、可用司机数、天气、游戏等)可以让运营团队作出明智的决定,例如动态定价、最大调度预计到达时间计算以及对我们服务的供求情况进行预测,从而改善 Uber 平台上的用户体验。尽管通过确定中长期趋势,批量数据可以提供强大的洞察力,但 Uber 服务可以将流式数据与实时处理结合起来,以每分钟一次的方式创建可操作的洞察力。
Gairos 是 Uber 的实时数据处理、存储和查询平台,旨在推动大规模、高效率的数据探索。通过数据智能,团队可以更好地理解 Uber 市场并提高其效率。应用实例包括动态定价、最大调度预计到达时间计算和供求预测。
为保证 Gairos 可以继续优化它在不断扩大的用例组合中的性能,我们重新构建了该平台,以实现更好的扩展性、稳定性和可持续性。在这两种最优策略中,影响最大的是数据驱动分片、查询路由和智能缓存。
该平台采用数据驱动的分片和查询路由技术,可支持 4 倍于以往解决方案的并发查询。有些关键的集群甚至已经从每月一次宕机,稳定为每月零宕机。自从 2018 年 12 月发布以来,该平台通过智能缓存技术,其规模已超过 10 倍,缓存命中率超过 80%。
为什么是 Gairos?
在 Uber 生态系统中,每个团队都有自己的数据管道和用于自己用例的查询服务,他们必须对此保持关注(监督监控、预警、维护解决方案的流处理框架等),而不是专注于系统优化。Gairos 的出现为实时数据处理、存储、查询建立了统一的平台,让团队可以将更多精力放在系统优化上。与实时数据系统的通用任务相比,用户可以专注于定制系统的业务逻辑。Gairos 的作用如下:
Gairos 概述
如图 1 所示,Gairos 从不同的 Apache Kafka 主题中获取数据,然后将数据写到不同的 Elasticsearch 集群。
图 1:Gairos 的简化架构展示了该平台中的主要组件
在这些 Elasticsearch 集群中, Gairos 查询服务是查询数据的网关。Gairos 客户端将查询发送到 Gairos 查询服务,实时获取数据。为了满足 Apache Hive 和 Presto 长期分析的需要,数据还被持久存储在 HDFS 中。
在 Gairos 中有几个系统:Apache Kafka、Gairos 摄取管道、 Elasticsearch 集群,Gairos 查询服务等。这些系统中的任何一个出了问题,客户都会受到影响。Gairos 的数据管道数量随着 Uber 市场业务规模的增长而增长。需要向 Gairos 添加越来越多的数据源来支持新的业务用例。
Uber 用例
通过使用 Gairos,我们得到了很多见解——在 Uber 收集用例,包括:
从用户打开 Uber 应用开始,我们把每个行程的基本数据称为会话。这一行为引发了一系列的数据事件,从司机实际接受乘车到行程结束。考虑到系统的复杂性和规模,这些数据会分布在许多不同的事件流中。
举例来说,当司机打开 Uber 应用时,它会触发司机的事件流。这个应用程序将显示该地区提供的行程(uberPOOL、uberX、UberBLACK 等)和每一条行程的价格,这是由我们的动态定价系统产生的,并且每一条行程价格将作为单独的事件出现在印象事件流中。司机接受了行程后,这一请求就被送到我们的调度系统,它把乘客和司机配对,并把他们的车辆分配到该行程。在司机搭乘乘客时,应用程序会向调度系统发送一个“接车完成”事件,该事件将有效地启动行程。当司机到达目的地,它将发送一个 “行程结束”事件,并在应用程序中表明乘客已下车。
一个典型的行程生命周期可以跨越六个不同的事件流,这些事件是由乘客应用、司机应用和 Uber 的后台调度服务器生成的。这些不同的事件流将串联到一个 Uber 行程中。
要让我们的服务能够根据数据洞察力迅速行动,实时地处理这些不同的数据流并进行查询,这是一个挑战。
司机的状态转换
下图(图 2)显示在用户定义的时间窗内旧金山司机的状态转换汇总。这是单个查询在一秒钟内返回的结果。
图 2:调度查询服务从 Gairos 获取并显示旧金山司机的状态转换数据
单个司机的状态转换
下图(图 3)显示了旧金山的单个司机应用在用户定义的时间窗口中的所有状态转换。这个查询与前面的查询相同,只是多了一个过滤器来匹配给定的司机应用 UUID。
图 3:调度服务为给定司机获取司机程序状态数据并显示
按地理位置划分的司机使用情况
下图(图 4)显示了按地理位置划分的司机使用情况。
图 4:按地理位置划分的司机使用情况
最后,让我们通过 Gairos 的数据来了解一下动态定价的原理。
有时,许多人要求乘车,以致路上没有足够的车运送他们。举例来说,坏天气,高峰时间和特殊情况,都能让异常多的人在同一时间想乘坐 Uber。当需求很大时,可以提高票价,以帮助确保需要搭车的人也能搭到车,这就是所谓的动态定价。
要计算由(Uber的六角分层空间指数)定义的六边形的动态倍数,需要从 Gairos 查询请求数量(需求)和可用的司机数量(供给)来获得最新数据。将这些数据输入定价模型,定价模型将生成该位置的动态倍数。图 5 显示了奥克兰体育场周围不同的六边形区域的动态倍数,当时正在进行比赛。
图 5:当有赛事时,奥克兰体育场不同六边形的动态倍数
可扩展性 / 可靠性方面的挑战
在 Gairos 的最初实现中,我们遇到了一些技术挑战和无法预料的问题。
由于使用 Gairos 的用例增加,实时数据流也随之增加。为了方便起见, Gairos 提供了 1500+ TB 的可查询数据总量,并提供了 30 个以上的生产线。总共有 4.5 万亿条记录,集群有 20 多个。在 Gairos 每秒发生的事件超过一百万次。越来越重要的是,要让系统更加稳定、可扩展和可持续,以便为越来越多的用例提供动力。
接下来,我们将重点介绍在开始扩展 Gairos 之后出现的一些技术挑战:
但我们第一次迭代 Gairos 的主要问题是,Gairos 的数据是如何使用的,而不会返回到 Gairos 以指导系统的优化和持续改进。Gairos 没有主动检查数据是否按规定使用,是否能够根据变化进行调整 (流量模式、查询模式等等)。对于 Gairos 自优化项目,我们闭环(图 6),让用户查询驱动优化,使 Gairos 更稳定、更可扩展、更可持续。
图 6:新的架构用红色箭头指示的新数据流为 Gairos 闭环
要让 Gairos 平台更稳定,更可扩展,更低的维护成本,就必须让系统更高效、更智能。
图 7:高层架构展示了平台中的数据流。红色箭头代表新的数据流,浅绿色组件代表两个新的优化:Giaros 查询分析器和优化引擎
经修订的高层结构如上图 7 所示。该系统的主要组成部分如下:
下面,我们详细介绍一下这些组件在整个 Gairos 生态系统中各自负责的内容。
客户端
客户端可以是服务,也可以是数据分析师等非服务用户。
服务客户端包括所有依赖 Gairos 为用户提供服务的实时服务,包括我们的动态定价和行程预测服务。这些服务会将一些事件发送到 Apache Kafka,供下游服务和管道处理。在为请求提供服务时,它们可以从 Gairos 中查询一些数据,然后作出决策。例如,预测服务可能需要查询来改进预测,以预测高流量事件中司机伙伴的需求和供应,或者我们的动态定价服务可能会利用 Gairos 根据需求、供应和一些预测输入来确定动态倍数。
Apache Kafka
Apache Kafka 是一种分布式流媒体平台,允许客户端发布 / 订阅事件流。全部实时服务都可以向其发送重要事件,以供下游服务 / 管道使用。RT-Gairos 还使用它来收集运行在 Gairos 中的所有查询。
Gairos-Ingestion(加工层)
Gairos-ingestion 是一种摄取框架,用于处理来自不同数据源的数据,并将其发布到 Gairos。有些数据源使用 Apache Spark 流。
Elasticsearch(Gairos 存储层)
Gairos 存储层 Elasticsearch 对 Gairos-Ingestion 使用的 30 多个不同数据源的数据进行索引,并为 Gairos 客户的查询做了准备。
RT-Gairos(查询层)
RT-Gairos 作为 Gairos 的网关。在到达 Gairos 存储层之前,所有的查询都会经过它。实时 Gairos 会强制执行访问控制、提供路由,并缓存一些查询结果。RT-Gairos 会收集所有到 Gairos 的查询,并推送到 Apache Kafka 主题。
查询分析器
查询分析器对从 RT-Gairos 收集到的查询进行分析,并生成用于 Gairos 优化引擎输入的见解。首先,利用简单的技术(过滤指标、聚合、时间范围、分片数、索引数)来生成查询模式。
优化引擎
Gairos 优化引擎根据系统统计数据和从 Query Analyzer 获得的查询信息,推荐使用其生命周期知识库进行一些优化。这会更新 Gairos 的设置:Ingestion-path、RT-Gairos 和 Elasticsearch。
有些设置更改可能需要进行基准测试,以了解在应用给定的更改之前 KPI 是否会得到改进。举例来说,对于一个数据源,最佳的分片数量是多少?这就是索引基准测试服务的作用。
索引基准服务
要优化 Gairos 的设置,我们需要使用一个基准测试工具来比较基于已定义 KPI 的不同设置(读 / 写吞吐量、延迟、内存使用等等)。
如图 8 所示,我们概述了 Gairos 基准测试服务的不同组成部分。
图 8:索引基准服务将进行测试并保存测试结果
这些组件包括:
Gairos 基准测试服务将接受 Gairos 优化引擎的请求,并进行基准测试。基准测试服务将复制单个索引,而不是从生产到暂存的整个历史记录,从而提高性能并减少资源使用。如果各个索引的性能都有提高,那么这个数据源的总体性能也会提高,因为它可以独立执行针对不同索引的查询。在评估测试结果之后,优化引擎可以决定是否更改生产环境中的索引设置。
如图 7 所示,整个系统涉及不少步骤。这些步骤包括:
优化策略
我们应用了一些优化策略,其他组织也可以用来优化他们的实时智能平台。
我们将逐一详细阐述。
分片和查询路由
分片就是通过一些键对数据进行分区,这样就可以将键相同的数据放入一个分片中。当写入 Elasticsearch 索引时,必须提供键才能将文档放到正确的分片中。在查询数据时,如果在查询中指定了键,则可以向特定的分片发送查询,而不是向所有分片发送查询。这样减少查询所需的节点数量,可以提高延迟,提高弹性(如果单个节点宕机,但查询不需要,也没关系)。
假设我们想给旧金山的所有司机发送一个促销优惠,我们需要司机列表。在下图 9 中,我们查询的是旧金山的所有司机。在顶部的数据没有按照城市进行分片,查询必须在所有四个分片中进行,以检查是否有司机可用。在底部的数据是按照城市进行分片的。查询只需从包含旧金山的司机的分片中检索数据即可。可以看到,进行查询的次数从 4 次减少到 1 次。
图 9:旧金山中的司机查询需要查询所有分片而不需要进行分片,而它只查询一个带分片的分片
分片的一个常见问题是热点问题(某些分片需要处理比其他分片高得多的写入 / 查询流量)。例如,如果我们按城市 ID 来分发聚合的匿名司机伙伴数据,有些城市(包括旧金山)的规模远比小城市大,从而导致了特定分片或节点负担过重。为了帮助分配决策和负载分配,要使分片的大小和效用大致相等,这一点非常重要。
在进行分片时需要考虑的因素如下:
根据 Write/Read QPS 和分片大小计算分片的数量。下面是寻找分片键的过程(图 10)。一旦确定了分片键,我们就使用历史数据来检查分片分布是否在 Gairos 给定的阈值内。
下图 11 所示是一个简化的分片示例。对于本例,假设每个节点可以处理 3000 个 Write QPS,并且最多可以存储 60 GB 数据。仅考虑数据大小和峰值 Write QPS。
图 11:基于给定的约束条件,将四个具有不同数据量(即我们平台上的用户量较大)和不同 QPS 的城市划分为四个分片。
分片必须满足以下约束:
这样做的目的是将数据尽可能均匀地分布在这些分片上。
根据每个城市的数据大小,我们可以估算出分片的数量:
Shard # based on>
根据峰值 QPS,我们可以得到另一个估计的分片数量:
Shard # based on peak QPS (2k + 3k + 5k + 1k)/3k = 4
求出这两个估计值的最大值:
**Shard #**max(3, 4) = 4
这四座城市将被放在四个分片中。旧金山和南达科他州可以放在同一个分片里。洛杉矶可以放在一个分片里。纽约可以分成两个分片。这样数据就会更均匀地分布在不同的分片中,同时每个节点都可以容纳数据并处理峰值 QPS。
如果要查询旧金山的司机,可以直接转到 1 号分片。而查询纽约的司机,则需要同时指向 3 号和 4 号分片。
为了缓解倾斜的分片和热点问题,我们为 Gairos 开发了一种定制分片算法,下表 12 是默认分片(之前)和我们的分片算法(之后)的最大 / 最小文档数。
每个分片的最大文档数 | 每个分片的最小文档数 | 最大 / 最小 |
---|---|---|
之前 | 4700 万 | 1700 万 |
之后 | 3000 万 | 2300 万 |
表 12:我们的分片算法生成的分片在文档数量上差异较小,文档在分片上分布更均匀
可以看出,文档在这些分片中的分布得更均匀。在 Gairos 的默认分片算法中,每个分片的最大和最小文档数比是 2.76;而我们的定制分片算法是 1.3。
为了检查它们可以支持的延迟和并发用户,我们做了一些基准测试。需求数据源的结果如下。
图 13 显示了不同数量客户端下的延迟。可见,有分片的数据延迟比没有分片的低。客户端数量越多,差异就越大。
图 13:对于需求数据来说,使用分片的延迟要低得多,并且随着客户端数量的增加,这种差异将会越来越大,这在本用例中已经有描述
在不同数量的客户端中,图 14 显示了它能够支持的并发用户数,可以看到,有分片的 QPS 的最高数量大约是没有分片的 4 倍左右。
图 14:有分片的 QPS 是没有分片的 QPS 的 4 倍
下面我们分享一下第二个数据源
supply_geodriver
的一些优化结果。相对于需求数据源(存储乘客请求),文档数量更多,数据大小更大。
图 15:使用分片的延迟较高,并且差异随着客户数量的增加而增加
如图 15 所示,使用分片后,平均延迟较差。就其能够支持的并发用户数量而言,延迟是未使用分片时间的 4 倍,见下图 16。
图 16:有分片的 QPS 是没有分片的 QPS 的 4 倍
第三个数据源是
supply_status
。
图 17:当客户端数量较少时,使用分片的延迟较高,当客户端数量超过 200 时,延迟就会降低
图 18:有分片的 QPS 是没有分片的 QPS 的 4 倍
图 17 表明,当客户端数量较少时,使用分片的平均延迟较高。当客户端数量超过 200 的情况下,平均延迟会降低。从图 18 中 可以看出,使用分片时,它可以支持的并发用户最多,大约是没有分片时的 4 倍。
综上所述,延迟对于某些大型数据源来说可能更糟糕,而且它能够支持的并发用户数量总是 4 倍于不分片的数量。要想在某些大型数据源中获得延迟和可扩展性,我们可以为每一个分区调整分区大小。
图 19:动态定价集群的 CPU 负载呈现出每日模式,并在一天中随着时间的推移而增加。峰值 CPU 负载从 60 降低到 10,每个节点的负载在一天中略有变化
作为分片策略的副产品,我们能够稳定定价集群,如图 19 所示。由于所有索引都是日索引,所以我们的定价集群中的节点的 CPU 负载都呈现每日模式。在一天的时间里,我们可以看到 CPU 负载随着时间的推移而增加。将分片策略应用到定价集群中的所有数据源上,CPU 负载稳定。
基于查询模式和签名的缓存
最简单的缓存方法是缓存所有查询结果。但由于我们的数据非常大,这些结果的总大小会比原始数据大。
此外,有些查询的执行频率不高,其缓存命中率也比较低。为了让缓存更节省资源,我们又引入了两个概念:查询签名和查询模式。我们先通过一个 Gairos 查询的例子,来了解 Gairos 查询是什么样子的。
Gairos 查询是一个 JSON 对象,它可能包含下列字段:
data source
、
granularity
、、、
aggregations
、、、、等。在定义签名时,只使用以下字段:
datasource
、
granularity
、、、
aggregations
、、、。查询签名由这些字段生成,并对每个字段进行排序。
查询模式的定义与字段集相同。惟一的区别是查询模式只考虑所使用的列,而忽略了过滤器中的操作符和值。对于 Gairos 查询,可以使用查询模式和签名进行更有效的分析。
根据查询模式,我们可以定义 RT-Gairos 的缓存规则,以便 RT-Gairos 能够对常用的查询结果进行缓存。举例来说,客户端以固定的时间间隔(1 分钟、5 分钟、1 小时等)来拉取最近两周的数据。若能按天缓存数据,则索引命中率将大大提高,缓存可用于改进搜索性能。可以对范围重叠的重复查询以及基于查询模式的时间粒度应用相似的策略。为提高缓存命中率,需要对查询进行分片,在此过程中,如果查询是可分片的,每一个查询都会根据查询的时间范围分片成多个小查询。有些聚合不能从单一子查询结果中获取聚合结果。这些查询将存储在 Elasticsearch 集群而非缓存中。
下面我们重点介绍一下缓存
rider_sessions
(样本数据集)的一些基准测试结果:
图 20:使用缓存的延迟要低得多,而且随着客户端数量的增加,差异也会显著增加。
图 21:有缓存的 QPS 是没有缓存的 QPS 的 10 倍
如图 20 所示,当我们将缓存应用于这些查询时,平均延迟会大大降低。从图 21 可以看出,它能 够支持的并发用户数量非常大。因为大部分针对
rider_sessions
的查询都很麻烦,所以我们将在其他数据源上进行更多的测试,以验证我们得到的结果。
supply_status
的缓存统计如图 22 所示。可以看出,
supply_status
的命中率在 80% 以上。命中率 QPS 在 50 左右,而设置 QPS 在 10 左右。
图 22:
supply_status
的缓存命中率很高,命中 QPS 在 50 左右,而设置 QPS 在 10 左右
另一个数据源
demand_jobs
如图 23 所示。命中率为 80%。
图 23:
demand_jobs
的缓存命中率在 80% 左右,命中有一些峰值
图 24:
supply_geodriver
的缓存命中率在 30% 左右
图 25:对于需求,根本没有缓存命中
最后,从图 25 可以看出,需求的缓存命中率为 0。可见,对于不同的数据源,使用缓存的效果有很大差异。为了提高缓存命中率,我们计划做更多的调整。
合并索引
Elasticsearch 是使用倒置索引来使搜索速度更快。当删除一个文档时,该文档将被标记为已删除,它仍然存在于倒置索引中。已删除文档将从搜索结果中排除。若删除的文档数量较多,则索引大小较大。
删除这些文件还会影响搜索性能。例如,在图 26 中,司机 D1、D2、D3 都更新了多次。可以看到,有 8 个文档,而司机只有 3 个。在合并索引之后,将清除这些删除的文档,并且使索引的大小减小。
图 25:对于需求,根本没有缓存命中
另一个提高索引性能的重要因素是索引中段的数量。我们会做一些基准测试,以决定应该使用什么时候才能合并索引。该基准将使用的关键指标是索引大小(存储索引的存储空间有多大)和搜索延迟(查询数据所需的时间)。收集到的来自实时系统的查询将用于搜索性能基准测试。
在确定了每个数据源的合并索引标准之后,优化引擎可以执行一些索引优化任务。集群将限制合并索引任务的数量,以便重新索引对于集群性能没有太大影响。为了防止性能的显著下降,任何时候最多只能运行一个合并索引任务。若发现有任何明显的影响,所有强制合并的任务将被中止。
处理繁重的查询
某些重度查询可能会影响整个集群的性能。
为了使集群更加稳定,可采取下列策略:
索引模板优化
从运行的查询中,可以得到每个数据源中每个字段的以下信息,这样我们就可以为每个字段确定索引设置。
每个数据源都必须回答以下问题:用户是否需要拉取原始数据。基于这些输入,可以为每个数据源获得最佳索引设置。可以将优化引擎更新为数据源存储的模板,以便我们能够获得更好的磁盘空间或搜索性能。某些设置(例如禁用源文件)是不向后兼容的,在执行之前需要经过一些批准。注意,禁用源文件会导致无法进行更新和重新编制索引。不应该在业务逻辑需要更新文档时禁用源代码。
因为数据是自动持久化的,并且很容易重放,所以发布的 Apache Kafka 主题在源被禁用前会转变为热管道主题,这样就可以通过重放 Apache Kafka 持久主题中的事件来迁移数据。
图 27 显示了确定每个字段设置的详细工作流程。
图 27:根据使用情况,确认各字段的设置
分片优化
每一个数据源复制一个到暂存集群的索引,然后使用索引工具将复制的索引重新索引到不同数量的分片。对于已复制和已重新索引的数据,基准测试将收集性能数据。所用查询将来自用户以前收集的查询。在为每个数据源确定了最佳分片后,查询优化可以在新索引中设置新分片编号。
界定索引范围
据观察,许多非常小的索引是在一些集群中生成的。这类索引有大量的分片。这在集群中引起了分片分配问题。一些节点可能有许多未使用的分片,而一些节点可能有许多繁忙分片,这会导致节点之间的负载不平衡和资源利用率低。
由于时间戳出界,通常会出现这些小的索引。在写入 Elasticsearch 集群时,根据每个数据源的数据保留和数据预测对数据进行过滤,从而避免创建这些接近空的索引,减少分片的数量。这里有一个群集示例。下面的图 28 显示在我们的一个集群中,经过清理这些小索引之后,分片数量从 4 万左右下降到 2 万。
图 28:清理完这些小索引后,分片数量从 4 万左右下降到 2 万
清除未使用的数据
所收集的查询可以确定最近 X 天内是否使用了数据源。基于此信息,Gairos 优化引擎可以执行各种数据清理任务,比如触发通知,删除数据存储中数据源的索引等。
未来工作
这些优化策略已经被应用于一些主要的数据源中。我们正计划将优化范围扩展到所有数据源,尤其是那些用于分片和缓存的数据源。
整个过程不是自动化的。一旦我们从对这些数据源的优化中积累了足够的领域知识,并应用了这些优化,我们就会投入更多的精力来实现整个过程的自动化。
一些机器学习/深度学习方法也可用于查询分析器,我们将在未来的迭代中对此进行探索。
作者介绍:
Gang Zhao,在 Uber 领导 Gairos 优化,同时专注于存储层(Elasticsearch)和查询层的优化。
Wenrui Meng,Uber 高级软件工程师,实力与实时数据指标优化。
Qing Xu,Uber 市场情报部工程经理。
Yanjun Huang,Uber 核心基础设施团队高级软件工程师,也是 Elasticsearch 专家。
原文链接:
中国顶尖技术团队访谈录(2021年第一季)