基于 Ray 打造高可用、易扩展的在线应用

Ant Ray Serving 是蚂蚁集团开发的基于 Ray 的在线服务框架,为用户提供 Serverless 的平台,将Java/Python 代码发布为在线服务。平台为用户提供部署、伸缩、流量路由、监控等基础能力,让用户可以专注于自己的业务逻辑和利用 Ray 的分布式能力开发在线应用。

Ray 提供了简洁易用的分布式API,灵活的调度等能力,使 Ray Serving 和它的用户在此基础上能够很快开发一个分布式应用,并且提供高可靠的服务。截止目前,在蚂蚁和阿里内部支持了5种不同的使用场景,Ant Ray Serving 达到了6万核,5千个节点的规模,并且参与了多年的双十一、双十二、新春五福等大促场景。

本文我们会介绍蚂蚁的 Ray 服务团队如何设计这个框架,做了哪些工作使它高可用又容易扩展,以及和 Ray Serve 的关系。

第一个在线的分布式计算场景

首先介绍一下我们遇到的第一个问题,在18年左右,蚂蚁支付部门的同事碰到了一些计算上的问题。他们开发了一个系统来计算支付机构的指标和策略,这些指标和策略需要以在线服务的形式提供给支付链路用于银行卡支付时的交易链路选择。当接入的支付机构和银行越来越多时,如何用这些日益增长的数据来选择一个最优的策略成为一个技术问题,他们发现单台服务器的性能已经不能及时的完成所有的计算任务。策略逻辑本身逻辑和依赖比较复杂,做分布式改造时,面临的重构成本也非常大。

以下是一个比较典型的策略简化后的伪代码,通常计算分为三层,每一层都有一个 for loop 来将计算任务拆分为更小的粒度,最后汇总拆分后的任务,再做一些结果的加工。

public class Strategy {

  public void calc(long time, Map banksAndIndicator){
    for(Entry e : banksAndIndicator.entrySet()){
       String bank = e.getKey();
       for(String indicator : e.getValues()){
         calcIndicator(time, bank, indicator);
       }
    }
    // do all banks & indicators' data calculation
  }

  public void calcIndicator(long time,
                           String bank,
                           List indicators){
    for(String indicator : indicators){
      calcBankIndicators(time, bank, indicator);
    }
    // do indicators' data calculation
  }

  public void calcBankIndicators(long time,
                                    String bank,
                                    String
                                    indicator){
    // do bank data calculation
  }
}

我们希望能帮助支付部门的同事,设计一个新的系统,能够以比较低的成本,将这些计算任务分布式化,解决单机瓶颈。当我们发现 Ray 的时候,我们认为 Ray 是一个非常适合解决这个问题的计算框架。

为什么用 Ray

借助 Ray,计算任务可以比较轻松从原来的单机串行执行扩展为分布式异步执行,使用 Ray 的 Remote API 将任务提交到计算集群,动态的拆分成更细粒度的计算任务,并行的执行和异步收集计算结果。得益于 Ray 非常简单易用的 API,使用户只需要做一些相当简单的改动就可以对他的代码进行分布式的重构。


Distributed-Task-Process-With-Ray
在 Ray 上去打造这样的一个在线的分布式系统,还有不少其他工作需要做。首先在每个用户启动的 Ray Job 中,我们会启动一个 AppMaster Actor。他会负责所有控制面的工作,比如创建和管理所有计算的 Actor,保存一些作业内部元数据,和所有 Actor Handler 等等。

虽然这个场景的计算任务本身没有什么状态,但因为需要提供计算服务给到上游系统,我们所有的 任务都采用常驻的 Actor 来进行计算,以此来节省进程和依赖初始化的时间。计算的 Actor 分为两类,分别为 Trigger Actor 和 Worker Actor,前者负责在 Actor 内启动一个 RPC 服务,接受外部的计算任务请求,后者负责承担分布式的计算任务。

当用户初始的 Trigger Actor 和 Worker Actor 都启动好以后,Trigger 的 RPC 服务对外接受请求。当计算任务触达 Trigger 后,Trigger 会进行第一层的计算任务拆分,并且交给一个 Dispatcher 角色挑选下一层的合适的 Worker Actor,通过 Ray call 来派发计算任务。下一层 Worker Actor 的工作机制可以以此类推。

