tinkerbell/web_python/ray/web.py
2020-12-04 23:40:23 -08:00

95 lines
2.5 KiB
Python

from js import XMLHttpRequest, Blob
import js
import json
import base64
import cloudpickle
import ray.webpb as webpb
def b64dumps(b):
return base64.standard_b64encode(b).decode()
def b64loads(s):
return base64.standard_b64decode(s)
def get(get_id):
enc = base64.standard_b64encode(get_id).decode()
body = json.dumps({"id": enc})
req = XMLHttpRequest.new()
req.open("POST", "/api/get", False)
blob = Blob.new([body], {type: 'application/json'})
req.send(blob)
out = json.loads(req.response)
return out
def put(obj):
data = cloudpickle.dumps(obj)
enc = base64.standard_b64encode(data).decode()
body = json.dumps({"data": enc})
req = XMLHttpRequest.new()
req.open("POST", "/api/put", False)
blob = Blob.new([body], {type: 'application/json'})
req.send(blob)
out = json.loads(req.response)
out_id = base64.standard_b64decode(out["id"])
return out_id
def schedule(task):
body = json.dumps(task)
req = XMLHttpRequest.new()
req.open("POST", "/api/schedule", False)
blob = Blob.new([body], {type: 'application/json'})
req.send(blob)
out = json.loads(req.response)
out_id = base64.standard_b64decode(out["return_id"])
return out_id
WORK_COMPLETE = 0
WORK_ERROR = 1
WORK_READY = 2
FUNCTION = webpb.ClientTaskType.FUNCTION.value
def exec_work(work_blob):
work = json.loads(work_blob)
js.console.log("Got work")
task = work["task"]
if "type" in task and task["type"] != FUNCTION:
return json.dumps({
"status": WORK_ERROR,
"error_msg": "Unsupported",
})
args = decode_json_args(task)
getresp = get(b64loads(task["payload_id"]))
func = webpb.loads(getresp["data"])
res = func(*args)
out_data = cloudpickle.dumps(res)
workstatus = {
"status": WORK_COMPLETE,
"complete_data": b64dumps(out_data),
"finished_ticket": work["ticket"],
}
return json.dumps(workstatus)
def decode_json_args(task):
out = []
if "args" not in task:
return []
for arg in task["args"]:
if "local" in arg and arg["local"] == webpb.Locality.REFERENCE.value:
getresp = get(b64loads(arg["reference_id"]))
data = webpb.loads(getresp["data"])
out.append(data)
elif "local" not in arg or arg["local"] == webpb.Locality.INTERNED.value:
out.append(webpb.loads(arg["data"]))
else:
raise Exception("convert_from_arg: Uncovered locality enum")
return out