diff --git a/web_python/make.sh b/web_python/make.sh new file mode 100755 index 0000000..811a5fa --- /dev/null +++ b/web_python/make.sh @@ -0,0 +1,3 @@ +#!/bin/sh +python setup.py sdist bdist_wheel +cp ./dist/ray_web-0.0.1-py3-none-any.whl ../web diff --git a/web_python/ray/__init__.py b/web_python/ray/__init__.py new file mode 100644 index 0000000..b71abca --- /dev/null +++ b/web_python/ray/__init__.py @@ -0,0 +1,102 @@ +from ray.api import ClientAPI +from ray.api import APIImpl +from typing import Optional, List, Tuple +from contextlib import contextmanager + +import logging + +logger = logging.getLogger(__name__) + +# _client_api has to be external to the API stub, below. +# Otherwise, ray.remote() that contains ray.remote() +# contains a reference to the RayAPIStub, therefore a +# reference to the _client_api, and then tries to pickle +# the thing. +_client_api: Optional[APIImpl] = None + +_server_api: Optional[APIImpl] = None + +_is_server: bool = False + + +@contextmanager +def stash_api_for_tests(in_test: bool): + global _is_server + is_server = _is_server + if in_test: + _is_server = True + yield _server_api + if in_test: + _is_server = is_server + + +def _set_client_api(val: Optional[APIImpl]): + global _client_api + global _is_server + if _client_api is not None: + raise Exception("Trying to set more than one client API") + _client_api = val + _is_server = False + + +def _set_server_api(val: Optional[APIImpl]): + global _server_api + global _is_server + if _server_api is not None: + raise Exception("Trying to set more than one server API") + _server_api = val + _is_server = True + + +def reset_api(): + global _client_api + global _server_api + global _is_server + _client_api = None + _server_api = None + _is_server = False + + +def _get_client_api() -> APIImpl: + global _client_api + global _server_api + global _is_server + api = None + if _is_server: + api = _server_api + else: + api = _client_api + if api is None: + # We're inside a raylet worker + raise Exception("CoreRayAPI not supported") + return api + + +class RayAPIStub: + def connect(self): + from ray.worker import Worker + _client_worker = Worker() + _set_client_api(ClientAPI(_client_worker)) + + def disconnect(self): + global _client_api + if _client_api is not None: + _client_api.close() + _client_api = None + + def __getattr__(self, key: str): + global _get_client_api + api = _get_client_api() + return getattr(api, key) + + +ray = RayAPIStub() + +# Someday we might add methods in this module so that someone who +# tries to `import ray_client as ray` -- as a module, instead of +# `from ray_client import ray` -- as the API stub +# still gets expected functionality. This is the way the ray package +# worked in the past. +# +# This really calls for PEP 562: https://www.python.org/dev/peps/pep-0562/ +# But until Python 3.6 is EOL, here we are. diff --git a/web_python/ray/api.py b/web_python/ray/api.py new file mode 100644 index 0000000..d5ade03 --- /dev/null +++ b/web_python/ray/api.py @@ -0,0 +1,77 @@ +# This file defines an interface and client-side API stub +# for referring either to the core Ray API or the same interface +# from the Ray client. +# +# In tandem with __init__.py, we want to expose an API that's +# close to `python/ray/__init__.py` but with more than one implementation. +# The stubs in __init__ should call into a well-defined interface. +# Only the core Ray API implementation should actually `import ray` +# (and thus import all the raylet worker C bindings and such). +# But to make sure that we're matching these calls, we define this API. + +from abc import ABC +from abc import abstractmethod + + +class APIImpl(ABC): + @abstractmethod + def get(self, *args, **kwargs): + pass + + @abstractmethod + def put(self, *args, **kwargs): + pass + + @abstractmethod + def wait(self, *args, **kwargs): + pass + + @abstractmethod + def remote(self, *args, **kwargs): + pass + + @abstractmethod + def call_remote(self, instance, kind: int, *args, **kwargs): + pass + + @abstractmethod + def get_actor_from_object(self, id): + pass + + @abstractmethod + def close(self, *args, **kwargs): + pass + + +class ClientAPI(APIImpl): + def __init__(self, worker): + self.worker = worker + + def get(self, *args, **kwargs): + return self.worker.get(*args, **kwargs) + + def put(self, *args, **kwargs): + return self.worker.put(*args, **kwargs) + + def wait(self, *args, **kwargs): + return self.worker.wait(*args, **kwargs) + + def remote(self, *args, **kwargs): + return self.worker.remote(*args, **kwargs) + + def call_remote(self, f, kind, *args, **kwargs): + return self.worker.call_remote(f, kind, *args, **kwargs) + + def get_actor_from_object(self, id): + raise Exception("Calling get_actor_from_object on the client side") + + def close(self, *args, **kwargs): + return self.worker.close() + + def __getattr__(self, key: str): + if not key.startswith("_"): + raise NotImplementedError( + "Not available in Ray client: `ray.{}`. This method is only " + "available within Ray remote functions and is not yet " + "implemented in the client API.".format(key)) + return self.__getattribute__(key) diff --git a/web_python/ray/client_app.py b/web_python/ray/client_app.py new file mode 100644 index 0000000..3b99f81 --- /dev/null +++ b/web_python/ray/client_app.py @@ -0,0 +1,90 @@ +from ray import ray +from typing import Tuple + +ray.connect("localhost:50051") + + +@ray.remote +class HelloActor: + def __init__(self): + self.count = 0 + + def say_hello(self, whom: str) -> Tuple[str, int]: + self.count += 1 + return ("Hello " + whom, self.count) + + +actor = HelloActor.remote() +s, count = ray.get(actor.say_hello.remote("you")) +print(s, count) +assert s == "Hello you" +assert count == 1 +s, count = ray.get(actor.say_hello.remote("world")) +print(s, count) +assert s == "Hello world" +assert count == 2 + + +@ray.remote +def plus2(x): + return x + 2 + + +@ray.remote +def fact(x): + print(x, type(fact)) + if x <= 0: + return 1 + # This hits the "nested tasks" issue + # https://github.com/ray-project/ray/issues/3644 + # So we're on the right track! + return ray.get(fact.remote(x - 1)) * x + + +@ray.remote +def get_nodes(): + return ray.nodes() # Can access the full Ray API in remote methods. + + +print("Cluster nodes", ray.get(get_nodes.remote())) +print(ray.nodes()) + +objectref = ray.put("hello world") + +# `ClientObjectRef(...)` +print(objectref) + +# `hello world` +print(ray.get(objectref)) + +ref2 = plus2.remote(234) +# `ClientObjectRef(...)` +print(ref2) +# `236` +print(ray.get(ref2)) + +ref3 = fact.remote(20) +# `ClientObjectRef(...)` +print(ref3) +# `2432902008176640000` +print(ray.get(ref3)) + +# Reuse the cached ClientRemoteFunc object +ref4 = fact.remote(5) +# `120` +print(ray.get(ref4)) + +ref5 = fact.remote(10) + +print([ref2, ref3, ref4, ref5]) +# should return ref2, ref3, ref4 +res = ray.wait([ref5, ref2, ref3, ref4], num_returns=3) +print(res) +assert [ref2, ref3, ref4] == res[0] +assert [ref5] == res[1] + +# should return ref2, ref3, ref4, ref5 +res = ray.wait([ref2, ref3, ref4, ref5], num_returns=4) +print(res) +assert [ref2, ref3, ref4, ref5] == res[0] +assert [] == res[1] diff --git a/web_python/ray/common.py b/web_python/ray/common.py new file mode 100644 index 0000000..f5051db --- /dev/null +++ b/web_python/ray/common.py @@ -0,0 +1,204 @@ +import ray.webpb as webpb +from ray import ray +from typing import Dict +import cloudpickle + + +class ClientBaseRef: + def __init__(self, id): + self.id = id + + def __repr__(self): + return "%s(%s)" % ( + type(self).__name__, + self.id.hex(), + ) + + def __eq__(self, other): + return self.id == other.id + + def binary(self): + return self.id + + +class ClientObjectRef(ClientBaseRef): + pass + + +class ClientActorNameRef(ClientBaseRef): + pass + + +class ClientRemoteFunc: + def __init__(self, f): + self._func = f + self._name = f.__name__ + self.id = None + self._ref = None + self._raylet_remote = None + + def __call__(self, *args, **kwargs): + raise TypeError(f"Remote function cannot be called directly. " + "Use {self._name}.remote method instead") + + def remote(self, *args, **kwargs): + return ray.call_remote(self, webpb.ClientTaskType.FUNCTION, *args, + **kwargs) + + def _get_ray_remote_impl(self): + if self._raylet_remote is None: + self._raylet_remote = ray.remote(self._func) + return self._raylet_remote + + def __repr__(self): + return "ClientRemoteFunc(%s, %s)" % (self._name, self.id) + + def _prepare_client_task(self) -> webpb.ClientTask: + if self._ref is None: + self._ref = ray.put(self._func) + task = webpb.ClientTask() + task.type = webpb.ClientTaskType.FUNCTION + task.name = self._name + task.payload_id = self._ref.id + return task + + +class ClientActorClass: + def __init__(self, actor_cls): + self.actor_cls = actor_cls + self._name = actor_cls.__name__ + self._ref = None + self._raylet_remote = None + + def __call__(self, *args, **kwargs): + raise TypeError(f"Remote actor cannot be instantiated directly. " + "Use {self._name}.remote() instead") + + def __getstate__(self) -> Dict: + state = { + "actor_cls": self.actor_cls, + "_name": self._name, + "_ref": self._ref, + } + return state + + def __setstate__(self, state: Dict) -> None: + self.actor_cls = state["actor_cls"] + self._name = state["_name"] + self._ref = state["_ref"] + + def remote(self, *args, **kwargs): + # Actually instantiate the actor + ref = ray.call_remote(self, webpb.ClientTaskType.ACTOR, *args, + **kwargs) + return ClientActorHandle(ClientActorNameRef(ref.id), self) + + def __repr__(self): + return "ClientRemoteActor(%s, %s)" % (self._name, self._ref) + + def __getattr__(self, key): + raise NotImplementedError("static methods") + + def _prepare_client_task(self): + if self._ref is None: + self._ref = ray.put(self.actor_cls) + task = webpb.ClientTask() + task.type = webpb.ClientTaskType.ACTOR + task.name = self._name + task.payload_id = self._ref.id + return task + + +class ClientActorHandle: + def __init__(self, actor_id: ClientActorNameRef, + actor_class: ClientActorClass): + self.actor_id = actor_id + self.actor_class = actor_class + self._real_actor_handle = None + + def _get_ray_remote_impl(self): + if self._real_actor_handle is None: + self._real_actor_handle = ray.get_actor_from_object(self.actor_id) + return self._real_actor_handle + + def __getstate__(self) -> Dict: + state = { + "actor_id": self.actor_id, + "actor_class": self.actor_class, + "_real_actor_handle": self._real_actor_handle, + } + return state + + def __setstate__(self, state: Dict) -> None: + self.actor_id = state["actor_id"] + self.actor_class = state["actor_class"] + self._real_actor_handle = state["_real_actor_handle"] + + def __getattr__(self, key): + return ClientRemoteMethod(self, key) + + def __repr__(self): + return "ClientActorHandle(%s, %s, %s)" % ( + self.actor_id, self.actor_class, self._real_actor_handle) + + +class ClientRemoteMethod: + def __init__(self, actor_handle: ClientActorHandle, method_name: str): + self.actor_handle = actor_handle + self.method_name = method_name + + def __call__(self, *args, **kwargs): + raise TypeError(f"Remote method cannot be called directly. " + "Use {self._name}.remote() instead") + + def _get_ray_remote_impl(self): + return getattr(self.actor_handle._get_ray_remote_impl(), + self.method_name) + + def __getstate__(self) -> Dict: + state = { + "actor_handle": self.actor_handle, + "method_name": self.method_name, + } + return state + + def __setstate__(self, state: Dict) -> None: + self.actor_handle = state["actor_handle"] + self.method_name = state["method_name"] + + def remote(self, *args, **kwargs): + return ray.call_remote(self, webpb.ClientTaskType.METHOD, *args, + **kwargs) + + def __repr__(self): + name = "%s.%s" % (self.actor_handle.actor_class._name, + self.method_name) + return "ClientRemoteMethod(%s, %s)" % (name, + self.actor_handle.actor_id) + + def _prepare_client_task(self): + task = webpb.ClientTask() + task.type = webpb.ClientTask.METHOD + task.name = self.method_name + task.payload_id = self.actor_handle.actor_id.id + return task + + +# def convert_from_arg(pb) -> Any: + # if pb.local == webpb.Locality.REFERENCE: + # return ClientObjectRef(pb.reference_id) + # elif pb.local == webpb.Locality.INTERNED: + # return cloudpickle.loads(pb.data) + + # raise Exception("convert_from_arg: Uncovered locality enum") + + +def convert_to_arg(val): + out = webpb.Arg() + if isinstance(val, ClientObjectRef): + out.local = webpb.Locality.REFERENCE + out.reference_id = val.id + else: + out.local = webpb.Locality.INTERNED + out.data = cloudpickle.dumps(val) + return out diff --git a/web_python/ray/web.py b/web_python/ray/web.py new file mode 100644 index 0000000..2326817 --- /dev/null +++ b/web_python/ray/web.py @@ -0,0 +1,40 @@ +from js import XMLHttpRequest, Blob + +import json +import base64 +import cloudpickle + + +def get(get_id): + enc = base64.standard_b64encode(get_id).decode() + body = json.dumps({"id": enc}) + req = XMLHttpRequest.new() + req.open("POST", "/api/get", False) + blob = Blob.new([body], {type: 'application/json'}) + req.send(blob) + out = json.loads(req.response) + return out + + +def put(obj): + data = cloudpickle.dumps(obj) + enc = base64.standard_b64encode(data).decode() + body = json.dumps({"data": enc}) + req = XMLHttpRequest.new() + req.open("POST", "/api/put", False) + blob = Blob.new([body], {type: 'application/json'}) + req.send(blob) + out = json.loads(req.response) + out_id = base64.standard_b64decode(out["id"]) + return out_id + + +def schedule(task): + body = json.dumps(task) + req = XMLHttpRequest.new() + req.open("POST", "/api/schedule", False) + blob = Blob.new([body], {type: 'application/json'}) + req.send(blob) + out = json.loads(req.response) + out_id = base64.standard_b64decode(out["return_id"]) + return out_id diff --git a/web_python/ray/webpb.py b/web_python/ray/webpb.py new file mode 100644 index 0000000..a6eca67 --- /dev/null +++ b/web_python/ray/webpb.py @@ -0,0 +1,57 @@ +from enum import Enum +import base64 +import cloudpickle + + +class ClientTaskType(Enum): + FUNCTION = 0 + ACTOR = 1 + METHOD = 2 + STATIC_METHOD = 3 + + +class ClientTask: + def __init__(self): + self.type = 0 + self.name = "" + self.payload_id = b'' + self.args = [] + + def toJsonable(self): + return { + "type": self.type.value, + "name": self.name, + "payload_id": base64.standard_b64encode(self.payload_id).decode(), + "args": [x.toJsonable() for x in self.args] + } + + +class Locality(Enum): + INTERNED = 0 + REFERENCE = 1 + + +class Arg: + def __init__(self): + self.local = Locality.INTERNED + self.reference_id = b'' + self.data = b'' + self.type = 0 + + def toJsonable(self): + return { + "local": self.local.value, + "reference_id": base64.standard_b64encode(self.reference_id).decode(), + "data": base64.standard_b64encode(self.data).decode(), + "type": self.type, + } + + +def loads(b64): + data = base64.standard_b64decode(b64) + return cloudpickle.loads(data) + + +def dumps(obj): + data = cloudpickle.dumps(obj) + return base64.standard_b64encode(data).decode() diff --git a/web_python/ray/worker.py b/web_python/ray/worker.py new file mode 100644 index 0000000..639e74f --- /dev/null +++ b/web_python/ray/worker.py @@ -0,0 +1,88 @@ +"""This file includes the Worker class which sits on the client side. +It implements the Ray API functions that are forwarded through grpc calls +to the server. +""" +import inspect +import logging +from typing import List +from typing import Tuple + +import cloudpickle + +import ray.webpb as webpb +import ray.web as webcall +from ray.common import convert_to_arg +from ray.common import ClientObjectRef +from ray.common import ClientActorClass +from ray.common import ClientRemoteFunc + +logger = logging.getLogger(__name__) + + +class Worker: + def __init__(self): + pass + + def get(self, ids): + to_get = [] + single = False + if isinstance(ids, list): + to_get = [x.id for x in ids] + elif isinstance(ids, ClientObjectRef): + to_get = [ids.id] + single = True + else: + raise Exception("Can't get something that's not a " + "list of IDs or just an ID: %s" % type(ids)) + out = [self._get(x) for x in to_get] + if single: + out = out[0] + return out + + def _get(self, id: bytes): + data = webcall.get(id) + if not data["valid"]: + raise Exception( + "Client GetObject returned invalid data: id invalid?") + return webpb.loads(data["data"]) + + def put(self, vals): + to_put = [] + single = False + if isinstance(vals, list): + to_put = vals + else: + single = True + to_put.append(vals) + + out = [self._put(x) for x in to_put] + if single: + out = out[0] + return out + + def _put(self, val): + id = webcall.put(val) + return ClientObjectRef(id) + + def remote(self, function_or_class, *args, **kwargs): + # TODO(barakmich): Arguments to ray.remote + # get captured here. + if inspect.isfunction(function_or_class): + return ClientRemoteFunc(function_or_class) + elif inspect.isclass(function_or_class): + return ClientActorClass(function_or_class) + else: + raise TypeError("The @ray.remote decorator must be applied to " + "either a function or to a class.") + + def call_remote(self, instance, kind, *args, **kwargs): + task = instance._prepare_client_task() + for arg in args: + pb_arg = convert_to_arg(arg) + task.args.append(pb_arg) + logging.debug("Scheduling %s" % task) + return_id = webcall.schedule(task.toJsonable()) + return ClientObjectRef(return_id) + + def close(self): + pass diff --git a/web_python/setup.py b/web_python/setup.py new file mode 100644 index 0000000..fe27fa2 --- /dev/null +++ b/web_python/setup.py @@ -0,0 +1,19 @@ +import setuptools + +setuptools.setup( + name="ray-web", # Replace with your own username + version="0.0.1", + author="Ray Authors", + author_email="ray@anyscale.com", + description="For use in pyodide", + long_description="A longer description", + long_description_content_type="text/markdown", + url="https://github.com/ray-project/ray-web", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + ], + python_requires='>=3.6', +)