From 0d0df2629ab74336ce1b4e8ba01ac97a58240b7e Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Thu, 15 Oct 2020 19:18:06 +0000 Subject: [PATCH] wip --- python/example_test.py | 112 ++++++++++++++++++++++++------------------------- python/ray.py | 16 +++++-- 2 files changed, 69 insertions(+), 59 deletions(-) diff --git a/python/example_test.py b/python/example_test.py index 6b58def..2139e12 100644 --- a/python/example_test.py +++ b/python/example_test.py @@ -3,11 +3,11 @@ import numpy as np import time import dumb_raylet -#ray.connect("localhost:50051") -object_store = dumb_raylet.ObjectStore() -executor = dumb_raylet.SimpleExecutor(object_store) -task_servicer = dumb_raylet.TaskServicer(object_store, executor) -executor.set_task_servicer(task_servicer) +ray.connect("localhost:50051") +# object_store = dumb_raylet.ObjectStore() +# executor = dumb_raylet.SimpleExecutor(object_store) +# task_servicer = dumb_raylet.TaskServicer(object_store, executor) +# executor.set_task_servicer(task_servicer) def test_timing(): @@ -21,70 +21,70 @@ def test_timing(): # Measure the time required to submit a remote task to the scheduler. elapsed_times = [] - for _ in range(1000): + for _ in range(10000): start_time = time.time() empty_function.remote() end_time = time.time() elapsed_times.append(end_time - start_time) elapsed_times = np.sort(elapsed_times) - average_elapsed_time = sum(elapsed_times) / 1000 + average_elapsed_time = sum(elapsed_times) / 10000 print("Time required to submit an empty function call:") print(" Average: {}".format(average_elapsed_time)) - print(" 90th percentile: {}".format(elapsed_times[900])) - print(" 99th percentile: {}".format(elapsed_times[990])) - print(" worst: {}".format(elapsed_times[999])) + print(" 90th percentile: {}".format(elapsed_times[9000])) + print(" 99th percentile: {}".format(elapsed_times[9900])) + print(" worst: {}".format(elapsed_times[9999])) # average_elapsed_time should be about 0.00038. # Measure the time required to submit a remote task to the scheduler # (where the remote task returns one value). - elapsed_times = [] - for _ in range(1000): - start_time = time.time() - trivial_function.remote() - end_time = time.time() - elapsed_times.append(end_time - start_time) - elapsed_times = np.sort(elapsed_times) - average_elapsed_time = sum(elapsed_times) / 1000 - print("Time required to submit a trivial function call:") - print(" Average: {}".format(average_elapsed_time)) - print(" 90th percentile: {}".format(elapsed_times[900])) - print(" 99th percentile: {}".format(elapsed_times[990])) - print(" worst: {}".format(elapsed_times[999])) + # elapsed_times = [] + # for _ in range(1000): + # start_time = time.time() + # trivial_function.remote() + # end_time = time.time() + # elapsed_times.append(end_time - start_time) + # elapsed_times = np.sort(elapsed_times) + # average_elapsed_time = sum(elapsed_times) / 1000 + # print("Time required to submit a trivial function call:") + # print(" Average: {}".format(average_elapsed_time)) + # print(" 90th percentile: {}".format(elapsed_times[900])) + # print(" 99th percentile: {}".format(elapsed_times[990])) + # print(" worst: {}".format(elapsed_times[999])) - # Measure the time required to submit a remote task to the scheduler - # and get the result. - elapsed_times = [] - for _ in range(1000): - start_time = time.time() - x = trivial_function.remote() - ray.get(x) - end_time = time.time() - elapsed_times.append(end_time - start_time) - elapsed_times = np.sort(elapsed_times) - average_elapsed_time = sum(elapsed_times) / 1000 - print("Time required to submit a trivial function call and get the " - "result:") - print(" Average: {}".format(average_elapsed_time)) - print(" 90th percentile: {}".format(elapsed_times[900])) - print(" 99th percentile: {}".format(elapsed_times[990])) - print(" worst: {}".format(elapsed_times[999])) - # average_elapsed_time should be about 0.0013. + # # Measure the time required to submit a remote task to the scheduler + # # and get the result. + # elapsed_times = [] + # for _ in range(1000): + # start_time = time.time() + # x = trivial_function.remote() + # ray.get(x) + # end_time = time.time() + # elapsed_times.append(end_time - start_time) + # elapsed_times = np.sort(elapsed_times) + # average_elapsed_time = sum(elapsed_times) / 1000 + # print("Time required to submit a trivial function call and get the " + # "result:") + # print(" Average: {}".format(average_elapsed_time)) + # print(" 90th percentile: {}".format(elapsed_times[900])) + # print(" 99th percentile: {}".format(elapsed_times[990])) + # print(" worst: {}".format(elapsed_times[999])) + # # average_elapsed_time should be about 0.0013. - # Measure the time required to do do a put. - elapsed_times = [] - for _ in range(1000): - start_time = time.time() - ray.put(1) - end_time = time.time() - elapsed_times.append(end_time - start_time) - elapsed_times = np.sort(elapsed_times) - average_elapsed_time = sum(elapsed_times) / 1000 - print("Time required to put an int:") - print(" Average: {}".format(average_elapsed_time)) - print(" 90th percentile: {}".format(elapsed_times[900])) - print(" 99th percentile: {}".format(elapsed_times[990])) - print(" worst: {}".format(elapsed_times[999])) - # average_elapsed_time should be about 0.00087. + # # Measure the time required to do do a put. + # elapsed_times = [] + # for _ in range(1000): + # start_time = time.time() + # ray.put(1) + # end_time = time.time() + # elapsed_times.append(end_time - start_time) + # elapsed_times = np.sort(elapsed_times) + # average_elapsed_time = sum(elapsed_times) / 1000 + # print("Time required to put an int:") + # print(" Average: {}".format(average_elapsed_time)) + # print(" 90th percentile: {}".format(elapsed_times[900])) + # print(" 99th percentile: {}".format(elapsed_times[990])) + # print(" worst: {}".format(elapsed_times[999])) + # # average_elapsed_time should be about 0.00087. diff --git a/python/ray.py b/python/ray.py index 862efa9..341bb21 100644 --- a/python/ray.py +++ b/python/ray.py @@ -6,12 +6,22 @@ import uuid class ObjectID: - def __init__(self, id): + def __init__(self, id, future=None): self.id = id + self.future = future def __repr__(self): return "ObjectID(%s)" % self.id.decode() + def get_id(self): + if self.future is not None: + return self.id + r = self.future.result() + self.id = r.return_id + self.future = None + return self.id + + worker_registry = {} @@ -89,7 +99,7 @@ class Worker: return RemoteFunc(self, func) def schedule(self, task): - return self.server.Schedule(task) + return self.server.Schedule.future(task) def close(self): self.channel.close() @@ -122,7 +132,7 @@ class RemoteFunc: t.args.append(arg) worker = get_worker_registry(self._worker_id) ticket = worker.schedule(t) - return ObjectID(ticket.return_id) + return ObjectID(None, future=ticket) def _push_func(self): worker = get_worker_registry(self._worker_id)