blob: ec26165f68da9483c80b212769317f9dd277b411 [file] [log] [blame]
Nicolas LaCassefea49162014-11-17 15:41:03 -08001// +build !nacl
2
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -08003package websocket
Shyam Jayaramandbae76b2014-11-17 12:51:29 -08004
5import (
Shyam Jayaramandbae76b2014-11-17 12:51:29 -08006 "errors"
7 "fmt"
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -08008 "io"
Shyam Jayaramandbae76b2014-11-17 12:51:29 -08009 "net"
10 "net/http"
11 "sync"
12
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -080013 "github.com/gorilla/websocket"
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080014
Jiri Simsa764efb72014-12-25 20:57:03 -080015 "v.io/core/veyron2/vlog"
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080016
Jiri Simsa764efb72014-12-25 20:57:03 -080017 "v.io/core/veyron/runtimes/google/lib/upcqueue"
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080018)
19
20var errListenerIsClosed = errors.New("Listener has been Closed")
21
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080022const bufferSize int = 4096
23
24// A listener that is able to handle either raw tcp request or websocket requests.
25// The result of Accept is is a net.Conn interface.
26type wsTCPListener struct {
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -080027 closed bool
28 mu sync.Mutex // Guards closed
29
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080030 // The queue of net.Conn to be returned by Accept.
31 q *upcqueue.T
32
33 // The queue for the http listener when we detect an http request.
34 httpQ *upcqueue.T
35
36 // The underlying listener.
37 netLn net.Listener
38 wsServer http.Server
39
40 netLoop sync.WaitGroup
41 wsLoop sync.WaitGroup
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080042
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -080043 hybrid bool // true if we're running in 'hybrid' mode
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080044}
45
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080046// queueListener is a listener that returns connections that are in q.
47type queueListener struct {
48 q *upcqueue.T
49 // ln is needed to implement Close and Addr
50 ln net.Listener
51}
52
53func (l *queueListener) Accept() (net.Conn, error) {
54 item, err := l.q.Get(nil)
55 switch {
56 case err == upcqueue.ErrQueueIsClosed:
57 return nil, errListenerIsClosed
58 case err != nil:
59 return nil, fmt.Errorf("Accept failed: %v", err)
60 default:
61 return item.(net.Conn), nil
62 }
63}
64
65func (l *queueListener) Close() error {
66 l.q.Shutdown()
67 return l.ln.Close()
68}
69
70func (l *queueListener) Addr() net.Addr {
71 return l.ln.Addr()
72}
73
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -080074func Listener(protocol, address string) (net.Listener, error) {
75 return listener(protocol, address, false)
76}
77
78func listener(protocol, address string, hybrid bool) (net.Listener, error) {
79 tcp := mapWebSocketToTCP[protocol]
80 netLn, err := net.Listen(tcp, address)
81 if err != nil {
82 return nil, err
83 }
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080084 ln := &wsTCPListener{
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -080085 q: upcqueue.New(),
86 httpQ: upcqueue.New(),
87 netLn: netLn,
88 hybrid: hybrid,
Shyam Jayaramandbae76b2014-11-17 12:51:29 -080089 }
90 ln.netLoop.Add(1)
91 go ln.netAcceptLoop()
92 httpListener := &queueListener{
93 q: ln.httpQ,
94 ln: ln,
95 }
96 handler := func(w http.ResponseWriter, r *http.Request) {
97 defer ln.wsLoop.Done()
98 if r.Method != "GET" {
99 http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
100 return
101 }
102 ws, err := websocket.Upgrade(w, r, nil, bufferSize, bufferSize)
103 if _, ok := err.(websocket.HandshakeError); ok {
104 http.Error(w, "Not a websocket handshake", 400)
105 vlog.Errorf("Rejected a non-websocket request: %v", err)
106 return
107 } else if err != nil {
108 http.Error(w, "Internal Error", 500)
109 vlog.Errorf("Rejected a non-websocket request: %v", err)
110 return
111 }
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -0800112 conn := WebsocketConn(ws)
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800113 if err := ln.q.Put(conn); err != nil {
114 vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed: %v", ws.RemoteAddr(), ws.LocalAddr(), err)
115 ws.Close()
116 return
117 }
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800118 }
119 ln.wsServer = http.Server{
120 Handler: http.HandlerFunc(handler),
121 }
122 go ln.wsServer.Serve(httpListener)
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -0800123 return ln, nil
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800124}
125
126func (ln *wsTCPListener) netAcceptLoop() {
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800127 defer ln.netLoop.Done()
128 for {
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -0800129 netConn, err := ln.netLn.Accept()
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800130 if err != nil {
131 vlog.VI(1).Infof("Exiting netAcceptLoop: net.Listener.Accept() failed on %v with %v", ln.netLn, err)
132 return
133 }
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -0800134 vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", netConn.RemoteAddr(), netConn.LocalAddr())
135 conn := netConn
136 if ln.hybrid {
137 hconn := &hybridConn{conn: netConn}
138 conn = hconn
139 magicbuf := [1]byte{}
140 n, err := io.ReadFull(netConn, magicbuf[:])
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800141 if err != nil {
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -0800142 vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) since we failed to read the first byte: %v", netConn.RemoteAddr(), netConn.LocalAddr(), err)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800143 continue
144 }
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -0800145 hconn.buffered = magicbuf[:n]
146 if magicbuf[0] != 'G' {
147 // Can't possibly be a websocket connection
148 if err := ln.q.Put(conn); err != nil {
149 vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", netConn.RemoteAddr(), netConn.LocalAddr(), err)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800150 }
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -0800151 continue
152 }
153 // Maybe be a websocket connection now.
154 }
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800155 ln.wsLoop.Add(1)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800156 if err := ln.httpQ.Put(conn); err != nil {
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800157 ln.wsLoop.Done()
158 vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800159 conn.Close()
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800160 continue
161 }
162 }
163}
164
165func (ln *wsTCPListener) Accept() (net.Conn, error) {
166 item, err := ln.q.Get(nil)
167 switch {
168 case err == upcqueue.ErrQueueIsClosed:
169 return nil, errListenerIsClosed
170 case err != nil:
171 return nil, fmt.Errorf("Accept failed: %v", err)
172 default:
173 return item.(net.Conn), nil
174 }
175}
176
177func (ln *wsTCPListener) Close() error {
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -0800178 ln.mu.Lock()
179 if ln.closed {
180 ln.mu.Unlock()
181 return errListenerIsClosed
182 }
183 ln.closed = true
184 ln.mu.Unlock()
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800185 addr := ln.netLn.Addr()
186 err := ln.netLn.Close()
187 vlog.VI(1).Infof("Closed net.Listener on (%q, %q): %v", addr.Network(), addr, err)
188 ln.httpQ.Shutdown()
189 ln.netLoop.Wait()
190 ln.wsLoop.Wait()
191 // q has to be shutdown after the netAcceptLoop finishes because that loop
192 // could be in the process of accepting a websocket connection. The ordering
193 // relative to wsLoop is not really relevant because the wsLoop counter wil
194 // decrement every time there a websocket connection has been handled and does
195 // not block on gets from q.
196 ln.q.Shutdown()
197 vlog.VI(3).Infof("Close stream.wsTCPListener %s", ln)
198 return nil
199}
200
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800201type addr struct{ n, a string }
202
203func (a *addr) Network() string {
204 return a.n
205}
206
207func (a *addr) String() string {
208 return a.a
209}
210
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800211func (ln *wsTCPListener) Addr() net.Addr {
Cosmos Nicolaou3c50ac42014-12-23 07:40:19 -0800212 protocol := "ws"
213 if ln.hybrid {
214 protocol = "wsh"
215 }
216 a := &addr{protocol, ln.netLn.Addr().String()}
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800217 return a
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800218}