假设我们正在运行一个基于 Web 的服务。请求处理变慢最终将会导致服务不可用。实际上,并不是所有的请求都需要立即处理。有些请求只要确认已收到即可。你有没有问过自己这样的问题:“我是否能够从异步请求处理中获益?如果确实如此的话,我该如何在一个实时的、大规模的关键任务系统中做出这种转变?”
接下来,我将要讨论我们是如何将一个面向用户的系统从基于请求-响应的同步系统迁移为一个异步系统的。我将会介绍是什么原因促使我们开始了这样的旅程,我们做出了哪些系统设计变更,这一过程中的挑战是什么,以及我们做出了哪些设计选择和权衡。最后,我会介绍在上线新系统时我们所使用的验证过程。
原始架构
是一个流视频服务,全球有超过两亿的会员在使用该服务。会员可以在众多支持的设备上观看电视节目、纪录片和电影。在访问 Netflix 的时候,借助我们的个性化推荐系统,他们能够看到各种选择。用户只需要点击播放按钮,就可以坐下来享受观看电影所带来的乐趣了。
我们会在这个电影放映过程中收集很多数据,用于运维和用例分析。其中有些数据会用来支撑我们的产品特性,比如继续观看,该特性允许会员中途停止观看电影,稍后等他回来时,在任何其他设备上都能从这个暂停点继续观看。这些数据还用于个性化和推荐引擎,以及核心业务分析。
我将会介绍迁移某个产品特性的经验,也就是浏览历史,该特性允许会员看到他们过去的浏览活动历史并且可以选择隐藏它。我们先看一下迁移之前的现有系统。在较高的层次上来讲,我们有支持不同设备的 Netflix 客户端,比如手机、台式机、笔记本电脑以及电视,它们在放映过程中会向 Netflix 云发送消息。
首先,数据会到达 Gateway 服务。在这里,它会被发送至 Playback API,该 API 会管理放映会话的生命周期。此外,它还会将放映数据发送至 Request Processor 层。在 Request Processor 中,除了其他的任务之外,它会将长期和短期的观看数据存储到持久层中,对我们来说,也就是Apache Cassandra以及用作缓存层的,后者能够让我们实现快速查找。
回压
大多数时候,该系统都能运行得非常好。在极少数情况下,有可能因为网络突发事件,或者某个 Cassandra 节点暂时运行缓慢,导致正在处理的某个请求被延迟。当这种情况发生时,鉴于此处是同步处理,请求处理线程不得不等待。这反过来又会使上游的 Playback API 服务变慢,进而使得 Gateway 网关服务变慢。
除了云中的重试策略之外,这种速度变慢还会响应到运行在用户设备上的 Netflix 客户端。有时候,这种现象被称为回压。回压可以表现为系统中的不可用,并且会建立一个客户端必须要进行重试的队列。在这里面,有些数据对我们的工作至关重要,所以我们希望避免数据丢失,例如,如果客户端填充其本地队列时,该队列的容量是有限的,那么就可能会造成数据丢失。
我们对该问题的解决方案是在系统中引入异步处理。在 Playback API 服务和 Request Processor 之间,我们引入了一个可持久化的队列。现在,当请求传入的时候,它会被放到一个持久化队列中,并且立即确认。这样,就没有必要等待该请求处理完毕了。
事实证明,Apache Kafka非常适合这样的场景。Kafka 提供了一个日志抽象,像 Playback API 这样的生产者可以追加日志,然后多个消费者都可以基于偏移量按照自己的节奏从 Kafka 日志中读取数据。
这听上去很简单。但是,如果只是在两个处理层之间引入 Apache Kafka,我们就能宣告大功告成吗?并非如此。Netflix 的运营规模达到了每秒钟 100 万个事件。在这种规模下,我们遇到了异步处理的一些挑战,包括数据丢失、处理延迟、乱序和重复记录,以及偶然性的处理失败。另外,还有涉及 Kafka 消费者平台的选择以及跨区域方面的设计决策。
挑战:数据丢失
关于数据丢失,有两个潜在的诱因。首先,如果 Kafka 集群本身不可用,毫无疑问,我们将会丢失数据。解决这个问题的简单方法就是添加一个额外的备用集群。如果主集群由于难以预见的原因而导致不可用的话,发布者(本例中,也就是 Playback API)可以将请求发布至备用集群。消费者的请求处理器可以连接至两个 Kafka 集群,因此不会丢失任何数据。
显然,这里付出的代价就是额外的成本。对于特定类型的数据来讲,这种行为是可行的。但是,是否所有的数据都需要这样做呢?幸运的是,答案是否定的。我们有两种类型的放映数据。关键数据将会按照这种方式来处理,这证明备用集群的额外成本是值得的。其他不太重要的数据则会只使用一个普通的 Kafka 集群。由于 Kafka 本身采用了很多的策略来提升高可用性,所以这足以满足我们的要求。
数据丢失的另外一个诱因是发布时间。Kafka 具有多个分区来增加可扩展性。每个分区都是由一组叫做 broker 的服务器来提供服务。其中,有一台服务器会被选为首领(leader)。当发布消息到某个分区时,会将数据发送至首领 broker。我们可以仅等待首领确认条目已经持久化到它的存储中,也可以等待跟随者(follower)broker 都确认它们也已写入到了持久化存储中。如果我们处理的是关键数据的话,等待分区所有 broker 的确认是合理的做法。在大规模的场景中,这不仅仅会涉及到多次写入的成本问题。
如果丢失了对首领 broker 的连接,会发生什么呢?在部署新架构仅仅几个月后,我们就遇到了这样的情况。如果在等待某个 broker 的确认信息时,该 broker 突然不可用,这显然会降低处理速度。这种速度减慢会导致回压和不可用,这正是我们所要避免的。
如果我们只是从首领 broker 中获取确认信息,将会遇到一个非常有意思的故障场景。如果我们在成功发布后又丢失了对首领 broker 的连接,会发生什么呢?首领选举会产生一个不同的首领。但是,如果原来首领已确认的条目并没有完全复制到其他 broker 上,那么这样的首领选举就会造成数据丢失,而这正是我们想要避免的。这叫做不完整的(unclean)broker 首领选举。
我们是如何处理这种情况的呢?同样,这里需要进行权衡。我们有一个生产者库,它是基于 Kafka 生产者客户端的一个包装器。这里有两个相关的优化。首先,因为我们使用了 non-keyed 分区,所以库能够选择它写入的分区。如果某个分区因为首领 broker 不可用而无法使用它的话,我们的库会写入到不同的分区中。同时,因为分区是在一个 under-replicated 的 broker 集合上,也就是首领 broker 具有的条目比跟随者 broker 更多,复制还没有全部完成,那么我们的库会选择一个复制情况更好的分区。
通过这些策略,我们最终选择了以异步模式写入,即发布者将消息写入内存队列,并异步发送至 Kafka。这有助于扩展性能,但是我们希望在同时遇到多个错误的时候,能够有一个最坏情况的数据损失上限。基于内存队列的大小和避免 under-replicated 分区策略,我们配置了令自己满意的上限。
我们监控了数据的持久性,从中可以持续得到 4 到 5 个 9 的可靠性,对于我们来讲,这是可以接受的。如果你的应用不允许丢失任何数据,那么可以选择在接受到所有 broker 的确认之后,再将该条目视为已处理。
挑战:处理延迟和自动扩展
在我们的系统中引入 Kafka 会有一个无法避免的副作用,那就是请求处理会有额外的延迟,这包括 Playback API 发布消息到 Kafka 以及 Request Processor 消费消息所需的时间。
还有就是数据在 Kafka 队列中等待的时间。这被称为滞后(lag),它是由消费者工作者节点和流量组成的一个函数。对于给定数量的节点,随着流量的增加,滞后也会随之增加。
如果你对峰值流量有一个明确的预估,那么就可以计算出系统中所需的处理节点的数量,以达到可接受的滞后。这样,我们就可以简单配置系统来管理峰值流量,只需“简单地设置即可,无需过多关注”。
对于我们来讲,流量在每天的不同时段和每周的不同日子都会有所变化。我们发现峰值的流量是低谷期的 5 倍。鉴于如此巨大的流量变化,我们希望能够更有效地利用资源,所以选择了自动伸缩。具体来讲,我们会基于流量添加或移除特定数量的消费者处理节点。
每当我们改变 Kafka 主题的消费者数量时,该主题的所有分区会在新的消费者集合之间重新平衡。这里的权衡是资源使用效率与重新平衡的成本。重新平衡会以不同的方式影响到我们。
如果你的处理是有状态的,那么必须要做一些复杂的事情。比如,消费者必须要暂停处理,然后获取内存状态,并根据 Kafka 中的偏移量进行已处理到何处的检查点判断。在分区重平衡之后,消费者会重新加载检查点之后的数据,然后基于检查点的偏移重新开始处理。
如果你的处理稍微简单一些,或者以外部方式存储状态,那么你可能会在重平衡进行的时候,继续正常进行处理。这样的话,当重平衡开始时,某些条目正在处理中,它们还没有发送确认消息到 Kafka,这些条目将会出现在另外一个处理节点中,因为该节点在重平衡后对应到了这个分区。在最糟糕的场景中,我们会重复处理某些条目。如果你的处理是幂等的或者有其他方式处理重复的项目,那么这并不是什么问题。
下一个问题就是,何时以及按照多大的幅度来进行自动扩展?有人可能认为滞后是触发自动扩展的一个好指标。但问题在于,我们无法很容易地通过这个指标进行收缩。当滞后是零的时候,我们如何得知该收缩 1 个、10 个还是 50 个处理节点呢?一次性移除太多的节点会导致“摇摆(flapping)”,也就是在短时间内一次又一次地移除和重复添加节点。
在实践中,很多开发人员会使用某个代理指标,比如 CPU 利用率。对于我们来讲,事实证明,每秒记录数(records-per-second,RPS)是一个很好的触发器,能够用来实现自动扩展。当系统处于稳定状态时,我们会测量 RPS 并建立一个基线。然后,当吞吐量相对于该基线发生变化时,我们就可以添加或移除节点了。
我们也有不同的模式实现扩展和收缩。我们希望避免在扩展过程中进行重平衡,因为此时已经有大量输入的数据,重平衡会暂时减缓消费者的速度,所以我们希望快速扩展。而收缩可以逐渐完成,因为此时支持的吞吐量要高于实际的值,我们可以接受再平衡带来的减速。
挑战:乱序和重复记录
在分布式系统中,会出现乱序和重复记录的情况。如何解决这个问题取决于应用的具体情况。在我们的场景中,我们使用了会话机制(sessionization),它会收集一个视频放映会话内的事件,该会话有一个明确的开始和结束事件。因此,我们会收集这个边界内该会话的所有事件。
对于给定会话内的多个事件,基于数据内的特定属性,我们会对其进行排序并去重。例如,每个事件会有一个递增的 ID 或来自客户端的时间戳。对于写入操作来说,当事件抵达服务器时,我们能够通过时间戳进行去重。
挑战:偶然性的处理失败
在消费者端,我们还要解决偶尔处理失败的问题。通常情况下,我们不想因为一个失败的条目而中断对整个队列的处理,有时候这种做法叫做队头(head-of-line)阻塞。相反,我们想要把失败的条目放到一边,继续处理队列中的其他内容,稍后再来单独处理它。
我们希望系统具备这样的特点,那就是再次尝试之前应该有一个有限的等待时间,没有必要立即进行尝试。我们可以为这些失败的条目使用一个单独的队列,也叫做延迟队列(delay queue)。有多种方式可以实现这一点。我们可以将其写入到另外一个 Kafka 队列中,然后构建一个延迟执行的处理器。
对于我们来讲,使用Amazon Simple Queue Service(SQS)很容易实现这一点,因为我们已经在Amazon Elastic Compute Cloud(EC2)上运行了。我们会将失败的条目提交到 SQS 队列中,该队列有一项特殊的功能,里面的条目在消费之前,可以指定一个间隔时间。
消费者平台
我们可以使用多种平台来消费和处理来自 Kafka 的条目。在 Netflix,我们使用了三种不同的平台。Apache Flink是一个流行的流处理系统。是 Netflix 几年前开源的一个流处理系统。最后,Kafka 有一个嵌入式的消费者客户端,借助它我们可以编写微服务,直接处理 Kafka 中的条目。
我们首先面临的问题是,哪种平台最好用?最终,我们意识到这个问题本身就是错误的。相反,问题应该是,哪种处理平台在哪种使用场景下最为有利?这三种方式各有优缺点,在不同的场景下,这三种平台我们都用到了。
如果要进行复杂的流处理,那么 Mantis 和 Apache Flink 非常合适。Apache Flink 还内置了对有状态流处理的支持,其中每个节点都可以存储本地状态,例如可以用于会话机制。微服务是很有吸引力的方案,至少对我们来讲是这样的,因为 Netflix 工程师对微服务生态有良好的支持,涵盖生成或初始化整洁的代码库,到 CI/CD 流水线和监控。
跨区域的问题
跨区域(cross-region)问题是很重要的,因为 Netflix 要在多个区域运营。由于我们正在运维的是一个大型的分布式系统,某个区域可能偶尔会出现不可用的情况。我们每年都会进行多次演练,关闭某个区域,确保能够实现跨区域的流量转发。
乍看上去,这没有什么问题,如果某个条目本来要发往另外一个区域,我们可以借助跨区域的隧道,将其远程发布到 Kafka 主题中。通常情况下,这是可行的,但是当我们遇到该区域真正无法使用时,远程发布方式就无法奏效了。
我们做了一个简单但微妙的修改,那就是始终希望在本地发布。我们发布到另外一个 Kafka 主题,并使用区域路由器异步将其发送至另外一个区域中。通过这种方式,某个放映会话的所有事件就可以一起处理了。
测试、验证和上线
现在我们已经解决了挑战,并做了权衡,那么该如何测试和上线的呢?在这个过程中,可以使用影子测试(Shadow testing)技术。在你的环境中,可能已经使用过类似的策略了。对我们来讲,这包括让 Playback API 双重写入到现有的同步系统和 Apache Kafka 中,异步请求处理器会基于 Apache Kafka 进行消费。然后,我们会有一个验证程序,它能校验正在处理中的请求是一致的。
下一步就是确保存储的制品是相同的。为此,我们创建了一个影子 Cassandra 集群。在这里,我们使用成本来换取足够的信心。如果你能有一个环境,可以相对容易地在短时间内获得额外的资源,那么在上线之前,它可以给你带来额外的信心,对于我们这样的云环境来讲,这一点是很容易实现的。
最终,我们上线了这个方案,按 userId 分割流量,也就是说,给定 userId 的所有流量被持续写入新系统或旧系统。我们从 1%的用户数据写入新系统开始,然后逐步增加比例,一直到 100%的用户。这使我们的迁移非常顺利,没有对上游或下游系统造成影响。
下图显示了我们现在的情况和下一步的发展方向。其中,蓝色的内容,即 Playback API、Viewing History Processor 和 Bookmark Processor 以及 Kafka,现在已经投入生产。我们还有处理额外数据的其他组成部分,包括 Attributes processor 和 Session Logs 服务,它们将会很有趣,因为数据量非常大,比你通常写进 Kafka 的数据要大得多。我们将有一些新的挑战需要解决。
结论
我们已经看到,异步处理是如何为我们提高可用性和数据质量的,以及在我们的环境中,是如何进行设计选择和权衡的。在实施之后,影子测试和增量上线确保我们能够进行自信和顺利地部署。有了这些信息,请思考如何将这些经验应用到你的环境中,以及在类似的旅程中可能会做出哪些其他的权衡。
作者简介
Sharma Podila 是一位软件工程领导、系统建设者、合作者和导师。他在云资源管理、分布式系统、数据基础设施方面有很深的造诣,并在交付跨功能的大规模分布式系统方面有着丰富的成功经历。
原文链接:
Microservices to Async Processing Migration at Scale
相关阅读:
Netflix基于Redis、Kafka和Elasticsearch构建高吞吐优先队列Timesone
破天荒!Netflix 为软件工程师引入细分职级,已有不少老员工因对新职级不满离职
Netflix工作10年,我收获的一些关键经验