diff --git a/client/connect.go b/client/connect.go index 1f43c48..f9bd218 100644 --- a/client/connect.go +++ b/client/connect.go @@ -66,6 +66,7 @@ func (c *client) DialAndServe(addr string) error { func (c *client) tx() { for x := range c.txchan { + logrus.Printf("tx: %#v\n", x) err := c.conn.WriteJSON(x) if err != nil { logrus.Errorf("tx error: %s\n", err) @@ -79,6 +80,7 @@ func (c *client) rx() { for { req := &kubelwagen.Request{} err := c.conn.ReadJSON(req) + logrus.Printf("rx: %#v\n", req) if err != nil { logrus.Errorf("rx error: %s\n", err) return diff --git a/filesystem.go b/filesystem.go index 43173d6..521750a 100644 --- a/filesystem.go +++ b/filesystem.go @@ -270,6 +270,7 @@ func (fs *WsFs) Open(name string, flags uint32, context *fuse.Context) (file nod r := Request{ Method: MethodOpen, Flags: flags, + Path: name, } resp, ok := fs.getResponse(&r) if !ok { @@ -286,6 +287,7 @@ func (fs *WsFs) Create(name string, flags uint32, mode uint32, context *fuse.Con Method: MethodCreate, Flags: flags, Mode: mode, + Path: name, } resp, ok := fs.getResponse(&r) if !ok { diff --git a/fs/local.go b/fs/local.go index 06ced99..ec715a9 100644 --- a/fs/local.go +++ b/fs/local.go @@ -4,6 +4,7 @@ import ( "io" "os" "path/filepath" + "sync" "syscall" "github.com/barakmich/kubelwagen" @@ -12,14 +13,17 @@ import ( ) type LocalFs struct { - base string - open map[int]*os.File + sync.Mutex + base string + nextfile int + openfiles map[int]*os.File } func NewLocalFs(path string) *LocalFs { return &LocalFs{ - base: path, - open: make(map[int]*os.File), + base: path, + nextfile: 1, + openfiles: make(map[int]*os.File), } } @@ -61,10 +65,8 @@ func (fs *LocalFs) Handle(r *kubelwagen.Request) *kubelwagen.Response { return out case kubelwagen.MethodAccess: return fs.serveFromErr(r, syscall.Access(fs.getPath(r), r.Mode)) - } - return &kubelwagen.Response{ - ID: r.ID, - Code: fuse.ENOSYS, + default: + return fs.handleFile(r) } } diff --git a/fs/localfile.go b/fs/localfile.go new file mode 100644 index 0000000..aeb7062 --- /dev/null +++ b/fs/localfile.go @@ -0,0 +1,244 @@ +package fs + +import ( + "io" + "os" + "syscall" + + "github.com/barakmich/kubelwagen" + "github.com/hanwen/go-fuse/fuse" +) + +func (fs *LocalFs) handleFile(r *kubelwagen.Request) *kubelwagen.Response { + switch r.Method { + case kubelwagen.MethodOpen: + return fs.open(r) + case kubelwagen.MethodCreate: + return fs.create(r) + case kubelwagen.MethodFileRead: + return fs.fileread(r) + case kubelwagen.MethodFileWrite: + return fs.filewrite(r) + case kubelwagen.MethodFileRelease: + return fs.filerelease(r) + case kubelwagen.MethodFileFlock: + return fs.fileflock(r) + case kubelwagen.MethodFileGetAttr: + return fs.filegetattr(r) + case kubelwagen.MethodFileFsync: + return fs.filefsync(r) + case kubelwagen.MethodFileTruncate: + return fs.filetruncate(r) + case kubelwagen.MethodFileChown: + return fs.filechown(r) + case kubelwagen.MethodFileChmod: + return fs.filechmod(r) + case kubelwagen.MethodFileAllocate: + return fs.fileallocate(r) + default: + return &kubelwagen.Response{ + ID: r.ID, + Code: fuse.ENOSYS, + } + } +} + +func (fs *LocalFs) open(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + fh := fs.nextfile + fs.nextfile++ + f, err := os.OpenFile(fs.getPath(r), int(r.Flags), 0) + if err != nil { + return kubelwagen.ErrorResp(r, err) + } + fs.openfiles[fh] = f + return &kubelwagen.Response{ + ID: r.ID, + Code: fuse.OK, + FileHandle: fh, + } +} + +func (fs *LocalFs) create(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + fh := fs.nextfile + fs.nextfile++ + f, err := os.OpenFile(fs.getPath(r), int(r.Flags)|os.O_CREATE, os.FileMode(r.Mode)) + if err != nil { + return kubelwagen.ErrorResp(r, err) + } + fs.openfiles[fh] = f + return &kubelwagen.Response{ + ID: r.ID, + Code: fuse.OK, + FileHandle: fh, + } +} + +func (fs *LocalFs) fileflock(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + code := fuse.ToStatus(syscall.Flock(int(f.Fd()), int(r.Flags))) + + return &kubelwagen.Response{ + ID: r.ID, + Code: code, + FileHandle: r.FileHandle, + } +} + +func (fs *LocalFs) filegetattr(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + var at *fuse.Attr + fi, err := f.Stat() + code := fuse.ToStatus(err) + if err == nil { + at = fuse.ToAttr(fi) + } + return &kubelwagen.Response{ + ID: r.ID, + Code: code, + FileHandle: r.FileHandle, + Stat: at, + } +} + +func (fs *LocalFs) filefsync(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + code := fuse.ToStatus(f.Sync()) + return &kubelwagen.Response{ + ID: r.ID, + Code: code, + FileHandle: r.FileHandle, + } +} + +func (fs *LocalFs) filetruncate(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + code := fuse.ToStatus(syscall.Ftruncate(int(f.Fd()), int64(r.Size))) + return &kubelwagen.Response{ + ID: r.ID, + Code: code, + FileHandle: r.FileHandle, + } +} + +func (fs *LocalFs) filechown(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + code := fuse.ToStatus(f.Chown(int(r.UID), int(r.GID))) + return &kubelwagen.Response{ + ID: r.ID, + Code: code, + FileHandle: r.FileHandle, + } +} + +func (fs *LocalFs) filechmod(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + code := fuse.ToStatus(f.Chmod(os.FileMode(r.Mode))) + return &kubelwagen.Response{ + ID: r.ID, + Code: code, + FileHandle: r.FileHandle, + } +} + +//TODO(barakmich): Linux only; support OS X +func (fs *LocalFs) fileallocate(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + code := fuse.ToStatus(syscall.Fallocate(int(f.Fd()), r.Mode, r.Offset, int64(r.Size))) + return &kubelwagen.Response{ + ID: r.ID, + Code: code, + FileHandle: r.FileHandle, + } +} + +func (fs *LocalFs) fileread(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + buf := make([]byte, r.Size) + n, err := f.ReadAt(buf, r.Offset) + if err != nil && err != io.EOF { + return kubelwagen.ErrorResp(r, err) + } + buf = buf[:n] + return &kubelwagen.Response{ + ID: r.ID, + Code: fuse.OK, + FileHandle: r.FileHandle, + Data: buf, + } +} + +func (fs *LocalFs) filewrite(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + n, err := f.WriteAt(r.Data, r.Offset) + return &kubelwagen.Response{ + ID: r.ID, + Code: fuse.ToStatus(err), + FileHandle: r.FileHandle, + WriteSize: n, + } +} + +func (fs *LocalFs) filerelease(r *kubelwagen.Request) *kubelwagen.Response { + fs.Lock() + defer fs.Unlock() + f, ok := fs.openfiles[r.FileHandle] + if !ok { + return kubelwagen.ErrorResp(r, os.ErrClosed) + } + err := f.Close() + delete(fs.openfiles, r.FileHandle) + return &kubelwagen.Response{ + ID: r.ID, + Code: fuse.ToStatus(err), + FileHandle: r.FileHandle, + } +} diff --git a/fs_file.go b/fs_file.go index 6c8f06f..cc969d6 100644 --- a/fs_file.go +++ b/fs_file.go @@ -30,6 +30,7 @@ func (f *WsFsFile) Read(buf []byte, off int64) (fuse.ReadResult, fuse.Status) { Method: MethodFileRead, FileHandle: f.h, Offset: off, + Size: uint32(len(buf)), } resp, ok := f.fs.getResponse(&r) if !ok { @@ -155,6 +156,7 @@ func (f *WsFsFile) Allocate(off uint64, size uint64, mode uint32) (code fuse.Sta FileHandle: f.h, Mode: mode, Size: uint32(size), + Offset: int64(off), } resp, ok := f.fs.getResponse(&r) if !ok {