package main import ( "context" "github.com/barakmich/tinkerbell/ray_rpc" "go.uber.org/zap" ) type Raylet struct { Objects ObjectStore Workers WorkerPool } func (r *Raylet) GetObject(_ context.Context, req *ray_rpc.GetRequest) (*ray_rpc.GetResponse, error) { zap.S().Debug("GetObject") data, err := GetObject(r.Objects, deserializeObjectID(req.Id)) if err != nil { return nil, err } return &ray_rpc.GetResponse{ Valid: true, Data: data, }, nil } func (r *Raylet) PutObject(_ context.Context, req *ray_rpc.PutRequest) (*ray_rpc.PutResponse, error) { zap.S().Debug("PutObject") id := r.Objects.MakeID() err := r.Objects.PutObject(&Object{id, req.Data}) if err != nil { return nil, err } return &ray_rpc.PutResponse{ Id: serializeObjectID(id), }, nil } func (r *Raylet) WaitObject(_ context.Context, _ *ray_rpc.WaitRequest) (*ray_rpc.WaitResponse, error) { panic("not implemented") // TODO: Implement } func (r *Raylet) Schedule(_ context.Context, task *ray_rpc.ClientTask) (*ray_rpc.ClientTaskTicket, error) { zap.S().Debug("Schedule:", task.Type) id := r.Objects.MakeID() ticket := &ray_rpc.ClientTaskTicket{serializeObjectID(id)} work := &ray_rpc.Work{} work.Task = task work.Ticket = ticket err := r.Workers.Schedule(work) if err != nil { return nil, err } return ticket, nil } func (r *Raylet) Workstream(conn WorkstreamConnection) error { worker := &SimpleWorker{ workChan: make(chan *ray_rpc.Work), clientConn: conn, pool: r.Workers, max: 3, } r.Workers.Register(worker) err := worker.Run() r.Workers.Deregister(worker) return err } func NewMemRaylet() *Raylet { store := NewMemObjectStore() return &Raylet{ Objects: store, Workers: NewRoundRobinWorkerPool(store), } }