File operations
This commit is contained in:
parent
4015bc3896
commit
892908fcf5
5 changed files with 260 additions and 8 deletions
|
|
@ -66,6 +66,7 @@ func (c *client) DialAndServe(addr string) error {
|
||||||
|
|
||||||
func (c *client) tx() {
|
func (c *client) tx() {
|
||||||
for x := range c.txchan {
|
for x := range c.txchan {
|
||||||
|
logrus.Printf("tx: %#v\n", x)
|
||||||
err := c.conn.WriteJSON(x)
|
err := c.conn.WriteJSON(x)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("tx error: %s\n", err)
|
logrus.Errorf("tx error: %s\n", err)
|
||||||
|
|
@ -79,6 +80,7 @@ func (c *client) rx() {
|
||||||
for {
|
for {
|
||||||
req := &kubelwagen.Request{}
|
req := &kubelwagen.Request{}
|
||||||
err := c.conn.ReadJSON(req)
|
err := c.conn.ReadJSON(req)
|
||||||
|
logrus.Printf("rx: %#v\n", req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("rx error: %s\n", err)
|
logrus.Errorf("rx error: %s\n", err)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -270,6 +270,7 @@ func (fs *WsFs) Open(name string, flags uint32, context *fuse.Context) (file nod
|
||||||
r := Request{
|
r := Request{
|
||||||
Method: MethodOpen,
|
Method: MethodOpen,
|
||||||
Flags: flags,
|
Flags: flags,
|
||||||
|
Path: name,
|
||||||
}
|
}
|
||||||
resp, ok := fs.getResponse(&r)
|
resp, ok := fs.getResponse(&r)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
@ -286,6 +287,7 @@ func (fs *WsFs) Create(name string, flags uint32, mode uint32, context *fuse.Con
|
||||||
Method: MethodCreate,
|
Method: MethodCreate,
|
||||||
Flags: flags,
|
Flags: flags,
|
||||||
Mode: mode,
|
Mode: mode,
|
||||||
|
Path: name,
|
||||||
}
|
}
|
||||||
resp, ok := fs.getResponse(&r)
|
resp, ok := fs.getResponse(&r)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
||||||
18
fs/local.go
18
fs/local.go
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/barakmich/kubelwagen"
|
"github.com/barakmich/kubelwagen"
|
||||||
|
|
@ -12,14 +13,17 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type LocalFs struct {
|
type LocalFs struct {
|
||||||
base string
|
sync.Mutex
|
||||||
open map[int]*os.File
|
base string
|
||||||
|
nextfile int
|
||||||
|
openfiles map[int]*os.File
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLocalFs(path string) *LocalFs {
|
func NewLocalFs(path string) *LocalFs {
|
||||||
return &LocalFs{
|
return &LocalFs{
|
||||||
base: path,
|
base: path,
|
||||||
open: make(map[int]*os.File),
|
nextfile: 1,
|
||||||
|
openfiles: make(map[int]*os.File),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -61,10 +65,8 @@ func (fs *LocalFs) Handle(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
return out
|
return out
|
||||||
case kubelwagen.MethodAccess:
|
case kubelwagen.MethodAccess:
|
||||||
return fs.serveFromErr(r, syscall.Access(fs.getPath(r), r.Mode))
|
return fs.serveFromErr(r, syscall.Access(fs.getPath(r), r.Mode))
|
||||||
}
|
default:
|
||||||
return &kubelwagen.Response{
|
return fs.handleFile(r)
|
||||||
ID: r.ID,
|
|
||||||
Code: fuse.ENOSYS,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
244
fs/localfile.go
Normal file
244
fs/localfile.go
Normal file
|
|
@ -0,0 +1,244 @@
|
||||||
|
package fs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/barakmich/kubelwagen"
|
||||||
|
"github.com/hanwen/go-fuse/fuse"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (fs *LocalFs) handleFile(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
switch r.Method {
|
||||||
|
case kubelwagen.MethodOpen:
|
||||||
|
return fs.open(r)
|
||||||
|
case kubelwagen.MethodCreate:
|
||||||
|
return fs.create(r)
|
||||||
|
case kubelwagen.MethodFileRead:
|
||||||
|
return fs.fileread(r)
|
||||||
|
case kubelwagen.MethodFileWrite:
|
||||||
|
return fs.filewrite(r)
|
||||||
|
case kubelwagen.MethodFileRelease:
|
||||||
|
return fs.filerelease(r)
|
||||||
|
case kubelwagen.MethodFileFlock:
|
||||||
|
return fs.fileflock(r)
|
||||||
|
case kubelwagen.MethodFileGetAttr:
|
||||||
|
return fs.filegetattr(r)
|
||||||
|
case kubelwagen.MethodFileFsync:
|
||||||
|
return fs.filefsync(r)
|
||||||
|
case kubelwagen.MethodFileTruncate:
|
||||||
|
return fs.filetruncate(r)
|
||||||
|
case kubelwagen.MethodFileChown:
|
||||||
|
return fs.filechown(r)
|
||||||
|
case kubelwagen.MethodFileChmod:
|
||||||
|
return fs.filechmod(r)
|
||||||
|
case kubelwagen.MethodFileAllocate:
|
||||||
|
return fs.fileallocate(r)
|
||||||
|
default:
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: fuse.ENOSYS,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) open(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
fh := fs.nextfile
|
||||||
|
fs.nextfile++
|
||||||
|
f, err := os.OpenFile(fs.getPath(r), int(r.Flags), 0)
|
||||||
|
if err != nil {
|
||||||
|
return kubelwagen.ErrorResp(r, err)
|
||||||
|
}
|
||||||
|
fs.openfiles[fh] = f
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: fuse.OK,
|
||||||
|
FileHandle: fh,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) create(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
fh := fs.nextfile
|
||||||
|
fs.nextfile++
|
||||||
|
f, err := os.OpenFile(fs.getPath(r), int(r.Flags)|os.O_CREATE, os.FileMode(r.Mode))
|
||||||
|
if err != nil {
|
||||||
|
return kubelwagen.ErrorResp(r, err)
|
||||||
|
}
|
||||||
|
fs.openfiles[fh] = f
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: fuse.OK,
|
||||||
|
FileHandle: fh,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) fileflock(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
code := fuse.ToStatus(syscall.Flock(int(f.Fd()), int(r.Flags)))
|
||||||
|
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: code,
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) filegetattr(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
var at *fuse.Attr
|
||||||
|
fi, err := f.Stat()
|
||||||
|
code := fuse.ToStatus(err)
|
||||||
|
if err == nil {
|
||||||
|
at = fuse.ToAttr(fi)
|
||||||
|
}
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: code,
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
Stat: at,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) filefsync(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
code := fuse.ToStatus(f.Sync())
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: code,
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) filetruncate(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
code := fuse.ToStatus(syscall.Ftruncate(int(f.Fd()), int64(r.Size)))
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: code,
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) filechown(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
code := fuse.ToStatus(f.Chown(int(r.UID), int(r.GID)))
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: code,
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) filechmod(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
code := fuse.ToStatus(f.Chmod(os.FileMode(r.Mode)))
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: code,
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO(barakmich): Linux only; support OS X
|
||||||
|
func (fs *LocalFs) fileallocate(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
code := fuse.ToStatus(syscall.Fallocate(int(f.Fd()), r.Mode, r.Offset, int64(r.Size)))
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: code,
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) fileread(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
buf := make([]byte, r.Size)
|
||||||
|
n, err := f.ReadAt(buf, r.Offset)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return kubelwagen.ErrorResp(r, err)
|
||||||
|
}
|
||||||
|
buf = buf[:n]
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: fuse.OK,
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
Data: buf,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) filewrite(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
n, err := f.WriteAt(r.Data, r.Offset)
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: fuse.ToStatus(err),
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
WriteSize: n,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *LocalFs) filerelease(r *kubelwagen.Request) *kubelwagen.Response {
|
||||||
|
fs.Lock()
|
||||||
|
defer fs.Unlock()
|
||||||
|
f, ok := fs.openfiles[r.FileHandle]
|
||||||
|
if !ok {
|
||||||
|
return kubelwagen.ErrorResp(r, os.ErrClosed)
|
||||||
|
}
|
||||||
|
err := f.Close()
|
||||||
|
delete(fs.openfiles, r.FileHandle)
|
||||||
|
return &kubelwagen.Response{
|
||||||
|
ID: r.ID,
|
||||||
|
Code: fuse.ToStatus(err),
|
||||||
|
FileHandle: r.FileHandle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -30,6 +30,7 @@ func (f *WsFsFile) Read(buf []byte, off int64) (fuse.ReadResult, fuse.Status) {
|
||||||
Method: MethodFileRead,
|
Method: MethodFileRead,
|
||||||
FileHandle: f.h,
|
FileHandle: f.h,
|
||||||
Offset: off,
|
Offset: off,
|
||||||
|
Size: uint32(len(buf)),
|
||||||
}
|
}
|
||||||
resp, ok := f.fs.getResponse(&r)
|
resp, ok := f.fs.getResponse(&r)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
@ -155,6 +156,7 @@ func (f *WsFsFile) Allocate(off uint64, size uint64, mode uint32) (code fuse.Sta
|
||||||
FileHandle: f.h,
|
FileHandle: f.h,
|
||||||
Mode: mode,
|
Mode: mode,
|
||||||
Size: uint32(size),
|
Size: uint32(size),
|
||||||
|
Offset: int64(off),
|
||||||
}
|
}
|
||||||
resp, ok := f.fs.getResponse(&r)
|
resp, ok := f.fs.getResponse(&r)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue