From 1158ac4760bb6a08bf8539bfdb2c853ba9069ebc Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Fri, 30 Mar 2018 11:39:11 -0700 Subject: [PATCH] first cross-websocket ls --- client/connect.go | 91 ++++++++++++++++++++++++++++++++++++++++++++++++ fs.go | 14 ++++++++ fs/local.go | 101 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 206 insertions(+) create mode 100644 client/connect.go create mode 100644 fs.go create mode 100644 fs/local.go diff --git a/client/connect.go b/client/connect.go new file mode 100644 index 0000000..1f43c48 --- /dev/null +++ b/client/connect.go @@ -0,0 +1,91 @@ +package client + +import ( + "fmt" + "net/url" + "os" + "os/signal" + "time" + + "github.com/barakmich/kubelwagen" + "github.com/barakmich/kubelwagen/fs" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +func Connect(addr string, dir string) { + closer := make(chan bool) + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + + go func() { + for range signalChan { + fmt.Println("\nReceived an interrupt, disconnecting...") + close(closer) + } + }() + + handler := fs.NewLocalFs(dir) + c := &client{ + handler: handler, + closer: closer, + } + + err := c.DialAndServe(addr) + if err != nil { + logrus.Fatalf("Error dialing %s: %s\n", addr, err) + } + <-closer + time.Sleep(500 * time.Millisecond) +} + +type client struct { + handler kubelwagen.FS + closer chan bool + txchan chan *kubelwagen.Response + conn *websocket.Conn +} + +func (c *client) DialAndServe(addr string) error { + var err error + c.txchan = make(chan *kubelwagen.Response, 20) + u := url.URL{Scheme: "ws", Host: addr, Path: "/"} + logrus.Printf("connecting to %s", u.String()) + + c.conn, _, err = websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + return err + } + defer c.conn.Close() + go c.tx() + go c.rx() + <-c.closer + return nil +} + +func (c *client) tx() { + for x := range c.txchan { + err := c.conn.WriteJSON(x) + if err != nil { + logrus.Errorf("tx error: %s\n", err) + } + } +} + +func (c *client) rx() { + defer close(c.closer) + defer close(c.txchan) + for { + req := &kubelwagen.Request{} + err := c.conn.ReadJSON(req) + if err != nil { + logrus.Errorf("rx error: %s\n", err) + return + } + go func(req *kubelwagen.Request) { + resp := c.handler.Handle(req) + c.txchan <- resp + }(req) + } +} diff --git a/fs.go b/fs.go new file mode 100644 index 0000000..6c8a9e2 --- /dev/null +++ b/fs.go @@ -0,0 +1,14 @@ +package kubelwagen + +import "github.com/hanwen/go-fuse/fuse" + +type FS interface { + Handle(req *Request) *Response +} + +func ErrorResp(r *Request, err error) *Response { + return &Response{ + ID: r.ID, + Code: fuse.ToStatus(err), + } +} diff --git a/fs/local.go b/fs/local.go new file mode 100644 index 0000000..3cbf76b --- /dev/null +++ b/fs/local.go @@ -0,0 +1,101 @@ +package fs + +import ( + "io" + "os" + "path/filepath" + + "github.com/barakmich/kubelwagen" + "github.com/hanwen/go-fuse/fuse" + "github.com/sirupsen/logrus" +) + +type LocalFs struct { + base string + open map[int]*os.File +} + +func NewLocalFs(path string) *LocalFs { + return &LocalFs{ + base: path, + open: make(map[int]*os.File), + } +} + +func (fs *LocalFs) Handle(r *kubelwagen.Request) *kubelwagen.Response { + switch r.Method { + case kubelwagen.MethodOpenDir: + return fs.openDir(r) + case kubelwagen.MethodGetAttr: + return fs.getAttr(r) + } + return &kubelwagen.Response{ + ID: r.ID, + Code: fuse.ENOSYS, + } +} + +func (fs *LocalFs) getPath(r *kubelwagen.Request) string { + return filepath.Join(fs.base, r.Path) +} + +func (fs *LocalFs) openDir(r *kubelwagen.Request) *kubelwagen.Response { + out := &kubelwagen.Response{ + ID: r.ID, + Code: fuse.OK, + } + // returns dirents and status + + f, err := os.Open(fs.getPath(r)) + if err != nil { + logrus.Errorf("LocalFS openDir (%s): %s\n", r.Path, err) + return kubelwagen.ErrorResp(r, err) + } + want := 100 + output := make([]fuse.DirEntry, 0, want) + for { + infos, err := f.Readdir(want) + for i := range infos { + // workaround for https://code.google.com/p/go/issues/detail?id=5960 + if infos[i] == nil { + continue + } + n := infos[i].Name() + d := fuse.DirEntry{ + Name: n, + } + if s := fuse.ToStatT(infos[i]); s != nil { + d.Mode = uint32(s.Mode) + d.Ino = s.Ino + } else { + logrus.Printf("ReadDir entry %q for %q has no stat info", n, r.Path) + } + output = append(output, d) + } + if len(infos) < want || err == io.EOF { + break + } + if err != nil { + logrus.Errorln("Readdir() returned err:", err) + break + } + } + f.Close() + + out.Dirents = output + return out +} + +func (fs *LocalFs) getAttr(r *kubelwagen.Request) *kubelwagen.Response { + out := &kubelwagen.Response{ + ID: r.ID, + Code: fuse.OK, + } + fi, err := os.Stat(fs.getPath(r)) + if err != nil { + logrus.Errorf("LocalFS getAttr (%s): %s\n", r.Path, err) + return kubelwagen.ErrorResp(r, err) + } + out.Stat = fuse.ToAttr(fi) + return out +}