金山云创立于 2012 年,是中国前三的互联网云服务商,2020 年 5 月在美国纳斯达克上市,业务范围遍及全球多个国家和地区。成立 8 年以来,金山云始终坚持以客户为中心的服务理念,提供安全、可靠、稳定、高品质的云计算服务。
金山云日志服务是针对日志类数据处理的一站式服务系统,提供从日志采集、日志存储到日志检索分析、实时消费、日志投递等多项服务,支撑着多个业务线的日志查询和监控业务,提升金山云各个产品线的运维、运营效率,目前每天数据量级在 200 TB。
作为针对日志类数据处理的一站式服务系统,金山云日志服务需要具备以下特性:
为什么选择 Pulsar
在调研过程中,我们从基础功能和可靠性两方面对比了 RocketMQ、Kafka 和 Pulsar,并总结了三者的优缺点(对比结果见下表)。
我们发现 Pulsar 非常适合应用于日志流处理。从 BookKeeper 层面来讲,Pulsar 就是日志流存储的组件。Pulsar 采用云原生架构,日志流服务同样采用云原生、无服务模式,所有服务都在云上实现。Pulsar 拥有诸多灵活的企业级特性,比如支持多租户、支持租户存储配额、数据 ETL、整体数据负载均衡策略等;支持传输大量数据;针对消息队列的监控比较完善等。下面我来详细介绍下我们选择 Pulsar 的一些特性。
计算与存储分离
Pulsar 的 producer 和 consumer 都与 broker 相连接,broker 作为无状态服务,可以横向扩缩容,扩缩容时不会影响数据的整体生产和消费;broker 不存储数据,数据存储在 broker 的下一层(即 bookie )中,实现了计算与存储的分离。
弹性水平扩缩容
对于云端产品而言,Pulsar 无需重平衡即可实现 broker 扩缩容。相比之下,Kafka 扩缩容前需要先进行重平衡操作,可能会导致集群负载较高,也会对整体服务产生影响。
其次,Pulsar topic 分区也可以实现无限扩容,扩容之后,通过负载均衡策略自动平衡整个分片和流量。
Pulsar 多租户
Pulsar 原生支持多租户。在日志服务中也有租户的概念,每一条产品线(即每一个项目)属于一个租户,实现了产品线之间的数据隔离。Pulsar 集群支持数百万个 topic(在雅虎已有实践),整个 topic 也通过租户实现了隔离,在租户级别,Pulsar 实现了存储配额、消息过期删除、隔离策略等优秀特性,且支持单独的认证和授权机制。
负载均衡策略
Pulsar 在命名空间级别有 bundle 的概念,如果当前 broker 负载较高,bundle 会通过管理 topic 分区策略进行 bundle split 操作,自动将子分区均衡到其他负载较低的 broker 上。在创建 topic 时,Pulsar 也会自动把 topic 优先分配到当前负载较低的 broker 上。
Pulsar IO 模型
写入操作中,broker 并发向 BookKeeper 写入数据;当 bookie 向 broker 反馈数据写入成功时,在 broker 层面,内部只维护一个队列。如果当前的消费模式是实时消费,则可以直接从 broker 获取数据,无需经过 bookie 查询,从而提升消息吞吐量。在追赶读场景中,查询历史数据才需要查询 bookie;追赶读还支持数据卸载功能,即将数据卸载到其他存储介质中(比如对象存储或 HDFS ),实现冷存历史数据。
Topic 创建、生产与消费
在控制台创建 topic 后,将 topic 信息和租户信息记录到 etcd 和 MySQL 中,然后图示右侧的两类服务会监听 etcd,一类是 producer 类服务,监听创建或删除 topic 后的内部操作。另一类是 consumer 类服务,当监听到创建新 topic 操作后,对应的服务会连接到 Pulsar topic,实时消费 topic 上的数据。然后 producer 开始接收数据,并判断应该向哪个 topic 写入数据,consumer 消费数据并在判断后写入,或转存再写入到其他 ES 或其他存储中。
图 1. Topic 创建、生产、消费流程
Topic 逻辑抽象
Pulsar 中有三个级别:topic、命名空间和租户。因为 Pulsar 目前不支持命名空间级别的正则消费模式,所以我们需要把整体概念往上提一层,减少后台 Flink 的任务量,实现整个项目级别的消费。也就是说,在日志服务中,topic 对应 Pulsar 逻辑上的分片,命名空间对应 Pulsar 逻辑上的 topic。通过这一改动,我们实现了两个功能,一是动态增加和减少分片数量,二是在后台启动的 Flink 任务可以消费单个项目级别的数据。
图 2. Pulsar topic 逻辑抽象图
消息订阅模型
Pulsar 提供四种消息订阅模型:
Broker 故障恢复
由于 broker 无状态,所以某一个 broker 宕机对整体的生产和消费没有任何影响,同时会有一个新 broker 担任 owner 角色,从 ZooKeeper 中获取 topic 元数据,并自动演进到新 owner 中,数据的存储层也不会发生变化。此外,无需拷贝 topic 内的数据,避免数据冗余。
Bookie 故障恢复
Bookie 层使用分片存储信息。由于 bookie 本身有多副本机制,当某个 bookie 出现故障时,系统会从其他 bookie 读取对应分片的信息,并进行重平衡,因此整个 bookie 的写入不会受到影响,保证整个 topic 的可用性。
Pulsar 在日志服务中的应用
日志服务系统的最底层是数据采集工具,我们基于开源的数据采集工具(如 Logstash、Flume、Beats)进行了定制化开发。数据存储中日志池是一个逻辑概念,对应于 Pulsar 中的 topic。日志服务系统的上层为查询分析和业务应用,查询分析指在日志服务的工作台进行检索分析,或通过 SQL 语法进行查询;业务应用指在控制台定制仪表盘和图表,实现实时告警等。查询分析和业务应用都支持数据转存,即把日志数据转存到存储介质或价格较低的存储设备中,如基于 KS3 的对象存储、ElasticSearch 集群或 Kafka。日志服务的产品功能概况如下图。
图 3. 日志服务的产品功能概况图
日志服务架构设计
我们根据日志服务的产品功能设计了日志服务的分层架构(如下图)。最下层为数据采集端,负责采集日志类文本数据、TP/TCP 协议数据、MySQL 中的日志数据等,我们自研的采集端开发工作仍在进行中。采集到的数据通过日志服务数据入口发送到对应的 Pulsar topic 中。我们将 Pulsar 集群应用于三大板块,一是通过 Flink-on-Pulsar 框架实现多维统计分析场景,因为有些业务线需要通过日志数据做多维度的聚合统计,产生指标结果类数据,再转存给业务线。二是将 Pulsar 集群应用于 LogHub(微服务化服务),主要消费 Pulsar topic 的数据,将数据直接写入到 ES,通过控制台即可查询整个日志流的数据,也可以做检索分析。三是在控制台上使用 Pulsar Functions 设置一些算子或 ETL 逻辑,后台通过 Pulsar Functions 模块做数据 ETL。我们采用 EalsticSearch 集群存储数据检索分析结果,KS3、KMR、KES 对应于我们内部的一些云端产品线,用于存储和计算。上层的数据输出部分可以分为两大模块,一是 Search API 模块,负责对外提供 API,通过调用 API 在控制台进行一些和日志紧密耦合的动作;二是 Control API 模块,支持在控制台进行管理类操作,比如创建 topic、调整 topic 的分区数量、检索告警等。
图 4. 日志服务的分层架构设计图
日志服务的通信设计
从日志服务的产品架构来讲,整个服务采用无状态运行模式,所以各类服务(特别是 producer 和 consumer 服务)通过 etcd 方式实现数据共享。也就是说,在控制台执行创建、更新、删除操作后,producer 和 consumer 就会感知到这些动作,从而进行相应的变化。此外,由于 producer 和 consumer 完全在容器中运行,且服务本身无状态,因此可以进行动态扩缩容。日志服务的通信设计图如下。
图 5. 日志服务的通信设计图
日志流处理架构
根据日志流处理的需求,我们设计了如下架构图。左边为数据采集端,采集端把数据发送给数据接收端(PutDatas),接收端再把数据发送给对应的 Pulsar topic。我们将 Pulsar 主要应用于三个场景。
图 6. 日志流处理架构图
未来规划
在 Pulsar 的支撑下,金山云日志服务一直运行良好。我们期待日志服务可以支持更多功能,实现更多需求。在日志服务方面,我们的规划如下:
目前,日志服务支撑金山云内部产品线约 15 条(如下图),单条线上数据传输大概为 200 TB/天,topic 数量已超过 3 万个。当 AI 业务接入 Pulsar 以后,整体数据量和 topic 数量都会有大幅度提升。
图 7. 使用日志服务的产品线
在测试和使用过程中,我们对 Pulsar 有了更全面的了解,期待 Pulsar 可以支持更多特性,比如:
结语
作为下一代云原生分布式消息流平台,Apache Pulsar 有其独特的优势,非常适合我们的日志流处理场景。Pulsar 社区非常活跃,有问必答。我们在前期调研、后续测试和正式上线过程中,StreamNative 的小伙伴们给予了极大支持,帮助我们快速推进业务上线。
目前,金山云日志服务中有 3 万多个 Pulsar topic,每天处理约 200 TB 数据,支撑 15 条产品线。自上线以来,Pulsar 运行状态稳定,大大节省了我们的开发和运维成本。我们期待尽快实现 Pulsar 集群的容器化部署,也期待 Pulsar 可以去除对 ZooKeeper 的依赖,支持自动扩缩容分区。我们愿意和 Pulsar 社区的小伙伴们一起开发新功能,进一步加快 Pulsar 的发展。
作者简介
刘彬,金山云大数据高级开发工程师。