ray的task2依赖 task1,现在需要获取1000个task2的计算结果,采用ray.wait的方式逐个获取结果并处理,结果发现,只有1000个task1结果全部执行了,才会执行task2,测试过程中猜测,提交的task2中task1任务一直满足调度条件,但是task2只有task1执行完毕了才会执行,而我想要的效果是task2只要有满足执行条件的就优先执行task2,这个应该怎么控制呢?
import time
import ray
import numpy as np
# 初始化Ray集群环境,并计时打印
startTime = time.time()
ray.init()
stopTime = time.time()
spendTime = stopTime - startTime
print("ray.init spend %f seconds." % spendTime)
@ray.remote
def longTimeCompute1(i):
time.sleep(0.5)
result = {}
result["B1"] = np.random.randn(256, 256)
result["B2"] = np.random.randn(256, 256)
return result
# 模拟耗时的计算服务-分布式
@ray.remote
def longTimeCompute2(i,image1):
time.sleep(0.4)
b1 = image1["B1"]
b2 = image1["B2"]
result = {}
result["NDVI1"] = (b1-b2)/(b1+b2)
return result
def execute(i):
image1 = longTimeCompute1.remote(i)
image2 = longTimeCompute2.remote(i,image1)
return image2
def wait(remote_objects: list, batch_size: int = 10,func=None):
final_result = []
unfinished = []
index1 = 1
for item in remote_objects:
index1 += 1
if len(unfinished) >= batch_size:
finished, unfinished = ray.wait(unfinished, num_returns=1, timeout=1800)
results = ray.get(finished)
if func:
for result in results:
bLastOne = False
if unfinished is None:
bLastOne = True
func(result, bLastOne)
final_result.append(None)
else:
final_result.extend(results)
if not item is None:
unfinished.append(item)
while unfinished:
finished, unfinished = ray.wait(unfinished, num_returns=1, timeout=1800)
results = ray.get(finished)
if func:
for result in results:
bLastOne = False
if unfinished is None:
bLastOne = True
func(result, bLastOne)
final_result.append(None)
else:
final_result.extend(results)
return final_result
def print_info():
index = 1
def inner_func(result,bLastOne=False):
nonlocal index
print(f"完成了第{index}个...")
if bLastOne:
print(f"总共{index}个,已全部计算完成。")
index += 1
return inner_func
# 分布式异步计算,并打印
startTime = time.time()
result_RayFutures = [execute(i) for i in range(1000)]
func = print_info()
result = wait(result_RayFutures,25,func)
print(result)
stopTime = time.time()
spendTime = stopTime - startTime
print("computer with ray spend %f seconds." % spendTime)