【干货】Ray2.0架构-中文翻译版(Part2)

Object 管理

通常,小的对象存储在所有者的进程内存储中,而大的对象存储在分布式对象存储中。为了减少每个对象的内存占用和解析时间。注意,在后一种情况下,占位对象(placeholder object)存储在进程内存储中,以表明该对象实际上存储在分布式对象存储中。

进程内存储中的对象可以通过内存复制快速解析,但由于额外的复制,当许多进程引用时,可能会占用更大的内存。单个工作程序的进程内存储的容量也受限于该机器的内存容量,从而限制了在任何给定时间可以引用的此类对象的总数。对于多次引用的对象,吞吐量也可能受到所有者进程处理能力的限制。

相比之下,解析分布式对象存储中的对象需要至少一个从工作程序到工作程序的本地共享内存存储的 RPC。如果工作进程的本地共享内存存储尚未包含对象的副本,则可能需要其它 RPC 连接。另一方面,因为共享内存存储是用共享内存实现的,所以同一节点上的多个工作人员可以引用对象的同一副本。如果对象可以,这可以减少总内存占用。分布式内存的使用还允许进程引用对象,而不需要对象本地,这意味着 zero-copy 反序列化可以使用超过单个机器的内存总容量限制。吞吐量可以随分布式对象存储中的节点数量而变化,因为对象的多个副本可能存储在不同的节点上。

Object 转换(Object resolution)

Object resolution 是将 ObjectRef 转换为普通值的过程,例如在使用 get 或者作为任务参数传递时会自动转换。

ObjectRef 包含两个字段:

通过直接从所有者的进程内的存储复制小对象来解析小对象。例如,如果所有者调用“ray.get”,系统将查找并反序列化本地进程内存储中的值。如果所有者提交了一个含有依赖任务,它将通过将值直接复制到 task specification 中来内联对象。类似地,如果借用者试图解析值,则对象值将直接从所有者处复制。

如果是个大对象会经过上面图中的流程进行解析。

对象 x 是在 Node 2 创建的,当 Owner 调用这个对象时会先查找对象的位置,并发出副本请求从 Node 2 中复制,然后 Node 1 接收这个对象。

大型对象存储在分布式对象存储中,必须使用分布式协议进行解析。如果对象已经存储在引用持有者的本地共享内存存储中,则引用持有者可以通过 IPC 检索对象。这将返回一个指向共享内存的指针,该内存可能同时被同一节点上的其它 worker 引用。

如果该对象在本地共享内存存储中不可用,则引用持有者会通知其本地 raylet,然后它会尝试从远程 raylet 获取副本。raylet 从对象目录中查找位置,并从这些 raylet 之一请求传输。自 Ray v1.3+ 起,对象目录存储在所有者处(以前存储在GCS中)。

Memory management

对于远程任务,对象值由正在执行的工作程序计算。如果值很小,worker 将直接向所有者回复值,并将其复制到所有者的进程内存储中。一旦所有引用超出范围,此值将被删除。

主副本与可收回副本。主副本(节点2)不符合逐出资格。但是,节点1(通过“ray.get”创建)和节点3(通过任务提交创建)上的副本可以在内存不够时被逐出。

如果该值较大,则执行工作程序将该值存储在其本地共享内存存储中。共享内存对象的初始副本称为主副本。主副本是唯一的。

因为只要范围中有引用,它就不会被释放。raylet 通过保存对存储对象的物理共享内存缓冲区的引用来“锁定”主副本,从而防止对象存储区将其逐出。相反,如果在本地内存不够时,对象的其它副本可能会被 LRU 逐出,除非开发人员在使用这个对象。

在大多数情况下,主副本是要创建的对象的第一个副本。如果初始副本因故障而丢失,所有者将尝试根据对象的可用位置指定新的主副本。

一旦对象引用计数变为 0,对象的所有副本最终都会被自动垃圾收集。所有者会立即从进程中存储中删除小对象。raylets 将从分布式对象存储中异步删除大型对象。

raylets 还管理分布式对象传输,该传输基于对象当前需要的位置创建对象的其他副本,例如,如果依赖于对象的任务被调度到远程节点。

由于以下任何原因,对象可能存储在节点的共享内存对象存储中:

  • 它是由本地工作进程通过 “ray.get” 或 “ray.wait” 请求的。一旦工作进程完成“ray.get” 请求,就可以释放这些资源。注意,对于可以是 zero-copy,从 “ray.get” 返回的 Python 的值直接引用共享内存缓冲区,因此对象将被“固定”,直到该Python值超出范围。
  • 它由在该节点上执行的前一个任务返回。一旦没有对对象的更多引用,或者一旦对象被引用,这些对象就可以被释放。
  • 它是由该节点上的本地工作进程通过 “ray.put” 创建的,一旦不再引用对象(上图中节点1上的对象A、B 和 C),就可以释放这些对象。
  • 它是在该节点上排队或执行的任务的参数。一旦任务完成或不再排队,就可以释放这些资源。节点 2 上的对象 B 和 C 都是这样的例子,因为它们的下游任务 g 和 h 尚未完成。
  • 此节点以前需要它,例如,已完成的任务需要它。节点 2 上的对象 A 就是一个例子,因为 f 已经完成了执行。如果内存不足,这些对象可能会基于本地 LRU 被逐出。当ObjectRef 超出范围时,它们也会被快速释放(例如,在 f 完成并调用 “del A” 之后,A 从节点 2 中删除)。

