过去一年 Spotify最大一次数据流优化实践 (过去一年收获满满词语)

过去一年 Spotify最大一次数据流优化实践 (过去一年收获满满词语)

本文,我们将讨论 Spotify 如何使用 Sort Merge Bucket(排序合并桶,SMB)联接技术优化和加速 target="_blank">Wrapped 2019中的元素,以用于Wrapped 2020。本文介绍了 SMB 的设计和实现,以及如何将 SMB 集成到数据管道中。

引言

Shuffle 是许多大型数据转换,例如 join、GroupByKey 或其他 reduce 操作的核心构件。遗憾的是,这也是许多管道中最昂贵的一步。SortMergeBucket 是一种通过在生产者端做预先工作来减少 Shuffle 的优化。直观地说,对于经常添加到已知 key 上的常见数据集,例如,用户 ID 上包含元数据的用户事件,我们可以将其写到 bucket 文件中,并根据该 key 记录进行分桶和排序。shuffle 知道哪些文件包含 key 的子集和次序,就可以从匹配的 bucket 文件中合并排序值,从而完全消除移动 Key-value 对代价高昂的磁盘和网络 I/O。Andrea Nardelli 在2018 年的硕士论文中进行了关于 Sort Merge Bucket 的最初研究,之后我们开始研究将这个想法推广为Scio 模块。

设计与实现

Spotify 的大部分数据管道都是用编写的,Scio 是Apache Beam的 Scala API,它在Google Cloud target="_blank">与 Beam 社区一起编写并合作了一份设计文档),和其他许多 I/O 一样, Scio 也提供了语法糖 Scala。 这个设计被模块化到以下所列的主要组件中——我们将从两个顶级 SMBPTransform开始: SortedBucketSink 和 SortedBucketSource 的写和读操作。

SortedBucketSink

这个转换以 SMB 格式写入一个 PCollection<T>(其中 T 有一个对应的 FileOperations<T> 实例)。它首先使用BucketMetadata提供的逻辑提取 Key 和分配 bucket ID,按 ID 对 key-values 进行分组,对所有 values 进行排序,然后使用 FileOperations 实例将它们写入与 bucket ID 相对应的文件中。

除了 bucket 文件之外,还将一个 JSON 文件写入输出目录,这代表从 BucketMetadata 读取源所需的信息:bucket 的数量、散列方案以及从每条记录中提取 key 的指令(例如,对于 Avro 记录,我们可以用包含 key 的 GenericRecord 字段的名称来对该指令进行编码)。

SortedBucketSource

该转换是从一个或多个 SMB 格式的源读取,使用相同的 key 和散列方案编写。它打开了来自每个源的相应 bucket 的文件句柄(使用该输入类型的 FileOperations<T>),并在保持排序的同时合并它们。这些结果作为 CoGbkResult 对象在每个 key 组中发出,就像 Beam 用于常规 Cogroup 操作的类一样,这样用户就可以使用正确的参数来提取每个源的结果。

FileOperations

FileOperations 抽象化了单个 bucket 文件的读写。因为我们需要精确地控制每个文件中的元素及其顺序,因此我们无法利用现有 Beam 文件 I/O,因为它们是在 PCollection 层上操作的,元素的位置和顺序也被抽象出来。取而代之的是, SMB 文件操作是在 BoundedSource(输入)和 ParDo(输出)的 底层进行的。当前, Avro、 BigQuery TableRow JSON 和 TensorFlow TFRecord/Example 记录得到了支持。我们还打算添加其他格式,如 Parquet。

BucketMetadata

该类对元素的 keying 和 bucketing 进行了抽象,包含诸如 key 字段、类、 bucket 的数量,碎片和散列函数。在写入时,元数据与数据文件一起序列化为 JSON 文件,用于在读取 SMB 源代码时检查兼容性。

优化和变体

在过去一年半的时间里,我们在 Spotify 的各种用例中采用了 SMB,并且在处理数据管道的规模和复杂性方面做了很多改进。

采用核心数据生产者

