如何用ray进行数据分片

目前的问题是,搭建好集群后,我会把数据都拉到master节点上,再调用ray的方法,把数据切片,分发到worker上进行分布式训练。
那么是否有方法,让ray在每个节点机上读取一部分数据,这样每个worker会读取这部分数据,再进行计算。区别在于,一个是读取数据再切片,一个是切片后读取数据。
这个是否有什么办法实现呢?

你好,数据分片是分布式系统常见需求,可以试用Mars on Ray来解决此类问题。Mars提供了分布式Numpy和Pandas能力,从读数据开始,数据即是以分布式切片的方式存储在集群中,后续计算皆围绕分布式数据展开。

如果你的数据本身就使用Numpy NDArray或Pandas DataFrame格式,那么几乎不用改代码就可以将原本单机的代码以分布式的方式在Mars on Ray上运行;如果数据的处理是其他数据结构API,那么可以用Mars的apply(udf)或map_chunk(udf)算子来处理,apply()以每行或每列的方式处理数据,map_chunk()直接处理每个数据分片。

Mars以前是以Ray Actor运行Mars Worker的模式,并在2022年研发了直接以Ray Common Task进行调度执行的模式,在Ray集群中使用Mars只需一行mars.new_session()的代码,目前master分支已同时支持这两种模式,新模式的相关文档这几天也会更新到Mars社区,敬请期待。

这是之前的文档:mars/ray.rst at master · mars-project/mars · GitHub
这几天会更新成默认介绍Ray Common Task模式。

好的,感谢回复。我们现在的问题可能是数据存在hdfs或者es上,然后master上没有办法吃掉这全部的数据,现在是这个问题比较尴尬,所以尝试找到一个先分配任务再读取的方法。
您提供的这个框架我会阅读一下,希望能解决问题。
再次感谢。

Mars支持从hdfs读取文件,支持的格式包括parquet、csv等。切片逻辑是内置在读数据的算子里的,用户可以通过chunk_size指定切片大小。如果你用Ray Common Task来做的话,就需要master把切片信息作为入参传给每个Task,然后每个Task里面根据偏移量来读取文件的一部分;如果用Ray Actor来做的话,那每个Actor初始化的时候可以带上切片信息。Anyway,肯定不如用Mars直接一个算子搞定来得容易。 :smiley:

1 Like