内存不足的情况

对于小对象,Ray 当前不会对每个工作进程的进程存储施加内存限制。你需要确保小对象不会太多,导致所有者进程因内存不足而被终止。

Ray 对共享内存对象施加限制由 raylet 负责强制执行此限制。下面是可以存储在节点上的不同类型的共享内存对象的可视化,具有基本的优先级。

对象创建请求由 raylet 排队,并在(6)中有足够的内存用于创建对象时提供服务。如果需要更多内存,raylet将选择要从(3)-(5)中逐出的对象以腾出空间。即使在所有这些对象被逐出后,raylet 也可能没有空间用于新对象。如果应用程序所需的总内存大于集群的内存容量,就会发生这种情况。

如果驱逐后需要更多的空间,raylet 首先会在整个集群中的每个 worker 处触发特定于语言的垃圾收集。在语言前端看到的 ObjectRef 似乎很小,因此不太可能触发通常的特定语言垃圾回收机制。然而,ObjectRef 的实际内存占用可能非常大,因为物理值存储在 Ray 的对象存储中的其它位置,并且可能存储在与语言级别 ObjectRef 不同的节点上。因此,当任何 Ray 对象存储达到容量时,我们会在所有工作线程上触发语言级别的垃圾收集,这将清除所有不需要的 ObjectRef,并允许从对象存储中释放物理值。

raylet 会在触发溢出之前,让 worker 有时间异步垃圾收集ObjectRef。溢出允许从对象存储中释放(2)中的主副本,即使对象仍然可以被引用。如果禁用溢出,则应用程序将在可配置超时后接收 ObjectStoreFullError。溢出可能代价很高,并且增加任务执行的长时间;因此,一旦对象存储达到可配置阈值(默认为80%),Ray 也会急快速溢出对象,以确保可用空间。

注意,即使启用了对象溢出,对象存储仍可能耗尽内存。如果同时使用的对象太多(1),则会发生这种情况。为了减轻这种情况,raylet 限制了正在执行的任务的参数的总大小,因为在任务完成之前无法释放参数。默认上限为对象存储内存的 70%。这确保了只要没有其他对象因 “ray.get” 请求而被活动锁定,任务就可以创建一个对象存储容量的 30%。

目前,raylet 没有为 worker 的“ray.get”请求的对象实现类似的上限,因为这样做可能会导致任务之间的死锁。因此,如果对大型对象有过多的并发 “ray.get” 请求,raylet 仍可能耗尽共享内存。发生这种情况时,raylet 会将对象分配为本地磁盘上的内存映射文件(默认情况下为/tmp)。由于I/O开销,这样分配的对象性能较差,但即使对象存储已满,它也允许应用程序继续运行。如果本地磁盘已满,则分配将失败,之后应用程序将收到 OutOfDiskError。

对象溢出(Object spilling)

Ray 默认支持在对象存储的容量用完后将对象溢出到外部存储。

外部存储通过可插拔接口实现。默认情况下支持两种类型的外部存储:

本地存储。默认情况下选择本地磁盘,这样 Ray 用户就可以使用对象溢出功能,而无需任何额外配置。

分布式存储(实验性,目前提供 Amazon S3)。访问速度可能较慢,但这可以提供更好的容错性,因为数据可以在工作节点故障后存活。

对象溢出由几个部分构成:

raylet 内

  • 本地对象管理器:跟踪对象元数据,例如外部存储中的位置,并协调 IO woker 和与其它 raylet 的通信。
  • 共享内存对象存储

IO workers

用于溢出和恢复对象的 python 进程。

外部存储

用于存放无法放入共享内存对象存储的对象。

raylet 管理一个 I/O worker 池。I/O worker 从本地共享内存对象存储和外部存储进行读/写。

当 Ray 没有足够的内存容量来创建对象时,它会引发对象溢出。请注意,Ray 只溢出对象的主副本:这是通过执行任务或通过 “Ray.put” 创建的对象的初始副本。非主副本可以立即被逐出,这种设计确保了集群中每个对象最多有一个溢出的副本。只有在对象溢出后,或者应用程序中没有更多引用时,主副本才可收回。

