Ray Plasma数据Put操作在内存中放大十几倍,求教原因

求教大家一个问题:在Ray Plasma中对十几兆的数据执行Put操作,通过对内存带宽进行监控,发现其在内存中放大了十几倍,有人对这个有了解吗?或者是否是配置的问题?

我对这块比较熟,你方便提供复现代码,并且说明你怎么监控内存的么?

复现方法:
使用1个ray actor进程,读取一个300万行的feather文件,按照每10万行切分成一个arrow数据块,把每个数据块使用ray.put写入plasma中,同时使用pcm-memory.x工具实时观察内存带宽。

通过内存带宽工具观察如下:
|-- System Memory Throughput(MB/s): 3021.37 --|
|-- System Memory Throughput(MB/s): 3390.82 --|
|-- System Memory Throughput(MB/s): 2976.58 --|
|-- System Memory Throughput(MB/s): 3078.50 --|
|-- System Memory Throughput(MB/s): 3558.88 --|
|-- System Memory Throughput(MB/s): 2962.87 --|
|-- System Memory Throughput(MB/s): 3345.91 --|
|-- System Memory Throughput(MB/s): 2987.54 --|
|-- System Memory Throughput(MB/s): 2964.75 --|
|-- System Memory Throughput(MB/s): 3337.53 --|
|-- System Memory Throughput(MB/s): 2977.92 --|
|-- System Memory Throughput(MB/s): 2960.92 --|
|-- System Memory Throughput(MB/s): 3370.28 --|
|-- System Memory Throughput(MB/s): 2968.78 --|
|-- System Memory Throughput(MB/s): 3758.08 --|
|-- System Memory Throughput(MB/s): 3170.96 --|
|-- System Memory Throughput(MB/s): 3064.41 --|
|-- System Memory Throughput(MB/s): 3958.74 --|
|-- System Memory Throughput(MB/s): 3045.37 --|
|-- System Memory Throughput(MB/s): 2973.61 --|
|-- System Memory Throughput(MB/s): 3339.06 --|
|-- System Memory Throughput(MB/s): 3079.25 --|
|-- System Memory Throughput(MB/s): 3411.84 --|
|-- System Memory Throughput(MB/s): 4236.84 --|
|-- System Memory Throughput(MB/s): 3030.43 --|
|-- System Memory Throughput(MB/s): 4229.07 --|
|-- System Memory Throughput(MB/s): 3125.15 --|
|-- System Memory Throughput(MB/s): 3016.82 --|
|-- System Memory Throughput(MB/s): 4898.78 --|
|-- System Memory Throughput(MB/s): 2934.06 --|
|-- System Memory Throughput(MB/s): 3016.42 --|
|-- System Memory Throughput(MB/s): 4124.66 --|
|-- System Memory Throughput(MB/s): 2970.09 --|

测试代码:
def data_slice(self, param: Param):
data_list = []
columns_list = [‘l_quantity’, ‘l_extendedprice’, ‘l_discount’, ‘l_tax’, ‘l_returnflag’, ‘l_linestatus’]
offset = 0
arrow_table = pf.read_table(“lineitem.tbl.1.feather”, columns=columns_list)
while offset < arrow_table.num_rows:
batch_table = arrow_table.slice(offset=offset, length=param.batch_size)
offset += param.batch_size
data_list.append(batch_table)
return data_list

def write_data(self, param: Param):
data_list = self.data_slice(param)
obj_id_list = []
for i in range(200):
batch_cnt = 0
for data_chunk in data_list:
objid = ray.put(data_chunk)
obj_id_list.append(objid)

问题描述:
通过pcm-memory.x实时观察,可以看到内存带宽峰值在3000MB/s-4000MB/s波动。每个写入plasma的数据块约7.4MB,写入平均延时是10ms,预期带宽为740MB/s,而实际内存带宽比预期带宽高5倍。
想了解为什么写入plasma内存会放大这么多倍

你这个代码缩进有问题,markdown 语法看一下。。。。

def data_slice(self, param: Param):
        data_list = []
		columns_list = ['l_quantity', 'l_extendedprice', 'l_discount', 'l_tax', 'l_returnflag', 'l_linestatus']
        offset = 0
        arrow_table = pf.read_table("lineitem.tbl.1.feather", columns=columns_list)
        while offset < arrow_table.num_rows:
            batch_table = arrow_table.slice(offset=offset, length=param.batch_size)
            offset += param.batch_size
            data_list.append(batch_table)
        return data_list

def write_data(self, param: Param):
        data_list = self.data_slice(param)
        obj_id_list = []
        for i in range(200):
            batch_cnt = 0
            for data_chunk in data_list:
                objid = ray.put(data_chunk)
                obj_id_list.append(objid)