最终我们设计的系统如下图所示。


New-Design-Of-The-Distributed-System
这个基于 Ray 的在线分布式计算系统将一个任务的计算耗时降低到了200ms以下的级别。加上在 Ray 上做了其他一些工作,支付部门的这个监控决策场景从以前的分钟级时延降低到秒级,取得非常不错的效果。

Ray 的其他在线场景

以上是我们在 Ray 上碰到的第一个在线计算问题,接下来我们不断的接触到其他更多的相似的在线场景。

首先是模型服务,蚂蚁集团的在线学习系统是基于 Ray 搭建的。训练模块会持续的迭代导出模型,一般每隔10-30分钟就会导出一次,我们需要尽快的将模型部署为服务。并且除了在线学习以外,模型服务还有异构、多模联合推理等诉求。这些通过 Ray 的调度和 Remote call 都可以设计出不错的端到端方案。


Inference-With-Ray-Serving
另外还有我们从20 年开始探索的 FaaS 场景:用一个类似 Knative Eventing 的系统将消息消费转换为 RPC 调用,触发 Ray Actor 内的用户 Function 进行数据处理,并利用 Ray 的扩展能力来实现 Function 的 serverless。未来我们可能还会在Ray 探索一些分布式 Function 的能力。

Faas-With-Ray-Serving

通用的在线框架设计目标

从以上这些在线的场景里面,我们可以看到一个通用的服务框架是必要的。而在 Ray 上打造这样一个系统,有着以下几个挑战:

一、实现3个9的高可用性:众所周知,Ray 是一个有状态的分布式系统,本身的运维就比无状态系统更加复杂。同时 Ray 的 Head 节点仍然是一个单点,还没有对 Ray 作业无感升级的能力。

二、实现高效的更新能力:当用户的模型和代码发生变化时,服务实例需要能尽量快的将新的模型和代码更新到线上,同时不打破对用户承诺的 SLA,比如10分钟内更新500个实例。

三、易于扩展,能迅速适应流量的变化(自动扩缩容)。

整体架构

先看一下蚂蚁内的 Ray Serving 的整体架构,使我们对角色和概念有一个整体的认知。


Ray-Serving-Architecture

  • Serving Client:该 SDK 封装了针对部署/更新这类控制面需求,以及服务调用的数据面需求的 API,并且提供了用户实现自己的计算逻辑的关键接口。
  • AppMaster:如第一章节所介绍的那样,AppMaster 是一个用户 Serving 作业的中心管控;它负责作业内的所有控制面工作,比如创建和管理所有计算的 Actor,保存一些作业内部元数据,和所有 Actor Handler 等等。
  • Backend:负责执行用户实现的计算逻辑。
  • Proxy:负责发布服务并挂载到服务发现的模块,负责将外部请求路由到对应的 Backend 处理。
  • Serving Keeper:负责将用户服务的多集群编排、流量控制、协调等工作。

部分关键设计

跨集群服务

在高可用这一块,比较自然的,我们首先采用了多集群部署,多活容灾的方案。为了增加多集群支持的同时,对用户不增加太多复杂度,我们引入了 Serving Keeper 这个角色。

Serving Keeper 的定位是支持跨 Ray 集群的服务发布,来支持集群容灾和集群更新时仍然保持高可用的在线服务 SLA。它可以作为一个外部服务运行在 K8S 集群上,也可以作为一个元 Serving 作业运行在 Ray 集群中。Serving Keeper 将状态全部保存在外部存储(如 MySQL)中,因此本身可以通过多实例部署来保证高可用。

在 Serving 作业提交时,只需要增加一个多集群部署的配置,Serving Client 就会将提交请求路由给 Serving Keeper,由它根据配置自动选择多个 Ray 集群来启动 Serving 作业。当每个独立的 Serving 作业启动后,一个 AppMaster Actor 会被创建出来,发布 RPC 服务并将自己注册到 Serving Keeper 来接受控制指令。后续所有针对这个跨集群作业的控制指令,都将由 Serving Keeper 分发到相应的 Serving AppMaster,以及协调各作业完成如滚动升级等复杂的工作流程。

