package main import ( "context" "errors" "fmt" "net/http" "path/filepath" "github.com/barakmich/tinkerbell/ray_rpc" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "go.uber.org/zap" ) func runHttp(addr, rootPath string, raylet *Raylet) error { r := gin.Default() h := rayletHandler{raylet} api := r.Group("/api") api.POST("/get", h.GetObject) api.POST("/put", h.PutObject) api.POST("/schedule", h.Schedule) api.GET("/ws", h.WorkerWS) r.Static("/web", rootPath) r.StaticFile("console.html", filepath.Join(rootPath, "console.html")) r.StaticFile("index.html", filepath.Join(rootPath, "index.html")) r.StaticFile("/", filepath.Join(rootPath, "index.html")) err := r.Run(addr) return err } type rayletHandler struct { raylet *Raylet } func (h *rayletHandler) GetObject(c *gin.Context) { var req ray_rpc.GetRequest c.BindJSON(&req) res, err := h.raylet.GetObject(context.TODO(), &req) if err != nil { c.JSON(500, gin.H{"error": err}) return } c.JSON(http.StatusOK, res) } func (h *rayletHandler) PutObject(c *gin.Context) { var req ray_rpc.PutRequest c.BindJSON(&req) res, err := h.raylet.PutObject(context.TODO(), &req) if err != nil { c.JSON(500, gin.H{"error": err}) return } c.JSON(http.StatusOK, res) } func (h *rayletHandler) Schedule(c *gin.Context) { var req ray_rpc.ClientTask c.BindJSON(&req) res, err := h.raylet.Schedule(context.TODO(), &req) if err != nil { c.JSON(500, gin.H{"error": err}) return } c.JSON(http.StatusOK, res) } func (h *rayletHandler) WorkerWS(c *gin.Context) { conn, err := wsupgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { zap.S().Error("Failed to set websocket upgrade: %v\n", err) return } worker := &WSWorker{ conn: conn, workChan: make(chan *ray_rpc.Work), h: h, } h.raylet.Workers.Register(worker) err = worker.Run() if err != nil { zap.S().Info("Closing WS worker:", err) } h.raylet.Workers.Deregister(worker) } var wsupgrader = websocket.Upgrader{ ReadBufferSize: 10240, WriteBufferSize: 10240, } type WSWorker struct { conn *websocket.Conn workChan chan *ray_rpc.Work h *rayletHandler sched bool } func (ws *WSWorker) AssignWork(work *ray_rpc.Work) error { ws.workChan <- work return nil } func (ws *WSWorker) Close() error { close(ws.workChan) return nil } func (ws *WSWorker) Schedulable() bool { return ws.sched } func (ws *WSWorker) Run() error { var sentinel ray_rpc.WorkStatus err := ws.conn.ReadJSON(&sentinel) if err != nil { return err } if sentinel.Status != ray_rpc.READY { return errors.New("Sent wrong sentinel? Closing...") } ws.sched = true zap.S().Info("New worker:", sentinel.ErrorMsg) go func() { for work := range ws.workChan { fmt.Println("sending work") ws.sched = false err = ws.conn.WriteJSON(work) if err != nil { zap.S().Error("Error sending:", err) return } } }() for { var result ray_rpc.WorkStatus err := ws.conn.ReadJSON(&result) if err != nil { zap.S().Error("Error on channel:", err) return err } ws.sched = true err = ws.h.raylet.Workers.Finish(&result) if err != nil { zap.S().Error("Error finishing:", err) return err } } return nil }