我理解是一次 ray.put 的耗时对吧,这个过程中涉及以下过程:

  • 数据序列化: 这部分我猜应该也是 7.9MB 左右
  • 数据完整拷贝: 这部分和上面应该相同

综上: 如果内存峰值出现在 ray.put,那么期望值应该是 1480MB/s

这个我看是先读在写,读取的时候也有内存拷贝把,以及 arrow_table.slice 也有可能涉及内存拷贝,由于 ray.put 会涉及一些进程间的通信,速度上感觉比 arrow 的操作会慢不少

综上: 我觉得内存带宽的峰值应该在 arrow 操作部分?你看看简单验证一下:直接注释掉 ray.put 试试?

之前测试过,注释掉之后内存带宽降下来了,所以猜测是ray.put的问题

降到接近 0 了?

注释以后,内存带宽接近机器的初始状态,没有太大的变化

=======初始内存
|-- System Memory Throughput(MB/s): 343.76 --|
|-- System Memory Throughput(MB/s): 140.31 --|
|-- System Memory Throughput(MB/s): 213.18 --|
|-- System Memory Throughput(MB/s): 227.50 --|
|-- System Memory Throughput(MB/s): 141.41 --|
|-- System Memory Throughput(MB/s): 83.39 --|
|-- System Memory Throughput(MB/s): 143.70 --|
|-- System Memory Throughput(MB/s): 146.77 --|
|-- System Memory Throughput(MB/s): 66.58 --|
|-- System Memory Throughput(MB/s): 173.62 --|
|-- System Memory Throughput(MB/s): 67.70 --|
|-- System Memory Throughput(MB/s): 74.32 --|
|-- System Memory Throughput(MB/s): 202.43 --|
|-- System Memory Throughput(MB/s): 78.89 --|
|-- System Memory Throughput(MB/s): 86.76 --|

===启动测试脚本(注释ray.put后的内存带宽)
|-- System Memory Throughput(MB/s): 142.82 --|
|-- System Memory Throughput(MB/s): 284.95 --|
|-- System Memory Throughput(MB/s): 780.94 --|
|-- System Memory Throughput(MB/s): 770.09 --|
|-- System Memory Throughput(MB/s): 169.07 --|
|-- System Memory Throughput(MB/s): 87.63 --|
|-- System Memory Throughput(MB/s): 166.92 --|
|-- System Memory Throughput(MB/s): 129.08 --|
|-- System Memory Throughput(MB/s): 78.84 --|
|-- System Memory Throughput(MB/s): 227.01 --|
|-- System Memory Throughput(MB/s): 144.75 --|
|-- System Memory Throughput(MB/s): 112.83 --|
|-- System Memory Throughput(MB/s): 254.53 --|
|-- System Memory Throughput(MB/s): 82.84 --|
|-- System Memory Throughput(MB/s): 97.79 --|
|-- System Memory Throughput(MB/s): 176.70 --|
|-- System Memory Throughput(MB/s): 94.51 --|
|-- System Memory Throughput(MB/s): 242.83 --|
|-- System Memory Throughput(MB/s): 210.45 --|
|-- System Memory Throughput(MB/s): 120.37 --|
|-- System Memory Throughput(MB/s): 116.11 --|
|-- System Memory Throughput(MB/s): 151.83 --|
|-- System Memory Throughput(MB/s): 186.46 --|
|-- System Memory Throughput(MB/s): 99.10 --|
|-- System Memory Throughput(MB/s): 188.73 --|
|-- System Memory Throughput(MB/s): 146.50 --|
|-- System Memory Throughput(MB/s): 80.66 --|
|-- System Memory Throughput(MB/s): 201.81 --|
|-- System Memory Throughput(MB/s): 97.06 --|
|-- System Memory Throughput(MB/s): 73.04 --|
|-- System Memory Throughput(MB/s): 189.91 --|
|-- System Memory Throughput(MB/s): 97.46 --|
|-- System Memory Throughput(MB/s): 142.29 --|

我想到了,这里其实是这样的,ray.put 主要是三个步骤:

  1. 序列化,我们用的是 pickle,这个是众所周知的不快。
  2. 向 plasma 申请固定 size 的连续内存,这步花费的时间也很长,涉及进程间的通讯,甚至 plasma 侧还有一些重试操作之类的。
  3. 根据 2 拿到的指针拷贝数据,这个从连续内存到连续内存的拷贝就是应该非常快的。

综上:这波内存峰值应该出现在 3, 这个的耗时大概率远小于 10ms

好的、十分感谢!
我再研究下ray.put代码看看