kubelwagen/client/connect.go

91 lines
1.7 KiB
Go

package client
import (
"fmt"
"net/url"
"os"
"os/signal"
"time"
"github.com/barakmich/kubelwagen"
"github.com/barakmich/kubelwagen/fs"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
func Connect(addr string, dir string) {
closer := make(chan bool)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
go func() {
for range signalChan {
fmt.Println("\nReceived an interrupt, disconnecting...")
close(closer)
}
}()
handler := fs.NewLocalFs(dir)
c := &client{
handler: handler,
closer: closer,
}
err := c.DialAndServe(addr)
if err != nil {
logrus.Fatalf("Error dialing %s: %s\n", addr, err)
}
<-closer
time.Sleep(500 * time.Millisecond)
}
type client struct {
handler kubelwagen.FS
closer chan bool
txchan chan *kubelwagen.Response
conn *websocket.Conn
}
func (c *client) DialAndServe(addr string) error {
var err error
c.txchan = make(chan *kubelwagen.Response, 20)
u := url.URL{Scheme: "ws", Host: addr, Path: "/"}
logrus.Printf("connecting to %s", u.String())
c.conn, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
return err
}
defer c.conn.Close()
go c.tx()
go c.rx()
<-c.closer
return nil
}
func (c *client) tx() {
for x := range c.txchan {
err := c.conn.WriteJSON(x)
if err != nil {
logrus.Errorf("tx error: %s\n", err)
}
}
}
func (c *client) rx() {
defer close(c.closer)
defer close(c.txchan)
for {
req := &kubelwagen.Request{}
err := c.conn.ReadJSON(req)
if err != nil {
logrus.Errorf("rx error: %s\n", err)
return
}
go func(req *kubelwagen.Request) {
resp := c.handler.Handle(req)
c.txchan <- resp
}(req)
}
}