from js import XMLHttpRequest, Blob import js import json import base64 import cloudpickle import ray.webpb as webpb def b64dumps(b): return base64.standard_b64encode(b).decode() def b64loads(s): return base64.standard_b64decode(s) def get(get_id): enc = base64.standard_b64encode(get_id).decode() body = json.dumps({"id": enc}) req = XMLHttpRequest.new() req.open("POST", "/api/get", False) blob = Blob.new([body], {type: 'application/json'}) req.send(blob) out = json.loads(req.response) return out def put(obj): data = cloudpickle.dumps(obj) enc = base64.standard_b64encode(data).decode() body = json.dumps({"data": enc}) req = XMLHttpRequest.new() req.open("POST", "/api/put", False) blob = Blob.new([body], {type: 'application/json'}) req.send(blob) out = json.loads(req.response) out_id = base64.standard_b64decode(out["id"]) return out_id def schedule(task): body = json.dumps(task) req = XMLHttpRequest.new() req.open("POST", "/api/schedule", False) blob = Blob.new([body], {type: 'application/json'}) req.send(blob) out = json.loads(req.response) out_id = base64.standard_b64decode(out["return_id"]) return out_id WORK_COMPLETE = 0 WORK_ERROR = 1 WORK_READY = 2 FUNCTION = webpb.ClientTaskType.FUNCTION.value def exec_work(work_blob): work = json.loads(work_blob) js.console.log("Got work") task = work["task"] if "type" in task and task["type"] != FUNCTION: return json.dumps({ "status": WORK_ERROR, "error_msg": "Unsupported", }) args = decode_json_args(task) getresp = get(b64loads(task["payload_id"])) func = webpb.loads(getresp["data"]) res = func(*args) out_data = cloudpickle.dumps(res) workstatus = { "status": WORK_COMPLETE, "complete_data": b64dumps(out_data), "finished_ticket": work["ticket"], } return json.dumps(workstatus) def decode_json_args(task): out = [] for arg in task["args"]: if "local" in arg and arg["local"] == webpb.Locality.REFERENCE.value: getresp = get(b64loads(arg["reference_id"])) data = webpb.loads(getresp["data"]) out.append(data) elif "local" not in arg or arg["local"] == webpb.Locality.INTERNED.value: out.append(webpb.loads(arg["data"])) else: raise Exception("convert_from_arg: Uncovered locality enum") return out