package main import ( "errors" "fmt" "github.com/barakmich/tinkerbell/ray_rpc" ) type WorkstreamConnection = ray_rpc.RayletWorkerConnection_WorkstreamServer type Worker interface { AssignWork(work *ray_rpc.Work) error Run() error Close() error } type SimpleWorker struct { workChan chan *ray_rpc.Work clientConn WorkstreamConnection pool WorkerPool } func (s *SimpleWorker) AssignWork(work *ray_rpc.Work) error { s.workChan <- work return nil } func (s *SimpleWorker) Close() error { close(s.workChan) return nil } func (w *SimpleWorker) Run() error { sentinel, err := w.clientConn.Recv() if err != nil { return err } if sentinel.Status != ray_rpc.READY { return errors.New("Sent wrong sentinel? Closing...") } fmt.Println("New worker:", sentinel.ErrorMsg) go func() { for work := range w.workChan { fmt.Println("sending work") err = w.clientConn.Send(work) if err != nil { fmt.Println("Error sending:", err) return } } }() for { result, err := w.clientConn.Recv() if err != nil { fmt.Println("Error on channel:", err) return err } err = w.pool.Finish(result) if err != nil { fmt.Println("Error finishing:", err) return err } } return nil }