working workers in browser

This commit is contained in:
Barak Michener 2020-12-04 20:45:19 -08:00
parent 56116aea2a
commit 715f79c9c0
13 changed files with 343 additions and 28 deletions

1
go.mod
View file

@ -8,6 +8,7 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.3 // indirect
github.com/google/uuid v1.1.2
github.com/gorilla/websocket v1.4.2
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect

2
go.sum
View file

@ -48,6 +48,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=

95
http.go
View file

@ -2,20 +2,29 @@ package main
import (
"context"
"errors"
"fmt"
"net/http"
"path/filepath"
"github.com/barakmich/tinkerbell/ray_rpc"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)
func runHttp(addr, rootPath string, raylet *Raylet) error {
r := gin.Default()
h := rayletHandler{raylet}
r.Static("/", rootPath)
api := r.Group("/api")
api.POST("/get", h.GetObject)
api.POST("/put", h.PutObject)
api.POST("/schedule", h.Schedule)
api.GET("/ws", h.WorkerWS)
r.Static("/web", rootPath)
r.StaticFile("console.html", filepath.Join(rootPath, "console.html"))
r.StaticFile("index.html", filepath.Join(rootPath, "index.html"))
r.StaticFile("/", filepath.Join(rootPath, "index.html"))
err := r.Run(addr)
return err
}
@ -56,3 +65,87 @@ func (h *rayletHandler) Schedule(c *gin.Context) {
}
c.JSON(http.StatusOK, res)
}
func (h *rayletHandler) WorkerWS(c *gin.Context) {
conn, err := wsupgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
zap.S().Error("Failed to set websocket upgrade: %v\n", err)
return
}
worker := &WSWorker{
conn: conn,
workChan: make(chan *ray_rpc.Work),
h: h,
}
h.raylet.Workers.Register(worker)
err = worker.Run()
if err != nil {
zap.S().Info("Closing WS worker:", err)
}
h.raylet.Workers.Deregister(worker)
}
var wsupgrader = websocket.Upgrader{
ReadBufferSize: 10240,
WriteBufferSize: 10240,
}
type WSWorker struct {
conn *websocket.Conn
workChan chan *ray_rpc.Work
h *rayletHandler
sched bool
}
func (ws *WSWorker) AssignWork(work *ray_rpc.Work) error {
ws.workChan <- work
return nil
}
func (ws *WSWorker) Close() error {
close(ws.workChan)
return nil
}
func (ws *WSWorker) Schedulable() bool {
return ws.sched
}
func (ws *WSWorker) Run() error {
var sentinel ray_rpc.WorkStatus
err := ws.conn.ReadJSON(&sentinel)
if err != nil {
return err
}
if sentinel.Status != ray_rpc.READY {
return errors.New("Sent wrong sentinel? Closing...")
}
ws.sched = true
zap.S().Info("New worker:", sentinel.ErrorMsg)
go func() {
for work := range ws.workChan {
fmt.Println("sending work")
ws.sched = false
err = ws.conn.WriteJSON(work)
if err != nil {
zap.S().Error("Error sending:", err)
return
}
}
}()
for {
var result ray_rpc.WorkStatus
err := ws.conn.ReadJSON(&result)
if err != nil {
zap.S().Error("Error on channel:", err)
return err
}
ws.sched = true
err = ws.h.raylet.Workers.Finish(&result)
if err != nil {
zap.S().Error("Error finishing:", err)
return err
}
}
return nil
}

View file