协议如下所示,重复执行,直到留出足够的空间来创建任何需要的对象:

  • Raylet(本地对象管理器)查找本地对象存储中的所有主副本。
  • Raylet将这些对象的溢出请求发送给 IO worker。
  • IO worker 将对象值及其元数据写入外部存储。
  • 一旦主副本溢出到外部存储,raylet 将使用溢出对象的位置更新对象目录。
  • 对象存储区收回主副本。
  • 一旦对象的引用计数变为0,所有者就会通知 raylet 可以删除该对象。raylet 向 IO worker 发送请求,以从外部存储中删除对象。

溢出的对象将根据需要恢复。当请求对象时,Raylet 要么通过向本地 IO worker 发送恢复请求从外部存储恢复对象,要么从不同节点上的 Raylet 获取副本。远程 Raylet 可能会将对象溢出到本地存储(例如,本地SSD)上。在这种情况下,远程 raylet 直接从本地存储读取对象并将其发送到网络。

由于 IO 开销,每个文件一个对象溢出许多小对象是低效的。对于本地存储,操作系统将很快耗尽 inode。如果对象小于 100MB,Ray 会将对象融合到单个文件中以避免此问题。

Ray 还支持多目录溢出,这意味着它使用安装在不同位置的多个文件系统。当多个本地磁盘连接到同一台机器时,这有助于提高溢出带宽和最大外部存储容量。

目前存在的限制:

  • 使用本地文件存储时,如果存储溢出对象的节点丢失,则溢出对象将丢失。在这种情况下,Ray 将尝试恢复对象,就像它从共享内存中丢失一样。
  • 如果所有者丢失,则无法访问溢出的对象,因为所有者存储对象的位置。
  • 应用程序当前正在使用的对象被 “pinned”。例如,如果 Python 的 Driver 有一个指向ray.get 获得的对象的原始指针(例如,共享内存上的 numpy 数组),则该对象将被固定。在应用程序释放这些对象之前,它们是不可使用溢出机制的。正在运行的任务的参数也固定在任务的持续运行的时间内,运行结束后才可以。

引用计数

每个 worker 存储其所拥有的每个对象的引用计数。所有者的本地引用计数包括本地Python 引用计数和作为所有者提交的任务所依赖的对象。当 Python 的 “ObjectRef” 被释放时,前者将递减。当依赖于对象的任务成功完成时(注意,以应用程序级异常结束的任务视为成功),后者将递减。

ObjectRef 也可以通过将它们复制到另一个进程。接收 “ObjectRef” 副本的过程称为借用者。例如:

@ray.remotedef temp_borrow(obj_refs): # Can use obj_refs temporarily as if I am the owner. x = ray.get(obj_refs[0])@ray.remoteclass Borrower: def borrow(self, obj_refs): # We save the ObjectRef in local state, so we are still borrowing the object once this task finishes. self.x = obj_refs[0]x_ref = foo.remote()temp_borrow.remote([x_ref]) # Passing x_ref in a list will allow borrow to run before the value is ready.b = Borrower.remote()b.borrow.remote([x_ref]) # x_ref can also be borrowed permanently by an actor.

通过跟踪这些引用。简言之,每当引用“逃离”本地作用域时,所有者就会添加到本地引用计数中。例如,在上面的代码中,当调用 “temp_borrow.remote” 和“b.borrow.remoto” 时,所有者会增加 x_ref 的挂起任务计数。一旦任务完成,它会向所有者回复一个仍在借用的引用列表。例如,在上述代码中,“temp_borrow” 的 worker 会回答说,它不再借用 “x_ref”,而 “Borrower” 的 worker 会回答说它仍在借用 “x_ref”。

如果 worker 仍在借用任何对象,所有者会将 worker 的 ID 添加到本地的 borrowers 列表中。borrowers 保持第二个本地参考计数,与所有者类似,一旦 borrowers 的本地参考计数变为 0,所有者要求 borrowers 回复。此时,所有者可以将 worker 从 borrowers 列表中删除并收集对象。在上述示例中,“borrowers” 的 worker 正在永久借用引用,因此所有者在 “borrowers” 自身超出范围或死亡之前不会释放对象。

borrowers 也可以递归地添加到所有者列表中。如果 borrowers 本身将 “ObjectRef” 传递给另一个进程,就会发生这种情况。在这种情况下,当 borrowers 响应所有者其本地引用计数为 0 时,它还包括其创建的任何新 borrowers 。所有者反过来使用相同的协议联系这些新的 borrowers。

根据上述一共包含下面不同的引用计数:

本地 python 引用计数

等于 worker 进程中 python 的引用技术,在取消或者分配 python 中的 ObjectRef 递增或递减。

提交任务计数

依赖于尚未完成执行的对象的任务数。当 worker 提交任务时递增。当任务完成时递减。如果对象足够小,可以存储在进程内存储中,则在将对象复制到 Task specification 中时,此计数会提前递减。

借用者(Borrowers)

