// Slightly modified from The Gorilla Websocket Authors. // // Copyright 2013 The Gorilla WebSocket Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package kubelwagen import ( "net/http" "time" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" ) const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } // Client is a middleman between the websocket connection and the hub. type WSConn struct { hub *Hub // The websocket connection. conn *websocket.Conn // Buffered channel of outbound messages. send chan Request } // readPump pumps messages from the websocket connection to the hub. // // The application runs readPump in a per-connection goroutine. The application // ensures that there is at most one reader on a connection by executing all // reads from this goroutine. func (c *WSConn) readPump() { defer func() { c.hub.Lock() defer c.hub.Unlock() c.hub.conn = nil c.conn.Close() }() c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { var resp Response err := c.conn.ReadJSON(&resp) if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { logrus.Printf("error: %v", err) } break } c.hub.responses <- resp } } // writePump pumps messages from the hub to the websocket connection. // // A goroutine running writePump is started for each connection. The // application ensures that there is at most one writer to a connection by // executing all writes from this goroutine. func (c *WSConn) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.conn.Close() }() for { select { case message, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // The hub closed the channel. c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } err := c.conn.WriteJSON(message) if err != nil { return } case <-ticker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { hub.Lock() defer hub.Unlock() if hub.conn != nil { w.WriteHeader(400) w.Write([]byte("Client already connected")) } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { logrus.Println(err) return } client := &WSConn{hub: hub, conn: conn, send: make(chan Request, 256)} hub.conn = client // Allow collection of memory referenced by the caller by doing all work in // new goroutines. go client.writePump() go client.readPump() }