kubelwagen/wsconn.go

125 lines
3.1 KiB
Go

// Slightly modified from The Gorilla Websocket Authors.
//
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package kubelwagen
import (
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Client is a middleman between the websocket connection and the hub.
type WSConn struct {
hub *Hub
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan Request
}
// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *WSConn) readPump() {
defer func() {
c.hub.Lock()
defer c.hub.Unlock()
c.hub.conn = nil
c.conn.Close()
}()
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
var resp Response
err := c.conn.ReadJSON(&resp)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logrus.Printf("error: %v", err)
}
break
}
c.hub.responses <- resp
}
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *WSConn) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
err := c.conn.WriteJSON(message)
if err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
hub.Lock()
defer hub.Unlock()
if hub.conn != nil {
w.WriteHeader(400)
w.Write([]byte("Client already connected"))
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logrus.Println(err)
return
}
client := &WSConn{hub: hub, conn: conn, send: make(chan Request, 256)}
hub.conn = client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()
}