first servicable server

This commit is contained in:
Barak Michener 2018-03-29 14:29:05 -07:00
parent 90aa93a1d7
commit 869999fc3e
8 changed files with 309 additions and 12 deletions

1
.gitignore vendored
View file

@ -3,6 +3,7 @@
*.dll
*.so
*.dylib
kubelwagen
# Test binary, build with `go test -c`
*.test

View file

@ -4,6 +4,8 @@ import (
"os"
"github.com/spf13/cobra"
"github.com/barakmich/kubelwagen"
)
var (

35
filesystem.go Normal file
View file

@ -0,0 +1,35 @@
package kubelwagen
import "github.com/hanwen/go-fuse/fuse/pathfs"
type WsFs struct {
pathfs.FileSystem
req chan RequestCallback
}
type WsFsOpts struct {
ReadOnly bool
}
func NewWsFs(opts WsFsOpts, req chan RequestCallback, closer chan bool) *pathfs.PathNodeFs {
var fs pathfs.FileSystem
wfs := &WsFs{
FileSystem: pathfs.NewDefaultFileSystem(),
req: req,
}
// TODO(barakmich): spin up a goroutine to handle notify requests
fs = wfs
if opts.ReadOnly {
fs = pathfs.NewReadonlyFileSystem(fs)
}
return pathfs.NewPathNodeFs(fs, nil)
}
func getChannel() chan Response {
return make(chan Response)
}
func (fs *WsFs) String() string {
return "kubelwagen"
}

18
fuse.go
View file

@ -1,5 +1,21 @@
package kubelwagen
func serveFuse(dir string, closer chan bool) error {
import (
"github.com/hanwen/go-fuse/fuse/nodefs"
"github.com/sirupsen/logrus"
)
func serveFuse(dir string, req chan RequestCallback, closer chan bool) error {
opts := WsFsOpts{
ReadOnly: true,
}
nfs := NewWsFs(opts, req, closer)
server, _, err := nodefs.MountRoot(dir, nfs.Root(), nil)
if err != nil {
logrus.Fatalln("cannot mount:", err)
}
defer server.Unmount()
go server.Serve()
<-closer
return nil
}

83
http.go
View file

@ -1,5 +1,84 @@
package kubelwagen
func serveHTTP(hostport string, closer chan bool) error {
return nil
import (
"net/http"
"sync"
"github.com/hanwen/go-fuse/fuse"
"github.com/sirupsen/logrus"
)
func serveHTTP(hostport string, req chan RequestCallback, closer chan bool) error {
hub := NewHub(req)
go hub.Serve(closer)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
return http.ListenAndServe(hostport, nil)
}
type Hub struct {
sync.Mutex
inFlight map[int]chan Response
nextID int
responses chan Response
conn *WSConn
rcb chan RequestCallback
}
func NewHub(rcb chan RequestCallback) *Hub {
return &Hub{
rcb: rcb,
inFlight: make(map[int]chan Response),
nextID: 1,
responses: make(chan Response, 10),
}
}
func (h *Hub) Serve(closer chan bool) {
for {
select {
case resp, ok := <-h.responses:
if !ok {
h.Close()
return
}
cb, exists := h.inFlight[resp.ID]
if !exists {
logrus.Fatal("got a response id without having an initial request")
}
delete(h.inFlight, resp.ID)
cb <- resp
case rcb, ok := <-h.rcb:
if !ok {
h.Close()
return
}
if h.conn == nil {
rcb.response <- Response{
Code: fuse.EPERM,
}
continue
}
h.inFlight[h.nextID] = rcb.response
rcb.message.ID = h.nextID
h.conn.send <- rcb.message
h.nextID++
case _, ok := <-closer:
if ok {
logrus.Fatal("closer had data")
}
h.Close()
return
}
}
}
func (h *Hub) Close() {
h.Lock()
defer h.Unlock()
if h.conn != nil {
close(h.conn.send)
}
//TODO(barakmich): close inflights
}

41
request.go Normal file
View file

@ -0,0 +1,41 @@
package kubelwagen
import "github.com/hanwen/go-fuse/fuse"
type Method int
const (
MethodInvalid Method = iota
MethodOpenDir
MethodGetAttr
)
type Request struct {
ID int `json:"id"`
Method Method `json:"method"`
Path string `json:"path"`
Mode int32
UID int32
GID int32
Size int32
Dev int32
Flags int32
NewPath string `json:"newpath,omitempty"`
Attr string `json:"attr,omitempty"`
Data []byte `json:"data,omitempty"`
}
type RequestCallback struct {
message Request
response chan Response
}
type Response struct {
ID int `json:"id"`
Code fuse.Status `json:"status"`
Data []byte `json:"data,omitempty"`
Stat *fuse.Attr `json:"stat,omitempty"`
Attrs []string `json:"attrs,omitempty"`
Dirents []fuse.DirEntry `json:"dirents,omitempty"`
LinkStr string `json:"linkstr,omitempty"`
}

View file

@ -4,14 +4,13 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"time"
"github.com/sirupsen/logrus"
)
func Serve(hostport string, dir string) {
closer := make(chan bool)
var wg sync.WaitGroup
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
@ -22,24 +21,23 @@ func Serve(hostport string, dir string) {
close(closer)
}
}()
wg.Add(2)
req := make(chan RequestCallback, 5)
go func(dir string, closer chan bool) {
err := serveFuse(dir, closer)
err := serveFuse(dir, req, closer)
if err != nil {
logrus.Fatalf("Error serving FUSE: %s", err)
}
wg.Done()
}(dir, closer)
go func(hostport string, closer chan bool) {
err := serveHTTP(hostport, closer)
err := serveHTTP(hostport, req, closer)
if err != nil {
logrus.Fatalf("Error serving HTTP: %s", err)
}
wg.Done()
}(hostport, closer)
wg.Wait()
<-closer
time.Sleep(2 * time.Second)
}

125
wsconn.go Normal file
View file

@ -0,0 +1,125 @@
// Slightly modified from The Gorilla Websocket Authors.
//
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package kubelwagen
import (
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Client is a middleman between the websocket connection and the hub.
type WSConn struct {
hub *Hub
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan Request
}
// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *WSConn) readPump() {
defer func() {
c.hub.Lock()
defer c.hub.Unlock()
c.hub.conn = nil
c.conn.Close()
}()
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
var resp Response
err := c.conn.ReadJSON(&resp)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logrus.Printf("error: %v", err)
}
break
}
c.hub.responses <- resp
}
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *WSConn) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
err := c.conn.WriteJSON(message)
if err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
hub.Lock()
defer hub.Unlock()
if hub.conn != nil {
w.WriteHeader(400)
w.Write([]byte("Client already connected"))
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logrus.Println(err)
return
}
client := &WSConn{hub: hub, conn: conn, send: make(chan Request, 256)}
hub.conn = client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()
}