各位大佬,我这边公司的需求是把原来的项目转移到ray框架上运行,所以需要对ray不同物理结点上的actor进行互相的数据发送测试,这里一个actor不断产生数据,一个不断接受数据但不保存,测试最大的数据传输量,直到阻塞为止。我不确定自己代码写对没有,也不确定哪些资源影响这方面性能,从而可以控制变量进行压测。
import ray
ray.shutdown()
ray.init(address=‘auto’)
import time
@ray.remote
class MyActor:
def init(self, actor_id):
self.actor_id = actor_id
def send_message(self, message, target_actor):
# 将消息打包成一个对象并发送给目标actor
message_id = message
# 这里应该是把message变成object id然后发送过去
beg_time=time.time()
return target_actor.receive_message.remote(message, self.actor_id,beg_time)
def receive_message(self, message, sender_id,beg_time):
# 从对象ID中获取消息对象并进行处理
message=0
t=time.time()
t=t-beg_time
print("收到来自"+str(sender_id)+"耗时:"+str(t))
return "收到来自"+str(sender_id)+"耗时:"+str(t)
创建两个actor并相互发送消息,一个在head节点,一个输入其他节点的ID
actor1 = MyActor.options(scheduling_strategy= ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().node_id,
soft=False,) ).remote(1)
actor2 = MyActor.options(scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id= ‘dbfd236931ba221ba41440bab3eb608ca6d3908af2d3ca5efed4ac75’,
soft=False,)).remote(2)
data = b"0" * 1024 * 1024 10244
print(ray.get(ray.get(actor1.send_message.remote(data, actor2))))