关于在Cluster下Task重复执行的问题

【Ray使用环境】测试
【Ray版本和类库】2.44.1

Cluster 启动 1 个 Head node 和 2 个 Worker node:

ray start --head --port=6380 --dashboard-port=8265 --num-cpus=0 --num-gpus=0 --disable-usage-stats --block
ray start --address='127.0.0.1:6380' --num-cpus=8 --num-gpus=0 --resources='{"fake_num_gpus": 8}' --block
ray start --address='127.0.0.1:6380' --num-cpus=8 --num-gpus=0 --resources='{"fake_num_gpus": 8}' --block

任务代码:

import time
import ray


class RayTaskScheduler:

    def __init__(self, address="auto"):
        # Initialize Ray cluster
        if not ray.is_initialized():
            ray.init(address=address)

        # Create placement group for Task A and Task B
        self.placement_group = ray.util.placement_group(bundles=[{
            "CPU": 1,
            "fake_num_gpus": 1
        }],
                                                        strategy="STRICT_PACK")

    @staticmethod
    @ray.remote(num_cpus=1)
    def task_a():
        """IO + CPU intensive task"""
        node_id = ray.get_runtime_context().get_node_id()
        print(f"[Task A] Running on node {node_id[:6]} (CPU intensive)")
        data = [i for i in range(100000)]  # Simulate data processing
        time.sleep(2)  # Simulate IO operation
        return data

    @staticmethod
    @ray.remote(num_cpus=1, resources={
        "fake_num_gpus": 1
    })
    def task_b(task_a_data):
        """GPU intensive task"""
        node_id = ray.get_runtime_context().get_node_id()
        print(f"[Task B] Running on node {node_id[:6]} (GPU intensive)")
        time.sleep(2)  # Simulate GPU processing
        return f"Processed {len(task_a_data)} items"

    @staticmethod
    @ray.remote(num_cpus=1)
    def task_c(task_b_result):
        """IO intensive task"""
        node_id = ray.get_runtime_context().get_node_id()
        print(f"[Task C] Running on node {node_id[:6]} (IO intensive)")
        time.sleep(2)  # Simulate IO operation
        return "Final result: " + task_b_result

    def execute_pipeline(self):
        """Execute tasks with dependencies"""
        print("\nStarting task pipeline...\n")

        # Submit tasks with chained dependencies
        future_a = self.task_a.options(placement_group=self.placement_group).remote()

        future_b = self.task_b.options(placement_group=self.placement_group).remote(future_a)

        future_c = self.task_c.remote(future_b)  # No placement group constraint

        # Wait for final result
        result = ray.get(future_c)
        print("\nPipeline execution completed!\n")
        return result

    def cleanup(self):
        """Release resources"""
        ray.util.remove_placement_group(self.placement_group)
        print("Released placement group resources")


if __name__ == "__main__":
    scheduler = RayTaskScheduler(address="auto")

    try:
        final_result = scheduler.execute_pipeline()
        print(f"\nFinal output: {final_result}")
    finally:
        scheduler.cleanup()
        ray.shutdown()

打印输出:

Starting task pipeline...

(task_a pid=1326770) [Task A] Running on node f98b1a (CPU intensive)
(task_a pid=1326770) [Task A] Running on node f98b1a (CPU intensive)
(task_a pid=1326770) [Task A] Running on node f98b1a (CPU intensive)
(task_b pid=1326770) [Task B] Running on node f98b1a (GPU intensive)
(task_b pid=1326770) [Task B] Running on node f98b1a (GPU intensive)
(task_b pid=1326770) [Task B] Running on node f98b1a (GPU intensive)
(task_c pid=1326899) [Task C] Running on node a8caef (IO intensive)
(task_c pid=1326899) [Task C] Running on node a8caef (IO intensive)
(task_c pid=1326899) [Task C] Running on node a8caef (IO intensive)

Pipeline execution completed!

从输出看,在启动 2 个 Worker node 时任务被执行了三次,启动 1 个 Worker node 时任务被执行了 2 次。

请问问题出在哪里?

这个感觉是日志搜集.out/.err到driver了3次,实际上只执行了一次? 你可以每个任务打印的时候加个纳秒时间戳试试

1 个赞

你是对的。那是worker发送driver的日志,或者说driver收集worker的日志重复了。代码只执行了一次。

1 个赞