This commit is contained in:
Barak Michener 2020-10-15 19:18:06 +00:00
parent e02e80708a
commit 0d0df2629a
2 changed files with 69 additions and 59 deletions

View file

@ -3,11 +3,11 @@ import numpy as np
import time
import dumb_raylet
#ray.connect("localhost:50051")
object_store = dumb_raylet.ObjectStore()
executor = dumb_raylet.SimpleExecutor(object_store)
task_servicer = dumb_raylet.TaskServicer(object_store, executor)
executor.set_task_servicer(task_servicer)
ray.connect("localhost:50051")
# object_store = dumb_raylet.ObjectStore()
# executor = dumb_raylet.SimpleExecutor(object_store)
# task_servicer = dumb_raylet.TaskServicer(object_store, executor)
# executor.set_task_servicer(task_servicer)
def test_timing():
@ -21,70 +21,70 @@ def test_timing():
# Measure the time required to submit a remote task to the scheduler.
elapsed_times = []
for _ in range(1000):
for _ in range(10000):
start_time = time.time()
empty_function.remote()
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
average_elapsed_time = sum(elapsed_times) / 10000
print("Time required to submit an empty function call:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
print(" 90th percentile: {}".format(elapsed_times[9000]))
print(" 99th percentile: {}".format(elapsed_times[9900]))
print(" worst: {}".format(elapsed_times[9999]))
# average_elapsed_time should be about 0.00038.
# Measure the time required to submit a remote task to the scheduler
# (where the remote task returns one value).
elapsed_times = []
for _ in range(1000):
start_time = time.time()
trivial_function.remote()
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to submit a trivial function call:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# elapsed_times = []
# for _ in range(1000):
# start_time = time.time()
# trivial_function.remote()
# end_time = time.time()
# elapsed_times.append(end_time - start_time)
# elapsed_times = np.sort(elapsed_times)
# average_elapsed_time = sum(elapsed_times) / 1000
# print("Time required to submit a trivial function call:")
# print(" Average: {}".format(average_elapsed_time))
# print(" 90th percentile: {}".format(elapsed_times[900]))
# print(" 99th percentile: {}".format(elapsed_times[990]))
# print(" worst: {}".format(elapsed_times[999]))
# Measure the time required to submit a remote task to the scheduler
# and get the result.
elapsed_times = []
for _ in range(1000):
start_time = time.time()
x = trivial_function.remote()
ray.get(x)
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to submit a trivial function call and get the "
"result:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.0013.
# # Measure the time required to submit a remote task to the scheduler
# # and get the result.
# elapsed_times = []
# for _ in range(1000):
# start_time = time.time()
# x = trivial_function.remote()
# ray.get(x)
# end_time = time.time()
# elapsed_times.append(end_time - start_time)
# elapsed_times = np.sort(elapsed_times)
# average_elapsed_time = sum(elapsed_times) / 1000
# print("Time required to submit a trivial function call and get the "
# "result:")
# print(" Average: {}".format(average_elapsed_time))
# print(" 90th percentile: {}".format(elapsed_times[900]))
# print(" 99th percentile: {}".format(elapsed_times[990]))
# print(" worst: {}".format(elapsed_times[999]))
# # average_elapsed_time should be about 0.0013.
# Measure the time required to do do a put.
elapsed_times = []
for _ in range(1000):
start_time = time.time()
ray.put(1)
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to put an int:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.00087.
# # Measure the time required to do do a put.
# elapsed_times = []
# for _ in range(1000):
# start_time = time.time()
# ray.put(1)
# end_time = time.time()
# elapsed_times.append(end_time - start_time)
# elapsed_times = np.sort(elapsed_times)
# average_elapsed_time = sum(elapsed_times) / 1000
# print("Time required to put an int:")
# print(" Average: {}".format(average_elapsed_time))
# print(" 90th percentile: {}".format(elapsed_times[900]))
# print(" 99th percentile: {}".format(elapsed_times[990]))
# print(" worst: {}".format(elapsed_times[999]))
# # average_elapsed_time should be about 0.00087.

View file

@ -6,12 +6,22 @@ import uuid
class ObjectID:
def __init__(self, id):
def __init__(self, id, future=None):
self.id = id
self.future = future
def __repr__(self):
return "ObjectID(%s)" % self.id.decode()
def get_id(self):
if self.future is not None:
return self.id
r = self.future.result()
self.id = r.return_id
self.future = None
return self.id
worker_registry = {}
@ -89,7 +99,7 @@ class Worker:
return RemoteFunc(self, func)
def schedule(self, task):
return self.server.Schedule(task)
return self.server.Schedule.future(task)
def close(self):
self.channel.close()
@ -122,7 +132,7 @@ class RemoteFunc:
t.args.append(arg)
worker = get_worker_registry(self._worker_id)
ticket = worker.schedule(t)
return ObjectID(ticket.return_id)
return ObjectID(None, future=ticket)
def _push_func(self):
worker = get_worker_registry(self._worker_id)