当前借用 “ObjectRef” 的进程的一组工作 ID。借用者是一个 worker 但不是所有者并且拥有 Python 本地实例的 ObjectRef,每个借用者还会维护一个本地的借用者列表,允许借用者将 “ObjectRef” 发送给另一借用者,而无需联系所有者 。当任务被传递一个 ObjectRef 并在任务结束后继续使用它时,该任务通知其调用方它正在借用该对象。然后,被调用的 worker 将任务 的 worker 的 ID 添加到此集合中。

当 ObjectRef 的引用计数为 0 时,如果是所有者本身会自动删除,所有者向每个### 借用者发送异步 RPC。借用者在收到后将其删除,如果无法联系到借用者,会从列表中删除。

如果借用者被移除,worker 会等待来自所有者的 rpc,一旦 worker 本地的引用计数为 0,worker 就会将借用者弹出并告知所有者。

嵌套计数(Nested count)

在作用域中且其值包含有 ObjectRef 的 ObjectRef 数。

谱系计数(Lineage count)

启用对象重建时使用。依赖于此 “ObjectRef” 且其值存储在分布式对象存储中(可能在失败时丢失)的任务数。在提交依赖于对象的任务时递增。如果任务返回的“ObjectRef”超出范围,或者任务完成并在进程内存储中返回值,则递减。

Corner cases

x_ref = foo.remote()@ray.remotedef capture(): ray.get(x_ref) # x_ref is captured. It will be pinned as long as the driver lives.

创建引用的常规做法是将 ObjectRef 作为任务参数直接传递给其它 worker,或者在数据结构(如列表)内部传递。也可以通过使用 “ray.cloudpickle” 对 “ObjectRef” 进行额外引用。在上面代码下,ray 无法跟踪对象的序列化副本或确定 ObjectRef 何时已反序列化(例如,如果 ObjectRef 由非 Ray 进程反序列化)。因此,将向对象的计数添加一个永久引用,以防止对象超出范围。

带 out-of-band 序列化的其它方法包括使用 “pickle” 或自定义序列化方法。与上述类似,Ray 无法跟踪这些引用。访问反序列化的 ObjectRef(即通过调用“ray.get”或作为任务参数传递)可能会导致引用计数异常。

Actor handles

用于跟踪(非分离)Actor 的生命周期。虚拟对象用于代表 Actor。此对象的 ID 是根据 Actor 创建任务的 ID 计算的。Actor 的创建者拥有虚拟对象。

当 Python 的 Actor handle 被释放时,这会减少虚拟对象的本地引用计数。当在 Actor handle 上提交任务时,这会增加虚拟对象的已提交任务计数。当一个 Actor handle 被传递给另一个进程时,接收进程被算作虚拟对象的借用者。一旦引用计数达到 0,所有者就通知 GCS 服务可以安全地销毁 Actor。

Actor 是不会被 Ray 自动回收的,需要显式删除。

与 Python GC

当对象是 Python 中引用循环的一部分时,Python 不能保证这些对象会被及时地垃圾回收。所有ObjectRef 可以在分布式对象存储中恶意地保持 Ray 对象的活动状态,当对象存储接近容量时,Ray 会周期性地在所有 Python worker 中触发“gc.collect()”。这确保 Python 引用循环不会导致虚假的对象存储已满状态。

Object Failure

在发生系统故障时,Ray 将尝试恢复任何丢失的对象,如果无法恢复,并且 worker 试图获取对象的值,则会引发应用程序级异常。

在更高层次上,Ray 保证如果所有者仍然活着,将尝试恢复对象。如果恢复失败,所有者将在异常中填写原因。如果对象的所有者已经死亡,任何试图获取该值的 worker 都会收到一个关于所有者死亡的异常,即使对象副本仍然存在于集群中。

小对象

小对象存储在所有者的进程中对象存储中,因此如果所有者死亡,小对象将丢失。任何试图在未来获取对象值的 worker 都将收到所有者已死亡的异常,并将错误存储在本地进程内对象存储中。

大对象和谱系重建

如果不存在其它副本,Ray 将尝试通过恢复对象。这是指通过重新执行创建对象的任务来恢复丢失的对象。如果任务的依赖关系也丢失,或者以前由于垃圾收集而被逐出,那么这些对象将被递归地重建。

谱系重建通过在每个对象旁边保持额外的“谱系引用计数”来工作。这是指依赖于对象本身可能被重新执行的任务数。如果任务或下游任务返回的任何对象仍在范围内,则可以重新执行任务。一旦谱系引用计数达到 0,Ray 将垃圾回收创建对象的 task specification。请注意,这是一个独立于对象值的垃圾收集机制:如果对象的直接引用计数达到 0,则即使其谱系计数保持在范围内,其值也将从 Ray 的对象存储中进行垃圾收集。

请注意,谱系重建可能会导致比通常更高的 Driver 内存使用率。如果总大小超过系统范围阈值(默认值为1GB),每个 Ray 工作程序将尝试回收其本地缓存的谱系计数。

