背景
随着伴鱼课程业务需求和用户量的快速增长,涉及到实时和延时任务的场景也越来越多。例如课程录制、课程视频转码、课程视频上传以及相关的课程视频分析、老师学生行为分析、语音识别、情绪识别等算法离线预测任务等。这些任务都需要大量的计算、存储、网络等资源,而且不同的场景对任务的执行时间,调度策略又有不同的要求。如果由业务方来各自管理机器资源并且监控每个任务的状态,累积的维护成本会非常高,而且不方便统一管理。从整体来看,在资源有限的情况下,简单的调度逻辑已无法同时满足全部的任务需求。我们需要一个能进行任务调度、任务编排、异构资源管理、任务监控的分布式任务调度解决方案。但是如何在合理高效管理任务的前提下去做到节约资源成本,就成了我们需要面对的一个问题。
以录制任务为例,为了满足高峰期的需求,有二十多台 64Core 128G Memory 的物理机全天运转提供服务。然而实际上大部分时间机器资源都是闲置的,只有在用户上课时才会有任务执行,但为了用户体验和实时类课程录制,又不能临时减少机器数量。在类似业务的背景下,我们对系统功能要求进行了整理,并对业界开源项目和第三方产品进行了调研。
功能要求
产品调研
业界内关于任务调度的开源项目和第三方产品有很多,我们主要调研了其中几个产品,并进行了几项指标的对比。
基于我们自身的需求背景及产品调研结果,主要考虑到如下原因:
因此,我们选择自研一套能够完全满足内部需求的分布式任务调度系统,并取名为 Jarvis (钢铁侠中的智能管家)。在资源上,Jarvis 系统可以接管业务方指定的物理机,云主机,K8S 集群, ECI, EKS 等资源,不同类型的任务可以做到资源隔离和动态管理。除此之外,Jarvis 系统还借助了 ECI 和 EKS 这些弹性容器服务的能力,在物理资源不足时,可以将容器任务调度到上面进行执行。(ECI, EKS:可以理解为一个按使用量计费的,无限容量的 K8S 集群)。
架构设计
Jarvis 系统主要有四大模块:JobManager、Scheduler、ResourceManager、Worker,每个模块都以集群方式部署。
模块介绍
JobManager
负责管理任务的生命周期,维护任务的依赖关系(DAG 编排),支持定时任务,实时任务的创建和管理,监控任务状态,管理任务的生命周期,维护任务状态机。Job Manager 负责监控任务的运行状态、管理任务的生命周期,处理实时/定时/延时任务,另外 Job Manager 还负责监控超时任务,对任务查杀和强行释放资源。
负责对任务进行调度,通过给 ResourceManager 发送任务进行资源绑定,并将分配到资源的任务 dispatch 到指定位置。Jarvis 调度系统的大脑,它从 Job Manager 中获取需要执行的任务,根据任务的类型、等待时间、优先级等信息,按照多种调度算法,对任务进行调度并将任务分发给合理的 Worker 来执行任务。
ResourceManager
负责管理业务方所有可用资源,包括但不限于物理机,云主机,K8S 集群等,并将 Scheduler 推送过来的任务绑定最合适的资源。作为 Jarvis 调度系统的资源管理中心,它还负责将物理机、K8S 集群等资源注册到缓存和数据库,将这些资源统一管理,并监控资源的负载情况和资源使用信息。除此之外,ResourceManager 集成了资源的打分,分配,调度方案,可作为插拔式插件进行更新。
该模块部署在宿主机上,可以使用容器或二进制形式部署,负责向 ResourceManager 上报机器资源使用情况、健康状态、心跳检查等,并向 Job Manager 上报任务执行状态。最终都会通过 Worker 执行作业与任务。Jarvis 调度系统中的任务执行和分发者,接收并执行由 Scheduler 分发的任务、接收并汇报任务的运行结果。实时向 ResourceManager 回报资源的使用情况、健康状态、心跳等信息,确保物理机资源能够被 Jarvis 管理。
模块细节
JobManager 模块
Job Manager 并不是严格的去中心化设计,而是通过 Etcd 分布式锁选举出 Master 节点,Master 节点相比其它 Slave 节点多了一些全局的监控工作,但不会直接与其它节点存在关联。
任务模型
在 JobManager 中实现了 3 种任务模型:
支持的任务类型
目前我们支持以下任务类型:
对于脚本类的任务,需要提供具体的脚本内容。对于容器类型的任务,需要提供任务镜像,启动参数,环境变量等。因为容器可以方便地限制 CPU、Memory 等资源的使用,而且在 ECI 的助力下,很少会出现资源不足的情况。
Jarvis 系统接入的第一个任务是直播中台的课程录制业务,业务方将原有服务中的录制逻辑抽离出来进行了容器化,在 Jarvis 中以容器形式运行。在接入录制任务的过程中,业务方提出了一些新的需求,比如客户端需要切换录制 SDK,上游服务可调用 Jarvis 的指令发送接口给任务发送切换 SDK 指令,但是 Docker 本身是不支持的,我们最终通过 Docker Exec API + IPC 打通了物理机以及 ECI 上容器任务的通信。另外 Jarvis 还支持自定义 Processor(可以理解为任务插件),可以直接在机器上执行特定的任务。
通过 Job Manager 创建任务的时候,可以设置限制资源(CPU、Memory、GPU 等)的参数,对于容器任务,容器底层自身可以做到严格的资源限制,对于脚本类和自定义 Processor 任务,我们会使用 Linux Cgroup、Namespace 技术来实现资源隔离和限制。
核心接口
任务状态
任一时刻,Job 只会处于以下一种状态
Job 的状态变化主要来自 Scheduler 和 Worker 的上报以及 Job Manager 的监控,同一个 Job 多个状态的上报存在并发问题,可能会造成缓存与数据库的不一致。为此我们实现了支持自动续期的 Redis 分布式锁来保证状态变化的原子性。
任务执行流程
JobManager 收到客户端提交的请求后,通过分布式 ID 生成器生成 JobId ,将其放入 Redis Set 中,并将 Job 信息持久化到 DB。创建 Job 的参数如下:
type CreateJobReq struct {
TeamIdstring//业务方标识
AppNamestring//业务名称
Namestring//任务名称
Descriptionstring//任务描述
Creatorstring//创建者
Timezonestring//时区
Retriesint32//重试次数
RetryIntervalint32//重试间隔
Priorityint32//优先级
Concurrencybool//是否并发执行
Executorstring//http,docker、eci...
ExecutorModestring//执行模式
ExecutorConfig map[string]string //任务参数及配置
Cpufloat64//cpu需求
Memoryint32//memory需求
Gpuint32//gpu需求
Timeoutint32//超时时间
CallbackUrlstring//回调URL
复制代码
此时 Job 为 Schedulable 状态,等待被 Scheduler 调度。Scheduler 会定时批量从 Job Manager 维护的 Redis Set 中拉取任务。Job 被拉取后,状态变更为 Scheduling ,然后由 Scheduler 根据 Job 指定的调度策略向 Resource Manager 申请资源,成功申请到资源后,Scheduler 将 Job 指派到对应资源上的 Worker,此时 Job 状态变更为 Pending, 在 Worker 启动任务成功后,任务状态变更为 Running。
定时任务的实现
常见用来实现定时任务的 DelayQueue 和 Cron,底层都是最小堆,单次插入删除的平均时间复杂度是 O(log n), 如果堆的大小已经达到 100w,那么每次插入都需要将近 20 次操作(2^20 = 1048576)。Jarvis 的设计目标是能同时维护百万级的定时任务,在这种情况下,用常见方式去执行创建任务、停止任务等操作将会非常耗时,为此我们需要一种更高级的数据结构:时间轮,而其可以达到近乎 O(1) 的时间复杂度。在海量任务场景下(百万级别),每次插入新的任务,时间轮要比最小堆少 19 次操作。参考 Kafka 中时间轮算法的实现,我们基于 Golang 实现了高性能的层级时间轮并应用到 Job Manager 中。
最简单的时间轮就是一个固定大小的循环列表,其中每格代表一个时间间隔,包含一个双向链表用来维护某一时刻下的任务列表。很明显,这种单层时间轮无法表示较大的时间跨度,且在初始化后无法管理超过跨度的定时任务。层级时间轮通过按需创建多个时间轮,并对每层时间轮设置不同的时间跨度,有效地解决了单层时间轮的缺点。当定时任务超过层级时间轮当前最大时间跨度后,会创建 N 倍与当前跨度的高层时间轮,其中的 N 是上述提到的循环列表格数。随着时间的流逝,高层的时间轮中的任务会被逐步降级插入到下层时间轮中,直到达到最底层时间轮的当前时刻指针,任务被取出,移出时间轮。
对于单个时间轮来说,目前可以达到 ms 级的精度,在 tick = 1ms,时间轮的时间格个数 timewheelsize = 60 时,第一层时间轮的跨度为 60ms,第二层时间轮的跨度为 6060ms = 3.6s,第三层时间轮的跨度为 603.6s = 216s … 第七层时间轮的时间跨度为 88.7 year,仅需七层就足以满足业务上的需求。
在实践过程中,我们也发现了 Jarvis 直接使用时间轮算法的一些问题,例如没有做备份,当服务器宕机时会丢失所有任务。而每个 JobManager 节点中都运行着一个时间轮,我们需要保证这个“分布式时间轮”在服务重启或宕机时,任务能被及时地分发到其它节点。为了解决这个问题,我们在以下的任务监控场景引入了 Job Bucket 的新概念。
任务监控
Job Manager 集群管理着 Jarvis 系统中的所有 Job,为了便于每个节点都均衡地参与到 Job 的监控和管理工作中,上游负责负载均衡,但 Job Manager 还需要先对 Job 进行分配。
对此,我们引入了 Bucket 的概念,类似 Redis Cluster 中的哈希槽。创建 Job 时,会根据 CRC32(JobId) mod BucketCount ,计算出 Job 所映射的 Bucket。Job Manager 节点在启动时会去抢占 Bucket,抢占成功后才会提供监控等服务,否则就会一直尝试抢占。例如,现在有 10 个 bucket,部署了 12 个 Job Manager 节点,这样会有 10 个节点抢到了 Bucket,另外 2 个节点只会提供接口服务,并不实际维护任务。
抢到 Bucket 的节点会监控 Bucket 下绑定的所有 Job,当发现有 Job 在某个状态超时后,会主动执行 kill、释放资源、重新调度等操作。节点需要通过心跳监控对 bucket 续期,当某个节点出现故障时,该 Bucket 会被立即释放,之前未抢到的节点会及时接管该 Bucket。Bucket 的数量是可动态配置的,一般我们会设置节点数为 N。N 代表备用的监控节点,可以自己把控节点数量。除了监控 Job 的状态,Job Manager 中的 Master 节点还会监控 Bucket 的数量,当宕机的节点数超过 N 时,意味着出现了 Bucket 无节点接管的情况。
如果节点宕机,新的节点接管其 Bucket 后,会将该 Bucket 下的定时任务重新加入到自己的时间轮中,这样就保证了定时任务在节点宕机重启时也不会丢失。
DAG 任务编排
DAG 的每个子任务本质都是一个实时任务。在实现上,我们用有向无环 (DAG) 维护了任务间的依赖关系,当子任务执行结束时,通知 DAG 执行一次检查,如果已无可执行的子任务,则 DAG 执行结束。
任务失败策略
一般任务执行失败有以下几种情况:
接收到任务失败上报后,Job Manager 会根据创建时指定的 “失败重试次数” 参数,尝试重新调度任务,当重试次数用完后,Job 会被标记为最终态 Failed。
任务结果回调
Jarvis 系统并不关心任务具体业务逻辑的对错,我们只保证任务成功在资源上运行。如果业务方需要拿到任务执行完的结果,可以在业务逻辑中任务结束前调自己的接口。我们也提供了回调机制:在创建 Job 时可以指定 CallBack URL,任务执行结束后,Jarvis 会将任务的输出结果进行回调。
保证某些场景下的任务幂等性
在系统的基本逻辑基础上,任务 (Job) 本身是具有幂等性的,因为任务 (Job) 可以抽象成一个请求,但是因为存在重试机制和补偿机制的缘故,为了避免在这些机制下产生任务 (Job) 被重复执行(即需要保证一个任务只会被一台机器执行一次),Jarvis 在基于 Redis Check + 分布式 ID + CallBack 机制下去保证这些场景下的「任务幂等性」。
Scheduler 模块
Scheduler 的核心功能是对任务进行调度,负责任务在创建后的“绑定资源 -> 指派任务”的过程。
调度策略
Jarvis 使用的调度算法主要有:
最简单的一个调度算法:先来先服务。维护非抢占式的任务
多优先级级队列调度算法。「多级」表示有多个队列,每个队列优先级从高到低。维护抢占式任务。当抢占式任务因资源不足无法执行时,会对低优先级的任务进行抢占,被抢占的任务会走正常的重试逻辑,直到重试次数用完。
除了以上两种调度算法外,我们还基于 ECI 实现了一个对业务场景很实用的 Feature:容忍等待时间。
考虑如下场景:有一些任务的实时性要求不高,在 24 小时内执行完就可以,而我们只有有限的物理机资源。当瞬时创建大量这种任务时,就算全部资源满负荷运转,同时能跑的任务数量也很有限。Scheduler 会将这些任务积压在队列中,在这 24 小时里充分利用资源,如果达到任务要求的可容忍等待时间后,资源仍然不足,Scheduler 就会直接将其调度到 ECI 上执行。
与”分布式时间轮”问题类似,Scheduler 集群各个节点的会维护的队列,也没有备份。但是 Scheduler 中没有 Bucket 的概念,它是完全无状态去中心化的。为了解决这个问题,我们在监听到 Scheduler 退出信号后,会把队列中未处理的 Jobs 信息进行回调,再通过 Job Manager 转发到其它节点。
调度流程
ResourceManager 模块
调度系统现状
Google 的研究工作 1 表明,调度系统经历了从单层调度系统到双层调度系统再到共享状态调度系统的演变过程:
ResourceManager 处理流程
Jarvis 属于共享状态调度系统其资源管理模块 ResourceManager 负责系统中资源的统一管理和分配,它接受来自 Worker 的资源信息汇报,并把集群中资源按照一定的策略分配给各个任务。ResourceManager 是一个资源管理模块,并不参与任务的具体执行(启动、杀死、重启等),其主要工作包括:Allocate、Report、Release。
ResourceManager 状态机
|Report Exist|
+AllocateReport SuccessvExpire
Initial +--------> Assumed +------------+---> Used +--------> Expired
|||| Release
+----------------++---------> Deleted <---+
ExpireReport Fail
复制代码
一次资源分配处理
ResourceManager 为 Job 分配资源的处理步骤包括:资源数据同步、资源分配、提交资源分配结果
资源数据同步
ResourceManger 模块在接收到任务后会先读取机器最新资源数据,用于资源分配的决策。目前获取资源数据采用的是全量获取的方式,该方式在机器量较少的情况可以适用,但是随着机器资源的不断增多资源同步会因耗时较长而降低系统的调度性能。新版 ResourceManager 已准备采用增量更新的方式来进行资源数据同步,每个 ResourceManger 本地缓存全部资源数据,后续根据时间戳来同步需要更新的数据,该方式在机器资源规模较大时可以保证资源同步的效率从而提高系统的调度效率。
资源分配
资源分配阶段 ResourceManager 会根据同步到的资源数据为任务分配合适的资源,主要包括:筛选、排序、优选 3 个步骤。
finalScoreHostX = (weight1 * priorityStrategy1) + (weight2 * priorityStrategy2)
伴鱼的许多业务都具有较为明显的业务高峰时段,例如教室上课、教学音视频录制、音视频转码、实时性数据分析等。接入这些业务后势必会导致调度系统在某个时间段内调度并发数飙升,为保证调度系统较高的吞吐率 ResourceManager 会一次接收多个 Scheduler 投递的任务,并灵活的将同类型 Job 进行资源合并后再进行资源分配。例如:JobA 和 JobB 都需要 1Core 1G ,则先合并为一个 2 Core 2G 的 Job 再进行资源分配。
此外 Jarvis 是一个支持异构资源的调度系统,目前 ResourceManager 管理的机器资源包括物理机集群、K8S 集群、ECI、EKS 等。资源分配时会优先分配公司机器资源,公司机器资源不足时分配 ECI 等资源以保证可弹性扩容。
提交资源分配结果
通过上一步资源分配处理已经确定任务要被分配到哪台机器,ResourceManager 依靠事务来提交资源分配结果以保证资源的一致性:在选定的机器上模拟扣减资源,并再次检测任务类型的满足性、亲和性等。若所有条件满足则进行数据更新,不满足则发生资源分配冲突。为了减少资源分配冲突后二次调度带来的开销,在资源分配冲突时直接从备选机器中选择一台机器进行分配处理。提交事务进行资源扣减,若成功则调度成功,若失败则回滚。
缓存与数据库的数据同步方案:
其中修改机器资源的 lua 脚本片段如下:
local hostData = redis.call("HGET", hostDataKey, hostName)
if hostData == nil or type(hostData) ~= "string" then
return redisNil
local jsonHostData = cjson.decode(hostData)
local hostCpu = jsonHostData["hostcpu"]
local hostMem = jsonHostData["hostmem"]
local allocatedCpu = jsonHostData["allocatedcpu"]
local allocatedMem = jsonHostData["allocatedmem"]
if allocatedCpu + jobCpu > hostCpu*maxPercent then
return resourceInsufficient
if allocatedMem + jobMem > hostMem*maxPercent then
return resourceInsufficient
jsonHostData["allocatedcpu"] = allocatedCpu + jobCpu
jsonHostData["allocatedmem"] = allocatedMem + jobMem
jsonHostData["utime"] = tonumber(uTime)
local strHostData = cjson.encode(jsonHostData)
local b = redis.call("HSET", hostDataKey, hostName, strHostData)
if b ~= 0 then
local d = redis.call("HDEL", hostDataKey, hostName)
return redisFail
return success
复制代码
Worker 是部署在资源节点上的代理,其核心功能是资源负载上报和管理宿主机执行具体任务。Worker 启动后会将宿主机注册到 Resource Manager 中,并定时上报 CPU、Memory 等信息。Scheduler 会将绑定资源成功的任务 dispatch 到指定的 Worker,Worker 会在宿主机上启动执行对应的脚本或容器。任务结束后,Worker 需要将结果上报给 Job Manager。
使用场景
Jarvis 可以帮助业务方管理不同类型的资源,将任务与业务逻辑解耦,通过我们提供的接口就可以快速创建各种类型的任务。目前直播中台已有多个算法离线分析预测任务正式接入 Jarvis 系统,文章开始提到的录制任务也已经跑通了接入流程,正在逐步把线上流量慢慢迁移到 Jarvis 中。
展望
参考资料
作者:闫云龙、宋园园
原文:原文:伴鱼分布式调度系统 Jarvis 的设计与实现