From efd3d65269dd1d7a8a209b870568ab5eefc1eb82 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Fri, 4 Dec 2020 12:34:41 -0800 Subject: [PATCH] abstract workers --- go.mod | 2 +- main.go | 2 +- raylet_grpc.go | 14 ++++++++---- worker.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++ worker_pool.go | 71 +++++++++------------------------------------------------- 5 files changed, 88 insertions(+), 67 deletions(-) create mode 100644 worker.go diff --git a/go.mod b/go.mod index cc38f4d..325a064 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/barakmich/go_raylet +module github.com/barakmich/tinkerbell go 1.15 diff --git a/main.go b/main.go index 45bc7a2..a1dca92 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,7 @@ import ( "net" "os" - "github.com/barakmich/go_raylet/ray_rpc" + "github.com/barakmich/tinkerbell/ray_rpc" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" diff --git a/raylet_grpc.go b/raylet_grpc.go index 37c2d05..d0a71a1 100644 --- a/raylet_grpc.go +++ b/raylet_grpc.go @@ -3,7 +3,7 @@ package main import ( "context" - "github.com/barakmich/go_raylet/ray_rpc" + "github.com/barakmich/tinkerbell/ray_rpc" "go.uber.org/zap" ) @@ -55,11 +55,17 @@ func (r *Raylet) Schedule(_ context.Context, task *ray_rpc.ClientTask) (*ray_rpc } func (r *Raylet) Workstream(conn WorkstreamConnection) error { - return r.Workers.Workstream(conn) + worker := &SimpleWorker{ + workChan: make(chan *ray_rpc.Work), + clientConn: conn, + pool: wp, + } + r.Workers.Register(worker) + err := worker.Run() + r.Workers.Deregister(worker) + return err } -//func (r *Raylet) Workstream() - func NewMemRaylet() *Raylet { store := NewMemObjectStore() return &Raylet{ diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..139ad37 --- /dev/null +++ b/worker.go @@ -0,0 +1,66 @@ +package main + +import ( + "errors" + "fmt" + + "github.com/barakmich/tinkerbell/ray_rpc" +) + +type WorkstreamConnection = ray_rpc.RayletWorkerConnection_WorkstreamServer + +type Worker interface { + AssignWork(work *ray_rpc.Work) error + Run() error + Close() error +} + +type SimpleWorker struct { + workChan chan *ray_rpc.Work + clientConn WorkstreamConnection + pool WorkerPool +} + +func (s *SimpleWorker) AssignWork(work *ray_rpc.Work) error { + s.workChan <- work + return nil +} + +func (s *SimpleWorker) Close() error { + close(w.workChan) + return nil +} + +func (w *SimpleWorker) Run() error { + sentinel, err := w.clientConn.Recv() + if err != nil { + return err + } + if sentinel.Status != ray_rpc.READY { + return errors.New("Sent wrong sentinel? Closing...") + } + fmt.Println("New worker:", sentinel.ErrorMsg) + go func() { + for work := range w.workChan { + fmt.Println("sending work") + err = w.clientConn.Send(work) + if err != nil { + fmt.Println("Error sending:", err) + return + } + } + }() + for { + result, err := w.clientConn.Recv() + if err != nil { + fmt.Println("Error on channel:", err) + return err + } + err = w.pool.Finish(result) + if err != nil { + fmt.Println("Error finishing:", err) + return err + } + } + return nil +} diff --git a/worker_pool.go b/worker_pool.go index 4283705..1dcb568 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -5,51 +5,35 @@ import ( "fmt" "sync" - "github.com/barakmich/go_raylet/ray_rpc" + "github.com/barakmich/tinkerbell/ray_rpc" "go.uber.org/zap" ) -type WorkstreamConnection = ray_rpc.RayletWorkerConnection_WorkstreamServer - type WorkerPool interface { - ray_rpc.RayletWorkerConnectionServer Schedule(*ray_rpc.Work) error Close() error Finish(*ray_rpc.WorkStatus) error - Deregister(interface{}) error + Register(worker Worker) error + Deregister(worker Worker) error } type SimpleRRWorkerPool struct { sync.Mutex - workers []*SimpleWorker + workers []Worker store ObjectStore offset int } -type SimpleWorker struct { - workChan chan *ray_rpc.Work - clientConn WorkstreamConnection - pool WorkerPool -} - func NewRoundRobinWorkerPool(obj ObjectStore) *SimpleRRWorkerPool { return &SimpleRRWorkerPool{ store: obj, } } -func (wp *SimpleRRWorkerPool) Workstream(conn WorkstreamConnection) error { +func (wp *SimpleRRWorkerPool) Register(worker Worker) error { wp.Lock() - worker := &SimpleWorker{ - workChan: make(chan *ray_rpc.Work), - clientConn: conn, - pool: wp, - } + defer wp.Unlock() wp.workers = append(wp.workers, worker) - wp.Unlock() - err := worker.Main() - wp.Deregister(worker) - return err } func (wp *SimpleRRWorkerPool) Schedule(work *ray_rpc.Work) error { @@ -59,7 +43,7 @@ func (wp *SimpleRRWorkerPool) Schedule(work *ray_rpc.Work) error { return errors.New("No workers available, try again later") } zap.S().Info("Sending work to worker", wp.offset) - wp.workers[wp.offset].workChan <- work + wp.workers[wp.offset].AssignWork(work) wp.offset++ if wp.offset == len(wp.workers) { wp.offset = 0 @@ -79,16 +63,15 @@ func (wp *SimpleRRWorkerPool) Close() error { wp.Lock() defer wp.Unlock() for _, w := range wp.workers { - close(w.workChan) + w.Close() } return nil } -func (wp *SimpleRRWorkerPool) Deregister(ptr interface{}) error { +func (wp *SimpleRRWorkerPool) Deregister(worker Worker) error { wp.Lock() defer wp.Unlock() fmt.Println("Deregistering worker") - worker := ptr.(*SimpleWorker) found := false for i, w := range wp.workers { if w == worker { @@ -96,7 +79,7 @@ func (wp *SimpleRRWorkerPool) Deregister(ptr interface{}) error { if wp.offset == len(wp.workers) { wp.offset = 0 } - close(worker.workChan) + worker.Close() found = true } } @@ -105,37 +88,3 @@ func (wp *SimpleRRWorkerPool) Deregister(ptr interface{}) error { } return nil } - -func (w *SimpleWorker) Main() error { - sentinel, err := w.clientConn.Recv() - if err != nil { - return err - } - if sentinel.Status != ray_rpc.READY { - return errors.New("Sent wrong sentinel? Closing...") - } - fmt.Println("New worker:", sentinel.ErrorMsg) - go func() { - for work := range w.workChan { - fmt.Println("sending work") - err = w.clientConn.Send(work) - if err != nil { - fmt.Println("Error sending:", err) - return - } - } - }() - for { - result, err := w.clientConn.Recv() - if err != nil { - fmt.Println("Error on channel:", err) - return err - } - err = w.pool.Finish(result) - if err != nil { - fmt.Println("Error finishing:", err) - return err - } - } - return nil -}