谱系重建目前有以下限制。如果应用程序不满足这些要求,那么它将收到一个重建失败的异常:

  • 对象及其任何传递依赖项必须由任务(actor or non-actor)生成。这意味着 ray.put 创建的对象不可恢复。请注意,由 “ray.put” 创建的对象始终与其所有者存储在同一节点上,所有者将最终与该节点共享;因此,在 “ray.put” 对象的主副本丢失的情况下,应用程序将收到一个通用的 “OwnerDiedError”。
  • 任务被假定为确定性和幂等性的。因此,默认情况下,由 Actor 任务创建的对象是不可重构的。如果用户将参与者的 “max_task_retrys” 和 “max_restarts” 设置为非零值,则可以作为谱系的一部分重新执行参与者任务。
  • 任务将仅重新执行其最大重试次数。默认情况下非 Actor 最多重试 3 次,Actor 不能重试。你可以通过 “max_retrys” 和 “max_task_retry” 参数修改。
  • 对象的所有者必须仍然活着。

如果存储在分布式内存中的对象的所有者丢失:在对象解析期间,raylet 将尝试定位对象的副本。同时,raylet 将定期与所有者联系,以检查所有者是否还活着。如果所有者已死亡,raylet 将存储一个系统级错误,该错误将在对象解析期间抛出给引用持有者。

任务管理

任务执行

任务调用方在从分布式调度程序请求资源之前等待创建所有任务参数可用。在许多情况下,任务的调用者也是任务参数的所有者。任务的调用方可能借用了任务参数,即它从所有者处收到了参数 “ObjectRef” 的反序列化副本。在这种情况下,任务调用方必须通过与参数所有者执行协议来确定参数是否已创建。借用进程将在反序列化 “ObjectRef” 时与所有者联系。一旦创建了对象,所有者就会做出响应,借用者会将对象标记为就绪。如果所有者失败,借用者也会将该对象标记为已准备好,因为对象的命运与所有者共享。

任务可以有三种类型的参数:plain values, inlined objects, 和 non-inlined objects.

plain values 不需要依赖关系解析。

inlined objects 是足够小的对象,可以存储在进程内存储中(默认阈值为100KB)。调用者可以将这些直接复制到 task specification 中。

non-inlined objects 是存储在分布式对象存储中的对象。其中包括大对象和已被所有者以外的进程借用的对象。在这种情况下,调用者将要求 raylet 在调度决策期间说明这些依赖关系。raylet 将等待这些对象成为其节点的本地对象,然后再给予依赖这个任务的 worker 。这确保了正在执行的工作程序在接收任务时不会阻塞(等待对象变为本地对象)。

资源调度实现

任务调用程序通过首先向请求的首选 raylet 发送资源请求来调度任务。可选择以下任一项:

  • 按数据位置:如果任务的对象参数存储在共享内存中,则调用方选择本地对象参数最多的节点。该信息通过调用方的本地对象目录检索,可能是过时的(例如,如果同时发生对象传输或逐出)。
  • 按节点关联:如果目标 raylet 使用了 NodeAffinitySchedulengStrategy 指定。
  • 默认情况下使用本地的 raylet。

首选 raylet 对请求进行排队,如果它要给予资源,则使用当前租给调用者的本地 worker 的地址来响应调用者。只要主动请求方和租用的 worker 还活着,租约就保持活动状态,并且 raylet 确保在租约处于活动状态时,其他客户端都不能使用该 worker。为了确保公平性,如果没有剩余的任务或已经过了足够的时间(例如,几百毫秒),调用方将返回空闲的工作程序。

请求方可以将任意数量的任务调度到租用的 worker 上,只要这些任务与授权的资源请求兼容即可。因此,租用可以被认为是一种以避免与调度器进行类似调度请求的通信优化方案。调度请求可以重用租用的工作人员,如果它具有以下相同的条件:

  • 资源规模,如 CPU:1。
  • 共享内存任务参数,因为这些参数必须在任务执行之前在节点上设置为本地。注意,小任务参数不需要匹配,因为这些参数被内联到任务参数中。此外,在数据结构内部传递的 ObjectRef 不需要匹配,因为 Ray 不会在任务开始之前将这些 ObjectRef 设置为本地。
  • 运行时环境,因为租用的工作程序在此环境中启动。

调用方可以持有多个 worker 租约以提高并行性。worker 租约在多个任务之间缓存,可以以减少调度程序的负载。

如果首选 raylet 选择不在本地给予资源,它还可以使用远程 raylet 的地址来响应调用者,调用者应该在该地址重试资源请求。这就是所谓的溢出调度。远程 raylet 可以根据其本地资源的当前可用性同意或拒绝资源请求。如果资源请求被拒绝,则调用者将再次从首选 raylet 请求,并且重复相同的过程,直到某个 raylet 授予资源请求。

