Ray在蚂蚁集团的一路向前
合作的机缘
时间回到2018年,当时蚂蚁集团的计算系统只有从阿里集团引进的 ODPS和Flink 两个垂直类的大数据计算引擎。随着蚂蚁集团业务的规模化和多元化发展,大量的计算诉求涌现出来(比如AI、图、运筹、金融场景的大规模计算等),而两大垂类的计算引擎很难满足业务对于性能和灵活性的要求,影响了业务的快速发展。蚂蚁集团技术团队遂开始加大对计算基础设施的研发投入,既要满足蚂蚁日益复杂、灵活多变的计算场景需求,也要补齐与国内外顶级互联网公司之间技术深度上的差距。
蚂蚁集团对计算基础设施的整体建设思路是既要灵活的支持大数据时代各种可能的高性能计算场景,同时要与国际领先的技术理念和发展趋势相结合。技术圈内一直有这样一句话:“整个大数据时代基本是追着Google和UC Berkeley RISELab跑”,蚂蚁集团要实学术界与工业界的共赢,也积极的尝试与RISELab实验室进行交流共建。在2018年,蚂蚁集团技术团队与Ion Stoica教授领衔的RISELab实验室多次交流讨论,结合实验室最新的研究方向和蚂蚁集团对于计算基础设施的能力诉求,蚂蚁集团与 RISELab 实验室的 Ray 项目进行开源共建合作。
历经四年的不断共建打磨,Ray已经发展成为蚂蚁集团的计算基础设施底盘(作为高性能计算的分布式研发框架),Ray的强大内核能力使得蚂蚁在新的计算引擎的构建变得非常简单,大量的基础能力能够被复用,分布式调用,服务发现,故障恢复,自动扩缩容,无感迁移、云原生等能力,使得上游计算引擎开发者只需要关注计算pattern内的事情,专注做核心的价值。
今天在蚂蚁以Ray为内核的计算规模已有了 60w+ core ,基于Ray内核支持了8+计算框架引擎,蚂蚁集团同时培养了30+人规模的计算引擎团队,这在逐利商业化的今天,尤为难得。
蚂蚁对Ray社区的贡献
Ray项目从2017年启动以来发展势头良好。Ray的系统的能力和规模发展迅速,作为分布式底盘支撑了多个平台和应用,并取得了不俗的业务效果。在开源社区,蚂蚁诞出了8位committer,拥有26位contributor,整体代码贡献量稳居社区第二,为Ray社区贡献了大量的核心功能和架构改进,并积极推动Ray社区的发展。
蚂蚁对Ray社区的的贡献主要围绕功能和架构两个方面:
功能方面:
- New Dashboard – 负责系统运维功能,提供作业管理、集群状态监控、日志采集等功能。蚂蚁很早就将Ray应用于生产环境,早期集群/作业问题排查是通过登录机器grep日志的方式排查,随着业务的增多,这种较为原始的排查方式再无法应对,所以我们开发了Dashboard将机器各种信息及异常状态透出,极大的解放了生产力。
- Java/C++ Worker – 社区的场景主要围绕AI领域,最早也只支持Python Worker,蚂蚁拥有丰富多样的业务场景,很多业务都围绕Java生态建设,因此我们最早贡献了Java Worker,后来又围绕编解码、算法等C++相关的业务领域推出了C++Worker,从语言方面上丰富了Ray的生态。
- 多租户 – 社区早期是没有作业租户概念,一个集群只执行一个作业的任务,但随着业务场景的丰富,这种模式的各种弊端逐渐显露出来。比如多个作业间无法通过Ray的API直接进行数据交互,需要通过对外暴露一些公共的RPC服务或者第三方介质来进行数据传输,为此我们从数据、任务及结构等多方面对Ray进行了重构,支持了多租户模式。
- 跨语言调用 – Ray提供的分布式原语足够抽象,能很容易的将多个割裂的系统融合在一起,早期的Ray仅支持Python生态,在蚂蚁贡献了Java/C++ Worker后,我们发现不同语言生态的系统可以在同一个作业中运行,但不同语言的系统之间的数据传递仍需要借助暴露公共服务或者第三方介质,为此我们贡献了跨语言调用的API,可以很容易的在C++/Java/Python三者之间进行跨语言的调用。
- Actor Task 流控 – Task 流控主要是应对高qps/tps情况下的业务场景,早期社区的任务主要集中在Machine Learning,高qps/tps的的场景较少,蚂蚁内部一开始应对的就是流图场景,对这种流控机制的需求也较为迫切,为此我们贡献了Task流控机制。
架构方面:
- Core Worker – 多语言Worker的公共抽象层。随着多语言Worker的支持,我们发现每种语言的Worker都在上层做了不少共性的事情,为此我们尝试把一些共性的东西下沉形成了一个基于C++的CoreWorker,Python/Java/C++分别通过Cython/JNI/C++与之交互,简化了多种语言Worker的冗余开发。
- Actor Direct Call – Actor之间高效直连的任务传输模式。社区早期Actor与Actor之间的交互是通过每个节点的Raylet间接代理的,当Actor之间的Task较多开销会急剧增大,为此我们抽象了Worker与Worker之间的交互,提供了Worker与Worker直连的交互方式,极大优化了Worker之间的Task吞吐。
- GCS Service – 提供一些元数据(Node/Actor/Job/NodeResource等)的处理及一些调度协调能力。GCS Service的引入抽象了元数据backend存储,简化了Actor的调度、管理及其一致性问题,也为后续集群状态透出及管理提供了坚实的技术基础。
- GCS Based Actor Scheduler – 一种中心化的Actor调度器。社区的调度之前仅有分布式调度,但当任务量较大时,分布式节点间资源视图同步不及时会导致大量的调度冲突,蚂蚁内部综合考虑了调度对象Normal Task和Actor的差异,前者具有短周期高频特性,后者具有长周期低频特性,因此设计了基于GCS的中心化的Actor调度,联合分布式调度一起提供服务,在不失Normal Task调度性能基础上,提高了Actor调度效率。
差异化的发展
Ray的技术特点
- Ray拥有一套灵活易用的API,是一个能快速构建分布式系统的编程框架,能够快速地Build各种分布式系统且不绑定计算范式,同时屏蔽分布式系统的底层细节。
- 多语言及跨语言调用的支持,使得Ray不仅能支撑各种具有语言特色的业务(python科学计算,C++音视频处理,Java图计算等),也能将很多之前割裂的系统用同一套接口融合在一起来,能减少多个系统对接维护的成本开销(如在线学习python+java)。
- 分布式共享内存ObjectStore,使得Ray任务间能够高效地共享数据,同时能够实现跨系统的纯内存数据流转,也为Ray的Distributed Futures设计理念提供了坚实的技术支撑。
- 高吞吐细粒度的轻量Task调度以及Data Locality、Task&Node affinity(PlacementGroup)各种亲和性调度支持使得Ray不仅能满足用户对轻量任务高性能的需求,也能支撑各种不同业务场景对任务调度定制化的诉求。
- 运行时环境插件化定制(RuntimeEnv),使得Ray能细化到针对每一个Task定制不同的环境依赖及初始化(pip, conda,container, jar),极大提升了任务执行所需的环境依赖准备方面的易用性和灵活性。
定位差异
Ray最早是在Spark无法处理计算模型比大数据场景复杂得多的嵌套并行方面问题背景下提出的,最开始的定位局限在强化学习领域,将其本身定位成一个高性能的执行框架。
Ray is a flexible, high-performance distributed execution framework.
随着Ray的发展,逐渐发现Ray本身所提供的分布式原语及API非常灵活易用,定位被放大成为一个快速构建一个分布式应用的编程框架。
Ray is a fast and simple framework for building and running distributed applications.
现在,出于一些商业考虑,Ray的定位再次聚焦于AI及Python生态。
Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a toolkit of libraries (Ray AIR) for accelerating ML workloads.
而蚂蚁早在2017年底开始与Ray合作并于2018年初落地了基于Ray的流图一体化的计算框架,之后的几年在Ray上探索并衍生出了在线学习、在线服务、运筹优化、科学计算、隐私计算等各种场景的计算平台,我们坚信Ray有很大的潜力,除了AI它能在更多场景下沉淀出各种新的计算平台。
产品差异
在社区,Ray主要的定位还局限于单个Ray集群仅服务于单个Application,大部分的场景是集群随着作业而拉起,随着作业下线而销毁。这导致了用户历史作业日志及信息难以追溯、作业间数据不便于共享、集群资源无法有效利用、集群历史版本无法及时升级等。
蚂蚁内部早期采用的也是单集群单作业模式,但随着业务量的增加各种弊端逐渐显露出来。比如每个作业无论大小都需要冗余一份head及redis元数据缓存资源;多个作业依赖同一个服务时无法直接使用ray的API来交互,必须要引入一个公共的服务并采用RPC方式来完成;成百上千的作业版本冗杂,维护比较困难。面对这些问题,我们推出了大集群多租户的模式,用户的多个作业可以共用同一个Ray集群,不同业务可以采用框架集群进行物理隔离,同一个框架集群内的作业还可以根据作业场景配置成混布或者隔离模式。
功能差异
除了产品上的差异外,功能上内部也新增了一些Feature(部分正在逐步推入社区)。
真实资源调度
Ray的任务需要显式的指定资源需求(如: actor.options(num_cpus=1, memory=1024 ** 3).remote()),而后引擎会根据用户demand的资源进行调度,用户指定的资源是静态的逻辑资源,用户很难准确的估算自己的某个任务需要多少cpu和memory,为了使得作业能顺利执行完,往往会把值估的比较大,这样就造成了资源浪费;而有时候用户对自己的任务需要多少资源可能根本没有预判(比如:数据分片不均衡导致部分task的内存过高)导致OOM,为了解决这类问题,我们动态的收集了每个任务的真实资源用量,采用动态的资源来不断地更新节点的资源使用,再结合节点预留公共资源的方式有效的缓解了资源浪费与任务流量突增带来的不稳定问题。
亲和性调度
社区的PlacementGroup接口目前具备亲和性和反亲性和能力(这套API及实现也是早期由蚂蚁推入社区),但这套也存在一些弊端:
- PlacementGroup接口同时具备亲和/反亲和、预留资源以及gang scheduling能力,足够灵活,但缺乏易用性。
- PlacementGroup的Bundles采用串行调度,存在一些性能问题。
- PlacementGroup采用两阶段提交来实现gang scheduling,复杂度高。
根据实际业务场景调研发现,绝大部分用户仅有亲和性/反亲和性调度需求。比如单纯指定某一类actor聚合在一起或者某几类actor分散开来,无需资源预留及gang scheduling。结合实际业务场景,我们设计了一套轻量级的Actor亲和性方案,性能和稳定性上都得到了很好的提升。
无感迁移
目前混布场景下线上容器故障率较高,在一些大促场景下也会出现pod腾挪碎片整理的需求,大规模集群下很容易出现一些故障/腾挪节点;此外,混布场景下调度往往也很容易会出现一些热点机器,不管是故障机还是热点机器都需要通过一种有效的手段来将其剔除/屏蔽并通知作业相关Actor迁移,避免故障/热点机器对线上作业造成持续性影响。
通过SystemJob代理通知相关作业进行迁移,System Job内置的System Actor及其交互的接口属于Ray引擎的CommonLib,SystemJob伴随着Ray集群一起启动,订阅GCS Server中Actor的变化以及Migration信息,节点的迁移事件会通过SystemActor代理至用户作业的Master中进行处理。
Common Libs
Common libs是利用Ray自身的能力抽象出一些共性的Lib&Service并以SystemJob的形式提供服务,目前蚂蚁内部已知的CommonLib有:
- actor-observer – 订阅集群内所有Actor的状态变化,并给集群内其他业务提供Actor状态变化订阅服务。
- actor-optimizes – 负责收集集群内所有作业元信息透出至Cougar,并将Cougar根据监控数据及元数据计算出的画像数据代理至业务的JobMaster。
- actor-migration – 订阅集群内节点无感迁移事件,并给集群内其他业务提供迁移事件订阅服务。
丰富的应用场景
作为统一的分布式计算底盘,Ray在蚂蚁内部衍生出了多种计算平台并服务了多种业务,如:
- 图计算(GeaFlow):安全、数金 、图谱、社交、IOT。
- 在线学习(Realtime):智能营销、首页Feed流、Tab3。
- 在线服务(Ray Serving):在线推理引擎,EventBus,金融核心。
- 金融计算(Mars):罗马、灯火。
- 运筹优化(RayOR):支付宝资金调度、欧拉。
- 隐私计算(SecurityFlow):微贷、阿里云医疗、国际。
目前线上按业务和部署单元划分有多个Ray集群,总核数约60w+,目前单个集群摸高到3000+节点/1k+作业/5w+ Actor,实际线上单个集群控制在1500+节点/300+作业/1w+ Actor规模。
RayaG – 图计算
RayaG是蚂蚁自研的实时图计算系统,以Ray为底座,在业界首次 构建了流图融合 的动态 计算能力,通过流图融合动态的特性,RayaG支持了多种实时图计算能力。
RayaG 以 Task-Based 的运行时模型为基础,故不同于传统的流计算系统和图计算系统。其支持了动态的DAG,通过动态的 DAG 支持在任务的运行过程中动态执行一个子任务,并基于子任务构建子的分布式 DAG。在弹性、动态的运行时之上,RayaG 构建了实时图计算的核心概念和能力。
Realtime – 在线学习
Realtime是基于Ray研发的一套面向在线、融合以及AI场景的实时计算引擎,目前已开源至社区。从18年诞生以来,长期支持了Ray上在线学习、在线计算等业务场景。Realtime充分利用Ray提供的各种基础分布式计算能力,比如Actor创建和指定策略调度、direct call、plasma、Actor fo等,构建上层实时计算框架所需的基础能力如分布式调度组网、上下游数据分发和基本的节点故障恢复重新拉起等。
在线学习业务场景中,Realtime负责用一个Java/Python跨语言作业串联起样本处理、模型训练、和模型评估等多个阶段。能够在运行时感知业务数据量和样本的变化,进行自适应扩缩容削峰填谷。下图是在线学习的一条完整链路,除去Serving以外目前已全部运行在Realtime上。
RayServing – 在线服务
蚂蚁集团以Ray为底盘,利用Ray在调度、FO和多语言等方面的灵活性,构建了在线服务平台Ray Serving。Ray Serving可以将用户的业务代码(Java/Python)部署为分布式的在线服务,并提供LDC部署、LDC容灾、流量路由、HPA和监控等能力。所有的操作都可以通过简单易用的产品平台或API进行。Ray Serving的二级调度可以在多机房/多集群之间进行流量路由管控,从而应对机房容灾、异地容灾等FO场景。
Ray Serving对用户屏蔽了分布式在线服务底层的复杂度,并且不对代码做范式要求,使得用户可以专注于自己的业务逻辑灵活编写代码,所部署的服务既轻量又符合蚂蚁LDC架构。其提供的HPA能力,在保证资源利用率的同时,减少了用户根据流量、资源水位进行运维的成本。通过友好的平台产品和API,用户可以很容易使用或对接Ray Serving。
Ray Serving的架构和定位,与开源Ray Serve是比较接近的。我们与Anyscale Ray Serve团队保持密切联系,并且正在进行两者的架构融合。目前已经贡献了Ray Serve的Java语言支持。Ray Serve现在还推出了DAG API,为在线服务带来了多服务节点的编排能力。部署后的Serve DAG,在收到请求后,会按照编排进行级联处理,比较适合多模推理场景。
Mars – 科学计算
Mars是阿里巴巴开源的分布式科学计算框架,支持将numpy、pandas、scikit-learn以及python function进行分布式化。
在AI领域,训练技术和特征工程技术长期独立发展。对于大规模样本的AI作业研发,在特征工程环节需要使用Hadoop、Spark等大数据处理框架,而后在训练环节则依赖于TensorFlow、PyTorch、XGBoost、LightGBM等训练框架。这两类框架设计哲学和运行环境的不一致,导致了以下两个问题:
- AI算法同学即使研发一条简单的AI端到端链路,也需要理解掌握多套框架平台;
- 各平台间需进行频繁的数据交换,造成了大量数据序列化、格式转换、合并拆分等开销,某些情况下数据交换方面的开销甚至在整体开销中达到最大占比,造成严重的资源浪费和研发低效;
因此,蚂蚁新计算引擎团队利用Ray作为通用分布式计算底盘,研发了Mars on Ray分布式科学计算框架,并对XGBoost on Ray进行无缝适配。实现了端到端AI链路运行在同一个作业之中,一个Python脚本研发整套大规模数据的AI工程。
AntOpt – 运筹优化
AntOpt是基于Ray搭建的一套运筹优化解决方案,涵盖了离线大规模求解,近线优化求解,实时在线决策等多种运筹优化求解能力,Ray解决了大规模多任务高并发以及异构环境中的求解问题。
SecretFlow – 隐私计算
SecretFlow 是蚂蚁隐私计算团队开源的基于Ray的隐私保护数据分析和机器学习的统一框架。它提供了:
- 设备抽象,将多方安全计算(MPC)、同态加密(HE)、可信执行环境(TEE)等隐私计算技术抽象为密文设备,将明文计算抽象为明文设备。
- 基于抽象设备的计算图,使数据分析和机器学习工作流程能够表示为计算图。
- 基于计算图的机器学习/数据分析能力,支持数据水平/垂直/混合分割等场景。
Ray 在中国技术圈子的蓬勃发展
蚂蚁集团在国内技术圈子一直承担着Ray应用推广的布道师和领头羊的角色,从18年至21年,蚂蚁每年都举办一场针对国内Ray爱好者的Meetup分享来提高Ray的影响力。今年虽然受到疫情的影响,但我们第一次采用双城线上+线下同步的模式,开放式的邀请国内相关友商线下齐聚就应用场景和未来发展进行深入的交流讨论。我们列举部分的外部应用场景分享给大家。
大疆创新
项目介绍:深度学习模型训练周期长,消耗计算资源多。为了提高分布式训练的稳定性,我们基于ray actor实现了GPU状态监控和训练过程管理,自动伸缩训练任务至健康的计算节点,提高了训练的容错性;为了降低频繁的数据访问带来的网络IO开销,我们利用ray object store缓存训练所需的数据集,实现了在计算节点之间shuffle和交换数据的功能,获得了与本地ssd数据加载一致的性能。
Kyligence/Byzer
Byzer-lang 是一门面向大数据和AI 的类SQL云原生语言。 Byzer 增强了SQL诸多特性,让SQL可以充分支持代码复用,IDE工具,包管理,版本管理等诸多编程领域成熟的工业特性,将Python作为第二等公民,打通了SQL 和 Python数据互通,完全分布式,能够支持AI 的完整流程,非常适合作为大数据和AI的入口,实现语言和平台的统一。 在Byzer-lang中,Ray被作为Python语言的分布式解释器去使用,和 Spark 一起做为 Byzer 语言解释器的Hybrid Runtime,两者能够美妙高效的融合,并且有效的支持了Byzer的各种能力而兴奋。
白海IDP
IDP(Intelligent Development Platform)是基于Ray的AI开发生产平台,核心组件包括易用的AI IDE - IDP Studiio,以及高性能分布式调度引擎-IDP Engine。IDP 以Ray的分布式计算架构为底座,构建精细化的资源调度体系,解耦“开发”和“训练及推理”的计算资源分配,让用户不为“用不到的资源”付费,并最大化提升资源利用效率。
华为
Fathom带来现代企业转型中需要Data + AI的智能化计算能力,通过使用分布式Ray底座无缝兼容开源生态与华为云服务,帮助用户快速实现基于海量数据的现代企业智能业务流程。
Intel
Oscar奖项
由中国信通院主办的开源技术创新奖项中,蚂蚁的Ray团队凭借自身的持续投入、创新应用和行业的引领地位,获得了二次开发类目的尖峰创新奖,也是国内开源体系对蚂蚁和对Ray的整体认可。
未来展望
利用率方面,截止到目前,Ray上衍生的计算引擎所承载的业务资源用量已达60w+ core,接下来会在资源利用率上加大投入,为响应低碳环保践行绿色生活做出一些努力。
规模化方面,目前Ray按业务、部署单元以及环境划分有多个Ray集群(200+),大量的集群会导致新的feature和一些bug修复不能及时给到用户升级,同时也给日常运维带来了很大的困扰,接下来会优化集群规模化后带来的潜在稳定性问题,提升集群升级体验,逐步收缩集群。
开源方面,蚂蚁有8位committer,拥有26位contributor,整体代码贡献量稳居世界第二,为Ray社区贡献了大量的核心功能和架构改进,并积极推动Ray社区的发展。目前,Ray项目的GitHub star数已经达到22k,并且得到了越来越多的关注。同时,也有越来越多开源项目已经集成进了Ray的生态。我们坚信Ray的技术价值及生态,会持续投入开源共建。
商业化方面,随着菜鸟、达摩院、网商、南方电网、中信银行等外部合作的推进,给我们商业化输出也带来了更多的信心,同时也给Ray在各种复杂环境的部署带来了一定的挑战,后续我们会沉淀出一套针对各种场景的统一部署方案,为商业化输出奠定坚实基础。同时蚂蚁也会坚持做好国内技术圈子的布道师和领头羊的角色,明年会加大在Ray中文社区的体系化建设和运营上的投入,将蚂蚁成功的合作案例进行系统性的宣传推广。