如何对ray集群的不同节点数据传输进行压力测试呢?

各位大佬,我这边公司的需求是把原来的项目转移到ray框架上运行,所以需要对ray不同物理结点上的actor进行互相的数据发送测试,这里一个actor不断产生数据,一个不断接受数据但不保存,测试最大的数据传输量,直到阻塞为止。我不确定自己代码写对没有,也不确定哪些资源影响这方面性能,从而可以控制变量进行压测。

import ray
ray.shutdown()
ray.init(address=‘auto’)
import time
@ray.remote
class MyActor:
def init(self, actor_id):
self.actor_id = actor_id

def send_message(self, message, target_actor):
    # 将消息打包成一个对象并发送给目标actor
    message_id = message
    # 这里应该是把message变成object id然后发送过去
    beg_time=time.time()
    return target_actor.receive_message.remote(message, self.actor_id,beg_time)

def receive_message(self, message, sender_id,beg_time):
    # 从对象ID中获取消息对象并进行处理
    message=0
    t=time.time()
    t=t-beg_time
    print("收到来自"+str(sender_id)+"耗时:"+str(t))
    return "收到来自"+str(sender_id)+"耗时:"+str(t)

创建两个actor并相互发送消息,一个在head节点,一个输入其他节点的ID

actor1 = MyActor.options(scheduling_strategy= ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().node_id,
soft=False,) ).remote(1)
actor2 = MyActor.options(scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id= ‘dbfd236931ba221ba41440bab3eb608ca6d3908af2d3ca5efed4ac75’,
soft=False,)).remote(2)

data = b"0" * 1024 * 1024 10244
print(ray.get(ray.get(actor1.send_message.remote(data, actor2))))

你们业务的细节我也不是很清楚,站在我的角度,我建议你多依赖 object store,他可以把数据映射/spill到磁盘上,对 worker 堆内存的影响少,而且比较方便的去更具具体业务实现你们的生产/消费模型

  • MyActor
def send_message(self, refs, target_actor):

    beg_time=time.time()
    return target_actor.receive_message.remote(refs, self.actor_id,beg_time)

def receive_message(self,refs, sender_id,beg_time):
    # 从对象ID中获取消息对象并进行处理
    # 如果要使用的话,ray.get 出来就好了
    self._refs.extend(refs)
    t=time.time()
    t=t-beg_time
    print("收到来自"+str(sender_id)+"耗时:"+str(t))
    return "收到来自"+str(sender_id)+"耗时:"+str(t)
  • driver:
for i in range(20):
    data = xxxx
    refs = [ray.put(data)]
    actor1.send_message.remote(refs, actor2)

这样在不同 actor 之间传递的就是 object ref 了,这个比较小,传输起来会很快,第一个 actor 的 raylet 不需要 get 数据本身,你的流量也会小很多,第二个 actor 才是使用数据的,到时候再 get 就好了