package main import ( "errors" "fmt" "sync" "github.com/barakmich/go_raylet/ray_rpc" ) type WorkstreamConnection = ray_rpc.RayletWorkerConnection_WorkstreamServer type WorkerPool interface { ray_rpc.RayletWorkerConnectionServer Schedule(*ray_rpc.Work) error Close() error Finish(*ray_rpc.WorkStatus) error Deregister(interface{}) error } type SimpleRRWorkerPool struct { sync.Mutex workers []*SimpleWorker store ObjectStore offset int } type SimpleWorker struct { workChan chan *ray_rpc.Work clientConn WorkstreamConnection pool WorkerPool } func NewRoundRobinWorkerPool(obj ObjectStore) *SimpleRRWorkerPool { return &SimpleRRWorkerPool{ store: obj, } } func (wp *SimpleRRWorkerPool) Workstream(conn WorkstreamConnection) error { wp.Lock() defer wp.Unlock() worker := &SimpleWorker{ workChan: make(chan *ray_rpc.Work, 10), clientConn: conn, pool: wp, } go worker.Main() wp.workers = append(wp.workers, worker) return nil } func (wp *SimpleRRWorkerPool) Schedule(work *ray_rpc.Work) error { wp.Lock() defer wp.Unlock() if len(wp.workers) == 0 { return errors.New("No workers available, try again later") } wp.workers[wp.offset].workChan <- work wp.offset++ if wp.offset == len(wp.workers) { wp.offset = 0 } return nil } func (wp *SimpleRRWorkerPool) Finish(status *ray_rpc.WorkStatus) error { if status.Status != ray_rpc.COMPLETE { panic("todo: Only call Finish on successfully completed work") } id := deserializeObjectID(status.FinishedTicket.ReturnId) return wp.store.PutObject(&Object{id, status.CompleteData}) } func (wp *SimpleRRWorkerPool) Close() error { wp.Lock() defer wp.Unlock() for _, w := range wp.workers { close(w.workChan) } return nil } func (wp *SimpleRRWorkerPool) Deregister(ptr interface{}) error { wp.Lock() defer wp.Unlock() worker := ptr.(*SimpleWorker) found := false for i, w := range wp.workers { if w == worker { wp.workers = append(wp.workers[:i], wp.workers[i+1:]...) if wp.offset == len(wp.workers) { wp.offset = 0 } found = true } } if !found { panic("Trying to deregister a worker that was never created") } return nil } func (w *SimpleWorker) Main() { sentinel, err := w.clientConn.Recv() if err != nil { fmt.Println(err) w.pool.Deregister(w) return } if sentinel.Status != ray_rpc.READY { fmt.Println("Sent wrong sentinel? Closing...") w.pool.Deregister(w) return } for { work, ok := <-w.workChan if !ok { break } err = w.clientConn.Send(work) if err != nil { fmt.Println("Error sending: %s", err) break } result, err := w.clientConn.Recv() err = w.pool.Finish(result) if err != nil { fmt.Println("Error finishing: %s", err) break } } w.pool.Deregister(w) }