Add some better balancing, reconnection

This commit is contained in:
Barak Michener 2020-12-04 22:40:50 -08:00
parent 715f79c9c0
commit 324ba0d160
6 changed files with 77 additions and 29 deletions

View file

@ -20,10 +20,12 @@ type SimpleWorker struct {
workChan chan *ray_rpc.Work
clientConn WorkstreamConnection
pool WorkerPool
max int
curr int
}
func (s *SimpleWorker) Schedulable() bool {
return true
return s.curr < s.max
}
func (s *SimpleWorker) AssignWork(work *ray_rpc.Work) error {
@ -48,6 +50,7 @@ func (w *SimpleWorker) Run() error {
go func() {
for work := range w.workChan {
zap.S().Debug("Sending work")
w.curr++
err = w.clientConn.Send(work)
if err != nil {
zap.S().Error("Error sending:", err)
@ -61,6 +64,7 @@ func (w *SimpleWorker) Run() error {
zap.S().Error("Error on channel:", err)
return err
}
w.curr--
err = w.pool.Finish(result)
if err != nil {
zap.S().Error("Error finishing:", err)