资源管理和调度

Ray 中的资源是一个键-值对,其中键表示资源名称,值是一个浮点数。为了方便起见,Ray 调度器具有对C PU、GPU 和内存资源类型默认支持。Ray 的资源是逻辑上的资源,不需要与物理资源进行 1 对 1 映射,默认情况下,Ray将每个节点上的逻辑资源数量设置为 Ray 自动检测到的物理数量。

用户还可以使用自定义资源需求,例如,指定资源需求{“custom_resource”:0.01}。可以在启动时向节点添加自定义资源。

分布式调度程序会尝试匹配集群中合适的资源,如一个任务要求 {“CPU”:1.0,“GPU”:1.0,那么此任务只能在 CPU >=1 和 GPU >=1 的节点上调度。默认每个@ray.remote 函数一定会需要 1 个 CPU 来运行。对于 actor 来说,默认是 0 个 CPU。这样一来,单个节点可以承载比其核心更多的 actor,从而将 CPU 尽可能留给操作系统。

有一些资源需要特殊处理:

  • CPU、GPU 和“内存”的数量在 Ray 启动期间自动检测。
  • 将 GPU 资源分配给任务将自动设置到 worker 的 CUDA_VISIBLE_DEVICES 系统变量,通过 ID 的方式限制使用的 GPU。

注意,因为资源请求是逻辑上的,所以 Ray 不会强制执行物理资源限制。用户可以指定准确的资源需求,例如,为具有 n 个线程的任务指定 “num_cpus=n” 。

分布式调度

资源统计

每个 raylet 跟踪所在节点的资源。当授予资源请求时,raylet 会相应地减少可用的本地资源。一旦资源被释放(或请求者死亡),raylet 就会相应地增加本地资源的可用性。因此,raylet 始终具有本地资源可用性的高度一致的视图。

每个 raylet 还从 GCS 接收关于集群中其他节点上资源可用性的信息。这用于分布式调度,例如,跨集群中的节点进行负载平衡。为了减少收集和传播的开销,这些信息是最终一致性的,可能会存在过时的问题。信息通过定期广播发送。GCS定期(默认情况下为 100ms)从每个 Ray 节点获取可用的资源,然后将其聚合并广播回每个 Ray 节点。

调度状态机

当 raylet 接收到资源请求(即 RequestWorkerLease PRC)时,它将通过上述状态机并以下面三种状态之一结束:

  • Granted:客户端现在可以使用 被授权的资源和 worker 来执行任务或 Actor。
  • Reschedule:根据当前节点对集群的观察,有一个比当前节点更好的节点。客户应重新安排请求。有两种可能性触发这个情况:
  • 如果当前节点是客户端的首选 raylet(即客户端联系的第一个 raylet),则这是一个 spillback 请求。客户端应在第一个 raylet 指定的 raylet 处重试请求。
  • 否则,当前节点是客户端首选 raylet 选择的节点。客户端应该再次在首选的 raylet 上重试请求。
  • Canceled:无法调度运行所需的资源。
  • 被选中的已经失效。
  • 无法为任务创建合适的运行环境,无法启动工作京城。

调度策略

Ray 有几个调度策略来控制在哪里运行任务或参与者。当提交任务或参与者时,用户可以选择指定要使用的调度策略/策略。

默认混合策略

当没有指定其它策略时,这是默认策略。此策略首先尝试将任务打包到本地节点,直到节点的关键资源利用率超过配置的阈值(默认情况下为 50%)。关键资源利用率是该节点上任何资源的最大利用率,例如,如果节点使用 8/10 个 CPU 和 70/100GB RAM,则其关键资源利用是 80%。

在本地节点上超过阈值后,策略将任务打包到第一个远程节点(按节点 id 排序),然后打包到第二个远程节点,依此类推,直到所有节点上的关键资源利用率超过阈值。之后,它将选择关键资源利用率最低的节点。

该策略的目的是在 bin-packing 和负载均衡之间实现平衡。当节点处于临界资源利用率时,策略倾向于 bin-packing 。按节点 ID 排序可确保所有节点在 bin-packing 时使用相同的顺序。当节点超过临界资源利用率时,策略支持负载平衡,选择负载最少的节点。

差价策略

此策略是循环在具有可用资源的节点之间分配任务。这个循环是本地的,并不是全局的。

节点相关性策略

使用此策略,用户可以明确指定任务或 Actor 应运行的目标节点。如果目标节点处于活动状态,则任务或 Actor 仅在那里运行。如果目标节点已死亡,则看是否能被调度到其它节点,不行的话就不会调度。

数据位置策略

Ray 通过让每个任务调用方基于调用方关于任务参数位置的本地信息选择首选 raylet。raylets 实现的单独的调度策略不考虑数据位置,这是为了避免向 raylet 添加用于发现哪些任务参数存储在哪些其他节点上的额外的 RPC 和复杂性。

预占用组策略

此策略将任务或 Actor 运行在指定的预占用组。

预占用组

Ray 支持预占用组这个功能,从多个节点中自动保留一组资源。它可以用于普通任务或者 Actor 的调度。通常用于 gang - scheduling actors

由于资源组可能涉及跨多个节点的资源,Ray 使用跨 raylets 来确保原子性。该协议由GCS协调。如果有任何 raylet 在协议执行过程中死亡,预占用组的创建将回滚,GCS 将再次对请求进行排队。如果 GCS 请求失效,且 GCS 容错功能启用,则重启后会 ping 所有参与者以重新启动协议。

与 Ray 中的其它东西不同,预占用组没有引用计数,由创建它们的 worker 和独立的 Actor 所有。在所有者死亡时自动销毁。你也可以显式销毁,销毁后使用这些保留的资源的任务和 Actor 都会被杀死,资源被释放。

当一个预占用组被创建时,它会请求保留了多个节点的资源。当其中一个节点故障时,确缺失的资源会重新被安排,优于其它还没分配完成的预占用组。在这些缺失的资源包被重新创建之前,该预占用组仍然处于部分分配状态。

Actor 管理

Actor 创建

当在 Python 中创建 Actor 时,创建工作人员首先向 GCS 注册 Actor 。对于独立 Actor ,注册是以同步的方式进行的,以避免同名 Actor 注册。对于非独立的 Actor(默认), 使用异步注册。

注册后,一旦解决了 Actor 创建任务的所有输入依赖关系,创建者就将 task specification 发送给 GCS 服务。然后,GCS 服务通过与正常任务相同的分布式调度协议来调度 Actor 创建任务,就好像 GCS 是 Actor 创建任务的调用者一样。

Actor handle 的原始创建者可以在 Actor handle 上提交任务,甚至在 GCS 安排 Actor 创建任务之前将其作为参数传递给其它任务/ Actor。

异步注册时,在 Actor 向 GCS 注册之前,创建者不会将 Actor handle 传递给其它任务/ Actor。这是为了防止创建者在注册完成之前死亡;通过阻止任务提交,我们可以确保引用 Actor 的其它 worker 可以发现注册失败。在这种情况下,任务提交仍然是异步的,因为创建者只是缓冲远程任务,直到 Actor 注册完成。

一旦创建 Actor 完成,GCS 将通过 pub-sub 通知任何拥有这个 Actor 的 handle 的 worker。每个 handle 缓存新创建的 Actor 的运行时元数据(例如 RPC 地址)。

然后,在 Actor handle 上提交的任何未处理的任务都可以发送给 Actor 执行。

与任务定义类似,Actor 定义通过 GCS 下载到 worker 上。

Actor 执行

Actor 可以有无限数量的调用者。一个 Actor handle 表示单个调用者,它包含其引用的Actor 的 RPC 地址。需要调用的 worker 将连接到这个地址并且提交任务。

一旦创建, Actor 任务就转换为对 Actor 进程的直接 gRPC 调用。一个 Actor 可以处理多个并发调用,尽管这里只显示了一个。

Actor 死亡

Actor 可以使独立或者非独立的,默认是非独立的。当所有 handle 超出范围或执行结束时,Ray 会自动垃圾回收它们。独立的 Actor 的生命周期与他们的原始创建者无关,一旦不再需要他们,应用程序必须手动删除他们。

对于非独立的 Actor,当 Actor 的所有等待的任务都已完成,所有的 Actor handle 都已超出范围(通过引用计数进行跟踪)时,Actor 的原始创建者通知 GCS 服务。GCS 服务然后向参与者发送 KillActor RPC 杀死 Actor。

如果 GCS 检测到创建者已退出(通过心跳),GCS 也会终止 Actor。在这个 Actor 上提交的所有未处理的任务和后续任务都将失败,并出现 RayActorError。

Actor 也可能在运行时意外崩溃,默认情况下,提交给崩溃了的 Actor 的所有任务都将失败,并出现 RayActorError,就像 Actor 正常退出一样。

Ray 还提供了一个(max_restarts)来自动重新启动 Actor,可以指定最多重启次数。如果启用了此选项,并且 Actor 的所有者仍然活着,GCS 服务将尝试通过重新提交其创建任务来重新启动崩溃的 Actor。所有具有 Actor handle 的客户端都会将任何未处理的任务缓存到 Actor,直到 Actor 重新启动。如果 Actor 无法重新启动或已达到最大重启次数,则客户端将使所有等待任务失败。

第二个选项(max_task_retrys)可用于在 Actor 重新启动后自动重试失败的 Actor 任务。这对于幂等任务和用户不需要自定义处理 RayActorError 的情况非常有用。

全局控制服务(Global Control Service,GCS)

全局控制服务,也称为GCS,是 Ray 的控制平台。它管理 Ray 集群,并充当协调 raylets 和发现其他节点进程的集中平台。GCS 还作为外部服务(如自动缩放器和仪表盘)与 Ray 集群通信的入口。GCS 目前是单线程的,心跳检查和资源轮询除外;

相关功能有:

  • 节点管理,管理集群节点的添加和删除,并广播给所有 raylet。
  • 资源管理,广播所有 raylet 并确认每个的资源可用性。
  • Actor 管理,处理 Actor 的创建和销毁请求、监测他们是否活跃,在出现问题时尝试重新创建。
  • 预占用组管理,处理 Ray 的预占用组的创建和删除。
  • 元数据存储,提供所有 worker 都能访问的键值存储,仅适用于小的元数据,任务和对象的元数据存储在拥有者的 worker 中。
  • worker 管理,处理 raylet 的故障报告。
  • 运行环境管理,用于管理运行环境包,包括包的使用次数和垃圾回收次数。

GCS 还提供了几个 gRPC 端点用来帮助获取当前 Ray 集群的状态,如 Actor
worker 、节点信息等。

GCS 默认使用简单的哈希 map 内存存储,可以将它放到 Redis 中。

节点管理

当 raylet 启动时会向 GCS 注册,GCS 会将 raylet 的信息存入存储中,注册成功后广播其它所有的 raylet 节点。

节点注册后,GCS 通过定期进行健康检查来监控 raylet 的活跃度。GCS 还获取 raylet 的资源情况,并将其广播给其它 raylet 节点。如果 raylet 检查失败,GCS 还会向集群广播 raylet 的死亡。一旦 raylet 接收到信息,它们就会清除相关的记录。

Raylets 还向 GCS 报告任何 worker 进程的死亡,以便将其广播给其它 Raylets。方便提前终止向该 worker 提交任务之类的场景。

资源管理

GCS 负责确保 raylet 拥有集群中资源使用情况的最新记录。如果记录不是最新的,raylet 可能会错误地将任务调度到另一个没有资源的节点运行任务。

默认情况下,GCS 将每 100ms 从注册的 raylet 中提取资源使用量。它还每100毫秒向所有 raylet 广播全局资源情况。

GCS 也是自动缩放器获取当前集群负载的入口点。自动缩放器使用它来分配或删除集群中的节点。

Actor 管理

GCS 在 Actor 管理中发挥着重要作用。所有 Actor 都需要先在 GCS 登记,然后才能调度。GCS 也是独立 Actor 的所有者。

预占用组管理

GCS 还管理预占用组的生命周期。GCS 通过两阶段提交协议来创建预占用组。

元数据存储

  • 集群仪表盘地址。
  • 远程功能的定义,在开发项目中定义远程运行的函数时,Ray 会检查有没有注册过,没有的话会添加。被指派的 worker 会从 GCS 中获取定义。
  • 运行环境的数据,默认情况下运行环境目录存储在 GCS 中,GCS 通过计算独立 Actor 和 job 使用情况来进行垃圾回收。
  • 一些 Ray 的其它组件的数据也会存储在这,如 Ray Serve 会将部署的元数据存储在这。

容错性

GCS 是 Ray 中非常关键的组件,故障的话整个集群都会故障。

在 2.0 中 GCS 在故障中会尝试恢复,恢复之前可能集群工作会不正常。

默认情况下,GCS 将所有数据存储到内存存储中,一旦发生故障,该存储将丢失。为了使 GCS 能够容错,它必须将数据写入持久存储。Ray 支持 Redis 作为外部存储系统。为了支持GCS容错,GCS 应该有一个高可用 Redis 实例作为支持。然后,当 GCS 重新启动时,它首先从 Redis 存储中加载信息,包括发生故障时集群中运行的所有 Raylet、actor 和预占用组。然后,GCS 将恢复健康检查和资源管理等常规功能。

GCS 故障时下列功能都无法使用:

  • Actor 的管理。
  • 预占用组的管理。
  • 资源管理。
  • raylet 管理。
  • worker 的管理。

由于这些组件不需要读取或写入GCS,因此任何正在运行的 Ray 的任务和 Actor 都将保持活动状态。同样,任何现有对象都将继续可用。

1 Like

Part1 见:【干货】Ray2.0架构-中文翻译版(Part1)

作者介绍:大家好!我是 Andy.Qin,一个想创造哆啦 A 梦的 Maker,连续创业者。我最熟悉的领域是分布式应用开发、高并发架构设计,其次是机器学习、自然语言处理和理解方向。我对开源社区和开源项目的建设也有极大的热忱,期望能与大家多多交流讨论!

本次利用业务时间,花了一个月翻译了将近60多页的Ray2.0架构白皮书,期望对大家能够有所帮助!大家有任何问题或者疑问,欢迎留言与我交流!
更多好文见: https://qin.news

目前也在招聘中:【可话】招聘高级后端 | 大数据工程师