84 lines
1.5 KiB
Go
84 lines
1.5 KiB
Go
package kubelwagen
|
|
|
|
import (
|
|
"net/http"
|
|
"sync"
|
|
|
|
"github.com/hanwen/go-fuse/fuse"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func serveHTTP(hostport string, req chan RequestCallback, closer chan bool) error {
|
|
hub := NewHub(req)
|
|
go hub.Serve(closer)
|
|
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
|
serveWs(hub, w, r)
|
|
})
|
|
return http.ListenAndServe(hostport, nil)
|
|
}
|
|
|
|
type Hub struct {
|
|
sync.Mutex
|
|
inFlight map[int]chan Response
|
|
nextID int
|
|
responses chan Response
|
|
conn *WSConn
|
|
rcb chan RequestCallback
|
|
}
|
|
|
|
func NewHub(rcb chan RequestCallback) *Hub {
|
|
return &Hub{
|
|
rcb: rcb,
|
|
inFlight: make(map[int]chan Response),
|
|
nextID: 1,
|
|
responses: make(chan Response, 10),
|
|
}
|
|
}
|
|
|
|
func (h *Hub) Serve(closer chan bool) {
|
|
for {
|
|
select {
|
|
case resp, ok := <-h.responses:
|
|
if !ok {
|
|
h.Close()
|
|
return
|
|
}
|
|
cb, exists := h.inFlight[resp.ID]
|
|
if !exists {
|
|
logrus.Fatal("got a response id without having an initial request")
|
|
}
|
|
delete(h.inFlight, resp.ID)
|
|
cb <- resp
|
|
case rcb, ok := <-h.rcb:
|
|
if !ok {
|
|
h.Close()
|
|
return
|
|
}
|
|
if h.conn == nil {
|
|
rcb.response <- Response{
|
|
Code: fuse.EPERM,
|
|
}
|
|
continue
|
|
}
|
|
h.inFlight[h.nextID] = rcb.response
|
|
rcb.message.ID = h.nextID
|
|
h.conn.send <- rcb.message
|
|
h.nextID++
|
|
case _, ok := <-closer:
|
|
if ok {
|
|
logrus.Fatal("closer had data")
|
|
}
|
|
h.Close()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Hub) Close() {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
if h.conn != nil {
|
|
close(h.conn.send)
|
|
}
|
|
//TODO(barakmich): close inflights
|
|
}
|