@ -42,7 +42,7 @@ func main() {
server := NewMemRaylet()
ray_rpc.RegisterRayletDriverServer(s, server)
ray_rpc.RegisterRayletWorkerConnectionServer(s, server)
go runHttp(fmt.Sprintf(":%d", *httpPort), *webRoot, server)
go runHttp(fmt.Sprintf("0.0.0.0:%d", *httpPort), *webRoot, server)
// Serve gRPC Server
fmt.Println("Serving gRPC on https://", addr)

View file

@ -3,7 +3,6 @@ package main
import (
"encoding/binary"
"errors"
"fmt"
"sync"
"time"
@ -63,7 +62,6 @@ func (mem *MemObjectStore) AwaitObjects(ids []ObjectID, c chan ObjectResult, tim
v, ok := mem.db[id]
if !ok {
waitingFor[id] = true
fmt.Println("want", id)
} else {
found++
c <- ObjectResult{&Object{ID: id, Data: v}, nil}

View file

@ -2,7 +2,6 @@ package main
import (
"context"
"fmt"
"github.com/barakmich/tinkerbell/ray_rpc"
"go.uber.org/zap"
@ -42,7 +41,6 @@ func (r *Raylet) WaitObject(_ context.Context, _ *ray_rpc.WaitRequest) (*ray_rpc
}
func (r *Raylet) Schedule(_ context.Context, task *ray_rpc.ClientTask) (*ray_rpc.ClientTaskTicket, error) {
fmt.Println(task)
zap.S().Debug("Schedule:", task.Type)
id := r.Objects.MakeID()
ticket := &ray_rpc.ClientTaskTicket{serializeObjectID(id)}

View file

@ -5,8 +5,12 @@
<script src="https://code.jquery.com/jquery-latest.js"></script>
<script src="https://cdn.jsdelivr.net/npm/jquery.terminal/js/jquery.terminal.min.js"></script>
<link href="https://cdn.jsdelivr.net/npm/jquery.terminal/css/jquery.terminal.min.css" rel="stylesheet"/>
<link href="renderedhtml.css" rel="stylesheet"/>
<script src="./pyodide_dev.js"></script>
<link href="web/renderedhtml.css" rel="stylesheet"/>
<script type="text/javascript">
// set the pyodide files URL (packages.json, pyodide.asm.data etc)
window.languagePluginUrl = 'web/';
</script>
<script src="./web/pyodide_dev.js"></script>
<!--<script src="./raylet.js"></script>-->
</head>
<body>
@ -80,7 +84,7 @@
pyodide.runPythonAsync(`
import micropip
import js
wheel_path = js.window.location.protocol + "//" + js.window.location.host + "/ray_web-0.0.1-py3-none-any.whl"
wheel_path = js.window.location.protocol + "//" + js.window.location.host + "/web/ray_web-0.0.1-py3-none-any.whl"
micropip.install(wheel_path)
`).then( () => {
term.echo("from ray import ray")

View file

@ -3,9 +3,21 @@
<head>
<meta charset="UTF-8">
<script src="https://code.jquery.com/jquery-latest.js"></script>
<script src="./raylet.js"></script>
<script type="text/javascript">
// set the pyodide files URL (packages.json, pyodide.asm.data etc)
window.languagePluginUrl = 'web/';
</script>
<script src="./web/pyodide_dev.js"></script>
<script src="./web/workterms.js"></script>
<script src="./web/raylet.js"></script>
</head>
<body>
<h1>Clap if you believe in fairies...</h1>
<h2 id="status">Status: starting...</h2>
<div id="output">
</div>
</body>
<script>
</script>
</html>

View file

@ -1,11 +1,41 @@
languagePluginLoader.then(() => {
pyodide.loadPackage(["micropip", "cloudpickle"]).then( () =>
pyodide.runPython(`
pyodide.loadPackage(["micropip", "cloudpickle"]).then( () => {
pyodide.runPythonAsync(`
import micropip
import js
wheel_path = js.window.location.protocol + "//" + js.window.location.host + "/ray_web-0.0.1-py3-none-any.whl"
micropip.install(wheel_path)
wheel_path = js.window.location.protocol + "//" + js.window.location.host + "/web/ray_web-0.0.1-py3-none-any.whl"
micropip.install(wheel_path)`)
.then(() => { pyodide.runPython(`
from ray import ray
ray.connect()
from ray.web import exec_work
`)
)
})
var wsprotocol = "ws:"
if (window.location.protocol == "https:") {
wsprotocol = "wss:"
}
var wspath = wsprotocol + "//" + window.location.host + "/api/ws"
var c = new WebSocket(wspath)
c.onmessage = function(msg) {
var workText = workTerms[Math.floor(Math.random() * workTerms.length)];
$("#output").append("<p>" + workText + "...</p>")
pyodide.globals.torun = msg.data
pyodide.runPythonAsync("exec_work(torun)").then((res) => {
$("#output").append("<p>Did work! 👏</p>")
c.send(res)
})
}
c.onopen = function() {
$("#status").text("Status: connected!")
c.send(JSON.stringify({
status: 2,
error_msg: "WebsocketWorker"
}))
}
c.onclose = function() {
$("#status").text("Status: disconnected")
}
})
})

108
web/workterms.js Normal file
View file

@ -0,0 +1,108 @@
workTerms = [
"Adding Hidden Agendas",
"Adjusting Bell Curves",
"Aesthesizing Industrial Areas",
"Aligning Covariance Matrices",
"Applying Feng Shui Shaders",
"Applying Theatre Soda Layer",
"Asserting Packed Exemplars",
"Attempting to Lock Back-Buffer",
"Binding Sapling Root System",
"Breeding Fauna",
"Building Data Trees",
"Bureacritizing Bureaucracies",
"Calculating Inverse Probability Matrices",
"Calculating Llama Expectoration Trajectory",
"Calibrating Blue Skies",
"Charging Ozone Layer",
"Coalescing Cloud Formations",
"Cohorting Exemplars",
"Collecting Meteor Particles",
"Compounding Inert Tessellations",
"Compressing Fish Files",
"Computing Optimal Bin Packing",
"Concatenating Sub-Contractors",
"Containing Existential Buffer",
"Debarking Ark Ramp",
"Debunching Unionized Commercial Services",
"Deciding What Message to Display Next",
"Decomposing Singular Values",
"Decrementing Tectonic Plates",
"Deleting Ferry Routes",
"Depixelating Inner Mountain Surface Back Faces",
"Depositing Slush Funds",
"Destabilizing Economic Indicators",
"Determining Width of Blast Fronts",
"Deunionizing Bulldozers",
"Dicing Models",
"Diluting Livestock Nutrition Variables",
"Downloading Satellite Terrain Data",
"Exposing Flash Variables to Streak System",
"Extracting Resources",
"Factoring Pay Scale",
"Fixing Election Outcome Matrix",
"Flood-Filling Ground Water",
"Flushing Pipe Network",
"Gathering Particle Sources",
"Generating Jobs",
"Gesticulating Mimes",
"Graphing Whale Migration",
"Hiding Willio Webnet Mask",
"Implementing Impeachment Routine",
"Increasing Accuracy of RCI Simulators",
"Increasing Magmafacation",
"Initializing My Sim Tracking Mechanism",
"Initializing Rhinoceros Breeding Timetable",
"Initializing Robotic Click-Path AI",
"Inserting Sublimated Messages",
"Integrating Curves",
"Integrating Illumination Form Factors",
"Integrating Population Graphs",
"Iterating Cellular Automata",
"Lecturing Errant Subsystems",
"Mixing Genetic Pool",
"Modeling Object Components",
"Mopping Occupant Leaks",
"Normalizing Power",
"Obfuscating Quigley Matrix",
"Overconstraining Dirty Industry Calculations",
"Partitioning City Grid Singularities",
"Perturbing Matrices",
"Pixalating Nude Patch",
"Polishing Water Highlights",
"Populating Lot Templates",
"Preparing Sprites for Random Walks",
"Prioritizing Landmarks",
"Projecting Law Enforcement Pastry Intake",
"Realigning Alternate Time Frames",
"Reconfiguring User Mental Processes",
"Relaxing Splines",
"Removing Road Network Speed Bumps",
"Removing Texture Gradients",
"Removing Vehicle Avoidance Behavior",
"Resolving GUID Conflict",
"Reticulating Splines",
"Retracting Phong Shader",
"Retrieving from Back Store",
"Reverse Engineering Image Consultant",
"Routing Neural Network Infanstructure",
"Scattering Rhino Food Sources",
"Scrubbing Terrain",
"Searching for Llamas",
"Seeding Architecture Simulation Parameters",
"Sequencing Particles",
"Setting Advisor Moods",
"Setting Inner Deity Indicators",
"Setting Universal Physical Constants",
"Sonically Enhancing Occupant-Free Timber",
"Speculating Stock Market Indices",
"Splatting Transforms",
"Stratifying Ground Layers",
"Sub-Sampling Water Data",
"Synthesizing Gravity",
"Synthesizing Wavelets",
"Time-Compressing Simulator Clock",
"Unable to Reveal Current Activity",
"Weathering Buildings",
"Zeroing Crime Network"
]

View file

@ -1,8 +1,18 @@
from js import XMLHttpRequest, Blob
import js
import json
import base64
import cloudpickle
import ray.webpb as webpb
def b64dumps(b):
return base64.standard_b64encode(b).decode()
def b64loads(s):
return base64.standard_b64decode(s)
def get(get_id):
@ -38,3 +48,46 @@ def schedule(task):
out = json.loads(req.response)
out_id = base64.standard_b64decode(out["return_id"])
return out_id
WORK_COMPLETE = 0
WORK_ERROR = 1
WORK_READY = 2
FUNCTION = webpb.ClientTaskType.FUNCTION.value
def exec_work(work_blob):
work = json.loads(work_blob)
js.console.log("Got work")
task = work["task"]
if "type" in task and task["type"] != FUNCTION:
return json.dumps({
"status": WORK_ERROR,
"error_msg": "Unsupported",
})
args = decode_json_args(task)
getresp = get(b64loads(task["payload_id"]))
func = webpb.loads(getresp["data"])
res = func(*args)
out_data = cloudpickle.dumps(res)
workstatus = {
"status": WORK_COMPLETE,
"complete_data": b64dumps(out_data),
"finished_ticket": work["ticket"],
}
return json.dumps(workstatus)
def decode_json_args(task):
out = []
for arg in task["args"]:
if "local" in arg and arg["local"] == webpb.Locality.REFERENCE.value:
getresp = get(b64loads(arg["reference_id"]))
data = webpb.loads(getresp["data"])
out.append(data)
elif "local" not in arg or arg["local"] == webpb.Locality.INTERNED.value:
out.append(webpb.loads(arg["data"]))
else:
raise Exception("convert_from_arg: Uncovered locality enum")
return out

View file

@ -2,9 +2,9 @@ package main
import (
"errors"
"fmt"
"github.com/barakmich/tinkerbell/ray_rpc"
"go.uber.org/zap"
)
type WorkstreamConnection = ray_rpc.RayletWorkerConnection_WorkstreamServer
@ -13,6 +13,7 @@ type Worker interface {
AssignWork(work *ray_rpc.Work) error
Run() error
Close() error
Schedulable() bool
}
type SimpleWorker struct {
@ -21,6 +22,10 @@ type SimpleWorker struct {
pool WorkerPool
}
func (s *SimpleWorker) Schedulable() bool {
return true
}
func (s *SimpleWorker) AssignWork(work *ray_rpc.Work) error {
s.workChan <- work
return nil
@ -39,13 +44,13 @@ func (w *SimpleWorker) Run() error {
if sentinel.Status != ray_rpc.READY {
return errors.New("Sent wrong sentinel? Closing...")
}
fmt.Println("New worker:", sentinel.ErrorMsg)
zap.S().Info("New worker:", sentinel.ErrorMsg)
go func() {
for work := range w.workChan {
fmt.Println("sending work")
zap.S().Debug("Sending work")
err = w.clientConn.Send(work)
if err != nil {
fmt.Println("Error sending:", err)
zap.S().Error("Error sending:", err)
return
}
}
@ -53,12 +58,12 @@ func (w *SimpleWorker) Run() error {
for {
result, err := w.clientConn.Recv()
if err != nil {
fmt.Println("Error on channel:", err)
zap.S().Error("Error on channel:", err)
return err
}
err = w.pool.Finish(result)
if err != nil {
fmt.Println("Error finishing:", err)
zap.S().Error("Error finishing:", err)
return err
}
}

View file

@ -2,7 +2,6 @@ package main
import (
"errors"
"fmt"
"sync"
"github.com/barakmich/tinkerbell/ray_rpc"
@ -43,11 +42,23 @@ func (wp *SimpleRRWorkerPool) Schedule(work *ray_rpc.Work) error {
if len(wp.workers) == 0 {
return errors.New("No workers available, try again later")
}
zap.S().Info("Sending work to worker", wp.offset)
wp.workers[wp.offset].AssignWork(work)
wp.offset++
if wp.offset == len(wp.workers) {
wp.offset = 0
origOffset := wp.offset
done := false
for !done {
worker := wp.workers[wp.offset]
if worker.Schedulable() {
zap.S().Info("Sending work to worker", wp.offset)
wp.workers[wp.offset].AssignWork(work)
done = true
}
wp.offset++
if wp.offset == len(wp.workers) {
wp.offset = 0
}
if wp.offset == origOffset && !done {
return errors.New("No workers schedulable")
}
}
return nil
}
@ -72,7 +83,6 @@ func (wp *SimpleRRWorkerPool) Close() error {
func (wp *SimpleRRWorkerPool) Deregister(worker Worker) error {
wp.Lock()
defer wp.Unlock()
fmt.Println("Deregistering worker")
found := false
for i, w := range wp.workers {
if w == worker {
@ -82,6 +92,7 @@ func (wp *SimpleRRWorkerPool) Deregister(worker Worker) error {
}
worker.Close()
found = true
zap.S().Info("Deregistering worker", i)
}
}
if !found {