因为 SMB 需要以一种特殊的方式对数据进行分类和排序,采用自然是从那个数据的生产者开始。大多数 Spotify 的数据处理依赖于几个核心数据集,它们是各个业务领域的真实数据源,例如流媒体活动、用户元数据和流上下文。为了把一年的数据转换成 SMB 格式,我们和数据集维护者一起工作。

由于 SortedBucketSink 主要通过一些额外的设置来替换 vanilla Avro sink,因此实现起来很简单。利用 Avrosink 的 sharding 选项可以控制输出文件的数量和大小。在迁移到 SMB 之后,我们没有注意到 vCPU、 vRAM 或实际时间的任何显著变化,因为分片需要一个完整的 shuffle,这与 SMB sink 的额外成本相似。我们随后还必须调整其他一些设置:

采用 Wrapped 2020

一旦核心数据集以 SMB 格式提供,我们就开始 Wrapped 2020,建立在 Wrapped 2019 活动留下的工作基础上。设计这个架构是为了可重用,这是好的开始。然而,数据源是一个大型、昂贵的 Bigtable 集群,必须进一步扩大规模以处理 Wrapped 作业的负载。通过将 Bigtable 迁移到 SMB 源,我们希望节省成本和时间。今年我们还需要处理过滤和聚合流的新的复杂需求。这样就需要在用户的监听历史中添加一个包含流上下文信息的大数据集。因为每一个联接都有很大的规模,所以这几乎是不可能的,或者至少是非常昂贵的。取而代之的是,我们试图使用 SMB 来完全消除此连接,并避免将 Bigtable 用作监听历史的源。

为了计算 Wrapped 2020,我们必须从三个主要数据源读取流媒体活动、用户元数据以及流媒体上下文。这三个源拥有我们需要的所有数据来生成每个人的 Wrapped,同时根据监听背景进行过滤。在此之前,Bigtable 中已经有 5 年的监听历史记录,并通过 user_id 进行了按 key 排序。现在,我们可以通过 SMB 从这三个源读取已经按 user_id key 进行排序的数据。然后,我们将每个 key 一年的数据进行汇总,计算出每个用户的 Wrapped。

由于 3 个主要源中有 1 个是按小时分区的,而另外 2 个是按日分区的,所以在一个作业中读一年的数据是有问题的,因为按小时分区的数据源有很多并行读取。取而代之的是,我们先运行一些较小的作业,这些作业会汇总出每个用户每周或每天的播放次数、 播放毫秒和其他信息。然后,我们再把所有这些更小的分区汇总成一个数据分区,这个数据分区将保存一年。

SMB 让这一切变得相对简单。利用 sortMergeTransform,我们可以将三个数据源结合起来,分别读取每个按 user_id key 排序的数据,并以 SMB 格式写入 Wrapped 输出(播放次数、播放毫秒、播放上下文等等)。

最后,我们运行聚合作业,使用 sortMergeGroupByKey 读取 SMB 的所有 Wrapped 周分区,合并一年的数据,然后将结果写到输出中,以便以后的作业计算剩余的 Wrapped。此处的灵活性的关键之处在于,聚合作业可以采用任何周和日分区的混合形式,这在运行它们时具有很大的逻辑意义。在实践中,最终结果如下:

在今年的 Wrapped 项目中,这最终为我们节省了很多成本。通过利用 SMB,我们可以在不使用传统的 shuffle 或 Bigtable 的情况下,成功连接了大概总共 1PB 的数据。据我们估计,与往年基于 Bigtable 的方法相比,今年的 top="5119">结语

通过采用 SMB,我们能够执行以前无法实现或成本高昂,或需要像 Bigtable 这样的定制工作方案的超大规模连接。在节省成本的同时,我们也开创了更多优化工作流程的方法。我们还需要做大量的工作。我们期望将更多的工作流迁移到 SMB,同时还可以处理更多的边缘情况,比如数据偏移、复合 key 以及更多的文件格式。

作者介绍:

本文作者为 Neville Li、Claire McGinty、Sahith Nallapareddy、Joel Östlund。

原文链接:

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。