为了支持用户的服务可以做跨集群部署,我们同样也需要支持跨集群的服务发现。在当前的架构中,Proxy 将在发布服务后,主动将自己的服务地址注册到多集群共享的服务发现组件。

目前我们是用了蚂蚁和阿里巴巴内部开发的跨集群服务组件,它提供两级的服务发现能力,第二级通常需要承担2-4个Ray集群的服务发现。用户应用在客户端可以查询到所有的可调用地址,并优先选择网络拓扑离自己更近的地址发起 RPC 调用。


Cross-Cluster Service Discovery
目前 Serving Keeper 支持了如下功能:

  • 多集群作业的自动编排(单元化部署架构)

  • 代理多集群的控制指令(扩缩容、更新)

  • 根据集群、容量事件协调集群间流量比例和容量

  • 支持多集群的蓝绿发布、金丝雀发布

状态持久化

在 Serving 作业中,每个 AppMaster Actor 都要负责所在作业的管理,它的内部保存不少当前作业的状态,比如服务元数据, Proxy/Backend Actor Handler, 副本数和状态等等。当作业所在的 Ray 集群发生宕机或者重启时,这些元数据都会从 AppMaster 内部状态中丢失。我们希望当 Serving 作业重新在集群中被启动时,这些数据以及整个作业的状态都能够被自动恢复。

因此我们在 AppMaster 中设计了一个状态管理模块,以上提到的数据都会作为状态被这个模块接管。当状态中某些数据发生变化时,会立即被状态管理模块持久化到外部存储。

如果 AppMaster 只是自己发生了 failover,他只需要读取外部存储恢复内存中的状态。然而如果是 Ray 作业被重启,那么类似 Actor Handler 这样的信息将不再有效,可以直接重置。AppMaster 会进入恢复状态,根据配置重建 Proxy/Backend,并在重建完成后恢复服务状态。

持续更新

在蚂蚁目前的 Ray Serving 集群中,每天都会触发上千次的更新操作,大部分更新都会在几分钟到十几分钟内完成,并且几乎对 SLA 没有影响。在我们的实践中,发现有几个比较重要点能很大的提高更新速度和稳定性:

  1. 更新时选择合适的更新步长,在更新前首先摘除流量;
  2. 尽量使用原地更新,避免需要创建新的 Actor。

扩展性

Ray 本身在扩展性方面就非常优秀。当跑在云上,K8s 或者 Yarn 上,开源的版本都可以基于 Autoscaler 来自动伸缩。在蚂蚁内部,有一个内部的自动扩缩容组件来完成集群的 autoscale。

然而目前我们的服务的扩缩容更多是通过提供API,交由用户来完成的。根据流量和负载来完成服务的自动扩缩容,实际上是一件非常有挑战的工作,其中主要的难点包括如何保证策略足够准确,以及扩容足够快速。

Ant Ray Serving 目前完成了自动扩缩容系统链路的开发,但是自动扩缩容的策略和算法还在进一步实验和验证中。我们将使用但不限于基于时序的预测以及机器学习模型等。

总结

总结一下,我们引入了跨集群架构、服务发现,状态持久化,原地更新等等来把 Ant Ray Serving 打造成一个健壮又灵活的在线系统。它在蚂蚁已经发展了接近3年多的时间,逐步增加到目前5千个节点的规模,支持了上述蚂蚁和阿里内部超过5个不同的使用场景。

未来计划

未来,我们计划会提高更新、服务等各方面的性能,在生产环境应用自动扩缩容的策略,并且探索如多模联合推理、分布式计算等更复杂的场景。

另外我们正在和 Anyscale Ray Serve 团队合作,在 Ray Serve 中支持 Java,打造具备灵活可扩展性的组件能力,并贡献我们内部的其他实用的功能。我们计划在 2021 年将 Ant Ray Serving 和开源 Ray Serve 完成架构的合并。

如果有任何关于 Ant Ray Serving 的问题,可以随时在 Ray 的 Slack 上联系我或者发送邮件到 tengweicai@gmail.com,谢谢大家。

原文由蔡腾纬、刘洋、罗澄曦于 2021-09-10发表于Ray中文社区公众号,查看原文