BIGO 于 2014 年成立,是一家高速发展的科技公司。基于强大的音视频处理技术、全球音视频实时传输技术、人工智能技术、CDN 技术,BIGO 推出了一系列音视频类社交及内容产品,包括Bigo Live(直播)和Likee(短视频)等,在全球已拥有近 1 亿用户,产品及服务已覆盖超过 150 个国家和地区。
挑战
最初,BIGO 的消息流平台主要采用开源 Kafka 作为数据支撑。随着数据规模日益增长,产品不断迭代,BIGO 消息流平台承载的数据规模出现了成倍增长,下游的在线模型训练、在线推荐、实时数据分析、实时数仓等业务对消息流平台的实时性和稳定性提出了更高的要求。开源的 Kafka 集群难以支撑海量数据处理场景,我们需要投入更多的人力去维护多个 Kafka 集群,这样成本会越来越高,主要体现在以下几个方面:
如果继续使用 Kafka,成本会不断上升:扩缩容机器、增加运维人力。同时,随着业务规模增长,我们对消息系统有了更高的要求:系统要更稳定可靠、便于水平扩展、延迟低。为了提高消息队列的实时性、稳定性和可靠性,降低运维成本,我们开始考虑是否要基于开源 Kafka 做本地化二次开发,或者看看社区中有没有更好的解决方案,来解决我们在维护 Kafka 集群时遇到的问题。
为什么选择 Pulsar
2019 年 11 月,我们开始调研消息队列,对比当前主流消息流平台的优缺点,并跟我们的需求对接。在调研过程中,我们发现 Apache Pulsar 是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体。Pulsar 能够无缝扩容、延迟低、吞吐高,支持多租户和跨地域复制。最重要的是,Pulsar 存储、计算分离的架构能够完美解决 Kafka 扩缩容的问题。Pulsar producer 把消息发送给 broker,broker 通过 bookie client 写到第二层的存储 BookKeeper 上。
Pulsar 采用存储、计算分离的分层架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性。
为了进一步加深对 Apache Pulsar 的理解,衡量 Pulsar 能否真正满足我们生产环境大规模消息 Pub-Sub 的需求,我们从 2019 年 12 月开始进行了一系列压测工作。由于我们使用的是机械硬盘,没有 SSD,在压测过程中遇到了一些性能问题,在 StreamNative 的协助下,我们分别对和BookKeeper进行了一系列的性能调优,Pulsar 的吞吐和稳定性均有所提高。
经过 3~4 个月的压测和调优,我们认为 Pulsar 完全能够解决我们使用 Kafka 时遇到的各种问题,并于 2020 年 4 月在测试环境上线 Pulsar。
Apache Pulsar at BIGO:Pub-Sub 消费模式
2020 年 5 月,我们正式在生产环境中使用 Pulsar 集群。Pulsar 在 BIGO 的场景主要是 Pub-Sub 的经典生产消费模式,前端有 Baina 服务(用 C++ 实现的数据接收服务),Kafka 的 Mirror Maker 和 Flink,以及其他语言如 Java、Python、C++ 等客户端的 producer 向 topic 写入数据。后端由 Flink 和 Flink SQL,以及其他语言的客户端的 consumer 消费数据。
在下游,我们对接的业务场景有实时数仓、实时 ETL(Extract-Transform-Load,将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程)、实时数据分析和实时推荐。大部分业务场景使用 Flink 消费 Pulsar topic 中的数据,并进行业务逻辑处理;其他业务场景消费使用的客户端语言主要分布在 C++、Go、Python 等。数据经过各自业务逻辑处理后,最终会写入 Hive、Pulsar topic 以及 ClickHouse、HDFS、Redis 等第三方存储服务。
Pulsar + Flink 实时流平台
在 BIGO,我们借助 Flink 和 Pulsar 打造了实时流平台。在介绍这个平台之前,我们先了解下 Pulsar Flink Connecto 的内部运行机理。在 Pulsar Flink Source/Sink API 中,上游有一个 Pulsar topic,中间是 Flink job,下游有一个 Pulsar topic。我们怎么消费这个 topic,又怎样处理数据并写入 Pulsar topic 呢?
按照上图左侧代码示例,初始化一个 StreamExecutionEnvironment,进行相关配置,比如修改 property、topic 值。然后创建一个 FlinkPulsarSource 对象,这个 Source 里面填上 serviceUrl(brokerlist)、adminUrl(admin 地址)以及 topic 数据的序列化方式,最终会把 property 传进去,这样就能够读取 Pulsar topic 中的数据。Sink 的使用方法非常简单,首先创建一个 FlinkPulsarSink,Sink 里面指定 target topic,再指定 TopicKeyExtractor 作为 key,并调用 addsink,把数据写入 Sink。这个生产消费模型很简单,和 Kafka 很像。
Pulsar topic 和 Flink 的消费如何联动呢?如下图所示,新建 FlinkPulsarSource 时,会为 topic 的每一个分区新创建一个 reader 对象。要注意的是 Pulsar Flink Connector 底层使用 reader API 消费,会先创建一个 reader,这个 reader 使用 Pulsar Non-Durable Cursor。Reader 消费的特点是读取一条数据后马上提交(commit),所以在监控上可能会看到 reader 对应的 subscription 没有 backlog 信息。
在 Pulsar 2.4.2 版本中,由 Non-Durable Cursor 订阅的 topic,在接收到 producer 写入的数据时,不会将数据保存在 broker 的 cache 中,导致大量数据读取请求落到 BookKeeper 中,降低数据读取效率。BIGO 在 Pulsar 2.5.1 版本中修正了这个问题。
Reader 订阅 Pulsar topic 后,消费 Pulsar topic 中的数据,Flink 如何保证 exactly-once 呢?Pulsar Flink Connector 使用另外一个独立的 subscription,这个 subscription 使用的是 Durable Cursor。当 Flink 触发 checkpoint,Pulsar Flink Connector 会把 reader 的状态(包括每个 Pulsar Topic Partition 的消费位置) checkpoint 到文件、内存或 RocksDB 中,当 checkpoint 完成后,会发布一次 Notify Checkpoint Complete 通知。Pulsar Flink Connector 收到 checkpoint 完成通知后,把当前所有 reader 的消费 Offset,即 message id 以独立的 SubscriptionName 提交给 Pulsar broker,此时才会把消费 Offset 信息真正记录下来。
Offset Commit 完成后,Pulsar broker 会将 Offset 信息(在 Pulsar 中以 Cursor 表示)存储到底层的分布式存储系统 BookKeeper 中,这样做的好处是当 Flink 任务重启后,会有两层恢复保障。第一种情况是从 checkpoint 恢复:可以直接从 checkpoint 里获得上一次消费的 message id,通过这个 message id 获取数据,这个数据流就能继续消费。如果没有从 checkpoint 恢复,Flink 任务重启后,会根据 SubscriptionName 从 Pulsar 中获取上一次 Commit 对应的 Offset 位置开始消费。这样就能有效防止 checkpoint 损坏导致整个 Flink 任务无法成功启动的问题。
先做 checkpoint N,完成后发布一次 notify Checkpoint Complete,等待一定时间间隔后,接下来做 checkpoint N+1,完成后也会进行一次 notify Checkpoint Complete 操作,此时把 Durable Cursor 进行一次 Commit,最终 Commit 到 Pulsar topic 的服务端上,这样能确保 checkpoint 的 exactly-once,也能根据自己设定的 subscription 保证 message “keep alive”。
Topic/Partition Discovery 要解决什么问题呢?当 Flink 任务消费 topic 时,如果 Topic 增加分区,Flink 任务需要能够自动发现分区。Pulsar Flink Connector 如何实现这一点呢?订阅 topic 分区的 reader 之间相互独立,每个 task manager 包含多个 reader thread,根据哈希函数把单个 task manager 中包含的 topic 分区映射过来,topic 中新增分区时,新加入的分区会映射到某个 task manager 上,task manager 发现新增分区后,会创建一个 reader,消费掉新数据。用户可以通过设置 `partition.discovery.interval-millis` 参数,调配检测频率。
为了降低 Flink 消费 Pulsar topic 的门槛,让 Pulsar Flink Connector 支持更加丰富的 Flink 新特性,BIGO 消息队列团队为 Pulsar Flink Connector 增加了 Pulsar Flink SQL DDL(Data Definition Language,数据定义语言) 和 Flink 1.11 支持。此前官方提供的 Pulsar Flink SQL 只支持 Catalog,要想通过 DDL 形式消费、处理 Pulsar topic 中的数据不太方便。在 BIGO 场景中,大部分 topic 数据都以 JSON 格式存储,而 JSON 的 schema 没有提前注册,所以只能在 Flink SQL 中指定 topic 的 DDL 后才可以消费。针对这种场景,BIGO 基于 Pulsar Flink Connector 做了二次开发,提供了通过 Pulsar Flink SQL DDL 形式消费、解析、处理 Pulsar topic 数据的代码框架(如下图所示)。
左边的代码中,第一步是配置 Pulsar topic 的消费,首先指定 topic 的 DDL 形式,比如 rip、rtime、uid 等,下面是消费 Pulsar topic 的基础配置,比如 topic 名称、service-url、admin-url 等。底层 reader 读到消息后,会根据 DDL 解出消息,将数据存储在 test_flink_sql 表中。第二步是常规逻辑处理(如对表进行字段抽取、做 join 等),得出相关统计信息或其他相关结果后,返回这些结果,写到 HDFS 或其他系统上等。第三步,提取相应字段,将其插入一张 hive 表。由于 Flink 1.11 对 hive 的写入支持比 1.9.1 更加优秀,所以 BIGO 又做了一次 API 兼容和版本升级,使 Pulsar Flink Connector 支持 Flink 1.11。
BIGO 基于 Pulsar 和 Flink 构建的实时流平台主要用于实时 ETL 处理场景和 AB-test 场景。
实时 ETL 处理场景
实时 ETL 处理场景主要运用 Pulsar Flink Source 及 Pulsar Flink Sink。这个场景中,Pulsar topic 实现几百甚至上千个 topic,每个 topic 都有独立的 schema。我们需要对成百上千个 topic 进行常规处理,如字段转换、容错处理、写入 HDFS 等。每个 topic 都对应 HDFS 上的一张表,成百上千个 topic 会在 HDFS 上映射成百上千张表,每张表的字段都不一样,这就是我们遇到的实时 ETL 场景。
这种场景的难点在于 topic 数量多。如果每个 topic 维护一个 Flink 任务,维护成本太高。之前我们想通过 HDFS Sink Connector 把 Pulsar topic 中的数据直接 sink 到 HDFS 上,但处理里面的逻辑却很麻烦。最终我们决定使用一个或多个 Flink 任务去消费成百上千个 topic,每个 topic 配自己的 schema,直接用 reader 来订阅所有 topic,进行 schema 解析后处理,将处理后的数据写到 HDFS 上。
随着程序运行,我们发现这种方案也存在问题:算子之间压力不均衡。因为有些 topic 流量大,有些流量小,如果完全通过随机哈希的方式映射到对应的 task manager 上去,有些 task manager 处理的流量会很高,而有些 task manager 处理的流量很低,导致有些 task 机器上积塞非常严重,拖慢 Flink 流的处理。所以我们引入了 slot group 概念,根据每个 topic 的流量情况进行分组,流量会映射到 topic 的分区数,在创建 topic 分区时也以流量为依据,如果流量很高,就多为 topic 创建分区,反之少一些。分组时,把流量小的 topic 分到一个 group 中,把流量大的 topic 单独放在一个 group 中,很好地隔离了资源,保证 task manager 总体上流量均衡。
AB-test 场景
实时数仓需要提供小时表或天表为数据分析师及推荐算法工程师提供数据查询服务,简单来讲就是 app 应用中会有很多打点,各种类型的打点会上报到服务端。如果直接暴露原始打点给业务方,不同的业务使用方就需要访问各种不同的原始表从不同维度进行数据抽取,并在表之间进行关联计算。频繁对底层基础表进行数据抽取和关联操作会严重浪费计算资源,所以我们提前从基础表中抽取用户关心的维度,将多个打点合并在一起,构成一张或多张宽表,覆盖上面推荐相关的或数据分析相关的 80% ~ 90% 场景任务。
在实时数仓场景下还需实时中间表,我们的解决方案是,针对 topic A 到 topic K ,我们使用 Pulsar Flink SQL 将消费到的数据解析成相应的表。通常情况下,将多张表聚合成一张表的常用做法是使用 join,如把表 A 到 K 按照 uid 进行 join 操作,形成非常宽的宽表;但在 Flink SQL 中 join 多张宽表效率较低。所以 BIGO 使用 union 来替代 join,做成很宽的视图,以小时为单位返回视图,写入 ClickHouse,提供给下游的业务方实时查询。使用 union 来替代 join 加速表的聚合,能够把小时级别的中间表产出控制在分钟级别。
输出天表可能还需要 join 存放在 hive 上的表或其他存储介质上的离线表,即流表和离线表之间 join 的问题。如果直接 join,checkpoint 中需要存储的中间状态会比较大,所以我们在另外一个维度上做了优化。
左侧部分类似于小时表,每个 topic 使用 Pulsar Flink SQL 消费并转换成对应的表,表之间进行 union 操作,将 union 得到的表以天为单位输入到 HBase(此处引入 HBase 是为了做替代它的 join)。
右侧需要 join 离线数据,使用 Spark 聚合离线的 Hive 表(如表 a1、a2、a3),聚合后的数据会通过精心设计的 row-key 写入 HBase 中。数据聚合后状态如下:假设左边数据的 key 填了宽表的前 80 列,后面 Spark 任务算出的数据对应同样一个 key,填上宽表的后 20 列,在 HBase 中组成一张很大的宽表,把最终数据再次从 HBase 抽出,写入 ClickHouse,供上层用户查询,这就是 AB-test 的主体架构。
业务收益
从 2020 年 5 月上线至今,Pulsar 运行稳定,日均处理消息数百亿,字节入流量为 2~3 GB/s。Apache Pulsar 提供的高吞吐、低延迟、高可靠性等特性极大提高了 BIGO 消息处理能力,降低了消息队列运维成本,节约了近 50% 的硬件成本。目前,我们在几十台物理主机上部署了上百个 Pulsar broker 和 bookie 进程,采用 bookie 和 broker 在同一个节点的混部模式,已经把 ETL 从 Kafka 迁移到 Pulsar,并逐步将生产环境中消费 Kafka 集群的业务(比如 Flink、Flink SQL、ClickHouse 等)迁移到 Pulsar 上。随着更多业务的迁移,Pulsar 上的流量会持续上涨。
我们的 ETL 任务有一万多个 topic,每个 topic 平均有 3 个分区,使用 3 副本的存储策略。之前使用 Kafka,随着分区数增加,磁盘由顺序读写逐渐退化为随机读写,读写性能退化严重。Apache Pulsar 的存储分层设计能够轻松支持百万 topic,为我们的 ETL 场景提供了优雅支持。
未来展望
BIGO 在 Pulsar broker 负载均衡、broker cache 命中率优化、broker 相关监控、BookKeeper 读写性能优、BookKeeper 磁盘 IO 性能优化、Pulsar 与 Flink、Pulsar 与 Flink SQL 结合等方面做了大量工作,提升了 Pulsar 的稳定性和吞吐,也降低了 Flink 与 Pulsar 结合的门槛,为 Pulsar 的推广打下了坚实基础。
未来,我们会增加 Pulsar 在 BIGO 的场景应用,帮助社区进一步优化、完善 Pulsar 功能,具体如下:
作者简介:
陈航,Apache Pulsar Committer,BIGO 大数据消息平台团队负责人,负责创建与开发承载大规模服务与应用的集中发布-订阅消息平台。他将 Apache Pulsar 引入到 BIGO 消息平台,并打通上下游系统,如 Flink、ClickHouse 和其他实时推荐与分析系统。他目前主要负责 Pulsar 性能调优、新功能开发及 Pulsar 生态集成。