分布式计算框架Ray介绍

分布式计算框架Ray介绍

Ray(https://github.com/ray-project/ray)是一个简单、通用的分布式计算框架。Ray项目最初由加州大学伯克利分校RISELab发起并开源。在过去的几年中,Ray项目发展迅速,在蚂蚁集团、Intel、微软、AWS等公司被应用于构建各种AI、大数据系统。本文基于即将发布的Ray1.0版本,介绍Ray的基础核心概念,以及如何使用Ray的Python和Java API来构建分布式应用。

首届Ray Summit将于2020年9月30日在线上举办,扫描图中二维码即可免费注册

Ray能解决什么问题

当我们要构建一个涉及大规模数据处理或者复杂计算的应用,传统的方式是使用现成的大数据框架,例如 Apache Flink 和 Apache Spark。这些系统提供的API通常基于某种特定的计算范式(例如DataStream、DataSet),要求用户基于这些特定的计算范式实现应用逻辑。对于传统的数据清洗、数据分析等应用,这种用法能够很好地适用。但是,随着分布式应用的逻辑越来越复杂(例如分布式机器学习应用),许多应用的逻辑并不能直接套用现有的计算范式。在这种情况下,开发者如果想要细粒度地控制系统中的任务流,就需要自己从头编写一个分布式应用。

但是现实中,开发一个分布式应用并不简单。除了应用本身的代码逻辑,我们还需要处理许多分布式系统中常见的难题,例如:分布式组件通信、服务部署、服务发现、监控、异常恢复等。处理这些问题,通常需要开发者在分布式系统领域有较深的经验,否则很难保证系统的性能和健壮性。

为了简化分布式编程,Ray提供了一套简单、通用的分布式编程API,屏蔽了分布式系统中的这些常见的难题,让开发者能够使用像开发单机程序一样简单的方式,开发分布式系统。Ray的API基于两个核心的概念:TaskActor

Ray API介绍

Ray 1.0版本目前支持Python和Java两种编程语言。接下来,让我们看看如何使用Ray的Python API和Java API来构建分布式应用。

注:Java API是Ray 1.0的新feature,由蚂蚁集团开发并贡献给开源社区。

本文示例代码GitHub仓库:https://github.com/raulchen/ray-intro-cn。

Task

Ray Task可以类比于单机程序中的函数。在Ray中,Task表示一个无状态的计算单元。如下面两个代码片段所示,Ray可以把一个任意的Python函数或Java静态方法转换成一个Task。在这个过程中,Ray会在一个远程进程中执行这个函数。并且这个过程是异步的,这意味着我们可以并行地执行多个Task。

Ray Task Python示例:

# `square`是一个普通的Python函数,`@ray.remote`装饰器表示我们可以

# 把这个函数转化成Ray task.

@ray.remote

def square(x):

    return x * x

obj_refs = []

# `square.remote` 会异步地远程执行`square`函数。

# 通过下面两行代码,我们并发地执行了5个Ray task。

# `square.remote`的返回值是一个`ObjectRef`对象,

# 表示Task执行结果的引用。

for i in range(5):

    obj_refs.append(square.remote(i))

# 实际的task执行结果存放在Ray的分布式object store里,

# 我们可以通过`ray.get`接口,同步地获取这些数据。

assert ray.get(obj_refs) == [0, 1, 4, 9, 16]

Ray Task Java示例:

public class RayDemo {

  public static int square(int x) {

    return x * x;

  }

}

List<ObjectRef<Integer>> objectRefs = new ArrayList<>();

// 通过`Ray.task(...).remote()`,我们可以把任意一个Java静态函数转化成Ray task,

// 异步地远程执行这个函数。通过下面两行代码,我们并发地执行了5个Ray task。

// `remote()`的返回值是一个`ObjectRef`对象,表示Task执行结果的引用。

for (int i = 0; i < 5; i++) {

  objectRefs.add(Ray.task(RayDemo::square, i).remote());

}

// 实际的task执行结果存放在Ray的分布式object store里,

// 我们可以通过`Ray.get`接口,同步地获取这些数据。

Assert.assertEquals(Ray.get(objectRefs), Arrays.asList(0, 1, 4, 9, 16));


Actor

Actor可以类比于单机程序中的类。Ray使用Actor来表示一个有状态的计算单元。在Ray中,我们可以基于任意一个Python class或Java class创建Actor对象。这个Actor对象运行在一个远程的Python或者Java进程中。同时,我们可以通过ActorHandle远程调用这个Actor的任意一个方法(每次调用称为一个Actor Task),多个Actor Task在Actor进程中会顺序执行,并共享Actor的状态。

Ray Actor Python示例:

# `Counter`是一个普通的Python类,`@ray.remote`装饰器表示我们可以

# 把这个类转化成Ray actor.

@ray.remote

class Counter(object):

    def __init__(self):

        self.value = 0

    def increment(self):

        self.value += 1

    def get_value(self):

        return self.value

# `Counter.remote`会基于`Counter`类创建一个actor对象,

# 这个actor对象会运行在一个远程的Python进程中。

counter = Counter.remote()

# `Counter.remote`的返回值是一个`ActorHandle`对象。

# 通过`ActorHandle`,我们可以远程调用Actor的任意一个方法(actor task)。

[counter.increment.remote() for _ in range(5)]

# Actor task的返回值也是一个`ObjectRef`对象。

# 同样地,我们通过`ray.get`获取实际的数据。

assert ray.get(counter.get_value.remote()) == 5

Ray Actor Java示例:

public static class Counter {

  private int value = 0;

  public void increment() {

    this.value += 1;

  }

  public int getValue() {

    return this.value;

  }

}


// 通过`Ray.actor(...).remote`接口,我们可以基于任意一个Java class创建一个Ray actor.

// 这个actor对象会运行在一个远程的Java进程中。

// 通过这个接口,我们得到一个`ActorHandle`对象。

ActorHandle<Counter> counter = Ray.actor(Counter::new).remote();

// 通过`ActorHandle`,我们可以远程调用Actor的任意一个方法(actor task)。

for (int i = 0; i < 5; i++) {

    counter.task(Counter::increment).remote();

}

// Actor task的返回值也是一个`ObjectRef`对象。

// 我们可以通过`ObjectRef::get`获取单个object的实际的数据。

ObjectRef<Integer> objectRef = counter.task(Counter::getValue).remote();

Assert.assertEquals((int) objectRef.get(), 5);

Object Store

Obect Store是Ray架构中的一个关键组件,Task计算的中间结果会存放在分布式Object Store中。除此之外,我们也可以使用put接口,显式地把一个Python或Java对象存放到Object Store中。

Ray Object Store Python示例:

# 显式地把一个对象放入object store。

obj_ref = ray.put(1)

assert ray.get(obj_ref) == 1

Ray Object Store Java示例:

// 显式地把一个对象放入object store。
ObjectRef<Integer> objectRef = Ray.put(1);

Assert.assertEquals((int) objectRef.get(), 1);

注:Ray的分布式Object Store包含多级缓存,临时对象只会存在于worker进程内的缓存中,从而优化性能。

构建复杂的分布式任务流

在Ray中,我们可以把一个Task输出的ObjectRef传递给另一个Task(包括Actor task)。在这种情况下,Ray会等待第一个Task执行结束之后,再开始执行第二个Task。同时,我们也可以把一个ActorHandle传递给一个Task,从而实现在多个远程Worker中同时远程调用一个Actor。通过这些方式,我们可以动态地、灵活地构建各种各样复杂的分布式任务流。

分布式任务流Python示例:

# 通过把一个task输出的`ObjectRef`传递给另一个task,

# 我们定义了两个task的依赖关系。

# Ray会等待第一个task执行结束之后,再开始执行第二个task。

obj1 = square.remote(2)

obj2 = square.remote(obj1)

assert ray.get(obj2) == 16

# 我们也可以把一个`ActorHandle`传递给一个task,

# 从而实现在多个远程worker中同时远程调用一个actor。

@ray.remote

def call_actor_in_worker(counter):

    counter.increment.remote()

counter = Counter.remote()

# 创建5个task,同时调用counter actor的increment方法,

# 并等待这五个task执行结束。

ray.get([call_actor_in_worker.remote(counter) for _ in range(5)])

assert ray.get(counter.get_value.remote()) == 5

分布式任务流Java示例:

// 通过把一个task输出的`ObjectRef`传递给另一个task,我们定义了两个task的依赖关系。

// Ray会等待第一个task执行结束之后,再开始执行第二个task。

ObjectRef<Integer> objRef1 = Ray.task(RayDemo::square, 2).remote();
ObjectRef<Integer> objRef2 = Ray.task(RayDemo::square, objRef1).remote();

Assert.assertEquals((int) objRef2.get(), 16);

public class RayDemo {

  public static int callActorInWorker(ActorHandle<Counter> counter) {

    counter.task(Counter::increment).remote();

    return 1;

  }

}

// 我们也可以把一个`ActorHandle`传递给一个task,

// 从而实现在多个远程worker中同时远程调用一个actor。

ActorHandle<Counter> counter = Ray.actor(Counter::new).remote();

List<ObjectRef<Integer>> objRefs = new ArrayList<>();

// 创建5个task,同时调用counter actor的increment方法。

for (int i = 0; i < 5; i++) {

    objRefs.add(Ray.task(RayDemo::callActorInWorker, counter).remote());

}

// 等待这五个task执行结束。

Ray.get(objRefs);

Assert.assertEquals((int) counter.task(Counter::getValue).remote().get(), 5);

更多高级功能

除了Task和Actor两个基本概念,Ray还提供了一系列高级功能,来帮助开发者更容易地开发分布式应用。这些高级功能包括但不限于:设置Task和Actor所需的资源、Actor生命周期管理、Actor自动故障恢复、自定义调度策略、Python/Java跨语言编程。我们将在之后的文章中详细介绍这些功能。

Ray的开源生态

在Ray的Python package中,除了Ray框架本身,目前还包括多个基于Ray开发的Python机器学习库:

  • Tune:分布式超参搜索;
  • RLlib:分布式强化学习;
  • RaySGD:分布式模型训练;
  • Ray Serve:分布式模型服务;

除此之外,还有多个第三方开源框架也集成到了Ray的生态中,例如Modin、Dask、Mars等,详细列表请查看Ray的官方文档:https://docs.ray.io/en/master/ray-libraries.html。

总结

本文介绍了Ray如何通过Task和Actor两个基础核心概念,给开发者提供一套简单、通用的分布式编程API,从而屏蔽分布式系统中常见的难题,让开发者能够使用像开发单机程序一样简单的方式,开发分布式应用。

如果想了解更多关于Ray的信息,请访问Ray官方GitHub:https://github.com/ray-project/ray。

原文由 陈昊于2021年10月12日发表于Ray中文社区公众号,查看原文

1 Like

您好,ray是否支持win11呢