kubelwagen/http.go

84 lines
1.5 KiB
Go

package kubelwagen
import (
"net/http"
"sync"
"github.com/hanwen/go-fuse/fuse"
"github.com/sirupsen/logrus"
)
func serveHTTP(hostport string, req chan RequestCallback, closer chan bool) error {
hub := NewHub(req)
go hub.Serve(closer)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
return http.ListenAndServe(hostport, nil)
}
type Hub struct {
sync.Mutex
inFlight map[int]chan Response
nextID int
responses chan Response
conn *WSConn
rcb chan RequestCallback
}
func NewHub(rcb chan RequestCallback) *Hub {
return &Hub{
rcb: rcb,
inFlight: make(map[int]chan Response),
nextID: 1,
responses: make(chan Response, 10),
}
}
func (h *Hub) Serve(closer chan bool) {
for {
select {
case resp, ok := <-h.responses:
if !ok {
h.Close()
return
}
cb, exists := h.inFlight[resp.ID]
if !exists {
logrus.Fatal("got a response id without having an initial request")
}
delete(h.inFlight, resp.ID)
cb <- resp
case rcb, ok := <-h.rcb:
if !ok {
h.Close()
return
}
if h.conn == nil {
rcb.response <- Response{
Code: fuse.EPERM,
}
continue
}
h.inFlight[h.nextID] = rcb.response
rcb.message.ID = h.nextID
h.conn.send <- rcb.message
h.nextID++
case _, ok := <-closer:
if ok {
logrus.Fatal("closer had data")
}
h.Close()
return
}
}
}
func (h *Hub) Close() {
h.Lock()
defer h.Unlock()
if h.conn != nil {
close(h.conn.send)
}
//TODO(barakmich): close inflights
}