分布式计算框架Ray介绍
首届Ray Summit将于2020年9月30日在线上举办,扫描图中二维码即可免费注册
Ray能解决什么问题
当我们要构建一个涉及大规模数据处理或者复杂计算的应用,传统的方式是使用现成的大数据框架,例如 Apache Flink 和 Apache Spark。这些系统提供的API通常基于某种特定的计算范式(例如DataStream、DataSet),要求用户基于这些特定的计算范式实现应用逻辑。对于传统的数据清洗、数据分析等应用,这种用法能够很好地适用。但是,随着分布式应用的逻辑越来越复杂(例如分布式机器学习应用),许多应用的逻辑并不能直接套用现有的计算范式。在这种情况下,开发者如果想要细粒度地控制系统中的任务流,就需要自己从头编写一个分布式应用。
但是现实中,开发一个分布式应用并不简单。除了应用本身的代码逻辑,我们还需要处理许多分布式系统中常见的难题,例如:分布式组件通信、服务部署、服务发现、监控、异常恢复等。处理这些问题,通常需要开发者在分布式系统领域有较深的经验,否则很难保证系统的性能和健壮性。
为了简化分布式编程,Ray提供了一套简单、通用的分布式编程API,屏蔽了分布式系统中的这些常见的难题,让开发者能够使用像开发单机程序一样简单的方式,开发分布式系统。Ray的API基于两个核心的概念:Task和Actor。
Ray API介绍
Ray 1.0版本目前支持Python和Java两种编程语言。接下来,让我们看看如何使用Ray的Python API和Java API来构建分布式应用。
注:Java API是Ray 1.0的新feature,由蚂蚁集团开发并贡献给开源社区。
本文示例代码GitHub仓库:https://github.com/raulchen/ray-intro-cn。
Task
Ray Task可以类比于单机程序中的函数。在Ray中,Task表示一个无状态的计算单元。如下面两个代码片段所示,Ray可以把一个任意的Python函数或Java静态方法转换成一个Task。在这个过程中,Ray会在一个远程进程中执行这个函数。并且这个过程是异步的,这意味着我们可以并行地执行多个Task。
Ray Task Python示例:
# `square`是一个普通的Python函数,`@ray.remote`装饰器表示我们可以
# 把这个函数转化成Ray task.
@ray.remote
def square(x):
return x * x
obj_refs = []
# `square.remote` 会异步地远程执行`square`函数。
# 通过下面两行代码,我们并发地执行了5个Ray task。
# `square.remote`的返回值是一个`ObjectRef`对象,
# 表示Task执行结果的引用。
for i in range(5):
obj_refs.append(square.remote(i))
# 实际的task执行结果存放在Ray的分布式object store里,
# 我们可以通过`ray.get`接口,同步地获取这些数据。
assert ray.get(obj_refs) == [0, 1, 4, 9, 16]
Ray Task Java示例:
public class RayDemo {
public static int square(int x) {
return x * x;
}
}
List<ObjectRef<Integer>> objectRefs = new ArrayList<>();
// 通过`Ray.task(...).remote()`,我们可以把任意一个Java静态函数转化成Ray task,
// 异步地远程执行这个函数。通过下面两行代码,我们并发地执行了5个Ray task。
// `remote()`的返回值是一个`ObjectRef`对象,表示Task执行结果的引用。
for (int i = 0; i < 5; i++) {
objectRefs.add(Ray.task(RayDemo::square, i).remote());
}
// 实际的task执行结果存放在Ray的分布式object store里,
// 我们可以通过`Ray.get`接口,同步地获取这些数据。
Assert.assertEquals(Ray.get(objectRefs), Arrays.asList(0, 1, 4, 9, 16));
Actor
Actor可以类比于单机程序中的类。Ray使用Actor来表示一个有状态的计算单元。在Ray中,我们可以基于任意一个Python class或Java class创建Actor对象。这个Actor对象运行在一个远程的Python或者Java进程中。同时,我们可以通过ActorHandle远程调用这个Actor的任意一个方法(每次调用称为一个Actor Task),多个Actor Task在Actor进程中会顺序执行,并共享Actor的状态。
Ray Actor Python示例:
# `Counter`是一个普通的Python类,`@ray.remote`装饰器表示我们可以
# 把这个类转化成Ray actor.
@ray.remote
class Counter(object):
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
def get_value(self):
return self.value
# `Counter.remote`会基于`Counter`类创建一个actor对象,
# 这个actor对象会运行在一个远程的Python进程中。
counter = Counter.remote()
# `Counter.remote`的返回值是一个`ActorHandle`对象。
# 通过`ActorHandle`,我们可以远程调用Actor的任意一个方法(actor task)。
[counter.increment.remote() for _ in range(5)]
# Actor task的返回值也是一个`ObjectRef`对象。
# 同样地,我们通过`ray.get`获取实际的数据。
assert ray.get(counter.get_value.remote()) == 5
Ray Actor Java示例:
public static class Counter {
private int value = 0;
public void increment() {
this.value += 1;
}
public int getValue() {
return this.value;
}
}
// 通过`Ray.actor(...).remote`接口,我们可以基于任意一个Java class创建一个Ray actor.
// 这个actor对象会运行在一个远程的Java进程中。
// 通过这个接口,我们得到一个`ActorHandle`对象。
ActorHandle<Counter> counter = Ray.actor(Counter::new).remote();
// 通过`ActorHandle`,我们可以远程调用Actor的任意一个方法(actor task)。
for (int i = 0; i < 5; i++) {
counter.task(Counter::increment).remote();
}
// Actor task的返回值也是一个`ObjectRef`对象。
// 我们可以通过`ObjectRef::get`获取单个object的实际的数据。
ObjectRef<Integer> objectRef = counter.task(Counter::getValue).remote();
Assert.assertEquals((int) objectRef.get(), 5);
Object Store
Obect Store是Ray架构中的一个关键组件,Task计算的中间结果会存放在分布式Object Store中。除此之外,我们也可以使用put接口,显式地把一个Python或Java对象存放到Object Store中。
Ray Object Store Python示例:
# 显式地把一个对象放入object store。
obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1
Ray Object Store Java示例:
// 显式地把一个对象放入object store。
ObjectRef<Integer> objectRef = Ray.put(1);
Assert.assertEquals((int) objectRef.get(), 1);
注:Ray的分布式Object Store包含多级缓存,临时对象只会存在于worker进程内的缓存中,从而优化性能。
构建复杂的分布式任务流
在Ray中,我们可以把一个Task输出的ObjectRef传递给另一个Task(包括Actor task)。在这种情况下,Ray会等待第一个Task执行结束之后,再开始执行第二个Task。同时,我们也可以把一个ActorHandle传递给一个Task,从而实现在多个远程Worker中同时远程调用一个Actor。通过这些方式,我们可以动态地、灵活地构建各种各样复杂的分布式任务流。
分布式任务流Python示例:
# 通过把一个task输出的`ObjectRef`传递给另一个task,
# 我们定义了两个task的依赖关系。
# Ray会等待第一个task执行结束之后,再开始执行第二个task。
obj1 = square.remote(2)
obj2 = square.remote(obj1)
assert ray.get(obj2) == 16
# 我们也可以把一个`ActorHandle`传递给一个task,
# 从而实现在多个远程worker中同时远程调用一个actor。
@ray.remote
def call_actor_in_worker(counter):
counter.increment.remote()
counter = Counter.remote()
# 创建5个task,同时调用counter actor的increment方法,
# 并等待这五个task执行结束。
ray.get([call_actor_in_worker.remote(counter) for _ in range(5)])
assert ray.get(counter.get_value.remote()) == 5
分布式任务流Java示例:
// 通过把一个task输出的`ObjectRef`传递给另一个task,我们定义了两个task的依赖关系。
// Ray会等待第一个task执行结束之后,再开始执行第二个task。
ObjectRef<Integer> objRef1 = Ray.task(RayDemo::square, 2).remote();
ObjectRef<Integer> objRef2 = Ray.task(RayDemo::square, objRef1).remote();
Assert.assertEquals((int) objRef2.get(), 16);
public class RayDemo {
public static int callActorInWorker(ActorHandle<Counter> counter) {
counter.task(Counter::increment).remote();
return 1;
}
}
// 我们也可以把一个`ActorHandle`传递给一个task,
// 从而实现在多个远程worker中同时远程调用一个actor。
ActorHandle<Counter> counter = Ray.actor(Counter::new).remote();
List<ObjectRef<Integer>> objRefs = new ArrayList<>();
// 创建5个task,同时调用counter actor的increment方法。
for (int i = 0; i < 5; i++) {
objRefs.add(Ray.task(RayDemo::callActorInWorker, counter).remote());
}
// 等待这五个task执行结束。
Ray.get(objRefs);
Assert.assertEquals((int) counter.task(Counter::getValue).remote().get(), 5);
更多高级功能
除了Task和Actor两个基本概念,Ray还提供了一系列高级功能,来帮助开发者更容易地开发分布式应用。这些高级功能包括但不限于:设置Task和Actor所需的资源、Actor生命周期管理、Actor自动故障恢复、自定义调度策略、Python/Java跨语言编程。我们将在之后的文章中详细介绍这些功能。
Ray的开源生态
在Ray的Python package中,除了Ray框架本身,目前还包括多个基于Ray开发的Python机器学习库:
- Tune:分布式超参搜索;
- RLlib:分布式强化学习;
- RaySGD:分布式模型训练;
- Ray Serve:分布式模型服务;
除此之外,还有多个第三方开源框架也集成到了Ray的生态中,例如Modin、Dask、Mars等,详细列表请查看Ray的官方文档:https://docs.ray.io/en/master/ray-libraries.html。
总结
本文介绍了Ray如何通过Task和Actor两个基础核心概念,给开发者提供一套简单、通用的分布式编程API,从而屏蔽分布式系统中常见的难题,让开发者能够使用像开发单机程序一样简单的方式,开发分布式应用。
如果想了解更多关于Ray的信息,请访问Ray官方GitHub:https://github.com/ray-project/ray。
原文由 陈昊于2021年10月12日发表于Ray中文社区公众号,查看原文