runtimes/google/ipc: rework 'hybrid' websocket protocol support.

- introduce a new protocol type, wsh, which can listen for both
  tcp and websocket connections.
- modify vc's to ensure that they use a control byte that can't
  be confused with websockets. Roll is straightforward, push to
  servers first, update clients, remove old support

Change-Id: I2d68eb51d1c9b50afbaa4c3164a9cef6c927aba1
diff --git a/lib/websocket/conn.go b/lib/websocket/conn.go
index 3a291ea..4d126d4 100644
--- a/lib/websocket/conn.go
+++ b/lib/websocket/conn.go
@@ -60,13 +60,12 @@
 	for n == 0 && err == nil {
 		if c.currReader == nil {
 			t, r, err := c.ws.NextReader()
-
-			if t != websocket.BinaryMessage {
-				return 0, fmt.Errorf("Unexpected message type %d", t)
-			}
 			if err != nil {
 				return 0, err
 			}
+			if t != websocket.BinaryMessage {
+				return 0, fmt.Errorf("Unexpected message type %d", t)
+			}
 			c.currReader = r
 		}
 		n, err = c.readFromCurrReader(b)
@@ -86,15 +85,19 @@
 func (c *wrappedConn) Close() error {
 	c.writeLock.Lock()
 	defer c.writeLock.Unlock()
+	// Send an EOF control message to the remote end so that it can
+	// handle the close gracefully.
+	msg := websocket.FormatCloseMessage(websocket.CloseGoingAway, "EOF")
+	c.ws.WriteControl(websocket.CloseMessage, msg, time.Now().Add(time.Second))
 	return c.ws.Close()
 }
 
 func (c *wrappedConn) LocalAddr() net.Addr {
-	return websocketAddr{s: c.ws.LocalAddr().String()}
+	return &addr{"ws", c.ws.LocalAddr().String()}
 }
 
 func (c *wrappedConn) RemoteAddr() net.Addr {
-	return websocketAddr{s: c.ws.RemoteAddr().String()}
+	return &addr{"ws", c.ws.RemoteAddr().String()}
 }
 
 func (c *wrappedConn) SetDeadline(t time.Time) error {
@@ -112,14 +115,53 @@
 	return c.ws.SetWriteDeadline(t)
 }
 
-type websocketAddr struct {
-	s string
+// hybridConn is used by the 'hybrid' protocol that can accept
+// either 'tcp' or 'websocket' connections. In particular, it allows
+// for the reader to peek and buffer the first n bytes of a stream
+// in order to determine what the connection type is.
+type hybridConn struct {
+	conn     net.Conn
+	buffered []byte
 }
 
-func (websocketAddr) Network() string {
-	return "ws"
+func (wc *hybridConn) Read(b []byte) (int, error) {
+	lbuf := len(wc.buffered)
+	if lbuf == 0 {
+		return wc.conn.Read(b)
+	}
+	copyn := copy(b, wc.buffered)
+	wc.buffered = wc.buffered[copyn:]
+	if len(b) > copyn {
+		n, err := wc.conn.Read(b[copyn:])
+		return copyn + n, err
+	}
+	return copyn, nil
 }
 
-func (w websocketAddr) String() string {
-	return w.s
+func (wc *hybridConn) Write(b []byte) (n int, err error) {
+	return wc.conn.Write(b)
+}
+
+func (wc *hybridConn) Close() error {
+	return wc.conn.Close()
+}
+
+func (wc *hybridConn) LocalAddr() net.Addr {
+	return &addr{"wsh", wc.conn.LocalAddr().String()}
+}
+
+func (wc *hybridConn) RemoteAddr() net.Addr {
+	return &addr{"wsh", wc.conn.RemoteAddr().String()}
+}
+
+func (wc *hybridConn) SetDeadline(t time.Time) error {
+	return wc.conn.SetDeadline(t)
+}
+
+func (wc *hybridConn) SetReadDeadline(t time.Time) error {
+	return wc.conn.SetReadDeadline(t)
+}
+
+func (wc *hybridConn) SetWriteDeadline(t time.Time) error {
+	return wc.conn.SetWriteDeadline(t)
 }
diff --git a/lib/websocket/conn_nacl.go b/lib/websocket/conn_nacl.go
index e20ced8..0aaf55e 100644
--- a/lib/websocket/conn_nacl.go
+++ b/lib/websocket/conn_nacl.go
@@ -28,7 +28,7 @@
 	currBuffer []byte
 }
 
-func Dial(address string) (net.Conn, error) {
+func Dial(protocol, address string, timeout time.Duration) (net.Conn, error) {
 	inst := PpapiInstance
 	u, err := url.Parse("ws://" + address)
 	if err != nil {
diff --git a/lib/websocket/conn_test.go b/lib/websocket/conn_test.go
index db38cbe..6b6db66 100644
--- a/lib/websocket/conn_test.go
+++ b/lib/websocket/conn_test.go
@@ -3,12 +3,13 @@
 
 import (
 	"bytes"
-	"github.com/gorilla/websocket"
 	"net"
 	"net/http"
 	"sync"
 	"testing"
 	"time"
+
+	"github.com/gorilla/websocket"
 )
 
 func writer(c net.Conn, data []byte, times int, wg *sync.WaitGroup) {
@@ -88,7 +89,7 @@
 	}
 	// Dial out in another go routine
 	go func() {
-		conn, err := Dial(addr.String())
+		conn, err := Dial("tcp", addr.String(), time.Second)
 		numTries := 0
 		for err != nil && numTries < 5 {
 			numTries++
diff --git a/lib/websocket/dialer.go b/lib/websocket/dialer.go
index 111acb2..d719a11 100644
--- a/lib/websocket/dialer.go
+++ b/lib/websocket/dialer.go
@@ -6,17 +6,23 @@
 	"net"
 	"net/http"
 	"net/url"
+	"time"
 
 	"github.com/gorilla/websocket"
 )
 
-func Dial(address string) (net.Conn, error) {
-	conn, err := net.Dial("tcp", address)
+func Dial(protocol, address string, timeout time.Duration) (net.Conn, error) {
+	var then time.Time
+	if timeout > 0 {
+		then = time.Now().Add(timeout)
+	}
+	tcp := mapWebSocketToTCP[protocol]
+	conn, err := net.DialTimeout(tcp, address, timeout)
 	if err != nil {
 		return nil, err
 	}
+	conn.SetReadDeadline(then)
 	u, err := url.Parse("ws://" + address)
-
 	if err != nil {
 		return nil, err
 	}
@@ -24,5 +30,7 @@
 	if err != nil {
 		return nil, err
 	}
+	var zero time.Time
+	conn.SetDeadline(zero)
 	return WebsocketConn(ws), nil
 }
diff --git a/lib/websocket/hybrid.go b/lib/websocket/hybrid.go
new file mode 100644
index 0000000..e1bbc37
--- /dev/null
+++ b/lib/websocket/hybrid.go
@@ -0,0 +1,29 @@
+package websocket
+
+import (
+	"net"
+	"time"
+)
+
+var mapWebSocketToTCP = map[string]string{"ws": "tcp", "ws4": "tcp4", "ws6": "tcp6", "wsh": "tcp", "wsh4": "tcp4", "wsh6": "tcp6", "tcp": "tcp", "tcp4": "tcp4", "tcp6": "tcp6"}
+
+// HybridDial returns net.Conn that can be used with a HybridListener but
+// always uses tcp. A client must specifically elect to use websockets by
+// calling websocket.Dialer. The returned net.Conn will report 'tcp' as its
+// Network.
+func HybridDial(network, address string, timeout time.Duration) (net.Conn, error) {
+	tcp := mapWebSocketToTCP[network]
+	return net.DialTimeout(tcp, address, timeout)
+}
+
+// HybridListener returns a net.Listener that supports both tcp and
+// websockets over the same, single, port. A listen address of
+// --veyron.tcp.protocol=wsh --veyron.tcp.address=127.0.0.1:8101 means
+// that port 8101 can accept connections that use either tcp or websocket.
+// The listener looks at the first 4 bytes of the incoming data stream
+// to decide if it's a websocket protocol or not. These must be 'GET ' for
+// websockets, all other protocols must guarantee to not send 'GET ' as the
+// first four bytes of the payload.
+func HybridListener(protocol, address string) (net.Listener, error) {
+	return listener(protocol, address, true)
+}
diff --git a/lib/websocket/init.go b/lib/websocket/init.go
deleted file mode 100644
index 314c07a..0000000
--- a/lib/websocket/init.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package websocket
-
-import (
-	"net"
-	"time"
-
-	"veyron.io/veyron/veyron2/ipc/stream"
-)
-
-var mapWebSocketToTCP = map[string]string{"ws": "tcp", "ws4": "tcp4", "ws6": "tcp6"}
-
-func wsListener(protocol, address string) (net.Listener, error) {
-	tcp := mapWebSocketToTCP[protocol]
-	ln, err := net.Listen(tcp, address)
-	if err != nil {
-		return nil, err
-	}
-	return NewListener(ln)
-}
-
-func wsDialer(protocol, address string, timeout time.Duration) (net.Conn, error) {
-	// TODO(cnicolaou): implement timeout support.
-	return Dial(address)
-}
-
-func init() {
-	// ws, ws4, ws6 represent websocket protocol instances.
-	for _, p := range []string{"ws", "ws4", "ws6"} {
-		stream.RegisterProtocol(p, wsDialer, wsListener)
-	}
-
-	// TODO(cnicolaou): fully enable and test this 'hybrid mode'.
-	// hws, hws4, hws6 represent a 'hybrid' protocol that can accept
-	// both websockets and tcp, using a 'magic' byte to discriminate
-	// between the two. These are needed when a single network port must
-	// be use to serve both websocket and tcp clients, we prefer to use
-	// tcp whenever we can to avoid the overhead of websockets. Clients
-	// decide whether to use hybrid tcp or websockets by electing to dial
-	// using the hws protocol or the ws protocol respectively.
-	//for _, p := range []string{"wsh", "wsh4", "wsh6"} {
-	//	stream.RegisterProtocol(p, tcpHybridDialer, wsHybridListener)
-	//}
-
-	// The implementation strategy is as follows:
-	// tcpHybridDialer will create and return a wrapped net.Conn which will
-	// write the 'magic' time the first time that its Write method is called
-	// but will otherwise be indistinguishable from the underlying net.Conn.
-	// This first write will require an extra copy, but avoid potentially
-	// sending two packets.
-	// wsHybridListener is essentially the same as the current wsTCPListener,
-	// but the magic byte handling implemented on a conditional basis.
-}
-
-/*
-func dialer(network, address string, timeout time.Duration) (net.Conn, error) {
-		conn, err := net.DialTimeout(network, address, timeout)
-		if err != nil {
-			return nil, err
-		}
-		// For tcp connections we add an extra magic byte so we can differentiate between
-		// raw tcp and websocket on the same port.
-		switch n, err := conn.Write([]byte{websocket.BinaryMagicByte}); {
-		case err != nil:
-			return nil, err
-		case n != 1:
-			return nil, fmt.Errorf("Unable to write the magic byte")
-		}
-		return conn, nil
-	}
-}
-*/
diff --git a/lib/websocket/listener.go b/lib/websocket/listener.go
index 10d3f51..2971a03 100644
--- a/lib/websocket/listener.go
+++ b/lib/websocket/listener.go
@@ -5,6 +5,7 @@
 import (
 	"errors"
 	"fmt"
+	"io"
 	"net"
 	"net/http"
 	"sync"
@@ -18,15 +19,14 @@
 
 var errListenerIsClosed = errors.New("Listener has been Closed")
 
-// We picked 0xFF because it's obviously outside the range of ASCII,
-// and is completely unused in UTF-8.
-const BinaryMagicByte byte = 0xFF
-
 const bufferSize int = 4096
 
 // A listener that is able to handle either raw tcp request or websocket requests.
 // The result of Accept is is a net.Conn interface.
 type wsTCPListener struct {
+	closed bool
+	mu     sync.Mutex // Guards closed
+
 	// The queue of net.Conn to be returned by Accept.
 	q *upcqueue.T
 
@@ -39,32 +39,10 @@
 
 	netLoop sync.WaitGroup
 	wsLoop  sync.WaitGroup
-}
 
-/*
-// bufferedConn is used to allow us to Peek at the first byte to see if it
-// is the magic byte used by veyron tcp requests.  Other than that it behaves
-// like a normal net.Conn.
-type bufferedConn struct {
-	net.Conn
-	// TODO(bjornick): Remove this buffering because we have way too much
-	// buffering anyway.  We really only need to buffer the first byte.
-	r *bufio.Reader
+	hybrid bool // true if we're running in 'hybrid' mode
 }
 
-func newBufferedConn(c net.Conn) bufferedConn {
-	return bufferedConn{Conn: c, r: bufio.NewReaderSize(c, bufferSize)}
-}
-
-func (c *bufferedConn) Peek(n int) ([]byte, error) {
-	return c.r.Peek(n)
-}
-
-func (c *bufferedConn) Read(p []byte) (int, error) {
-	return c.r.Read(p)
-}
-*/
-
 // queueListener is a listener that returns connections that are in q.
 type queueListener struct {
 	q *upcqueue.T
@@ -93,11 +71,21 @@
 	return l.ln.Addr()
 }
 
-func NewListener(netLn net.Listener) (net.Listener, error) {
+func Listener(protocol, address string) (net.Listener, error) {
+	return listener(protocol, address, false)
+}
+
+func listener(protocol, address string, hybrid bool) (net.Listener, error) {
+	tcp := mapWebSocketToTCP[protocol]
+	netLn, err := net.Listen(tcp, address)
+	if err != nil {
+		return nil, err
+	}
 	ln := &wsTCPListener{
-		q:     upcqueue.New(),
-		httpQ: upcqueue.New(),
-		netLn: netLn,
+		q:      upcqueue.New(),
+		httpQ:  upcqueue.New(),
+		netLn:  netLn,
+		hybrid: hybrid,
 	}
 	ln.netLoop.Add(1)
 	go ln.netAcceptLoop()
@@ -136,47 +124,38 @@
 }
 
 func (ln *wsTCPListener) netAcceptLoop() {
-	defer ln.Close()
 	defer ln.netLoop.Done()
 	for {
-		conn, err := ln.netLn.Accept()
+		netConn, err := ln.netLn.Accept()
 		if err != nil {
 			vlog.VI(1).Infof("Exiting netAcceptLoop: net.Listener.Accept() failed on %v with %v", ln.netLn, err)
 			return
 		}
-		vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
-		/*
-			bc := newBufferedConn(conn)
-			magic, err := bc.Peek(1)
+		vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", netConn.RemoteAddr(), netConn.LocalAddr())
+		conn := netConn
+		if ln.hybrid {
+			hconn := &hybridConn{conn: netConn}
+			conn = hconn
+			magicbuf := [1]byte{}
+			n, err := io.ReadFull(netConn, magicbuf[:])
 			if err != nil {
-				vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as the magic byte failed to be read: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
-				bc.Close()
+				vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) since we failed to read the first byte: %v", netConn.RemoteAddr(), netConn.LocalAddr(), err)
 				continue
 			}
-
-				vlog.VI(1).Infof("Got a connection from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
-				// Check to see if it is a regular connection or a http connection.
-				if magic[0] == BinaryMagicByte {
-					if _, err := bc.r.ReadByte(); err != nil {
-						vlog.VI(1).Infof("Shutting down conn from %s (local address: %s), could read past the magic byte: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
-						bc.Close()
-						continue
-					}
-					if err := ln.q.Put(&bc); err != nil {
-						vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
-						bc.Close()
-						continue
-					}
-					continue
+			hconn.buffered = magicbuf[:n]
+			if magicbuf[0] != 'G' {
+				// Can't possibly be a websocket connection
+				if err := ln.q.Put(conn); err != nil {
+					vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", netConn.RemoteAddr(), netConn.LocalAddr(), err)
 				}
-		*/
-
+				continue
+			}
+			// Maybe be a websocket connection now.
+		}
 		ln.wsLoop.Add(1)
-		//		if err := ln.httpQ.Put(&bc); err != nil {
 		if err := ln.httpQ.Put(conn); err != nil {
 			ln.wsLoop.Done()
 			vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
-			//bc.Close()
 			conn.Close()
 			continue
 		}
@@ -196,6 +175,13 @@
 }
 
 func (ln *wsTCPListener) Close() error {
+	ln.mu.Lock()
+	if ln.closed {
+		ln.mu.Unlock()
+		return errListenerIsClosed
+	}
+	ln.closed = true
+	ln.mu.Unlock()
 	addr := ln.netLn.Addr()
 	err := ln.netLn.Close()
 	vlog.VI(1).Infof("Closed net.Listener on (%q, %q): %v", addr.Network(), addr, err)
@@ -223,6 +209,10 @@
 }
 
 func (ln *wsTCPListener) Addr() net.Addr {
-	a := &addr{"ws", ln.netLn.Addr().String()}
+	protocol := "ws"
+	if ln.hybrid {
+		protocol = "wsh"
+	}
+	a := &addr{protocol, ln.netLn.Addr().String()}
 	return a
 }
diff --git a/lib/websocket/listener_nacl.go b/lib/websocket/listener_nacl.go
index df89e2b..41dd95e 100644
--- a/lib/websocket/listener_nacl.go
+++ b/lib/websocket/listener_nacl.go
@@ -10,8 +10,6 @@
 // Websocket listeners are not supported in NaCl.
 // This file is needed for compilation only.
 
-const BinaryMagicByte byte = 0x90
-
-func NewListener(netLn net.Listener) (net.Listener, error) {
-	return nil, fmt.Errorf("Websocket NewListener called in nacl code!")
+func Listener(protocol, address string) (net.Listener, error) {
+	return nil, fmt.Errorf("Websocket Listener called in nacl code!")
 }
diff --git a/lib/websocket/util_test.go b/lib/websocket/util_test.go
new file mode 100644
index 0000000..f900dd7
--- /dev/null
+++ b/lib/websocket/util_test.go
@@ -0,0 +1,287 @@
+package websocket_test
+
+import (
+	"encoding/gob"
+	"fmt"
+	"hash/crc64"
+	"io"
+	"math/rand"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"veyron.io/veyron/veyron2/ipc/stream"
+
+	"veyron.io/veyron/veyron/lib/testutil"
+)
+
+var crcTable *crc64.Table
+
+func init() {
+	testutil.Init()
+	crcTable = crc64.MakeTable(crc64.ISO)
+}
+
+func newSender(t *testing.T, dialer stream.DialerFunc, protocol, address string) net.Conn {
+	conn, err := dialer(protocol, address, time.Minute)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+		return nil
+	}
+	return conn
+}
+
+func checkProtocols(conn net.Conn, tx string) error {
+	expectedProtocol := map[string]string{
+		"ws": "ws", "wsh": "tcp", "tcp": "tcp",
+	}
+	if got, want := conn.LocalAddr().Network(), expectedProtocol[tx]; got != want {
+		return fmt.Errorf("wrong local protocol: got %q, want %q", got, want)
+	}
+	// Can't tell that the remote protocol is really 'wsh'
+	if got, want := conn.RemoteAddr().Network(), expectedProtocol[tx]; got != want {
+		return fmt.Errorf("wrong remote protocol: got %q, want %q", got, want)
+	}
+	return nil
+}
+
+type packet struct {
+	Data  []byte
+	Size  int
+	CRC64 uint64
+}
+
+func createPacket() *packet {
+	p := &packet{}
+	p.Size = rand.Intn(4 * 1024)
+	p.Data = make([]byte, p.Size)
+	for i := 0; i < p.Size; i++ {
+		p.Data[i] = byte(rand.Int() & 0xff)
+	}
+	p.CRC64 = crc64.Checksum([]byte(p.Data), crcTable)
+	return p
+}
+
+func checkPacket(p *packet) error {
+	if got, want := len(p.Data), p.Size; got != want {
+		return fmt.Errorf("wrong sizes: got %d, want %d", got, want)
+	}
+	crc := crc64.Checksum(p.Data, crcTable)
+	if got, want := crc, p.CRC64; got != want {
+		return fmt.Errorf("wrong crc: got %d, want %d", got, want)
+	}
+	return nil
+}
+
+type backChannel struct {
+	crcChan  chan uint64
+	byteChan chan []byte
+	errChan  chan error
+}
+
+type bcTable struct {
+	ready *sync.Cond
+	sync.Mutex
+	bc map[string]*backChannel
+}
+
+var globalBCTable bcTable
+
+func init() {
+	globalBCTable.ready = sync.NewCond(&globalBCTable)
+	globalBCTable.bc = make(map[string]*backChannel)
+}
+
+func (bt *bcTable) waitfor(key string) *backChannel {
+	bt.Lock()
+	defer bt.Unlock()
+	for {
+		bc := bt.bc[key]
+		if bc != nil {
+			delete(bt.bc, key)
+			return bc
+		}
+		bt.ready.Wait()
+	}
+}
+
+func (bt *bcTable) add(key string, bc *backChannel) {
+	bt.Lock()
+	bt.bc[key] = bc
+	bt.Unlock()
+	bt.ready.Broadcast()
+}
+
+func packetReceiver(t *testing.T, ln net.Listener, bc *backChannel) {
+	conn, err := ln.Accept()
+	if err != nil {
+		close(bc.crcChan)
+		close(bc.errChan)
+		return
+	}
+
+	globalBCTable.add(conn.RemoteAddr().String(), bc)
+
+	defer conn.Close()
+	dec := gob.NewDecoder(conn)
+	rxed := 0
+	for {
+		var p packet
+		err := dec.Decode(&p)
+		if err != nil {
+			if err != io.EOF {
+				bc.errChan <- fmt.Errorf("unexpected error: %s", err)
+			}
+			close(bc.crcChan)
+			close(bc.errChan)
+			return
+		}
+		if err := checkPacket(&p); err != nil {
+			bc.errChan <- fmt.Errorf("unexpected error: %s", err)
+		}
+		bc.crcChan <- p.CRC64
+		rxed++
+	}
+}
+
+func packetSender(t *testing.T, nPackets int, conn net.Conn) {
+	txCRCs := make([]uint64, nPackets)
+	enc := gob.NewEncoder(conn)
+	for i := 0; i < nPackets; i++ {
+		p := createPacket()
+		txCRCs[i] = p.CRC64
+		if err := enc.Encode(p); err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+	}
+	conn.Close() // Close the connection so that the receiver quits.
+
+	bc := globalBCTable.waitfor(conn.LocalAddr().String())
+	for err := range bc.errChan {
+		if err != nil {
+			t.Fatalf(err.Error())
+		}
+	}
+
+	rxed := 0
+	for rxCRC := range bc.crcChan {
+		if got, want := rxCRC, txCRCs[rxed]; got != want {
+			t.Errorf("%s -> %s: packet %d: mismatched CRCs: got %d, want %d", conn.LocalAddr().String(), conn.RemoteAddr().String(), rxed, got, want)
+		}
+		rxed++
+	}
+	if got, want := rxed, nPackets; got != want {
+		t.Fatalf("%s -> %s: got %d, want %d", conn.LocalAddr().String(), conn.RemoteAddr().String(), got, want)
+	}
+}
+
+func packetRunner(t *testing.T, ln net.Listener, dialer stream.DialerFunc, protocol, address string) {
+	nPackets := 100
+	go packetReceiver(t, ln, &backChannel{
+		crcChan: make(chan uint64, nPackets),
+		errChan: make(chan error, nPackets),
+	})
+
+	conn := newSender(t, dialer, protocol, address)
+	if err := checkProtocols(conn, protocol); err != nil {
+		t.Fatalf(err.Error())
+	}
+	packetSender(t, nPackets, conn)
+}
+
+func byteReceiver(t *testing.T, ln net.Listener, bc *backChannel) {
+	conn, err := ln.Accept()
+	if err != nil {
+		close(bc.byteChan)
+		close(bc.errChan)
+		return
+	}
+	globalBCTable.add(conn.RemoteAddr().String(), bc)
+
+	defer conn.Close()
+	rxed := 0
+	for {
+		buf := make([]byte, rxed+1)
+		n, err := conn.Read(buf)
+		if err != nil {
+			if err != io.EOF {
+				bc.errChan <- fmt.Errorf("unexpected error: %s", err)
+			}
+			close(bc.byteChan)
+			close(bc.errChan)
+			return
+		}
+		if got, want := n, len(buf[:n]); got != want {
+			bc.errChan <- fmt.Errorf("%s -> %s: got %d bytes, expected %d", conn.LocalAddr().String(), conn.RemoteAddr().String(), got, want)
+		}
+		if got, want := buf[0], byte(0xff); got != want {
+			bc.errChan <- fmt.Errorf("%s -> %s: got %x, want %x", conn.LocalAddr().String(), conn.RemoteAddr().String(), got, want)
+		}
+		bc.byteChan <- buf[:n]
+		rxed++
+	}
+}
+
+func byteSender(t *testing.T, nIterations int, conn net.Conn) {
+	txBytes := make([][]byte, nIterations+1)
+	for i := 0; i < nIterations; i++ {
+		p := make([]byte, i+1)
+		p[0] = 0xff
+		for j := 1; j <= i; j++ {
+			p[j] = byte(64 + i) // start at ASCII A
+		}
+		txBytes[i] = p
+		n, err := conn.Write(p)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+		if got, want := n, i+1; got != want {
+			t.Fatalf("wrote %d, not %d bytes", got, want)
+		}
+	}
+	conn.Close()
+
+	bc := globalBCTable.waitfor(conn.LocalAddr().String())
+
+	for err := range bc.errChan {
+		if err != nil {
+			t.Fatalf(err.Error())
+		}
+	}
+
+	addr := fmt.Sprintf("%s -> %s", conn.LocalAddr().String(), conn.RemoteAddr().String())
+	rxed := 0
+	for rxBytes := range bc.byteChan {
+		if got, want := len(rxBytes), rxed+1; got != want {
+			t.Fatalf("%s: got %d, want %d bytes", addr, got, want)
+		}
+		if got, want := rxBytes[0], byte(0xff); got != want {
+			t.Fatalf("%s: got %x, want %x", addr, got, want)
+		}
+		for i := 0; i < len(rxBytes); i++ {
+			if got, want := rxBytes[i], txBytes[rxed][i]; got != want {
+				t.Fatalf("%s: got %c, want %c", addr, got, want)
+			}
+		}
+		rxed++
+	}
+	if got, want := rxed, nIterations; got != want {
+		t.Fatalf("%s: got %d, want %d", addr, got, want)
+	}
+}
+
+func byteRunner(t *testing.T, ln net.Listener, dialer stream.DialerFunc, protocol, address string) {
+	nIterations := 10
+	go byteReceiver(t, ln, &backChannel{
+		byteChan: make(chan []byte, nIterations),
+		errChan:  make(chan error, nIterations),
+	})
+
+	conn := newSender(t, dialer, protocol, address)
+	defer conn.Close()
+	if err := checkProtocols(conn, protocol); err != nil {
+		t.Fatalf(err.Error())
+	}
+	byteSender(t, nIterations, conn)
+}
diff --git a/lib/websocket/ws_test.go b/lib/websocket/ws_test.go
new file mode 100644
index 0000000..f126893
--- /dev/null
+++ b/lib/websocket/ws_test.go
@@ -0,0 +1,94 @@
+package websocket_test
+
+import (
+	"net"
+	"sync"
+	"testing"
+
+	"veyron.io/veyron/veyron2/ipc/stream"
+
+	"veyron.io/veyron/veyron/lib/websocket"
+)
+
+func packetTester(t *testing.T, dialer stream.DialerFunc, listener stream.ListenerFunc, txProtocol, rxProtocol string) {
+	ln, err := listener(rxProtocol, "127.0.0.1:0")
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	defer ln.Close()
+	if got, want := ln.Addr().Network(), rxProtocol; got != want {
+		t.Fatalf("got %q, want %q", got, want)
+	}
+
+	packetRunner(t, ln, dialer, txProtocol, ln.Addr().String())
+	packetRunner(t, ln, dialer, txProtocol, ln.Addr().String())
+}
+
+func byteTester(t *testing.T, dialer stream.DialerFunc, listener stream.ListenerFunc, txProtocol, rxProtocol string) {
+	ln, err := listener(rxProtocol, "127.0.0.1:0")
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	defer ln.Close()
+	if got, want := ln.Addr().Network(), rxProtocol; got != want {
+		t.Fatalf("got %q, want %q", got, want)
+	}
+
+	byteRunner(t, ln, dialer, txProtocol, ln.Addr().String())
+	byteRunner(t, ln, dialer, txProtocol, ln.Addr().String())
+
+}
+
+func TestWSToWS(t *testing.T) {
+	byteTester(t, websocket.Dial, websocket.Listener, "ws", "ws")
+	packetTester(t, websocket.Dial, websocket.Listener, "ws", "ws")
+}
+
+func TestWSToWSH(t *testing.T) {
+	byteTester(t, websocket.Dial, websocket.HybridListener, "ws", "wsh")
+	//packetTester(t, websocket.Dial, websocket.HybridListener, "ws", "wsh")
+}
+
+func TestWSHToWSH(t *testing.T) {
+	byteTester(t, websocket.HybridDial, websocket.HybridListener, "wsh", "wsh")
+	packetTester(t, websocket.HybridDial, websocket.HybridListener, "wsh", "wsh")
+}
+
+func TestTCPToWSH(t *testing.T) {
+	byteTester(t, net.DialTimeout, websocket.HybridListener, "tcp", "wsh")
+	packetTester(t, net.DialTimeout, websocket.HybridListener, "tcp", "wsh")
+}
+
+func TestMixed(t *testing.T) {
+	ln, err := websocket.HybridListener("wsh", "127.0.0.1:0")
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	defer ln.Close()
+
+	var pwg sync.WaitGroup
+	packetTest := func(dialer stream.DialerFunc, protocol string) {
+		packetRunner(t, ln, dialer, protocol, ln.Addr().String())
+		pwg.Done()
+	}
+
+	pwg.Add(4)
+	go packetTest(websocket.Dial, "ws")
+	go packetTest(net.DialTimeout, "tcp")
+	go packetTest(websocket.Dial, "ws")
+	go packetTest(websocket.HybridDial, "wsh")
+	pwg.Wait()
+
+	var bwg sync.WaitGroup
+	byteTest := func(dialer stream.DialerFunc, protocol string) {
+		byteRunner(t, ln, dialer, protocol, ln.Addr().String())
+		bwg.Done()
+	}
+	bwg.Add(4)
+	go byteTest(websocket.Dial, "ws")
+	go byteTest(net.DialTimeout, "tcp")
+	go byteTest(websocket.Dial, "ws")
+	go byteTest(websocket.HybridDial, "wsh")
+
+	bwg.Wait()
+}
diff --git a/profiles/chrome/chrome.go b/profiles/chrome/chrome.go
index ada91dd..f2df385 100644
--- a/profiles/chrome/chrome.go
+++ b/profiles/chrome/chrome.go
@@ -9,8 +9,8 @@
 	"veyron.io/veyron/veyron2/options"
 	"veyron.io/veyron/veyron2/rt"
 
-	_ "veyron.io/veyron/veyron/lib/websocket"
 	"veyron.io/veyron/veyron/profiles/internal/platform"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
 	_ "veyron.io/veyron/veyron/runtimes/google/rt"
 )
 
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index 815b6dd..12e64d8 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -17,10 +17,11 @@
 	"veyron.io/veyron/veyron/lib/appcycle"
 	"veyron.io/veyron/veyron/lib/flags"
 	"veyron.io/veyron/veyron/lib/netstate"
-	_ "veyron.io/veyron/veyron/lib/tcp"
-	_ "veyron.io/veyron/veyron/lib/websocket"
 	"veyron.io/veyron/veyron/profiles/internal/gce"
 	"veyron.io/veyron/veyron/profiles/internal/platform"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/wsh"
 	_ "veyron.io/veyron/veyron/runtimes/google/rt"
 )
 
diff --git a/profiles/generic.go b/profiles/generic.go
index 5b6f2d7..c377531 100644
--- a/profiles/generic.go
+++ b/profiles/generic.go
@@ -7,10 +7,11 @@
 	"veyron.io/veyron/veyron2/rt"
 
 	"veyron.io/veyron/veyron/lib/appcycle"
-	_ "veyron.io/veyron/veyron/lib/tcp"
-	_ "veyron.io/veyron/veyron/lib/websocket"
 	"veyron.io/veyron/veyron/profiles/internal"
 	"veyron.io/veyron/veyron/profiles/internal/platform"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/wsh"
 	_ "veyron.io/veyron/veyron/runtimes/google/rt"
 )
 
diff --git a/profiles/roaming/roaming.go b/profiles/roaming/roaming.go
index c772848..b2e6bbc 100644
--- a/profiles/roaming/roaming.go
+++ b/profiles/roaming/roaming.go
@@ -22,10 +22,11 @@
 	"veyron.io/veyron/veyron/lib/flags"
 	"veyron.io/veyron/veyron/lib/netconfig"
 	"veyron.io/veyron/veyron/lib/netstate"
-	_ "veyron.io/veyron/veyron/lib/tcp"
-	_ "veyron.io/veyron/veyron/lib/websocket"
 	"veyron.io/veyron/veyron/profiles/internal"
 	"veyron.io/veyron/veyron/profiles/internal/platform"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/wsh"
 	_ "veyron.io/veyron/veyron/runtimes/google/rt"
 	"veyron.io/veyron/veyron/services/mgmt/debug"
 
diff --git a/profiles/static/static.go b/profiles/static/static.go
index a6b03c3..91ab924 100644
--- a/profiles/static/static.go
+++ b/profiles/static/static.go
@@ -14,12 +14,14 @@
 	"veyron.io/veyron/veyron/lib/appcycle"
 	"veyron.io/veyron/veyron/lib/flags"
 	"veyron.io/veyron/veyron/lib/netstate"
-	_ "veyron.io/veyron/veyron/lib/tcp"
-	_ "veyron.io/veyron/veyron/lib/websocket"
 	"veyron.io/veyron/veyron/profiles/internal"
 	"veyron.io/veyron/veyron/profiles/internal/platform"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/wsh"
 	_ "veyron.io/veyron/veyron/runtimes/google/rt"
 	"veyron.io/veyron/veyron/services/mgmt/debug"
+
 	// TODO(cnicolaou,ashankar): move this into flags.
 	sflag "veyron.io/veyron/veyron/security/flag"
 )
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 6705ef0..5f203f4 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -29,10 +29,9 @@
 
 	"veyron.io/veyron/veyron/lib/netstate"
 	"veyron.io/veyron/veyron/lib/stats"
-	_ "veyron.io/veyron/veyron/lib/tcp"
 	"veyron.io/veyron/veyron/lib/testutil"
 	tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
-	_ "veyron.io/veyron/veyron/lib/websocket"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
 	imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
 	"veyron.io/veyron/veyron/runtimes/google/lib/publisher"
diff --git a/lib/tcp/init.go b/runtimes/google/ipc/protocols/tcp/init.go
similarity index 100%
rename from lib/tcp/init.go
rename to runtimes/google/ipc/protocols/tcp/init.go
diff --git a/runtimes/google/ipc/protocols/ws/init.go b/runtimes/google/ipc/protocols/ws/init.go
new file mode 100644
index 0000000..587db00
--- /dev/null
+++ b/runtimes/google/ipc/protocols/ws/init.go
@@ -0,0 +1,14 @@
+package websocket
+
+import (
+	"veyron.io/veyron/veyron2/ipc/stream"
+
+	"veyron.io/veyron/veyron/lib/websocket"
+)
+
+func init() {
+	// ws, ws4, ws6 represent websocket protocol instances.
+	for _, p := range []string{"ws", "ws4", "ws6"} {
+		stream.RegisterProtocol(p, websocket.Dial, websocket.Listener)
+	}
+}
diff --git a/runtimes/google/ipc/protocols/wsh/init.go b/runtimes/google/ipc/protocols/wsh/init.go
new file mode 100644
index 0000000..53f34e0
--- /dev/null
+++ b/runtimes/google/ipc/protocols/wsh/init.go
@@ -0,0 +1,15 @@
+// Package wsh registers the websocket 'hybrid' protocol.
+// We prefer to use tcp whenever we can to avoid the overhead of websockets.
+package wsh
+
+import (
+	"veyron.io/veyron/veyron2/ipc/stream"
+
+	"veyron.io/veyron/veyron/lib/websocket"
+)
+
+func init() {
+	for _, p := range []string{"wsh", "wsh4", "wsh6"} {
+		stream.RegisterProtocol(p, websocket.HybridDial, websocket.HybridListener)
+	}
+}
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index 4c7cc05..6ac8b20 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -16,7 +16,6 @@
 	"veyron.io/veyron/veyron/lib/modules"
 	"veyron.io/veyron/veyron/lib/modules/core"
 	tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
-	_ "veyron.io/veyron/veyron/lib/websocket"
 	imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
 	inaming "veyron.io/veyron/veyron/runtimes/google/naming"
diff --git a/runtimes/google/ipc/stream/benchmark/dial_vc.go b/runtimes/google/ipc/stream/benchmark/dial_vc.go
index 2995374..e0275cf 100644
--- a/runtimes/google/ipc/stream/benchmark/dial_vc.go
+++ b/runtimes/google/ipc/stream/benchmark/dial_vc.go
@@ -5,8 +5,8 @@
 	"testing"
 	"time"
 
-	_ "veyron.io/veyron/veyron/lib/tcp"
 	"veyron.io/veyron/veyron/lib/testutil"
+	_ "veyron.io/veyron/veyron/profiles/static"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
 
 	"veyron.io/veyron/veyron2/naming"
diff --git a/runtimes/google/ipc/stream/benchmark/throughput_flow.go b/runtimes/google/ipc/stream/benchmark/throughput_flow.go
index 902aa27..374464d 100644
--- a/runtimes/google/ipc/stream/benchmark/throughput_flow.go
+++ b/runtimes/google/ipc/stream/benchmark/throughput_flow.go
@@ -4,7 +4,6 @@
 	"io"
 	"testing"
 
-	_ "veyron.io/veyron/veyron/lib/tcp"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
 
 	"veyron.io/veyron/veyron2/ipc/stream"
diff --git a/runtimes/google/ipc/stream/manager/manager_test.go b/runtimes/google/ipc/stream/manager/manager_test.go
index dc2b32d..551e0e5 100644
--- a/runtimes/google/ipc/stream/manager/manager_test.go
+++ b/runtimes/google/ipc/stream/manager/manager_test.go
@@ -19,10 +19,10 @@
 
 	"veyron.io/veyron/veyron/lib/expect"
 	"veyron.io/veyron/veyron/lib/modules"
-	_ "veyron.io/veyron/veyron/lib/tcp"
 	"veyron.io/veyron/veyron/lib/testutil"
 	tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
-	_ "veyron.io/veyron/veyron/lib/websocket"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/tcp"
+	_ "veyron.io/veyron/veyron/runtimes/google/ipc/protocols/ws"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/version"
 	inaming "veyron.io/veyron/veyron/runtimes/google/naming"
diff --git a/runtimes/google/ipc/stream/message/message.go b/runtimes/google/ipc/stream/message/message.go
index 7c5358b..0928254 100644
--- a/runtimes/google/ipc/stream/message/message.go
+++ b/runtimes/google/ipc/stream/message/message.go
@@ -78,8 +78,14 @@
 	commonHeaderSizeBytes = 4 // 1 byte type + 3 bytes payload length
 	dataHeaderSizeBytes   = 9 // 4 byte id.VC + 4 byte id.Flow + 1 byte flags
 
-	controlType = 0
-	dataType    = 1
+	// Make sure the first byte can't be ASCII to ensure that a VC
+	// header can never be confused with a web socket request.
+	// TODO(cnicolaou): remove the original controlType and dataType values
+	// when new binaries are pushed.
+	controlType   = 0
+	controlTypeWS = 0x80
+	dataType      = 1
+	dataTypeWS    = 0x81
 )
 
 var (
@@ -119,7 +125,7 @@
 	}
 	macSize := c.MACSize()
 	switch msgType {
-	case controlType:
+	case controlType, controlTypeWS:
 		if !c.Open(payload.Contents) {
 			payload.Release()
 			return nil, corruptedMessageErr
@@ -127,7 +133,7 @@
 		m, err := readControl(bytes.NewBuffer(payload.Contents[:msgPayloadSize-macSize]))
 		payload.Release()
 		return m, err
-	case dataType:
+	case dataType, dataTypeWS:
 		if !c.Open(payload.Contents[0 : dataHeaderSizeBytes+macSize]) {
 			payload.Release()
 			return nil, corruptedMessageErr
diff --git a/runtimes/google/ipc/stream/proxy/proxy_test.go b/runtimes/google/ipc/stream/proxy/proxy_test.go
index efb08e0..20500bb 100644
--- a/runtimes/google/ipc/stream/proxy/proxy_test.go
+++ b/runtimes/google/ipc/stream/proxy/proxy_test.go
@@ -11,9 +11,9 @@
 	"veyron.io/veyron/veyron2/ipc/stream"
 	"veyron.io/veyron/veyron2/naming"
 
-	_ "veyron.io/veyron/veyron/lib/tcp"
 	"veyron.io/veyron/veyron/lib/testutil"
 	tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
+	_ "veyron.io/veyron/veyron/profiles"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"