runtimes/google/ipc: rework 'hybrid' websocket protocol support.
- introduce a new protocol type, wsh, which can listen for both
tcp and websocket connections.
- modify vc's to ensure that they use a control byte that can't
be confused with websockets. Roll is straightforward, push to
servers first, update clients, remove old support
Change-Id: I2d68eb51d1c9b50afbaa4c3164a9cef6c927aba1
diff --git a/lib/websocket/conn.go b/lib/websocket/conn.go
index 3a291ea..4d126d4 100644
--- a/lib/websocket/conn.go
+++ b/lib/websocket/conn.go
@@ -60,13 +60,12 @@
for n == 0 && err == nil {
if c.currReader == nil {
t, r, err := c.ws.NextReader()
-
- if t != websocket.BinaryMessage {
- return 0, fmt.Errorf("Unexpected message type %d", t)
- }
if err != nil {
return 0, err
}
+ if t != websocket.BinaryMessage {
+ return 0, fmt.Errorf("Unexpected message type %d", t)
+ }
c.currReader = r
}
n, err = c.readFromCurrReader(b)
@@ -86,15 +85,19 @@
func (c *wrappedConn) Close() error {
c.writeLock.Lock()
defer c.writeLock.Unlock()
+ // Send an EOF control message to the remote end so that it can
+ // handle the close gracefully.
+ msg := websocket.FormatCloseMessage(websocket.CloseGoingAway, "EOF")
+ c.ws.WriteControl(websocket.CloseMessage, msg, time.Now().Add(time.Second))
return c.ws.Close()
}
func (c *wrappedConn) LocalAddr() net.Addr {
- return websocketAddr{s: c.ws.LocalAddr().String()}
+ return &addr{"ws", c.ws.LocalAddr().String()}
}
func (c *wrappedConn) RemoteAddr() net.Addr {
- return websocketAddr{s: c.ws.RemoteAddr().String()}
+ return &addr{"ws", c.ws.RemoteAddr().String()}
}
func (c *wrappedConn) SetDeadline(t time.Time) error {
@@ -112,14 +115,53 @@
return c.ws.SetWriteDeadline(t)
}
-type websocketAddr struct {
- s string
+// hybridConn is used by the 'hybrid' protocol that can accept
+// either 'tcp' or 'websocket' connections. In particular, it allows
+// for the reader to peek and buffer the first n bytes of a stream
+// in order to determine what the connection type is.
+type hybridConn struct {
+ conn net.Conn
+ buffered []byte
}
-func (websocketAddr) Network() string {
- return "ws"
+func (wc *hybridConn) Read(b []byte) (int, error) {
+ lbuf := len(wc.buffered)
+ if lbuf == 0 {
+ return wc.conn.Read(b)
+ }
+ copyn := copy(b, wc.buffered)
+ wc.buffered = wc.buffered[copyn:]
+ if len(b) > copyn {
+ n, err := wc.conn.Read(b[copyn:])
+ return copyn + n, err
+ }
+ return copyn, nil
}
-func (w websocketAddr) String() string {
- return w.s
+func (wc *hybridConn) Write(b []byte) (n int, err error) {
+ return wc.conn.Write(b)
+}
+
+func (wc *hybridConn) Close() error {
+ return wc.conn.Close()
+}
+
+func (wc *hybridConn) LocalAddr() net.Addr {
+ return &addr{"wsh", wc.conn.LocalAddr().String()}
+}
+
+func (wc *hybridConn) RemoteAddr() net.Addr {
+ return &addr{"wsh", wc.conn.RemoteAddr().String()}
+}
+
+func (wc *hybridConn) SetDeadline(t time.Time) error {
+ return wc.conn.SetDeadline(t)
+}
+
+func (wc *hybridConn) SetReadDeadline(t time.Time) error {
+ return wc.conn.SetReadDeadline(t)
+}
+
+func (wc *hybridConn) SetWriteDeadline(t time.Time) error {
+ return wc.conn.SetWriteDeadline(t)
}
diff --git a/lib/websocket/conn_nacl.go b/lib/websocket/conn_nacl.go
index e20ced8..0aaf55e 100644
--- a/lib/websocket/conn_nacl.go
+++ b/lib/websocket/conn_nacl.go
@@ -28,7 +28,7 @@
currBuffer []byte
}
-func Dial(address string) (net.Conn, error) {
+func Dial(protocol, address string, timeout time.Duration) (net.Conn, error) {
inst := PpapiInstance
u, err := url.Parse("ws://" + address)
if err != nil {
diff --git a/lib/websocket/conn_test.go b/lib/websocket/conn_test.go
index db38cbe..6b6db66 100644
--- a/lib/websocket/conn_test.go
+++ b/lib/websocket/conn_test.go
@@ -3,12 +3,13 @@
import (
"bytes"
- "github.com/gorilla/websocket"
"net"
"net/http"
"sync"
"testing"
"time"
+
+ "github.com/gorilla/websocket"
)
func writer(c net.Conn, data []byte, times int, wg *sync.WaitGroup) {
@@ -88,7 +89,7 @@
}
// Dial out in another go routine
go func() {
- conn, err := Dial(addr.String())
+ conn, err := Dial("tcp", addr.String(), time.Second)
numTries := 0
for err != nil && numTries < 5 {
numTries++
diff --git a/lib/websocket/dialer.go b/lib/websocket/dialer.go
index 111acb2..d719a11 100644
--- a/lib/websocket/dialer.go
+++ b/lib/websocket/dialer.go
@@ -6,17 +6,23 @@
"net"
"net/http"
"net/url"
+ "time"
"github.com/gorilla/websocket"
)
-func Dial(address string) (net.Conn, error) {
- conn, err := net.Dial("tcp", address)
+func Dial(protocol, address string, timeout time.Duration) (net.Conn, error) {
+ var then time.Time
+ if timeout > 0 {
+ then = time.Now().Add(timeout)
+ }
+ tcp := mapWebSocketToTCP[protocol]
+ conn, err := net.DialTimeout(tcp, address, timeout)
if err != nil {
return nil, err
}
+ conn.SetReadDeadline(then)
u, err := url.Parse("ws://" + address)
-
if err != nil {
return nil, err
}
@@ -24,5 +30,7 @@
if err != nil {
return nil, err
}
+ var zero time.Time
+ conn.SetDeadline(zero)
return WebsocketConn(ws), nil
}
diff --git a/lib/websocket/hybrid.go b/lib/websocket/hybrid.go
new file mode 100644
index 0000000..e1bbc37
--- /dev/null
+++ b/lib/websocket/hybrid.go
@@ -0,0 +1,29 @@
+package websocket
+
+import (
+ "net"
+ "time"
+)
+
+var mapWebSocketToTCP = map[string]string{"ws": "tcp", "ws4": "tcp4", "ws6": "tcp6", "wsh": "tcp", "wsh4": "tcp4", "wsh6": "tcp6", "tcp": "tcp", "tcp4": "tcp4", "tcp6": "tcp6"}
+
+// HybridDial returns net.Conn that can be used with a HybridListener but
+// always uses tcp. A client must specifically elect to use websockets by
+// calling websocket.Dialer. The returned net.Conn will report 'tcp' as its
+// Network.
+func HybridDial(network, address string, timeout time.Duration) (net.Conn, error) {
+ tcp := mapWebSocketToTCP[network]
+ return net.DialTimeout(tcp, address, timeout)
+}
+
+// HybridListener returns a net.Listener that supports both tcp and
+// websockets over the same, single, port. A listen address of
+// --veyron.tcp.protocol=wsh --veyron.tcp.address=127.0.0.1:8101 means
+// that port 8101 can accept connections that use either tcp or websocket.
+// The listener looks at the first 4 bytes of the incoming data stream
+// to decide if it's a websocket protocol or not. These must be 'GET ' for
+// websockets, all other protocols must guarantee to not send 'GET ' as the
+// first four bytes of the payload.
+func HybridListener(protocol, address string) (net.Listener, error) {
+ return listener(protocol, address, true)
+}
diff --git a/lib/websocket/init.go b/lib/websocket/init.go
deleted file mode 100644
index 314c07a..0000000
--- a/lib/websocket/init.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package websocket
-
-import (
- "net"
- "time"
-
- "veyron.io/veyron/veyron2/ipc/stream"
-)
-
-var mapWebSocketToTCP = map[string]string{"ws": "tcp", "ws4": "tcp4", "ws6": "tcp6"}
-
-func wsListener(protocol, address string) (net.Listener, error) {
- tcp := mapWebSocketToTCP[protocol]
- ln, err := net.Listen(tcp, address)
- if err != nil {
- return nil, err
- }
- return NewListener(ln)
-}
-
-func wsDialer(protocol, address string, timeout time.Duration) (net.Conn, error) {
- // TODO(cnicolaou): implement timeout support.
- return Dial(address)
-}
-
-func init() {
- // ws, ws4, ws6 represent websocket protocol instances.
- for _, p := range []string{"ws", "ws4", "ws6"} {
- stream.RegisterProtocol(p, wsDialer, wsListener)
- }
-
- // TODO(cnicolaou): fully enable and test this 'hybrid mode'.
- // hws, hws4, hws6 represent a 'hybrid' protocol that can accept
- // both websockets and tcp, using a 'magic' byte to discriminate
- // between the two. These are needed when a single network port must
- // be use to serve both websocket and tcp clients, we prefer to use
- // tcp whenever we can to avoid the overhead of websockets. Clients
- // decide whether to use hybrid tcp or websockets by electing to dial
- // using the hws protocol or the ws protocol respectively.
- //for _, p := range []string{"wsh", "wsh4", "wsh6"} {
- // stream.RegisterProtocol(p, tcpHybridDialer, wsHybridListener)
- //}
-
- // The implementation strategy is as follows:
- // tcpHybridDialer will create and return a wrapped net.Conn which will
- // write the 'magic' time the first time that its Write method is called
- // but will otherwise be indistinguishable from the underlying net.Conn.
- // This first write will require an extra copy, but avoid potentially
- // sending two packets.
- // wsHybridListener is essentially the same as the current wsTCPListener,
- // but the magic byte handling implemented on a conditional basis.
-}
-
-/*
-func dialer(network, address string, timeout time.Duration) (net.Conn, error) {
- conn, err := net.DialTimeout(network, address, timeout)
- if err != nil {
- return nil, err
- }
- // For tcp connections we add an extra magic byte so we can differentiate between
- // raw tcp and websocket on the same port.
- switch n, err := conn.Write([]byte{websocket.BinaryMagicByte}); {
- case err != nil:
- return nil, err
- case n != 1:
- return nil, fmt.Errorf("Unable to write the magic byte")
- }
- return conn, nil
- }
-}
-*/
diff --git a/lib/websocket/listener.go b/lib/websocket/listener.go
index 10d3f51..2971a03 100644
--- a/lib/websocket/listener.go
+++ b/lib/websocket/listener.go
@@ -5,6 +5,7 @@
import (
"errors"
"fmt"
+ "io"
"net"
"net/http"
"sync"
@@ -18,15 +19,14 @@
var errListenerIsClosed = errors.New("Listener has been Closed")
-// We picked 0xFF because it's obviously outside the range of ASCII,
-// and is completely unused in UTF-8.
-const BinaryMagicByte byte = 0xFF
-
const bufferSize int = 4096
// A listener that is able to handle either raw tcp request or websocket requests.
// The result of Accept is is a net.Conn interface.
type wsTCPListener struct {
+ closed bool
+ mu sync.Mutex // Guards closed
+
// The queue of net.Conn to be returned by Accept.
q *upcqueue.T
@@ -39,32 +39,10 @@
netLoop sync.WaitGroup
wsLoop sync.WaitGroup
-}
-/*
-// bufferedConn is used to allow us to Peek at the first byte to see if it
-// is the magic byte used by veyron tcp requests. Other than that it behaves
-// like a normal net.Conn.
-type bufferedConn struct {
- net.Conn
- // TODO(bjornick): Remove this buffering because we have way too much
- // buffering anyway. We really only need to buffer the first byte.
- r *bufio.Reader
+ hybrid bool // true if we're running in 'hybrid' mode
}
-func newBufferedConn(c net.Conn) bufferedConn {
- return bufferedConn{Conn: c, r: bufio.NewReaderSize(c, bufferSize)}
-}
-
-func (c *bufferedConn) Peek(n int) ([]byte, error) {
- return c.r.Peek(n)
-}
-
-func (c *bufferedConn) Read(p []byte) (int, error) {
- return c.r.Read(p)
-}
-*/
-
// queueListener is a listener that returns connections that are in q.
type queueListener struct {
q *upcqueue.T
@@ -93,11 +71,21 @@
return l.ln.Addr()
}
-func NewListener(netLn net.Listener) (net.Listener, error) {
+func Listener(protocol, address string) (net.Listener, error) {
+ return listener(protocol, address, false)
+}
+
+func listener(protocol, address string, hybrid bool) (net.Listener, error) {
+ tcp := mapWebSocketToTCP[protocol]
+ netLn, err := net.Listen(tcp, address)
+ if err != nil {
+ return nil, err
+ }
ln := &wsTCPListener{
- q: upcqueue.New(),
- httpQ: upcqueue.New(),
- netLn: netLn,
+ q: upcqueue.New(),
+ httpQ: upcqueue.New(),
+ netLn: netLn,
+ hybrid: hybrid,
}
ln.netLoop.Add(1)
go ln.netAcceptLoop()
@@ -136,47 +124,38 @@
}
func (ln *wsTCPListener) netAcceptLoop() {
- defer ln.Close()
defer ln.netLoop.Done()
for {
- conn, err := ln.netLn.Accept()
+ netConn, err := ln.netLn.Accept()
if err != nil {
vlog.VI(1).Infof("Exiting netAcceptLoop: net.Listener.Accept() failed on %v with %v", ln.netLn, err)
return
}
- vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
- /*
- bc := newBufferedConn(conn)
- magic, err := bc.Peek(1)
+ vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", netConn.RemoteAddr(), netConn.LocalAddr())
+ conn := netConn
+ if ln.hybrid {
+ hconn := &hybridConn{conn: netConn}
+ conn = hconn
+ magicbuf := [1]byte{}
+ n, err := io.ReadFull(netConn, magicbuf[:])
if err != nil {
- vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as the magic byte failed to be read: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
- bc.Close()
+ vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) since we failed to read the first byte: %v", netConn.RemoteAddr(), netConn.LocalAddr(), err)
continue
}
-
- vlog.VI(1).Infof("Got a connection from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
- // Check to see if it is a regular connection or a http connection.
- if magic[0] == BinaryMagicByte {
- if _, err := bc.r.ReadByte(); err != nil {
- vlog.VI(1).Infof("Shutting down conn from %s (local address: %s), could read past the magic byte: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
- bc.Close()
- continue
- }
- if err := ln.q.Put(&bc); err != nil {
- vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
- bc.Close()
- continue
- }
- continue
+ hconn.buffered = magicbuf[:n]
+ if magicbuf[0] != 'G' {
+ // Can't possibly be a websocket connection
+ if err := ln.q.Put(conn); err != nil {
+ vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", netConn.RemoteAddr(), netConn.LocalAddr(), err)
}
- */
-
+ continue
+ }
+ // Maybe be a websocket connection now.
+ }
ln.wsLoop.Add(1)
- // if err := ln.httpQ.Put(&bc); err != nil {
if err := ln.httpQ.Put(conn); err != nil {
ln.wsLoop.Done()
vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
- //bc.Close()
conn.Close()
continue
}
@@ -196,6 +175,13 @@
}
func (ln *wsTCPListener) Close() error {
+ ln.mu.Lock()
+ if ln.closed {
+ ln.mu.Unlock()
+ return errListenerIsClosed
+ }
+ ln.closed = true
+ ln.mu.Unlock()
addr := ln.netLn.Addr()
err := ln.netLn.Close()
vlog.VI(1).Infof("Closed net.Listener on (%q, %q): %v", addr.Network(), addr, err)
@@ -223,6 +209,10 @@
}
func (ln *wsTCPListener) Addr() net.Addr {
- a := &addr{"ws", ln.netLn.Addr().String()}
+ protocol := "ws"
+ if ln.hybrid {
+ protocol = "wsh"
+ }
+ a := &addr{protocol, ln.netLn.Addr().String()}
return a
}
diff --git a/lib/websocket/listener_nacl.go b/lib/websocket/listener_nacl.go
index df89e2b..41dd95e 100644
--- a/lib/websocket/listener_nacl.go
+++ b/lib/websocket/listener_nacl.go
@@ -10,8 +10,6 @@
// Websocket listeners are not supported in NaCl.
// This file is needed for compilation only.
-const BinaryMagicByte byte = 0x90
-
-func NewListener(netLn net.Listener) (net.Listener, error) {
- return nil, fmt.Errorf("Websocket NewListener called in nacl code!")
+func Listener(protocol, address string) (net.Listener, error) {
+ return nil, fmt.Errorf("Websocket Listener called in nacl code!")
}
diff --git a/lib/websocket/util_test.go b/lib/websocket/util_test.go
new file mode 100644
index 0000000..f900dd7
--- /dev/null
+++ b/lib/websocket/util_test.go
@@ -0,0 +1,287 @@
+package websocket_test
+
+import (
+ "encoding/gob"
+ "fmt"
+ "hash/crc64"
+ "io"
+ "math/rand"
+ "net"
+ "sync"
+ "testing"
+ "time"
+
+ "veyron.io/veyron/veyron2/ipc/stream"
+
+ "veyron.io/veyron/veyron/lib/testutil"
+)
+
+var crcTable *crc64.Table
+
+func init() {
+ testutil.Init()
+ crcTable = crc64.MakeTable(crc64.ISO)
+}
+
+func newSender(t *testing.T, dialer stream.DialerFunc, protocol, address string) net.Conn {
+ conn, err := dialer(protocol, address, time.Minute)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ return nil
+ }
+ return conn
+}
+
+func checkProtocols(conn net.Conn, tx string) error {
+ expectedProtocol := map[string]string{
+ "ws": "ws", "wsh": "tcp", "tcp": "tcp",
+ }
+ if got, want := conn.LocalAddr().Network(), expectedProtocol[tx]; got != want {
+ return fmt.Errorf("wrong local protocol: got %q, want %q", got, want)
+ }
+ // Can't tell that the remote protocol is really 'wsh'
+ if got, want := conn.RemoteAddr().Network(), expectedProtocol[tx]; got != want {
+ return fmt.Errorf("wrong remote protocol: got %q, want %q", got, want)
+ }
+ return nil
+}
+
+type packet struct {
+ Data []byte
+ Size int
+ CRC64 uint64
+}
+
+func createPacket() *packet {
+ p := &packet{}
+ p.Size = rand.Intn(4 * 1024)
+ p.Data = make([]byte, p.Size)
+ for i := 0; i < p.Size; i++ {
+ p.Data[i] = byte(rand.Int() & 0xff)
+ }
+ p.CRC64 = crc64.Checksum([]byte(p.Data), crcTable)
+ return p
+}
+
+func checkPacket(p *packet) error {
+ if got, want := len(p.Data), p.Size; got != want {
+ return fmt.Errorf("wrong sizes: got %d, want %d", got, want)
+ }
+ crc := crc64.Checksum(p.Data, crcTable)
+ if got, want := crc, p.CRC64; got != want {
+ return fmt.Errorf("wrong crc: got %d, want %d", got, want)
+ }
+ return nil
+}
+
+type backChannel struct {
+ crcChan chan uint64
+ byteChan chan []byte
+ errChan chan error
+}
+
+type bcTable struct {
+ ready *sync.Cond
+ sync.Mutex
+ bc map[string]*backChannel
+}
+
+var globalBCTable bcTable
+
+func init() {
+ globalBCTable.ready = sync.NewCond(&globalBCTable)
+ globalBCTable.bc = make(map[string]*backChannel)
+}
+
+func (bt *bcTable) waitfor(key string) *backChannel {
+ bt.Lock()
+ defer bt.Unlock()
+ for {
+ bc := bt.bc[key]
+ if bc != nil {
+ delete(bt.bc, key)
+ return bc
+ }
+ bt.ready.Wait()
+ }
+}
+
+func (bt *bcTable) add(key string, bc *backChannel) {
+ bt.Lock()
+ bt.bc[key] = bc
+ bt.Unlock()
+ bt.ready.Broadcast()
+}
+
+func packetReceiver(t *testing.T, ln net.Listener, bc *backChannel) {
+ conn, err := ln.Accept()
+ if err != nil {
+ close(bc.crcChan)
+ close(bc.errChan)
+ return
+ }
+
+ globalBCTable.add(conn.RemoteAddr().String(), bc)
+
+ defer conn.Close()
+ dec := gob.NewDecoder(conn)
+ rxed := 0
+ for {
+ var p packet
+ err := dec.Decode(&p)
+ if err != nil {
+ if err != io.EOF {
+ bc.errChan <- fmt.Errorf("unexpected error: %s", err)
+ }
+ close(bc.crcChan)
+ close(bc.errChan)
+ return
+ }
+ if err := checkPacket(&p); err != nil {
+ bc.errChan <- fmt.Errorf("unexpected error: %s", err)
+ }
+ bc.crcChan <- p.CRC64
+ rxed++
+ }
+}
+
+func packetSender(t *testing.T, nPackets int, conn net.Conn) {
+ txCRCs := make([]uint64, nPackets)
+ enc := gob.NewEncoder(conn)
+ for i := 0; i < nPackets; i++ {
+ p := createPacket()
+ txCRCs[i] = p.CRC64
+ if err := enc.Encode(p); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ }
+ conn.Close() // Close the connection so that the receiver quits.
+
+ bc := globalBCTable.waitfor(conn.LocalAddr().String())
+ for err := range bc.errChan {
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ }
+
+ rxed := 0
+ for rxCRC := range bc.crcChan {
+ if got, want := rxCRC, txCRCs[rxed]; got != want {
+ t.Errorf("%s -> %s: packet %d: mismatched CRCs: got %d, want %d", conn.LocalAddr().String(), conn.RemoteAddr().String(), rxed, got, want)
+ }
+ rxed++
+ }
+ if got, want := rxed, nPackets; got != want {
+ t.Fatalf("%s -> %s: got %d, want %d", conn.LocalAddr().String(), conn.RemoteAddr().String(), got, want)
+ }
+}
+
+func packetRunner(t *testing.T, ln net.Listener, dialer stream.DialerFunc, protocol, address string) {
+ nPackets := 100
+ go packetReceiver(t, ln, &backChannel{
+ crcChan: make(chan uint64, nPackets),
+ errChan: make(chan error, nPackets),
+ })
+
+ conn := newSender(t, dialer, protocol, address)
+ if err := checkProtocols(conn, protocol); err != nil {
+ t.Fatalf(err.Error())
+ }
+ packetSender(t, nPackets, conn)
+}
+
+func byteReceiver(t *testing.T, ln net.Listener, bc *backChannel) {
+ conn, err := ln.Accept()
+ if err != nil {
+ close(bc.byteChan)
+ close(bc.errChan)
+ return
+ }
+ globalBCTable.add(conn.RemoteAddr().String(), bc)
+
+ defer conn.Close()
+ rxed := 0
+ for {
+ buf := make([]byte, rxed+1)
+ n, err := conn.Read(buf)
+ if err != nil {
+ if err != io.EOF {
+ bc.errChan <- fmt.Errorf("unexpected error: %s", err)
+ }
+ close(bc.byteChan)
+ close(bc.errChan)
+ return
+ }
+ if got, want := n, len(buf[:n]); got != want {
+ bc.errChan <- fmt.Errorf("%s -> %s: got %d bytes, expected %d", conn.LocalAddr().String(), conn.RemoteAddr().String(), got, want)
+ }
+ if got, want := buf[0], byte(0xff); got != want {
+ bc.errChan <- fmt.Errorf("%s -> %s: got %x, want %x", conn.LocalAddr().String(), conn.RemoteAddr().String(), got, want)
+ }
+ bc.byteChan <- buf[:n]
+ rxed++
+ }
+}
+
+func byteSender(t *testing.T, nIterations int, conn net.Conn) {
+ txBytes := make([][]byte, nIterations+1)
+ for i := 0; i < nIterations; i++ {
+ p := make([]byte, i+1)
+ p[0] = 0xff
+ for j := 1; j <= i; j++ {
+ p[j] = byte(64 + i) // start at ASCII A
+ }
+ txBytes[i] = p
+ n, err := conn.Write(p)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if got, want := n, i+1; got != want {
+ t.Fatalf("wrote %d, not %d bytes", got, want)
+ }
+ }
+ conn.Close()
+
+ bc := globalBCTable.waitfor(conn.LocalAddr().String())
+
+ for err := range bc.errChan {
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ }
+
+ addr := fmt.Sprintf("%s -> %s", conn.LocalAddr().String(), conn.RemoteAddr().String())
+ rxed := 0
+ for rxBytes := range bc.byteChan {
+ if got, want := len(rxBytes), rxed+1; got != want {
+ t.Fatalf("%s: got %d, want %d bytes", addr, got, want)
+ }
+ if got, want := rxBytes[0], byte(0xff); got != want {
+ t.Fatalf("%s: got %x, want %x", addr, got, want)
+ }
+ for i := 0; i < len(rxBytes); i++ {
+ if got, want := rxBytes[i], txBytes[rxed][i]; got != want {
+ t.Fatalf("%s: got %c, want %c", addr, got, want)
+ }
+ }
+ rxed++
+ }
+ if got, want := rxed, nIterations; got != want {
+ t.Fatalf("%s: got %d, want %d", addr, got, want)
+ }
+}
+
+func byteRunner(t *testing.T, ln net.Listener, dialer stream.DialerFunc, protocol, address string) {
+ nIterations := 10
+ go byteReceiver(t, ln, &backChannel{
+ byteChan: make(chan []byte, nIterations),
+ errChan: make(chan error, nIterations),
+ })
+
+ conn := newSender(t, dialer, protocol, address)
+ defer conn.Close()
+ if err := checkProtocols(conn, protocol); err != nil {
+ t.Fatalf(err.Error())
+ }
+ byteSender(t, nIterations, conn)
+}
diff --git a/lib/websocket/ws_test.go b/lib/websocket/ws_test.go
new file mode 100644
index 0000000..f126893
--- /dev/null
+++ b/lib/websocket/ws_test.go
@@ -0,0 +1,94 @@
+package websocket_test
+
+import (
+ "net"
+ "sync"
+ "testing"
+
+ "veyron.io/veyron/veyron2/ipc/stream"
+
+ "veyron.io/veyron/veyron/lib/websocket"
+)
+
+func packetTester(t *testing.T, dialer stream.DialerFunc, listener stream.ListenerFunc, txProtocol, rxProtocol string) {
+ ln, err := listener(rxProtocol, "127.0.0.1:0")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ defer ln.Close()
+ if got, want := ln.Addr().Network(), rxProtocol; got != want {
+ t.Fatalf("got %q, want %q", got, want)
+ }
+
+ packetRunner(t, ln, dialer, txProtocol, ln.Addr().String())
+ packetRunner(t, ln, dialer, txProtocol, ln.Addr().String())
+}
+
+func byteTester(t *testing.T, dialer stream.DialerFunc, listener stream.ListenerFunc, txProtocol, rxProtocol string) {
+ ln, err := listener(rxProtocol, "127.0.0.1:0")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ defer ln.Close()
+ if got, want := ln.Addr().Network(), rxProtocol; got != want {
+ t.Fatalf("got %q, want %q", got, want)
+ }
+
+ byteRunner(t, ln, dialer, txProtocol, ln.Addr().String())
+ byteRunner(t, ln, dialer, txProtocol, ln.Addr().String())
+
+}
+
+func TestWSToWS(t *testing.T) {
+ byteTester(t, websocket.Dial, websocket.Listener, "ws", "ws")
+ packetTester(t, websocket.Dial, websocket.Listener, "ws", "ws")
+}
+
+func TestWSToWSH(t *testing.T) {
+ byteTester(t, websocket.Dial, websocket.HybridListener, "ws", "wsh")
+ //packetTester(t, websocket.Dial, websocket.HybridListener, "ws", "wsh")
+}
+
+func TestWSHToWSH(t *testing.T) {
+ byteTester(t, websocket.HybridDial, websocket.HybridListener, "wsh", "wsh")
+ packetTester(t, websocket.HybridDial, websocket.HybridListener, "wsh", "wsh")
+}
+
+func TestTCPToWSH(t *testing.T) {
+ byteTester(t, net.DialTimeout, websocket.HybridListener, "tcp", "wsh")
+ packetTester(t, net.DialTimeout, websocket.HybridListener, "tcp", "wsh")
+}
+
+func TestMixed(t *testing.T) {
+ ln, err := websocket.HybridListener("wsh", "127.0.0.1:0")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ defer ln.Close()
+
+ var pwg sync.WaitGroup
+ packetTest := func(dialer stream.DialerFunc, protocol string) {
+ packetRunner(t, ln, dialer, protocol, ln.Addr().String())
+ pwg.Done()
+ }
+
+ pwg.Add(4)
+ go packetTest(websocket.Dial, "ws")
+ go packetTest(net.DialTimeout, "tcp")
+ go packetTest(websocket.Dial, "ws")
+ go packetTest(websocket.HybridDial, "wsh")
+ pwg.Wait()
+
+ var bwg sync.WaitGroup
+ byteTest := func(dialer stream.DialerFunc, protocol string) {
+ byteRunner(t, ln, dialer, protocol, ln.Addr().String())
+ bwg.Done()
+ }
+ bwg.Add(4)
+ go byteTest(websocket.Dial, "ws")
+ go byteTest(net.DialTimeout, "tcp")
+ go byteTest(websocket.Dial, "ws")
+ go byteTest(websocket.HybridDial, "wsh")
+
+ bwg.Wait()
+}
diff --git a/profiles/chrome/chrome.go b/profiles/chrome/chrome.go
index ada91dd..f2df385 100644
--- a/profiles/chrome/chrome.go
+++ b/profiles/chrome/chrome.go
@@ -9,8 +9,8 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/rt"
- _ "veyron.io/veyron/veyron/lib/websocket"
"veyron.io/veyron/veyron/profiles/internal/platform"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
_ "veyron.io/veyron/veyron/runtimes/google/rt"
)
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index 815b6dd..12e64d8 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -17,10 +17,11 @@
"veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/flags"
"veyron.io/veyron/veyron/lib/netstate"
- _ "veyron.io/veyron/veyron/lib/tcp"
- _ "veyron.io/veyron/veyron/lib/websocket"
"veyron.io/veyron/veyron/profiles/internal/gce"
"veyron.io/veyron/veyron/profiles/internal/platform"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/wsh"
_ "veyron.io/veyron/veyron/runtimes/google/rt"
)
diff --git a/profiles/generic.go b/profiles/generic.go
index 5b6f2d7..c377531 100644
--- a/profiles/generic.go
+++ b/profiles/generic.go
@@ -7,10 +7,11 @@
"veyron.io/veyron/veyron2/rt"
"veyron.io/veyron/veyron/lib/appcycle"
- _ "veyron.io/veyron/veyron/lib/tcp"
- _ "veyron.io/veyron/veyron/lib/websocket"
"veyron.io/veyron/veyron/profiles/internal"
"veyron.io/veyron/veyron/profiles/internal/platform"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/wsh"
_ "veyron.io/veyron/veyron/runtimes/google/rt"
)
diff --git a/profiles/roaming/roaming.go b/profiles/roaming/roaming.go
index c772848..b2e6bbc 100644
--- a/profiles/roaming/roaming.go
+++ b/profiles/roaming/roaming.go
@@ -22,10 +22,11 @@
"veyron.io/veyron/veyron/lib/flags"
"veyron.io/veyron/veyron/lib/netconfig"
"veyron.io/veyron/veyron/lib/netstate"
- _ "veyron.io/veyron/veyron/lib/tcp"
- _ "veyron.io/veyron/veyron/lib/websocket"
"veyron.io/veyron/veyron/profiles/internal"
"veyron.io/veyron/veyron/profiles/internal/platform"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/wsh"
_ "veyron.io/veyron/veyron/runtimes/google/rt"
"veyron.io/veyron/veyron/services/mgmt/debug"
diff --git a/profiles/static/static.go b/profiles/static/static.go
index a6b03c3..91ab924 100644
--- a/profiles/static/static.go
+++ b/profiles/static/static.go
@@ -14,12 +14,14 @@
"veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/flags"
"veyron.io/veyron/veyron/lib/netstate"
- _ "veyron.io/veyron/veyron/lib/tcp"
- _ "veyron.io/veyron/veyron/lib/websocket"
"veyron.io/veyron/veyron/profiles/internal"
"veyron.io/veyron/veyron/profiles/internal/platform"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/wsh"
_ "veyron.io/veyron/veyron/runtimes/google/rt"
"veyron.io/veyron/veyron/services/mgmt/debug"
+
// TODO(cnicolaou,ashankar): move this into flags.
sflag "veyron.io/veyron/veyron/security/flag"
)
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 6705ef0..5f203f4 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -29,10 +29,9 @@
"veyron.io/veyron/veyron/lib/netstate"
"veyron.io/veyron/veyron/lib/stats"
- _ "veyron.io/veyron/veyron/lib/tcp"
"veyron.io/veyron/veyron/lib/testutil"
tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
- _ "veyron.io/veyron/veyron/lib/websocket"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
"veyron.io/veyron/veyron/runtimes/google/lib/publisher"
diff --git a/lib/tcp/init.go b/runtimes/google/ipc/protocols/tcp/init.go
similarity index 100%
rename from lib/tcp/init.go
rename to runtimes/google/ipc/protocols/tcp/init.go
diff --git a/runtimes/google/ipc/protocols/ws/init.go b/runtimes/google/ipc/protocols/ws/init.go
new file mode 100644
index 0000000..587db00
--- /dev/null
+++ b/runtimes/google/ipc/protocols/ws/init.go
@@ -0,0 +1,14 @@
+package websocket
+
+import (
+ "veyron.io/veyron/veyron2/ipc/stream"
+
+ "veyron.io/veyron/veyron/lib/websocket"
+)
+
+func init() {
+ // ws, ws4, ws6 represent websocket protocol instances.
+ for _, p := range []string{"ws", "ws4", "ws6"} {
+ stream.RegisterProtocol(p, websocket.Dial, websocket.Listener)
+ }
+}
diff --git a/runtimes/google/ipc/protocols/wsh/init.go b/runtimes/google/ipc/protocols/wsh/init.go
new file mode 100644
index 0000000..53f34e0
--- /dev/null
+++ b/runtimes/google/ipc/protocols/wsh/init.go
@@ -0,0 +1,15 @@
+// Package wsh registers the websocket 'hybrid' protocol.
+// We prefer to use tcp whenever we can to avoid the overhead of websockets.
+package wsh
+
+import (
+ "veyron.io/veyron/veyron2/ipc/stream"
+
+ "veyron.io/veyron/veyron/lib/websocket"
+)
+
+func init() {
+ for _, p := range []string{"wsh", "wsh4", "wsh6"} {
+ stream.RegisterProtocol(p, websocket.HybridDial, websocket.HybridListener)
+ }
+}
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index 4c7cc05..6ac8b20 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -16,7 +16,6 @@
"veyron.io/veyron/veyron/lib/modules"
"veyron.io/veyron/veyron/lib/modules/core"
tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
- _ "veyron.io/veyron/veyron/lib/websocket"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
inaming "veyron.io/veyron/veyron/runtimes/google/naming"
diff --git a/runtimes/google/ipc/stream/benchmark/dial_vc.go b/runtimes/google/ipc/stream/benchmark/dial_vc.go
index 2995374..e0275cf 100644
--- a/runtimes/google/ipc/stream/benchmark/dial_vc.go
+++ b/runtimes/google/ipc/stream/benchmark/dial_vc.go
@@ -5,8 +5,8 @@
"testing"
"time"
- _ "veyron.io/veyron/veyron/lib/tcp"
"veyron.io/veyron/veyron/lib/testutil"
+ _ "veyron.io/veyron/veyron/profiles/static"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
"veyron.io/veyron/veyron2/naming"
diff --git a/runtimes/google/ipc/stream/benchmark/throughput_flow.go b/runtimes/google/ipc/stream/benchmark/throughput_flow.go
index 902aa27..374464d 100644
--- a/runtimes/google/ipc/stream/benchmark/throughput_flow.go
+++ b/runtimes/google/ipc/stream/benchmark/throughput_flow.go
@@ -4,7 +4,6 @@
"io"
"testing"
- _ "veyron.io/veyron/veyron/lib/tcp"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
"veyron.io/veyron/veyron2/ipc/stream"
diff --git a/runtimes/google/ipc/stream/manager/manager_test.go b/runtimes/google/ipc/stream/manager/manager_test.go
index dc2b32d..551e0e5 100644
--- a/runtimes/google/ipc/stream/manager/manager_test.go
+++ b/runtimes/google/ipc/stream/manager/manager_test.go
@@ -19,10 +19,10 @@
"veyron.io/veyron/veyron/lib/expect"
"veyron.io/veyron/veyron/lib/modules"
- _ "veyron.io/veyron/veyron/lib/tcp"
"veyron.io/veyron/veyron/lib/testutil"
tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
- _ "veyron.io/veyron/veyron/lib/websocket"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+ _ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
"veyron.io/veyron/veyron/runtimes/google/ipc/version"
inaming "veyron.io/veyron/veyron/runtimes/google/naming"
diff --git a/runtimes/google/ipc/stream/message/message.go b/runtimes/google/ipc/stream/message/message.go
index 7c5358b..0928254 100644
--- a/runtimes/google/ipc/stream/message/message.go
+++ b/runtimes/google/ipc/stream/message/message.go
@@ -78,8 +78,14 @@
commonHeaderSizeBytes = 4 // 1 byte type + 3 bytes payload length
dataHeaderSizeBytes = 9 // 4 byte id.VC + 4 byte id.Flow + 1 byte flags
- controlType = 0
- dataType = 1
+ // Make sure the first byte can't be ASCII to ensure that a VC
+ // header can never be confused with a web socket request.
+ // TODO(cnicolaou): remove the original controlType and dataType values
+ // when new binaries are pushed.
+ controlType = 0
+ controlTypeWS = 0x80
+ dataType = 1
+ dataTypeWS = 0x81
)
var (
@@ -119,7 +125,7 @@
}
macSize := c.MACSize()
switch msgType {
- case controlType:
+ case controlType, controlTypeWS:
if !c.Open(payload.Contents) {
payload.Release()
return nil, corruptedMessageErr
@@ -127,7 +133,7 @@
m, err := readControl(bytes.NewBuffer(payload.Contents[:msgPayloadSize-macSize]))
payload.Release()
return m, err
- case dataType:
+ case dataType, dataTypeWS:
if !c.Open(payload.Contents[0 : dataHeaderSizeBytes+macSize]) {
payload.Release()
return nil, corruptedMessageErr
diff --git a/runtimes/google/ipc/stream/proxy/proxy_test.go b/runtimes/google/ipc/stream/proxy/proxy_test.go
index efb08e0..20500bb 100644
--- a/runtimes/google/ipc/stream/proxy/proxy_test.go
+++ b/runtimes/google/ipc/stream/proxy/proxy_test.go
@@ -11,9 +11,9 @@
"veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/naming"
- _ "veyron.io/veyron/veyron/lib/tcp"
"veyron.io/veyron/veyron/lib/testutil"
tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
+ _ "veyron.io/veyron/veyron/profiles"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"