引言
随着互联网数据的爆发式增长,互联网业务的发展速度已经明显高于计算机硬件的发展速度。在此背景下,我们可以看到仅靠单机系统能解决的问题越来越少,而越来越多的领域和应用场景需要构建分布式系统。C++作为native的编程语言,由于其高性能、轻量级的特点广泛应用于现代分布式系统中,如Tensorflow、Caffe、XGboost、Redis等都选择C/C++作为主要的编程语言。
相比单机应用,想要构建一个功能完善、高可用、可应用于生产环境的C++分布式系统并没有那么简单。通常我们需要做以下考虑:
- 解决通信问题。我们通过protobuf等序列化工具定义组件间的通信协议,然后通过RPC框架(或者socket)实现通信。这里面还需要考虑服务发现、同/异步IO和多路复用等问题,整个实现较为繁琐。
- 解决部署问题。我们需要找到满足特定资源规格的服务器,对不同组件的进程进行部署。这个过程可能需要对接不同的云平台实现资源调度。
- 解决故障恢复问题。监控任意一个节点的故障事件,重启并恢复系统状态。
看起来,整个实现中需要考虑的问题非常多,完成所有这些事情并不容易。那么有没有一个系统,能够帮你解决以上所有的分布式问题,让你能够专注在系统本身的逻辑上呢?随着Ray的到来,理想即将成为现实。
Ray是什么
整体介绍
Ray(https://github.com/ray-project/ray)是一个简单、通用的分布式计算框架。项目最初由加州大学伯克利分校RISELab发起并开源。在过去的几年中,Ray项目发展迅速,在蚂蚁集团、Intel、微软、AWS、Uber等公司被广泛应用于构建各种AI、大数据系统,并且已经形成了较为完善的生态圈(Tune、Rlib、Serve、分布式Scikit-learn、XGboost on Ray等)。相比现有的大数据计算系统(Spark、Flink等),Ray最本质的特点和不同点在于,Ray的设计没有基于某种特定的计算范式(例如DataStream、DataSet),而是回归编程本身,通过抽象编程中最基本的Function和Class等概念,构建了一套简单易用的分布式编程API。所以从系统层次的角度看,Ray的API更加底层,更加灵活。Ray不会限制你的应用场景,无论是批处理、流计算、图计算,还是机器学习、科学计算等,只要你的系统具有分布式特性,需要利用多机协同完成一个特定的任务,你就可以选择Ray帮你完成分布式系统的构建。
C++ API
Ray在创建初期只支持Python API,2018年中旬蚂蚁集团开源了Java API。本文介绍的C++ API是Ray上用户接口的第三种语言实现。有人会问,已经有了Python语言和Java语言的支持,为什么还要开发C++版本?我们开发C++ API的主要原因是,在某些高性能场景,Java和Python在系统调优之后仍然无法满足业务需求。除此之外,Ray底层内核和组件本身就是纯C++实现,使用C++ API会让用户层和内核层无缝衔接,整个系统无语言间调用开销。
下面简要介绍一下Ray C++ API中几个核心概念和使用方式。
Task
Ray中的Task对应单机编程中的function。通过Ray的Task API,我们可以很容易地把任意一个C++函数放到分布式集群中异步执行,大幅提高执行效率。
假设我们有一个耗时的heavy_compute函数,如果在单机环境中串行执行10000次,整体耗时很长:
int heavy_compute(int value) {
return value;
}
std::vector<int> results;
for(int i = 0; i < 10000; i++) {
results.push_back(heavy_compute(i));
}
利用Ray将单机的heavy_compute改造成分布式的heavy_compute:
// 声明heavy_compute为remote function
RAY_REMOTE(heavy_compute);
std::vector<ray::ObjectRef<int>> results;
for(int i = 0; i < 10000; i++) {
// 利用ray::Task调用远程函数,它们被Ray自动调度到集群的节点上实现分布式计算
results.push_back(ray::Task(heavy_compute).Remote(i));
}
// 获取分布式计算的结果
for(auto result : ray::Get(results)) {
std::cout<< *result << std::endl;
}
Actor
普通的Task是一种无状态的计算,如果想实现有状态的计算,需要使用Actor。
Actor对应单机编程中的Class。基于Ray强大的分布式能力,我们可以将以下的单机Counter改造成部署在远程节点的分布式Counter。
class Counter {
int count;
public:
Counter(int init) { count = init; }
int Add(int x) { return x + 1; }
};
Counter *CreateCounter(int init) {
return new Counter(init);
}
RAY_REMOTE(CreateCounter, &Counter::Add);
// 创建一个Counter actor
ActorHandle<Counter> actor = ray::Actor(CreateCounter).Remote(0);
// 调用actor的远程函数
auto result = actor.Task(&Counter::Add).Remote(1);
EXPECT_EQ(1, *(ray::Get(result)));
Object
在以上Task和Actor的例子中,我们注意到,最后都是使用“ray::Get”获取计算结果。这里绕不开的一个概念就是Object。每次调用“Remote”方法返回的是一个Object引用(ObjectRef),每个ObjectRef指向一个集群内唯一的远程Object。Ray的Object类似异步编程中常见的future概念。在Ray中,Object会存储在底层的分布式Object Store中(基于shared memory)。当你调用“ray::Get”方法获取一个Object时,会从远程节点拉取数据到当前节点,经过反序列化返回到你的程序中。
除了存储应用的中间计算结果,你也可以通过“ray::Put”创建一个Object。除此之外,你也可以通过“ray::Wait”接口等待一组Object的结果。
// 把一个object Put到object store中
auto obj_ref1 = ray::Put(100);
// 从object store获取数据
auto res1 = obj_ref1.Get();
//或者调用ray::Get(obj_ref1)
EXPECT_EQ(100, *res1);
// 等待多个object ready
auto obj_ref2 = ray::Put(200);
ray::Wait({obj_ref1, obj_ref2}, /*num_results=*/1, /*timeout_ms=*/1000);
从以上基础API可以看出,Ray已经解决了分布式系统组件间通信、存储和传输等问题。Ray还有一些高级功能用来解决分布式系统中的其他问题,例如调度、故障恢复、部署运维等,具体的将在下面的例子中进行介绍。
用Ray C++实现分布式存储系统
简单了解了Ray的定位和Ray C++ API后,让我们用一个实际的例子看一下如何利用Ray C++ API,构建一个简单的KV存储系统。
demo说明
在这个简单的KV存储系统中,有一个main server和一个backup server,仅main server提供服务,backup server则只用于备份数据,不提供服务。同时要求系统具备自动故障恢复能力,即任意一个server重启后数据不会丢失,能继续提供服务。
注 :这只是一个demo,不专注存储本身的逻辑和优化,目的是使用尽可能简单的代码来展示如何用Ray来快速开发一个分布式存储系统。完整代码请在文末点击“阅读原文”查看。
Server实现
用C++ API实现一个分布式存储比较简单,用户可以先按照单机版的KV store的思路去写server,稍后再利用Ray的分布式部署和调度能力将单机版的KV store变成一个分布式的KV store。
main server
class MainServer {
public:
MainServer();
std::pair<bool, std::string> Get(const std::string &key);
void Put(const std::string &key, const std::string &val);
private:
std::unordered_map<std::string, std::string> data_;
};
std::pair<bool, std::string> MainServer::Get(const std::string &key) {
auto it = data_.find(key);
if (it == data_.end()) {
return std::pair<bool, std::string>{};
}
return {true, it->second};
}
void MainServer::Put(const std::string &key, const std::string &val) {
// 先将数据同步到backup server
...
// 再更新本地kv
data_[key] = val;
}
可以看到分布式KV store的读数据(MainServer::Get)的实现和单机版的实现相比没有任何差异,用户不必关心分布式的细节,关注业务逻辑本身即可。Put时要注意先将数据同步写到backup server中再写本地,确保数据的一致性。
backup server
class BackupServer {
public:
BackupServer();
// 当main server重启时会调用GetAllData做数据恢复
std::unordered_map<std::string, std::string> GetAllData() {
return data_;
}
// 当main server写数据时会调用SyncData将数据先同步到buckup server
void SyncData(const std::string &key, const std::string &val) {
data_[key] = val;
}
private:
std::unordered_map<std::string, std::string> data_;
};
部署
集群部署
部署应用之前需要首先部署一个Ray集群。目前Ray已经支持在多个主流云平台进行一键部署,如AWS、Azure、GCP、Aliyun和Kubernetes环境等。如果你已经拥有了一个配置文件,可以在安装Ray之后通过命令行进行一键部署:
ray up -y config.yaml
具体如何配置可以参考官方文档。
另外一种选择,如果你拥有正在运行的服务器,也可以通过在各服务器上执行start命令手动组建Ray集群:
- 选择一台服务器作为主节点:
ray start --head
- 在其他服务器加入主节点进行组网:
ray start --address=${HRAD_ADDRESS}
actor部署
Ray集群部署之后,我们需要将前面创建的MainServer和BackupServer这两个actor实例部署到集群中,以提供分布式存储服务。用Ray创建Actor的API就可以很简单的实现actor部署。
static MainServer *CreateMainServer() { return new MainServer(); }
static BackupServer *CreateBackupServer() { return new BackupServer(); }
// 声明remote function
RAY_REMOTE(CreateMainServer, CreateBackupServer);
const std::string MAIN_SERVER_NAME = "main_actor";
const std::string BACKUP_SERVER_NAME = "backup_actor";
// 通过ray::Actor将actor的实例部署到Ray集群中
void StartServer() {
ray::Actor(CreateMainServer)
.SetName(MAIN_SERVER_NAME)
.Remote();
ray::Actor(CreateBackupServer)
.SetName(BACKUP_SERVER_NAME)
.Remote();
}
调度
设置资源
如果你对actor运行环境的硬件有特殊要求,还可以通过API设置actor所需要的资源,比如CPU,内存等资源。
// 所需资源cpu:1, 内存1G
const std::unordered_map<std::string, double> RESOUECES{
{"CPU", 1.0}, {"memory", 1024.0 * 1024.0 * 1024.0}};
// 通过ray::Actor将actor的实例部署到Ray集群中并设置资源需求
void StartServer() {
ray::Actor(CreateMainServer)
.SetName(MAIN_SERVER_NAME)
.SetResources(RESOUECES) //设置资源
.Remote();
ray::Actor(CreateBackupServer)
.SetName(BACKUP_SERVER_NAME)
.SetResources(RESOUECES) //设置资源
.Remote();
}
设置调度策略
我们希望将main server和backup server两个actor调度到不同的节点上,以保证某一个节点挂掉不会同时影响两个server。可以利用Ray的Placement Group实现这种特殊的调度功能。
Placement Group允许用户从集群中预置一部分资源供Task和Actor调度。
ray::PlacementGroup CreateSimplePlacementGroup(const std::string &name) {
// 设置Placement Group的资源
std::vector<std::unordered_map<std::string, double>> bundles{RESOUECES, RESOUECES};
// 创建Placement Group并设置调度策略为平铺调度(SPREAD)
ray::PlacementGroupCreationOptions options{
false, name, bundles, ray::PlacementStrategy::SPREAD};
return ray::CreatePlacementGroup(options);
}
auto placement_group = CreateSimplePlacementGroup("my_placement_group");
assert(placement_group.Wait(10));
上面的代码创建了一个Placement Group,调度策略为平铺(SPREAD), 平铺调度的含义是将actor以平铺的方式调度到不同的节点上。更多的调度策略可以参考Ray的官方文档 https://docs.ray.io/en/master/placement-group.html。
接下来就可以通过Placement Group将actor调度到不同的节点上了。
// 通过ray::Actor将actor的实例调度到Ray集群不同的节点上
void StartServer() {
// 调度main server
ray::Actor(CreateMainServer)
.SetName(MAIN_SERVER_NAME)
.SetResources(RESOUECES)
.SetPlacementGroup(placement_group, 0) //调度到某个节点上
.Remote();
// 调度backup server
ray::Actor(CreateBackupServer)
.SetName(BACKUP_SERVER_NAME)
.SetResources(RESOUECES)
.SetPlacementGroup(placement_group, 1) //调度到某个节点上
.Remote();
}
服务发现和组件通信
现在我们已经把main server和backup server两个actor实例部署到Ray集群中的两个节点上了,接下来需要解决main server的服务发现问题和client-server的通信问题。
Ray的named actor可以很方便的实现服务发现,我们在创建actor的时候设置了actor的名字,后续就可以通过ray::GetActor(name)来发现之前创建的actor了。
Ray Task则可以解决client-server之间的通信问题,就像调用本地函数一样实现远程函数调用,用户无需关心数据通信的细节(如传输协议、网络通信等)。
class Client {
public:
Client() {
// main server服务发现
main_actor_ = ray::GetActor<MainServer>(MAIN_SERVER_NAME);
}
bool Put(const std::string &key, const std::string &val) {
// 利用Ray Task调用远端main server的Put函数
(*main_actor_).Task(&MainServer::Put).Remote(key, val).Get();
return true;
}
std::pair<bool, std::string> Get(const std::string &key) {
// 利用Ray Task调用远端main server的Get函数
return *(*main_actor_).Task(&MainServer::Get).Remote(key).Get();
}
private:
boost::optional<ray::ActorHandle<MainServer>> main_actor_;
};
故障恢复
进程故障恢复
Ray提供了进程故障恢复的功能,比如actor进程挂掉之后Ray会自动将actor进程拉起来,并重新创建actor实例,只需要设置actor的最大重启次数即可。
// 通过ray::Actor将actor的实例调度到Ray集群不同的节点上并设置最大重启次数
void StartServer() {
ray::Actor(CreateMainServer)
.SetName("main_actor")
.SetResources(RESOUECES)
.SetPlacementGroup(placement_group, 0)
.SetMaxRestarts(1) //设置最大重启次数让Ray做自动故障恢复
.Remote();
ray::Actor(CreateBackupServer)
.SetName("backup_actor")
.SetResources(RESOUECES)
.SetPlacementGroup(placement_group, 1)
.SetMaxRestarts(1) //设置最大重启次数让Ray做自动故障恢复
.Remote();
}
状态恢复
虽然Ray会做进程故障恢复,重新创建actor实例,但是actor的运行状态需要用户去做状态恢复处理,比如main actor挂掉之后重新拉起来,之前在main actor内存中的数据都会丢失,需要做数据恢复。
MainServer::MainServer() {
// 如果当前实例是重启的则做故障处理
if (ray::WasCurrentActorRestarted()) {
HandleFailover();
}
}
void MainServer::HandleFailover() {
backup_actor_ = *ray::GetActor<BackupServer>(BACKUP_SERVER_NAME);
// 从backup server拉取所有数据
data_ = *backup_actor_.Task(&BackupServer::GetAllData).Remote().Get();
RAYLOG(INFO) << "MainServer get all data from BackupServer";
}
main server的failover是在构造函数里面做的,Ray提供了判断actor实例是否重启的API。在actor的构造函数中如果发现是重启的实例则做数据恢复的处理,具体方法就是从backup server拉取所有数据。backup server的failover处理也是类似的, 不再赘述。
运维与监控
Ray提供了一套简单的运维和监控系统(Dashboard)让我们可以实时查看系统的运行情况。
以上面的KV store为例,我们可以看到actor列表、node列表和运行日志等信息,并且可以捕捉到一些异常的events。
actor列表
node列表
运行日志
异常events透出
如何快速开始
Ray目前已经完成了1.7.0版本的release,C++ API作为其中一个highlight功能正式集成到wheel包发布,详见发布记录https://github.com/ray-project/ray/releases/tag/ray-1.7.0。
考虑到Ray内核的实现是多语言的,跑C++应用需要同时具备Python环境和C++环境,我们将Ray整体打包成wheel用pip进行管理。你可以通过以下方式快速获取一个Ray C++模版工程。
环境要求:Linux系统或macOS, Python 3.6-3.9版本,C++17环境,bazel 3.4以上版本(可选,模版工程基于bazel)。
- 安装最新版本的Ray:
pip install -U ray[cpp]
- 通过ray命令行工具生成C++模版工程:
mkdir ray-template && ray cpp --generate-bazel-project-template-to ray-template
- 进入模版工程,编译并运行:
cd ray-template && sh run.sh
以上运行方式会在跑example的过程中在本地拉起Ray集群,example运行结束后自动关闭Ray集群。如果你想让应用连接已有的Ray集群,可以按如下方式启动:
ray start --head
RAY_ADDRESS=127.0.0.1:6379 sh run.sh
测试结束后可以通过stop命令关闭ray集群,避免残留进程:
ray stop
现在,你可以开始基于模版工程开发自己的C++分布式系统了!
总结
本文通过一个存储系统的例子介绍了如何利用Ray C++ API构建分布式系统,整个demo代码不过200多行,却同时解决了部署、调度、通信、故障恢复和运维监控等问题。我们可以发现,Ray致力于解决分布式系统的通用问题,不限制你的计算范式和应用场景,无论是针对新建应用还是已有的应用,都可以通过Ray快速升级为强大的分布式系统。
联系我们
了解更多细节,请阅读 Ray官方文档 https://docs.ray.io/en/master/index.html。
贡献代码,可以直接提PR到 https://github.com/ray-project/ray,或者给我们提issue。
您也可以通过slack联系我们,加入Ray的channel。
微信公众号:搜索并关注“Ray中文社区”。
关于我们
我们是蚂蚁计算智能技术部团队,横跨美国硅谷、中国北京、上海、杭州和成都。我们追求的工程师文化是开放、简单、迭代、追求效率、用技术解决问题!热情相邀加入我们!!
请联系我们的邮箱:antcomputing@antgroup.com
原文由 宋顾杨 祁宇于2021年10月12日发表于Ray中文社区公众号,查看原文