diff --git a/go.mod b/go.mod index c819882..651e528 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.4.3 // indirect github.com/google/uuid v1.1.2 + github.com/gorilla/websocket v1.4.2 github.com/json-iterator/go v1.1.10 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect diff --git a/go.sum b/go.sum index 26bcdce..4e7734b 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= diff --git a/http.go b/http.go index 3fa3aa1..5db9228 100644 --- a/http.go +++ b/http.go @@ -2,20 +2,29 @@ package main import ( "context" + "errors" + "fmt" "net/http" + "path/filepath" "github.com/barakmich/tinkerbell/ray_rpc" "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + "go.uber.org/zap" ) func runHttp(addr, rootPath string, raylet *Raylet) error { r := gin.Default() h := rayletHandler{raylet} - r.Static("/", rootPath) api := r.Group("/api") api.POST("/get", h.GetObject) api.POST("/put", h.PutObject) api.POST("/schedule", h.Schedule) + api.GET("/ws", h.WorkerWS) + r.Static("/web", rootPath) + r.StaticFile("console.html", filepath.Join(rootPath, "console.html")) + r.StaticFile("index.html", filepath.Join(rootPath, "index.html")) + r.StaticFile("/", filepath.Join(rootPath, "index.html")) err := r.Run(addr) return err } @@ -56,3 +65,87 @@ func (h *rayletHandler) Schedule(c *gin.Context) { } c.JSON(http.StatusOK, res) } + +func (h *rayletHandler) WorkerWS(c *gin.Context) { + conn, err := wsupgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + zap.S().Error("Failed to set websocket upgrade: %v\n", err) + return + } + worker := &WSWorker{ + conn: conn, + workChan: make(chan *ray_rpc.Work), + h: h, + } + h.raylet.Workers.Register(worker) + err = worker.Run() + if err != nil { + zap.S().Info("Closing WS worker:", err) + } + h.raylet.Workers.Deregister(worker) +} + +var wsupgrader = websocket.Upgrader{ + ReadBufferSize: 10240, + WriteBufferSize: 10240, +} + +type WSWorker struct { + conn *websocket.Conn + workChan chan *ray_rpc.Work + h *rayletHandler + sched bool +} + +func (ws *WSWorker) AssignWork(work *ray_rpc.Work) error { + ws.workChan <- work + return nil +} + +func (ws *WSWorker) Close() error { + close(ws.workChan) + return nil +} + +func (ws *WSWorker) Schedulable() bool { + return ws.sched +} + +func (ws *WSWorker) Run() error { + var sentinel ray_rpc.WorkStatus + err := ws.conn.ReadJSON(&sentinel) + if err != nil { + return err + } + if sentinel.Status != ray_rpc.READY { + return errors.New("Sent wrong sentinel? Closing...") + } + ws.sched = true + zap.S().Info("New worker:", sentinel.ErrorMsg) + go func() { + for work := range ws.workChan { + fmt.Println("sending work") + ws.sched = false + err = ws.conn.WriteJSON(work) + if err != nil { + zap.S().Error("Error sending:", err) + return + } + } + }() + for { + var result ray_rpc.WorkStatus + err := ws.conn.ReadJSON(&result) + if err != nil { + zap.S().Error("Error on channel:", err) + return err + } + ws.sched = true + err = ws.h.raylet.Workers.Finish(&result) + if err != nil { + zap.S().Error("Error finishing:", err) + return err + } + } + return nil +} diff --git a/main.go b/main.go index 0fa2d29..c5f1ae5 100644 --- a/main.go +++ b/main.go @@ -42,7 +42,7 @@ func main() { server := NewMemRaylet() ray_rpc.RegisterRayletDriverServer(s, server) ray_rpc.RegisterRayletWorkerConnectionServer(s, server) - go runHttp(fmt.Sprintf(":%d", *httpPort), *webRoot, server) + go runHttp(fmt.Sprintf("0.0.0.0:%d", *httpPort), *webRoot, server) // Serve gRPC Server fmt.Println("Serving gRPC on https://", addr) diff --git a/object.go b/object.go index 65b3fed..3caafaf 100644 --- a/object.go +++ b/object.go @@ -3,7 +3,6 @@ package main import ( "encoding/binary" "errors" - "fmt" "sync" "time" @@ -63,7 +62,6 @@ func (mem *MemObjectStore) AwaitObjects(ids []ObjectID, c chan ObjectResult, tim v, ok := mem.db[id] if !ok { waitingFor[id] = true - fmt.Println("want", id) } else { found++ c <- ObjectResult{&Object{ID: id, Data: v}, nil} diff --git a/raylet_grpc.go b/raylet_grpc.go index 40a7433..a4e0b20 100644 --- a/raylet_grpc.go +++ b/raylet_grpc.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "github.com/barakmich/tinkerbell/ray_rpc" "go.uber.org/zap" @@ -42,7 +41,6 @@ func (r *Raylet) WaitObject(_ context.Context, _ *ray_rpc.WaitRequest) (*ray_rpc } func (r *Raylet) Schedule(_ context.Context, task *ray_rpc.ClientTask) (*ray_rpc.ClientTaskTicket, error) { - fmt.Println(task) zap.S().Debug("Schedule:", task.Type) id := r.Objects.MakeID() ticket := &ray_rpc.ClientTaskTicket{serializeObjectID(id)} diff --git a/web/console.html b/web/console.html index 1580838..b769e6c 100644 --- a/web/console.html +++ b/web/console.html @@ -5,8 +5,12 @@ - - + + +
@@ -80,7 +84,7 @@ pyodide.runPythonAsync(` import micropip import js - wheel_path = js.window.location.protocol + "//" + js.window.location.host + "/ray_web-0.0.1-py3-none-any.whl" + wheel_path = js.window.location.protocol + "//" + js.window.location.host + "/web/ray_web-0.0.1-py3-none-any.whl" micropip.install(wheel_path) `).then( () => { term.echo("from ray import ray") diff --git a/web/index.html b/web/index.html index fa1dcba..f9e4392 100644 --- a/web/index.html +++ b/web/index.html @@ -3,9 +3,21 @@ - + + + + +" + workText + "...
") + pyodide.globals.torun = msg.data + pyodide.runPythonAsync("exec_work(torun)").then((res) => { + $("#output").append("Did work! 👏
") + c.send(res) + }) + } + c.onopen = function() { + $("#status").text("Status: connected!") + c.send(JSON.stringify({ + status: 2, + error_msg: "WebsocketWorker" + })) + } + c.onclose = function() { + $("#status").text("Status: disconnected") + } +}) }) diff --git a/web/workterms.js b/web/workterms.js new file mode 100644 index 0000000..165531a --- /dev/null +++ b/web/workterms.js @@ -0,0 +1,108 @@ +workTerms = [ +"Adding Hidden Agendas", +"Adjusting Bell Curves", +"Aesthesizing Industrial Areas", +"Aligning Covariance Matrices", +"Applying Feng Shui Shaders", +"Applying Theatre Soda Layer", +"Asserting Packed Exemplars", +"Attempting to Lock Back-Buffer", +"Binding Sapling Root System", +"Breeding Fauna", +"Building Data Trees", +"Bureacritizing Bureaucracies", +"Calculating Inverse Probability Matrices", +"Calculating Llama Expectoration Trajectory", +"Calibrating Blue Skies", +"Charging Ozone Layer", +"Coalescing Cloud Formations", +"Cohorting Exemplars", +"Collecting Meteor Particles", +"Compounding Inert Tessellations", +"Compressing Fish Files", +"Computing Optimal Bin Packing", +"Concatenating Sub-Contractors", +"Containing Existential Buffer", +"Debarking Ark Ramp", +"Debunching Unionized Commercial Services", +"Deciding What Message to Display Next", +"Decomposing Singular Values", +"Decrementing Tectonic Plates", +"Deleting Ferry Routes", +"Depixelating Inner Mountain Surface Back Faces", +"Depositing Slush Funds", +"Destabilizing Economic Indicators", +"Determining Width of Blast Fronts", +"Deunionizing Bulldozers", +"Dicing Models", +"Diluting Livestock Nutrition Variables", +"Downloading Satellite Terrain Data", +"Exposing Flash Variables to Streak System", +"Extracting Resources", +"Factoring Pay Scale", +"Fixing Election Outcome Matrix", +"Flood-Filling Ground Water", +"Flushing Pipe Network", +"Gathering Particle Sources", +"Generating Jobs", +"Gesticulating Mimes", +"Graphing Whale Migration", +"Hiding Willio Webnet Mask", +"Implementing Impeachment Routine", +"Increasing Accuracy of RCI Simulators", +"Increasing Magmafacation", +"Initializing My Sim Tracking Mechanism", +"Initializing Rhinoceros Breeding Timetable", +"Initializing Robotic Click-Path AI", +"Inserting Sublimated Messages", +"Integrating Curves", +"Integrating Illumination Form Factors", +"Integrating Population Graphs", +"Iterating Cellular Automata", +"Lecturing Errant Subsystems", +"Mixing Genetic Pool", +"Modeling Object Components", +"Mopping Occupant Leaks", +"Normalizing Power", +"Obfuscating Quigley Matrix", +"Overconstraining Dirty Industry Calculations", +"Partitioning City Grid Singularities", +"Perturbing Matrices", +"Pixalating Nude Patch", +"Polishing Water Highlights", +"Populating Lot Templates", +"Preparing Sprites for Random Walks", +"Prioritizing Landmarks", +"Projecting Law Enforcement Pastry Intake", +"Realigning Alternate Time Frames", +"Reconfiguring User Mental Processes", +"Relaxing Splines", +"Removing Road Network Speed Bumps", +"Removing Texture Gradients", +"Removing Vehicle Avoidance Behavior", +"Resolving GUID Conflict", +"Reticulating Splines", +"Retracting Phong Shader", +"Retrieving from Back Store", +"Reverse Engineering Image Consultant", +"Routing Neural Network Infanstructure", +"Scattering Rhino Food Sources", +"Scrubbing Terrain", +"Searching for Llamas", +"Seeding Architecture Simulation Parameters", +"Sequencing Particles", +"Setting Advisor Moods", +"Setting Inner Deity Indicators", +"Setting Universal Physical Constants", +"Sonically Enhancing Occupant-Free Timber", +"Speculating Stock Market Indices", +"Splatting Transforms", +"Stratifying Ground Layers", +"Sub-Sampling Water Data", +"Synthesizing Gravity", +"Synthesizing Wavelets", +"Time-Compressing Simulator Clock", +"Unable to Reveal Current Activity", +"Weathering Buildings", +"Zeroing Crime Network" +] diff --git a/web_python/ray/web.py b/web_python/ray/web.py index 2326817..6718079 100644 --- a/web_python/ray/web.py +++ b/web_python/ray/web.py @@ -1,8 +1,18 @@ from js import XMLHttpRequest, Blob +import js import json import base64 import cloudpickle +import ray.webpb as webpb + + +def b64dumps(b): + return base64.standard_b64encode(b).decode() + + +def b64loads(s): + return base64.standard_b64decode(s) def get(get_id): @@ -38,3 +48,46 @@ def schedule(task): out = json.loads(req.response) out_id = base64.standard_b64decode(out["return_id"]) return out_id + + +WORK_COMPLETE = 0 +WORK_ERROR = 1 +WORK_READY = 2 + +FUNCTION = webpb.ClientTaskType.FUNCTION.value + + +def exec_work(work_blob): + work = json.loads(work_blob) + js.console.log("Got work") + task = work["task"] + if "type" in task and task["type"] != FUNCTION: + return json.dumps({ + "status": WORK_ERROR, + "error_msg": "Unsupported", + }) + args = decode_json_args(task) + getresp = get(b64loads(task["payload_id"])) + func = webpb.loads(getresp["data"]) + res = func(*args) + out_data = cloudpickle.dumps(res) + workstatus = { + "status": WORK_COMPLETE, + "complete_data": b64dumps(out_data), + "finished_ticket": work["ticket"], + } + return json.dumps(workstatus) + + +def decode_json_args(task): + out = [] + for arg in task["args"]: + if "local" in arg and arg["local"] == webpb.Locality.REFERENCE.value: + getresp = get(b64loads(arg["reference_id"])) + data = webpb.loads(getresp["data"]) + out.append(data) + elif "local" not in arg or arg["local"] == webpb.Locality.INTERNED.value: + out.append(webpb.loads(arg["data"])) + else: + raise Exception("convert_from_arg: Uncovered locality enum") + return out diff --git a/worker.go b/worker.go index b1af58a..6c90d5f 100644 --- a/worker.go +++ b/worker.go @@ -2,9 +2,9 @@ package main import ( "errors" - "fmt" "github.com/barakmich/tinkerbell/ray_rpc" + "go.uber.org/zap" ) type WorkstreamConnection = ray_rpc.RayletWorkerConnection_WorkstreamServer @@ -13,6 +13,7 @@ type Worker interface { AssignWork(work *ray_rpc.Work) error Run() error Close() error + Schedulable() bool } type SimpleWorker struct { @@ -21,6 +22,10 @@ type SimpleWorker struct { pool WorkerPool } +func (s *SimpleWorker) Schedulable() bool { + return true +} + func (s *SimpleWorker) AssignWork(work *ray_rpc.Work) error { s.workChan <- work return nil @@ -39,13 +44,13 @@ func (w *SimpleWorker) Run() error { if sentinel.Status != ray_rpc.READY { return errors.New("Sent wrong sentinel? Closing...") } - fmt.Println("New worker:", sentinel.ErrorMsg) + zap.S().Info("New worker:", sentinel.ErrorMsg) go func() { for work := range w.workChan { - fmt.Println("sending work") + zap.S().Debug("Sending work") err = w.clientConn.Send(work) if err != nil { - fmt.Println("Error sending:", err) + zap.S().Error("Error sending:", err) return } } @@ -53,12 +58,12 @@ func (w *SimpleWorker) Run() error { for { result, err := w.clientConn.Recv() if err != nil { - fmt.Println("Error on channel:", err) + zap.S().Error("Error on channel:", err) return err } err = w.pool.Finish(result) if err != nil { - fmt.Println("Error finishing:", err) + zap.S().Error("Error finishing:", err) return err } } diff --git a/worker_pool.go b/worker_pool.go index ed1da4a..71cb137 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -2,7 +2,6 @@ package main import ( "errors" - "fmt" "sync" "github.com/barakmich/tinkerbell/ray_rpc" @@ -43,11 +42,23 @@ func (wp *SimpleRRWorkerPool) Schedule(work *ray_rpc.Work) error { if len(wp.workers) == 0 { return errors.New("No workers available, try again later") } - zap.S().Info("Sending work to worker", wp.offset) - wp.workers[wp.offset].AssignWork(work) - wp.offset++ - if wp.offset == len(wp.workers) { - wp.offset = 0 + origOffset := wp.offset + + done := false + for !done { + worker := wp.workers[wp.offset] + if worker.Schedulable() { + zap.S().Info("Sending work to worker", wp.offset) + wp.workers[wp.offset].AssignWork(work) + done = true + } + wp.offset++ + if wp.offset == len(wp.workers) { + wp.offset = 0 + } + if wp.offset == origOffset && !done { + return errors.New("No workers schedulable") + } } return nil } @@ -72,7 +83,6 @@ func (wp *SimpleRRWorkerPool) Close() error { func (wp *SimpleRRWorkerPool) Deregister(worker Worker) error { wp.Lock() defer wp.Unlock() - fmt.Println("Deregistering worker") found := false for i, w := range wp.workers { if w == worker { @@ -82,6 +92,7 @@ func (wp *SimpleRRWorkerPool) Deregister(worker Worker) error { } worker.Close() found = true + zap.S().Info("Deregistering worker", i) } } if !found {