【Ray使用环境】测试
【Ray版本和类库】2.44.1
Cluster 启动 1 个 Head node 和 2 个 Worker node:
ray start --head --port=6380 --dashboard-port=8265 --num-cpus=0 --num-gpus=0 --disable-usage-stats --block
ray start --address='127.0.0.1:6380' --num-cpus=8 --num-gpus=0 --resources='{"fake_num_gpus": 8}' --block
ray start --address='127.0.0.1:6380' --num-cpus=8 --num-gpus=0 --resources='{"fake_num_gpus": 8}' --block
任务代码:
import time
import ray
class RayTaskScheduler:
def __init__(self, address="auto"):
# Initialize Ray cluster
if not ray.is_initialized():
ray.init(address=address)
# Create placement group for Task A and Task B
self.placement_group = ray.util.placement_group(bundles=[{
"CPU": 1,
"fake_num_gpus": 1
}],
strategy="STRICT_PACK")
@staticmethod
@ray.remote(num_cpus=1)
def task_a():
"""IO + CPU intensive task"""
node_id = ray.get_runtime_context().get_node_id()
print(f"[Task A] Running on node {node_id[:6]} (CPU intensive)")
data = [i for i in range(100000)] # Simulate data processing
time.sleep(2) # Simulate IO operation
return data
@staticmethod
@ray.remote(num_cpus=1, resources={
"fake_num_gpus": 1
})
def task_b(task_a_data):
"""GPU intensive task"""
node_id = ray.get_runtime_context().get_node_id()
print(f"[Task B] Running on node {node_id[:6]} (GPU intensive)")
time.sleep(2) # Simulate GPU processing
return f"Processed {len(task_a_data)} items"
@staticmethod
@ray.remote(num_cpus=1)
def task_c(task_b_result):
"""IO intensive task"""
node_id = ray.get_runtime_context().get_node_id()
print(f"[Task C] Running on node {node_id[:6]} (IO intensive)")
time.sleep(2) # Simulate IO operation
return "Final result: " + task_b_result
def execute_pipeline(self):
"""Execute tasks with dependencies"""
print("\nStarting task pipeline...\n")
# Submit tasks with chained dependencies
future_a = self.task_a.options(placement_group=self.placement_group).remote()
future_b = self.task_b.options(placement_group=self.placement_group).remote(future_a)
future_c = self.task_c.remote(future_b) # No placement group constraint
# Wait for final result
result = ray.get(future_c)
print("\nPipeline execution completed!\n")
return result
def cleanup(self):
"""Release resources"""
ray.util.remove_placement_group(self.placement_group)
print("Released placement group resources")
if __name__ == "__main__":
scheduler = RayTaskScheduler(address="auto")
try:
final_result = scheduler.execute_pipeline()
print(f"\nFinal output: {final_result}")
finally:
scheduler.cleanup()
ray.shutdown()
打印输出:
Starting task pipeline...
(task_a pid=1326770) [Task A] Running on node f98b1a (CPU intensive)
(task_a pid=1326770) [Task A] Running on node f98b1a (CPU intensive)
(task_a pid=1326770) [Task A] Running on node f98b1a (CPU intensive)
(task_b pid=1326770) [Task B] Running on node f98b1a (GPU intensive)
(task_b pid=1326770) [Task B] Running on node f98b1a (GPU intensive)
(task_b pid=1326770) [Task B] Running on node f98b1a (GPU intensive)
(task_c pid=1326899) [Task C] Running on node a8caef (IO intensive)
(task_c pid=1326899) [Task C] Running on node a8caef (IO intensive)
(task_c pid=1326899) [Task C] Running on node a8caef (IO intensive)
Pipeline execution completed!
从输出看,在启动 2 个 Worker node 时任务被执行了三次,启动 1 个 Worker node 时任务被执行了 2 次。
请问问题出在哪里?