From 869999fc3e723afd4520932e4ff622660d8b7836 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Thu, 29 Mar 2018 14:29:05 -0700 Subject: [PATCH] first servicable server --- .gitignore | 1 + cmd/kubelwagen/serve.go | 2 + filesystem.go | 35 ++++++++++++++ fuse.go | 18 ++++++- http.go | 83 +++++++++++++++++++++++++++++++- request.go | 41 ++++++++++++++++ serve.go | 16 +++---- wsconn.go | 125 ++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 309 insertions(+), 12 deletions(-) create mode 100644 filesystem.go create mode 100644 request.go create mode 100644 wsconn.go diff --git a/.gitignore b/.gitignore index a1338d6..98a9cdb 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ *.dll *.so *.dylib +kubelwagen # Test binary, build with `go test -c` *.test diff --git a/cmd/kubelwagen/serve.go b/cmd/kubelwagen/serve.go index f771c78..f9ec010 100644 --- a/cmd/kubelwagen/serve.go +++ b/cmd/kubelwagen/serve.go @@ -4,6 +4,8 @@ import ( "os" "github.com/spf13/cobra" + + "github.com/barakmich/kubelwagen" ) var ( diff --git a/filesystem.go b/filesystem.go new file mode 100644 index 0000000..594da05 --- /dev/null +++ b/filesystem.go @@ -0,0 +1,35 @@ +package kubelwagen + +import "github.com/hanwen/go-fuse/fuse/pathfs" + +type WsFs struct { + pathfs.FileSystem + req chan RequestCallback +} + +type WsFsOpts struct { + ReadOnly bool +} + +func NewWsFs(opts WsFsOpts, req chan RequestCallback, closer chan bool) *pathfs.PathNodeFs { + var fs pathfs.FileSystem + wfs := &WsFs{ + FileSystem: pathfs.NewDefaultFileSystem(), + req: req, + } + + // TODO(barakmich): spin up a goroutine to handle notify requests + fs = wfs + if opts.ReadOnly { + fs = pathfs.NewReadonlyFileSystem(fs) + } + return pathfs.NewPathNodeFs(fs, nil) +} + +func getChannel() chan Response { + return make(chan Response) +} + +func (fs *WsFs) String() string { + return "kubelwagen" +} diff --git a/fuse.go b/fuse.go index 4e75095..58a4b23 100644 --- a/fuse.go +++ b/fuse.go @@ -1,5 +1,21 @@ package kubelwagen -func serveFuse(dir string, closer chan bool) error { +import ( + "github.com/hanwen/go-fuse/fuse/nodefs" + "github.com/sirupsen/logrus" +) + +func serveFuse(dir string, req chan RequestCallback, closer chan bool) error { + opts := WsFsOpts{ + ReadOnly: true, + } + nfs := NewWsFs(opts, req, closer) + server, _, err := nodefs.MountRoot(dir, nfs.Root(), nil) + if err != nil { + logrus.Fatalln("cannot mount:", err) + } + defer server.Unmount() + go server.Serve() + <-closer return nil } diff --git a/http.go b/http.go index 8adb9ce..7c09895 100644 --- a/http.go +++ b/http.go @@ -1,5 +1,84 @@ package kubelwagen -func serveHTTP(hostport string, closer chan bool) error { - return nil +import ( + "net/http" + "sync" + + "github.com/hanwen/go-fuse/fuse" + "github.com/sirupsen/logrus" +) + +func serveHTTP(hostport string, req chan RequestCallback, closer chan bool) error { + hub := NewHub(req) + go hub.Serve(closer) + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + serveWs(hub, w, r) + }) + return http.ListenAndServe(hostport, nil) +} + +type Hub struct { + sync.Mutex + inFlight map[int]chan Response + nextID int + responses chan Response + conn *WSConn + rcb chan RequestCallback +} + +func NewHub(rcb chan RequestCallback) *Hub { + return &Hub{ + rcb: rcb, + inFlight: make(map[int]chan Response), + nextID: 1, + responses: make(chan Response, 10), + } +} + +func (h *Hub) Serve(closer chan bool) { + for { + select { + case resp, ok := <-h.responses: + if !ok { + h.Close() + return + } + cb, exists := h.inFlight[resp.ID] + if !exists { + logrus.Fatal("got a response id without having an initial request") + } + delete(h.inFlight, resp.ID) + cb <- resp + case rcb, ok := <-h.rcb: + if !ok { + h.Close() + return + } + if h.conn == nil { + rcb.response <- Response{ + Code: fuse.EPERM, + } + continue + } + h.inFlight[h.nextID] = rcb.response + rcb.message.ID = h.nextID + h.conn.send <- rcb.message + h.nextID++ + case _, ok := <-closer: + if ok { + logrus.Fatal("closer had data") + } + h.Close() + return + } + } +} + +func (h *Hub) Close() { + h.Lock() + defer h.Unlock() + if h.conn != nil { + close(h.conn.send) + } + //TODO(barakmich): close inflights } diff --git a/request.go b/request.go new file mode 100644 index 0000000..fa963ba --- /dev/null +++ b/request.go @@ -0,0 +1,41 @@ +package kubelwagen + +import "github.com/hanwen/go-fuse/fuse" + +type Method int + +const ( + MethodInvalid Method = iota + MethodOpenDir + MethodGetAttr +) + +type Request struct { + ID int `json:"id"` + Method Method `json:"method"` + Path string `json:"path"` + Mode int32 + UID int32 + GID int32 + Size int32 + Dev int32 + Flags int32 + NewPath string `json:"newpath,omitempty"` + Attr string `json:"attr,omitempty"` + Data []byte `json:"data,omitempty"` +} + +type RequestCallback struct { + message Request + response chan Response +} + +type Response struct { + ID int `json:"id"` + Code fuse.Status `json:"status"` + Data []byte `json:"data,omitempty"` + Stat *fuse.Attr `json:"stat,omitempty"` + Attrs []string `json:"attrs,omitempty"` + Dirents []fuse.DirEntry `json:"dirents,omitempty"` + LinkStr string `json:"linkstr,omitempty"` +} diff --git a/serve.go b/serve.go index b372339..250f3db 100644 --- a/serve.go +++ b/serve.go @@ -4,14 +4,13 @@ import ( "fmt" "os" "os/signal" - "sync" + "time" "github.com/sirupsen/logrus" ) func Serve(hostport string, dir string) { closer := make(chan bool) - var wg sync.WaitGroup signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt) @@ -22,24 +21,23 @@ func Serve(hostport string, dir string) { close(closer) } }() - wg.Add(2) + + req := make(chan RequestCallback, 5) go func(dir string, closer chan bool) { - err := serveFuse(dir, closer) + err := serveFuse(dir, req, closer) if err != nil { logrus.Fatalf("Error serving FUSE: %s", err) } - wg.Done() }(dir, closer) go func(hostport string, closer chan bool) { - err := serveHTTP(hostport, closer) + err := serveHTTP(hostport, req, closer) if err != nil { logrus.Fatalf("Error serving HTTP: %s", err) } - wg.Done() }(hostport, closer) - wg.Wait() - + <-closer + time.Sleep(2 * time.Second) } diff --git a/wsconn.go b/wsconn.go new file mode 100644 index 0000000..6c2aa33 --- /dev/null +++ b/wsconn.go @@ -0,0 +1,125 @@ +// Slightly modified from The Gorilla Websocket Authors. +// +// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package kubelwagen + +import ( + "net/http" + "time" + + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// Client is a middleman between the websocket connection and the hub. +type WSConn struct { + hub *Hub + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan Request +} + +// readPump pumps messages from the websocket connection to the hub. +// +// The application runs readPump in a per-connection goroutine. The application +// ensures that there is at most one reader on a connection by executing all +// reads from this goroutine. +func (c *WSConn) readPump() { + defer func() { + c.hub.Lock() + defer c.hub.Unlock() + c.hub.conn = nil + c.conn.Close() + }() + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + var resp Response + err := c.conn.ReadJSON(&resp) + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + logrus.Printf("error: %v", err) + } + break + } + c.hub.responses <- resp + } +} + +// writePump pumps messages from the hub to the websocket connection. +// +// A goroutine running writePump is started for each connection. The +// application ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *WSConn) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + err := c.conn.WriteJSON(message) + if err != nil { + return + } + + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { + hub.Lock() + defer hub.Unlock() + if hub.conn != nil { + w.WriteHeader(400) + w.Write([]byte("Client already connected")) + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logrus.Println(err) + return + } + client := &WSConn{hub: hub, conn: conn, send: make(chan Request, 256)} + hub.conn = client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go client.writePump() + go client.readPump() +}