abstract workers

This commit is contained in:
Barak Michener 2020-12-04 12:34:41 -08:00
parent 51f92278ac
commit efd3d65269
5 changed files with 88 additions and 67 deletions

2
go.mod
View file

@ -1,4 +1,4 @@
module github.com/barakmich/go_raylet module github.com/barakmich/tinkerbell
go 1.15 go 1.15

View file

@ -7,7 +7,7 @@ import (
"net" "net"
"os" "os"
"github.com/barakmich/go_raylet/ray_rpc" "github.com/barakmich/tinkerbell/ray_rpc"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"

View file

@ -3,7 +3,7 @@ package main
import ( import (
"context" "context"
"github.com/barakmich/go_raylet/ray_rpc" "github.com/barakmich/tinkerbell/ray_rpc"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -55,11 +55,17 @@ func (r *Raylet) Schedule(_ context.Context, task *ray_rpc.ClientTask) (*ray_rpc
} }
func (r *Raylet) Workstream(conn WorkstreamConnection) error { func (r *Raylet) Workstream(conn WorkstreamConnection) error {
return r.Workers.Workstream(conn) worker := &SimpleWorker{
workChan: make(chan *ray_rpc.Work),
clientConn: conn,
pool: wp,
}
r.Workers.Register(worker)
err := worker.Run()
r.Workers.Deregister(worker)
return err
} }
//func (r *Raylet) Workstream()
func NewMemRaylet() *Raylet { func NewMemRaylet() *Raylet {
store := NewMemObjectStore() store := NewMemObjectStore()
return &Raylet{ return &Raylet{

66
worker.go Normal file
View file

@ -0,0 +1,66 @@
package main
import (
"errors"
"fmt"
"github.com/barakmich/tinkerbell/ray_rpc"
)
type WorkstreamConnection = ray_rpc.RayletWorkerConnection_WorkstreamServer
type Worker interface {
AssignWork(work *ray_rpc.Work) error
Run() error
Close() error
}
type SimpleWorker struct {
workChan chan *ray_rpc.Work
clientConn WorkstreamConnection
pool WorkerPool
}
func (s *SimpleWorker) AssignWork(work *ray_rpc.Work) error {
s.workChan <- work
return nil
}
func (s *SimpleWorker) Close() error {
close(w.workChan)
return nil
}
func (w *SimpleWorker) Run() error {
sentinel, err := w.clientConn.Recv()
if err != nil {
return err
}
if sentinel.Status != ray_rpc.READY {
return errors.New("Sent wrong sentinel? Closing...")
}
fmt.Println("New worker:", sentinel.ErrorMsg)
go func() {
for work := range w.workChan {
fmt.Println("sending work")
err = w.clientConn.Send(work)
if err != nil {
fmt.Println("Error sending:", err)
return
}
}
}()
for {
result, err := w.clientConn.Recv()
if err != nil {
fmt.Println("Error on channel:", err)
return err
}
err = w.pool.Finish(result)
if err != nil {
fmt.Println("Error finishing:", err)
return err
}
}
return nil
}

View file

@ -5,51 +5,35 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/barakmich/go_raylet/ray_rpc" "github.com/barakmich/tinkerbell/ray_rpc"
"go.uber.org/zap" "go.uber.org/zap"
) )
type WorkstreamConnection = ray_rpc.RayletWorkerConnection_WorkstreamServer
type WorkerPool interface { type WorkerPool interface {
ray_rpc.RayletWorkerConnectionServer
Schedule(*ray_rpc.Work) error Schedule(*ray_rpc.Work) error
Close() error Close() error
Finish(*ray_rpc.WorkStatus) error Finish(*ray_rpc.WorkStatus) error
Deregister(interface{}) error Register(worker Worker) error
Deregister(worker Worker) error
} }
type SimpleRRWorkerPool struct { type SimpleRRWorkerPool struct {
sync.Mutex sync.Mutex
workers []*SimpleWorker workers []Worker
store ObjectStore store ObjectStore
offset int offset int
} }
type SimpleWorker struct {
workChan chan *ray_rpc.Work
clientConn WorkstreamConnection
pool WorkerPool
}
func NewRoundRobinWorkerPool(obj ObjectStore) *SimpleRRWorkerPool { func NewRoundRobinWorkerPool(obj ObjectStore) *SimpleRRWorkerPool {
return &SimpleRRWorkerPool{ return &SimpleRRWorkerPool{
store: obj, store: obj,
} }
} }
func (wp *SimpleRRWorkerPool) Workstream(conn WorkstreamConnection) error { func (wp *SimpleRRWorkerPool) Register(worker Worker) error {
wp.Lock() wp.Lock()
worker := &SimpleWorker{ defer wp.Unlock()
workChan: make(chan *ray_rpc.Work),
clientConn: conn,
pool: wp,
}
wp.workers = append(wp.workers, worker) wp.workers = append(wp.workers, worker)
wp.Unlock()
err := worker.Main()
wp.Deregister(worker)
return err
} }
func (wp *SimpleRRWorkerPool) Schedule(work *ray_rpc.Work) error { func (wp *SimpleRRWorkerPool) Schedule(work *ray_rpc.Work) error {
@ -59,7 +43,7 @@ func (wp *SimpleRRWorkerPool) Schedule(work *ray_rpc.Work) error {
return errors.New("No workers available, try again later") return errors.New("No workers available, try again later")
} }
zap.S().Info("Sending work to worker", wp.offset) zap.S().Info("Sending work to worker", wp.offset)
wp.workers[wp.offset].workChan <- work wp.workers[wp.offset].AssignWork(work)
wp.offset++ wp.offset++
if wp.offset == len(wp.workers) { if wp.offset == len(wp.workers) {
wp.offset = 0 wp.offset = 0
@ -79,16 +63,15 @@ func (wp *SimpleRRWorkerPool) Close() error {
wp.Lock() wp.Lock()
defer wp.Unlock() defer wp.Unlock()
for _, w := range wp.workers { for _, w := range wp.workers {
close(w.workChan) w.Close()
} }
return nil return nil
} }
func (wp *SimpleRRWorkerPool) Deregister(ptr interface{}) error { func (wp *SimpleRRWorkerPool) Deregister(worker Worker) error {
wp.Lock() wp.Lock()
defer wp.Unlock() defer wp.Unlock()
fmt.Println("Deregistering worker") fmt.Println("Deregistering worker")
worker := ptr.(*SimpleWorker)
found := false found := false
for i, w := range wp.workers { for i, w := range wp.workers {
if w == worker { if w == worker {
@ -96,7 +79,7 @@ func (wp *SimpleRRWorkerPool) Deregister(ptr interface{}) error {
if wp.offset == len(wp.workers) { if wp.offset == len(wp.workers) {
wp.offset = 0 wp.offset = 0
} }
close(worker.workChan) worker.Close()
found = true found = true
} }
} }
@ -105,37 +88,3 @@ func (wp *SimpleRRWorkerPool) Deregister(ptr interface{}) error {
} }
return nil return nil
} }
func (w *SimpleWorker) Main() error {
sentinel, err := w.clientConn.Recv()
if err != nil {
return err
}
if sentinel.Status != ray_rpc.READY {
return errors.New("Sent wrong sentinel? Closing...")
}
fmt.Println("New worker:", sentinel.ErrorMsg)
go func() {
for work := range w.workChan {
fmt.Println("sending work")
err = w.clientConn.Send(work)
if err != nil {
fmt.Println("Error sending:", err)
return
}
}
}()
for {
result, err := w.clientConn.Recv()
if err != nil {
fmt.Println("Error on channel:", err)
return err
}
err = w.pool.Finish(result)
if err != nil {
fmt.Println("Error finishing:", err)
return err
}
}
return nil
}