Merge branch 'master' into proxyflow
diff --git a/.gerrit_commit_message b/.gerrit_commit_message
new file mode 100644
index 0000000..beb8086
--- /dev/null
+++ b/.gerrit_commit_message
@@ -0,0 +1,22 @@
+DO NOT SUBMIT: RPC2 Experiment.
+
+This change implements a prototype POC for RPC2.
+
+All new implementation is prefixed with X and we did not change any existing
+code.
+
+Doc: https://docs.google.com/a/google.com/document/d/11NlwHsNyGg-bZU0-d43s-z0HS9pz9x4QSwpe0M8bukQ/edit?usp=sharing
+
+Main points:
+(1) VIF/VC are unified into one layer. This automatically saves us a round
+    of auth when making a direct connection (not through a proxy)
+(2) When talking through a proxy, a VC is built on top of flows from the client
+    to the proxy and from the proxy to the server. This allows us to reuse the
+    stream manager for the proxy, allowing it to be built with the stream API.
+
+Code:
+x_client.go, x_server.go are mostly identical to client.go and server.go, just
+with different types.
+The real implementation are in xvc.go, x_manager.go, rpc/proxy.go, and x_listener.go.
+
+Change-Id: I650570570d597a27274430e83a5efa118e6923c6
\ No newline at end of file
diff --git a/runtime/internal/lib/bqueue/drrqueue/drrqueue.go b/runtime/internal/lib/bqueue/drrqueue/drrqueue.go
index a42731b..d9820ae 100644
--- a/runtime/internal/lib/bqueue/drrqueue/drrqueue.go
+++ b/runtime/internal/lib/bqueue/drrqueue/drrqueue.go
@@ -230,6 +230,11 @@
 	}
 	w.updateStateLocked(true, consumed)
 
+	ss := "bufs: "
+	for _, b := range bufs {
+		ss += fmt.Sprintf(" %d", b.Size())
+	}
+
 	return bufs, bufs != nil
 }
 
diff --git a/runtime/internal/lib/iobuf/allocator.go b/runtime/internal/lib/iobuf/allocator.go
index d059564..21d370c 100644
--- a/runtime/internal/lib/iobuf/allocator.go
+++ b/runtime/internal/lib/iobuf/allocator.go
@@ -66,6 +66,14 @@
 	return a.iobuf.slice(free, base, a.index)
 }
 
+func (a *Allocator) CopyExtra(buf []byte, extra int) *Slice {
+	slice := a.Alloc(uint(len(buf) + extra))
+	copy(slice.Contents, buf)
+	slice.Contents = slice.Contents[:len(buf)]
+	slice.extra = uint(extra)
+	return slice
+}
+
 // Copy allocates a Slice and copies the buf into it.
 func (a *Allocator) Copy(buf []byte) *Slice {
 	slice := a.Alloc(uint(len(buf)))
diff --git a/runtime/internal/lib/iobuf/slice.go b/runtime/internal/lib/iobuf/slice.go
index d48a463..1da3940 100644
--- a/runtime/internal/lib/iobuf/slice.go
+++ b/runtime/internal/lib/iobuf/slice.go
@@ -7,8 +7,9 @@
 // Slice refers to an iobuf and the byte slice for the actual data.
 type Slice struct {
 	iobuf    *buf
-	free     uint   // Free area before base, if any.
-	base     uint   // Index into the underlying iobuf.
+	free     uint // Free area before base, if any.
+	base     uint // Index into the underlying iobuf.
+	extra    uint
 	Contents []byte // iobuf.Contents[base:bound]
 }
 
@@ -17,6 +18,14 @@
 	return len(slice.Contents)
 }
 
+func (slice *Slice) Extend(n int) {
+	if int(slice.extra) < n {
+		panic("not enough reserved.")
+	}
+	slice.Contents = slice.Contents[0 : len(slice.Contents)+n]
+	slice.extra -= uint(n)
+}
+
 // FreeEntirePrefix sets the free index to zero.  Be careful when using this,
 // you should ensure that no Slices are using the free region.
 func (slice *Slice) FreeEntirePrefix() {
diff --git a/runtime/internal/rpc/proxy/proxy.go b/runtime/internal/rpc/proxy/proxy.go
new file mode 100644
index 0000000..8328311
--- /dev/null
+++ b/runtime/internal/rpc/proxy/proxy.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package proxy
+
+import (
+	"fmt"
+	"io"
+
+	"v.io/x/ref/runtime/internal/lib/iobuf"
+	"v.io/x/ref/runtime/internal/rpc/stream"
+	"v.io/x/ref/runtime/internal/rpc/stream/crypto"
+	"v.io/x/ref/runtime/internal/rpc/stream/manager"
+	"v.io/x/ref/runtime/internal/rpc/stream/message"
+
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/security"
+	"v.io/x/lib/vlog"
+)
+
+var nullCipher = crypto.NullControlCipher{}
+
+type proxy struct {
+	mgr       stream.XManager
+	principal security.Principal
+}
+
+func New(protocol, address string, rid naming.RoutingID, ctx *context.T, principal security.Principal) (*proxy, func(), naming.Endpoint, error) {
+	proxy := &proxy{
+		mgr:       manager.XInternalNew(ctx, rid),
+		principal: principal,
+	}
+	ln, ep, err := proxy.mgr.Listen(protocol, address, proxy.principal, proxy.principal.BlessingStore().Default())
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	go proxy.listenLoop(ln)
+	// TODO(suharshs): implement proxy close function.
+	return proxy, func() {}, ep, nil
+}
+
+func (p *proxy) listenLoop(ln stream.XListener) {
+	for {
+		f, err := ln.Accept()
+		if err != nil {
+			vlog.Errorf("ln.Accept failed: %v", err)
+		}
+		go func() {
+			if err := p.processFlow(f); err != nil {
+				vlog.Errorf("processFlow failed: %v", err)
+			}
+		}()
+	}
+}
+
+func (p *proxy) processFlow(f stream.XFlow) error {
+	r := iobuf.NewReader(iobuf.NewPool(0), f)
+	m, err := message.ReadFrom(r, nullCipher)
+	if err != nil {
+		return fmt.Errorf("failed to read message: %v", err)
+	}
+	switch msg := m.(type) {
+	case *message.SetupVC:
+		fout, err := p.mgr.Dial(msg.RemoteEndpoint, p.principal)
+		if err != nil {
+			return fmt.Errorf("p.mgr.Dial: %v", err)
+		}
+		// Forward the setupVC message.
+		if err := message.WriteTo(fout, msg, nullCipher); err != nil {
+			return fmt.Errorf("message.WriteTo(fout): %v", err)
+		}
+		// Disable encryption on the flows since all data sent after the setupVC message
+		// will be encryption between the ends.
+		f.DisableEncryption()
+		fout.DisableEncryption()
+		go p.forwardLoop(f, fout)
+		go p.forwardLoop(fout, f)
+		return nil
+	default:
+		return fmt.Errorf("unexpected message: %v", m)
+	}
+}
+
+func (p *proxy) forwardLoop(fin, fout stream.XFlow) {
+	for {
+		_, err := io.Copy(fin, fout)
+		if err == io.EOF {
+			return
+		} else if err != nil {
+			vlog.Errorf("f.Read failed: %v", err)
+			return
+		}
+	}
+}
diff --git a/runtime/internal/rpc/stream/manager/xlistener.go b/runtime/internal/rpc/stream/manager/xlistener.go
new file mode 100644
index 0000000..a899891
--- /dev/null
+++ b/runtime/internal/rpc/stream/manager/xlistener.go
@@ -0,0 +1,130 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+	"net"
+
+	"v.io/x/lib/vlog"
+	"v.io/x/ref/runtime/internal/lib/upcqueue"
+	"v.io/x/ref/runtime/internal/rpc/stream"
+	"v.io/x/ref/runtime/internal/rpc/stream/vc"
+	iversion "v.io/x/ref/runtime/internal/rpc/version"
+
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/security"
+	"v.io/v23/verror"
+)
+
+type adapter interface {
+	Accept() (stream.XConn, naming.Endpoint, error)
+	Close() error
+}
+
+type xListener struct {
+	m          *xmanager
+	q          *upcqueue.T
+	principal  security.Principal
+	lBlessings security.Blessings
+	ln         adapter
+}
+
+func (ln *xListener) acceptLoop(ctx *context.T) {
+	for {
+		conn, lep, err := ln.ln.Accept()
+		if err != nil {
+			vlog.Errorf("netAcceptLoop: %v", err)
+			return
+		}
+		// TODO(suharshs): Get the versions here correctly.
+		vc, err := vc.XNewAccepted(ctx, conn, ln.principal, lep, ln.lBlessings, *iversion.SupportedRange, ln.q)
+		if err != nil {
+			vlog.Errorf("netAcceptLoop: %v", err)
+			return
+		}
+		if err := ln.m.vcCache.Insert(vc); err != nil {
+			vlog.Errorf("m.vcCache.Insert: %v", err)
+		}
+	}
+}
+
+func (ln *xListener) Accept() (stream.XFlow, error) {
+	item, err := ln.q.Get(nil)
+	switch {
+	case err == upcqueue.ErrQueueIsClosed:
+		return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil))
+	case err != nil:
+		return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
+	default:
+		return item.(stream.XFlow), nil
+	}
+}
+
+func (ln *xListener) Close() error {
+	// TODO(suharshs): implement this for real.
+	ln.q.Close()
+	return nil
+}
+
+type netAdapter struct {
+	net.Listener
+	rid naming.RoutingID
+}
+
+func (n *netAdapter) Accept() (stream.XConn, naming.Endpoint, error) {
+	if conn, err := n.Listener.Accept(); err != nil {
+		return nil, nil, err
+	} else {
+		return xNetConn{conn}, localEndpoint(conn, n.rid), nil
+	}
+}
+
+func newXNetListener(ctx *context.T, m *xmanager, netLn net.Listener, principal security.Principal, blessings security.Blessings) stream.XListener {
+	ln := &xListener{
+		m:          m,
+		q:          upcqueue.New(),
+		ln:         &netAdapter{netLn, m.rid},
+		principal:  principal,
+		lBlessings: blessings,
+	}
+	go ln.acceptLoop(ctx)
+	return ln
+}
+
+type proxyAdapter struct {
+	q *upcqueue.T
+}
+
+func (p *proxyAdapter) Accept() (stream.XConn, naming.Endpoint, error) {
+	item, err := p.q.Get(nil)
+	f := item.(stream.XFlow)
+	return f, f.LocalEndpoint(), err
+}
+
+func (p *proxyAdapter) Close() error {
+	p.q.Close()
+	return nil
+}
+
+func newXProxyListener(proxyEP naming.Endpoint, m *xmanager, ctx *context.T, principal security.Principal) (stream.XListener, error) {
+	pa := &proxyAdapter{upcqueue.New()}
+	ln := &xListener{
+		m:          m,
+		q:          upcqueue.New(),
+		ln:         pa,
+		principal:  principal,
+		lBlessings: principal.BlessingStore().Default(),
+	}
+	_, vc, err := m.internalDial(proxyEP, ln.principal)
+	if err != nil {
+		return nil, err
+	}
+	if err := vc.ListenTo(pa.q); err != nil {
+		return nil, err
+	}
+	go ln.acceptLoop(ctx)
+	return ln, nil
+}
diff --git a/runtime/internal/rpc/stream/manager/xmanager.go b/runtime/internal/rpc/stream/manager/xmanager.go
new file mode 100644
index 0000000..34b797e
--- /dev/null
+++ b/runtime/internal/rpc/stream/manager/xmanager.go
@@ -0,0 +1,178 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+	"net"
+	"time"
+
+	inaming "v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/runtime/internal/rpc/stream"
+	"v.io/x/ref/runtime/internal/rpc/stream/vc"
+	iversion "v.io/x/ref/runtime/internal/rpc/version"
+
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/verror"
+)
+
+type xmanager struct {
+	ctx     *context.T // TODO(mattr): This should be pushed into the managers interfaces.
+	rid     naming.RoutingID
+	vcCache *vc.XVCCache
+}
+
+func XInternalNew(ctx *context.T, rid naming.RoutingID) stream.XManager {
+	return &xmanager{
+		ctx:     ctx,
+		rid:     rid,
+		vcCache: vc.NewXVCCache(),
+	}
+}
+
+func xdial(network, address string, timeout time.Duration) (net.Conn, error) {
+	d, _, _, _ := rpc.RegisteredProtocol(network)
+	if d != nil {
+		conn, err := d(network, address, timeout)
+		if err != nil {
+			return nil, verror.New(stream.ErrDialFailed, nil, err)
+		}
+		return conn, nil
+	}
+	return nil, verror.New(stream.ErrDialFailed, nil, verror.New(errUnknownNetwork, nil, network))
+}
+
+func (m *xmanager) Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.XListener, naming.Endpoint, error) {
+	bNames, err := extractBlessingNames(principal, blessings)
+	if err != nil {
+		return nil, nil, err
+	}
+	ln, ep, err := m.internalListen(protocol, address, principal, blessings, opts...)
+	if err != nil {
+		return nil, nil, err
+	}
+	ep.Blessings = bNames
+	return ln, ep, nil
+}
+
+func (m *xmanager) internalListen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.XListener, *inaming.Endpoint, error) {
+	if protocol == inaming.Network {
+		// Act as if listening on the address of a remote proxy.
+		ep, err := inaming.NewEndpoint(address)
+		if err != nil {
+			return nil, nil, verror.New(stream.ErrBadArg, nil, verror.New(errEndpointParseError, nil, address, err))
+		}
+		return m.remoteListen(ep, principal)
+	}
+
+	netln, err := listen(protocol, address)
+	if err != nil {
+		return nil, nil, err
+	}
+
+	ln := newXNetListener(m.ctx, m, netln, principal, blessings)
+	ep := &inaming.Endpoint{
+		Protocol: protocol,
+		Address:  netln.Addr().String(),
+		RID:      m.rid,
+	}
+	return ln, ep, nil
+}
+
+func (m *xmanager) remoteListen(proxy naming.Endpoint, principal security.Principal) (stream.XListener, *inaming.Endpoint, error) {
+	ln, err := newXProxyListener(proxy, m, m.ctx, principal)
+	if err != nil {
+		return nil, nil, err
+	}
+	ep := &inaming.Endpoint{
+		Protocol: proxy.Addr().Network(),
+		Address:  proxy.Addr().String(),
+		RID:      m.rid,
+	}
+	return ln, ep, nil
+}
+
+func (m *xmanager) Dial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.XFlow, error) {
+	f, _, err := m.internalDial(remote, principal, opts...)
+	return f, err
+}
+
+func (m *xmanager) internalDial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.XFlow, stream.XVC, error) {
+	var timeout time.Duration
+	for _, o := range opts {
+		switch v := o.(type) {
+		case DialTimeout:
+			timeout = time.Duration(v)
+		}
+	}
+	var v stream.XVC
+	var err error
+	if v, err = m.vcCache.ReservedFind(remote); err != nil {
+		return nil, nil, err
+	}
+	defer m.vcCache.Unreserve(remote)
+	if v == nil {
+		conn, err := xdial(remote.Addr().Network(), remote.Addr().String(), timeout)
+		if err != nil {
+			return nil, nil, err
+		}
+		local := localEndpoint(conn, m.rid)
+		if v, err = vc.XNewDialed(m.ctx, xNetConn{conn}, principal, local, remote, *iversion.SupportedRange); err != nil {
+			return nil, nil, err
+		}
+		if err := m.vcCache.Insert(v); err != nil {
+			return nil, nil, err
+		}
+	}
+
+	if remote.RoutingID() != v.(backingVC).RemoteEndpoint().RoutingID() {
+		flow, err := v.Connect()
+		if err != nil {
+			return nil, nil, err
+		}
+		if v, err = vc.XNewDialed(m.ctx, flow, principal, flow.LocalEndpoint(), remote, *iversion.SupportedRange); err != nil {
+			return nil, nil, err
+		}
+	}
+	f, err := v.Connect()
+	return f, v, err
+}
+
+func (m *xmanager) Shutdown() {
+	// TODO(suharshs): Implement this.
+}
+
+func (m *xmanager) RoutingID() naming.RoutingID {
+	return m.rid
+}
+
+type backingVC interface {
+	LocalEndpoint() naming.Endpoint
+	RemoteEndpoint() naming.Endpoint
+	LocalPrincipal() security.Principal
+	LocalBlessings() security.Blessings
+	RemoteBlessings() security.Blessings
+	LocalDischarges() map[string]security.Discharge
+	RemoteDischarges() map[string]security.Discharge
+	VCDataCache() stream.VCDataCache
+}
+
+func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+	localAddr := conn.LocalAddr()
+	ep := &inaming.Endpoint{
+		Protocol: localAddr.Network(),
+		Address:  localAddr.String(),
+		RID:      rid,
+	}
+	return ep
+}
+
+type xNetConn struct {
+	net.Conn
+}
+
+func (xNetConn) DisableEncryption() {}
diff --git a/runtime/internal/rpc/stream/manager/xmanager_test.go b/runtime/internal/rpc/stream/manager/xmanager_test.go
new file mode 100644
index 0000000..6ff4dc4
--- /dev/null
+++ b/runtime/internal/rpc/stream/manager/xmanager_test.go
@@ -0,0 +1,148 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"os"
+	"strings"
+	"testing"
+
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/vtrace"
+	"v.io/x/ref/lib/flags"
+	"v.io/x/ref/runtime/internal/rpc/proxy"
+	"v.io/x/ref/runtime/internal/rpc/stream"
+	"v.io/x/ref/runtime/internal/rpc/stream/manager"
+	ivtrace "v.io/x/ref/runtime/internal/vtrace"
+	"v.io/x/ref/test/testutil"
+)
+
+func Init() (*context.T, context.CancelFunc) {
+	ctx, cancel := context.RootContext()
+	ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{
+		CollectRegexp:  ".*",
+		DumpOnShutdown: true,
+		CacheSize:      100,
+	})
+	if err != nil {
+		panic(err)
+	}
+	return ctx, func() {
+		vtrace.FormatTraces(os.Stderr, vtrace.GetStore(ctx).TraceRecords(), nil)
+		cancel()
+	}
+}
+
+func TestDirectConnection(t *testing.T) {
+	ctx, cancel := Init()
+	defer cancel()
+
+	p := testutil.NewPrincipal("test")
+	rid := naming.FixedRoutingID(0x5555)
+	m := manager.XInternalNew(ctx, rid)
+	want := "read this please"
+
+	ln, ep, err := m.Listen("tcp", "127.0.0.1:0", p, p.BlessingStore().Default())
+	if err != nil {
+		t.Error(err)
+	}
+
+	go func(ln stream.XListener) {
+		flow, err := ln.Accept()
+		if err != nil {
+			t.Error(err)
+		}
+		got, err := readLine(flow)
+		if err != nil {
+			t.Error(err)
+		}
+		if got != want {
+			t.Errorf("got %v, want %v", got, want)
+		}
+		ln.Close()
+	}(ln)
+
+	flow, err := m.Dial(ep, p)
+	if err != nil {
+		t.Error(err)
+	}
+	writeLine(flow, want)
+}
+
+func TestProxyConnection(t *testing.T) {
+	ctx, cancel := Init()
+	defer cancel()
+
+	p := testutil.NewPrincipal("test")
+	srid := naming.FixedRoutingID(0x5555)
+	crid := naming.FixedRoutingID(0x4444)
+	prid := naming.FixedRoutingID(0x6666)
+	sm := manager.XInternalNew(ctx, srid)
+	cm := manager.XInternalNew(ctx, crid)
+	want := "read this please"
+
+	_, sd, pep, err := proxy.New("tcp", "127.0.0.1:0", prid, ctx, p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer sd()
+	fmt.Printf("proxy ep: %v\n", pep)
+
+	// listen on the proxy
+	ln, ep, err := sm.Listen("v23", pep.String(), p, p.BlessingStore().Default())
+	fmt.Printf("server ep: %v\n", ep)
+	if err != nil {
+		t.Error(err)
+	}
+
+	go func(ln stream.XListener) {
+		flow, err := ln.Accept()
+		if err != nil {
+			t.Error(err)
+		}
+		got, err := readLine(flow)
+		if err != nil {
+			t.Error(err)
+		}
+		if got != want {
+			t.Errorf("got %v, want %v", got, want)
+		}
+		ln.Close()
+	}(ln)
+
+	flow, err := cm.Dial(ep, p)
+	if err != nil {
+		t.Error(err)
+	}
+
+	writeLine(flow, want)
+}
+
+func readLine(f stream.XFlow) (string, error) {
+	var result bytes.Buffer
+	var buf [5]byte
+	for {
+		n, err := f.Read(buf[:])
+		result.Write(buf[:n])
+		if err == io.EOF || buf[n-1] == '\n' {
+			return strings.TrimRight(result.String(), "\n"), nil
+		}
+		if err != nil {
+			return "", fmt.Errorf("Read returned (%d, %v)", n, err)
+		}
+	}
+}
+
+func writeLine(f stream.XFlow, data string) error {
+	data = data + "\n"
+	if n, err := f.Write([]byte(data)); err != nil {
+		return fmt.Errorf("Write returned (%d, %v)", n, err)
+	}
+	return nil
+}
diff --git a/runtime/internal/rpc/stream/message/data.go b/runtime/internal/rpc/stream/message/data.go
index 784b603..bb6196f 100644
--- a/runtime/internal/rpc/stream/message/data.go
+++ b/runtime/internal/rpc/stream/message/data.go
@@ -25,6 +25,10 @@
 // SetClose sets the Close flag of the message.
 func (d *Data) SetClose() { d.flags |= 0x1 }
 
+func (d *Data) Encrypted() bool { return d.flags&0x2 == 2 }
+
+func (d *Data) SetEncrypted() { d.flags |= 0x2 }
+
 // Release releases the Payload
 func (d *Data) Release() {
 	if d.Payload != nil {
diff --git a/runtime/internal/rpc/stream/message/message.go b/runtime/internal/rpc/stream/message/message.go
index 5e4b790..1ef3d10 100644
--- a/runtime/internal/rpc/stream/message/message.go
+++ b/runtime/internal/rpc/stream/message/message.go
@@ -159,6 +159,15 @@
 			Payload: payload,
 		}
 		m.Payload.TruncateFront(uint(dataHeaderSizeBytes + macSize))
+		if m.Encrypted() {
+			if !c.Open(m.Payload.Contents) {
+				m.Payload.Release()
+				return nil, verror.New(errCorruptedMessage, nil)
+			}
+			// TODO(mattr): This is probably not exactly right, but the iobuf code
+			// is strange and complex.
+			m.Payload.Contents = m.Payload.Contents[:m.Payload.Size()-macSize]
+		}
 		return m, nil
 	default:
 		payload.Release()
@@ -190,6 +199,12 @@
 		write4ByteUint(header[8:12], uint32(m.Flow))
 		header[12] = m.flags
 		EncryptMessage(msg.Contents, c)
+		if m.Encrypted() {
+			// TODO(mattr): Ensure that when we create an encrypted data message
+			// we leave macSize bytes free at the end.
+			msg.Extend(macSize)
+			c.Seal(msg.Contents[HeaderSizeBytes+macSize:])
+		}
 		_, err := w.Write(msg.Contents)
 		msg.Release()
 		return err
diff --git a/runtime/internal/rpc/stream/message/message_test.go b/runtime/internal/rpc/stream/message/message_test.go
index 72c108e..c5b4155 100644
--- a/runtime/internal/rpc/stream/message/message_test.go
+++ b/runtime/internal/rpc/stream/message/message_test.go
@@ -233,7 +233,7 @@
 func TestDataNoPayload(t *testing.T) {
 	tests := []Data{
 		{VCI: 10, Flow: 3},
-		{VCI: 11, Flow: 4, flags: 10},
+		{VCI: 11, Flow: 4},
 	}
 	var c testControlCipher
 	pool := iobuf.NewPool(0)
diff --git a/runtime/internal/rpc/stream/vc/auth.go b/runtime/internal/rpc/stream/vc/auth.go
index b85a4f1..7bca61d 100644
--- a/runtime/internal/rpc/stream/vc/auth.go
+++ b/runtime/internal/rpc/stream/vc/auth.go
@@ -8,10 +8,12 @@
 	"bytes"
 	"io"
 
+	"v.io/v23/context"
 	"v.io/v23/rpc/version"
 	"v.io/v23/security"
 	"v.io/v23/verror"
 	"v.io/v23/vom"
+	"v.io/v23/vtrace"
 
 	"v.io/x/ref/runtime/internal/lib/iobuf"
 	"v.io/x/ref/runtime/internal/rpc/stream"
@@ -55,7 +57,7 @@
 	}
 	// Note that since the client uses a self-signed blessing to authenticate
 	// during VC setup, it does not share any discharges.
-	client, _, err := readBlessings(conn, authClientContextTag, crypter, v)
+	client, _, err := readBlessings(nil, conn, authClientContextTag, crypter, v)
 	if err != nil {
 		return security.Blessings{}, nil, err
 	}
@@ -69,7 +71,7 @@
 // The client will only share its blessings if the server (who shares its
 // blessings first) is authorized as per the authorizer for this RPC.
 func AuthenticateAsClient(conn io.ReadWriteCloser, crypter crypto.Crypter, v version.RPCVersion, params security.CallParams, auth *ServerAuthorizer) (security.Blessings, security.Blessings, map[string]security.Discharge, error) {
-	server, serverDischarges, err := readBlessings(conn, authServerContextTag, crypter, v)
+	server, serverDischarges, err := readBlessings(nil, conn, authServerContextTag, crypter, v)
 	if err != nil {
 		return security.Blessings{}, security.Blessings{}, nil, err
 	}
@@ -113,7 +115,9 @@
 	if err := enc.Encode(discharges); err != nil {
 		return verror.New(stream.ErrNetwork, nil, verror.New(errVomEncodeBlessing, nil, err))
 	}
-	msg, err := crypter.Encrypt(iobuf.NewSlice(buf.Bytes()))
+	bts := buf.Bytes()
+
+	msg, err := crypter.Encrypt(iobuf.NewSlice(bts))
 	if err != nil {
 		return err
 	}
@@ -125,36 +129,56 @@
 	return nil
 }
 
-func readBlessings(r io.Reader, tag []byte, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, error) {
+func readBlessings(ctx *context.T, r io.Reader, tag []byte, crypter crypto.Crypter, v version.RPCVersion) (security.Blessings, map[string]security.Discharge, error) {
+	var span vtrace.Span
+	if ctx != nil {
+		ctx, span = vtrace.WithNewSpan(ctx, "Read blessings")
+		defer span.Finish()
+	}
+
 	var msg []byte
 	var noBlessings security.Blessings
+	_, span = vtrace.WithNewSpan(ctx, "First vom Decoding")
 	dec := vom.NewDecoder(r)
 	if err := dec.Decode(&msg); err != nil {
 		return noBlessings, nil, verror.New(stream.ErrNetwork, nil, verror.New(errHandshakeMessage, nil, err))
 	}
+	span.Finish()
+
+	_, span = vtrace.WithNewSpan(ctx, "Checking channel binding")
 	buf, err := crypter.Decrypt(iobuf.NewSlice(msg))
 	if err != nil {
 		return noBlessings, nil, err
 	}
 	defer buf.Release()
+	span.Finish()
+
 	dec = vom.NewDecoder(bytes.NewReader(buf.Contents))
 	var (
 		blessings security.Blessings
 		sig       security.Signature
 	)
+	_, span = vtrace.WithNewSpan(ctx, "Second vom Decoding")
 	if err = dec.Decode(&sig); err != nil {
 		return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
 	}
+	span.Finish()
+	_, span = vtrace.WithNewSpan(ctx, "Third vom Decoding")
 	if err = dec.Decode(&blessings); err != nil {
 		return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
 	}
+	span.Finish()
+	_, span = vtrace.WithNewSpan(ctx, "Fourth vom Decoding")
 	var discharges []security.Discharge
 	if err := dec.Decode(&discharges); err != nil {
 		return noBlessings, nil, verror.New(stream.ErrNetwork, nil, err)
 	}
+	span.Finish()
+	_, span = vtrace.WithNewSpan(ctx, "Signature verify")
 	if !sig.Verify(blessings.PublicKey(), append(tag, crypter.ChannelBinding()...)) {
 		return noBlessings, nil, verror.New(stream.ErrSecurity, nil, verror.New(errInvalidSignatureInMessage, nil))
 	}
+	span.Finish()
 	return blessings, mkDischargeMap(discharges), nil
 }
 
diff --git a/runtime/internal/rpc/stream/vc/knobs.go b/runtime/internal/rpc/stream/vc/knobs.go
index 7271f7a..70539fb 100644
--- a/runtime/internal/rpc/stream/vc/knobs.go
+++ b/runtime/internal/rpc/stream/vc/knobs.go
@@ -32,4 +32,8 @@
 	// Special flow over which discharges for third-party caveats
 	// on the server's blessings are sent.
 	DischargeFlowID = 4
+
+	// These virtual flows are used as queue IDs in XVC.
+	ExpressFlowID = 5
+	StopFlowID    = 6
 )
diff --git a/runtime/internal/rpc/stream/vc/writer.go b/runtime/internal/rpc/stream/vc/writer.go
index 684a357..ff4fba1 100644
--- a/runtime/internal/rpc/stream/vc/writer.go
+++ b/runtime/internal/rpc/stream/vc/writer.go
@@ -111,6 +111,12 @@
 	w.muSharedCountersBorrowed.Unlock()
 }
 
+func (w *writer) DisableEncryption() {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	w.Sink.Put(disableEncryption, w.deadline)
+}
+
 // Write implements the Write call for a Flow.
 //
 // Flow control is achieved using receive buffers (aka counters), wherein the
@@ -151,7 +157,9 @@
 			w.muSharedCountersBorrowed.Unlock()
 			w.Sink.Release(n)
 		}
-		slice := w.Alloc.Copy(b[:n])
+		// TODO(mattr): We're leaving space here for the MAC.  This is a gross
+		// way to do it.  The whole allocator thing is done in an unpleasant way here.
+		slice := w.Alloc.CopyExtra(b[:n], 16)
 		if err := w.Sink.Put(slice, w.deadline); err != nil {
 			slice.Release()
 			atomic.AddUint32(&w.totalBytes, uint32(written))
diff --git a/runtime/internal/rpc/stream/vc/xvc.go b/runtime/internal/rpc/stream/vc/xvc.go
new file mode 100644
index 0000000..14e3a93
--- /dev/null
+++ b/runtime/internal/rpc/stream/vc/xvc.go
@@ -0,0 +1,824 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"sync"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/rpc/version"
+	"v.io/v23/security"
+	"v.io/v23/vom"
+	"v.io/v23/vtrace"
+	"v.io/x/lib/vlog"
+	"v.io/x/ref/runtime/internal/lib/bqueue"
+	"v.io/x/ref/runtime/internal/lib/bqueue/drrqueue"
+	"v.io/x/ref/runtime/internal/lib/iobuf"
+	vsync "v.io/x/ref/runtime/internal/lib/sync"
+	"v.io/x/ref/runtime/internal/lib/upcqueue"
+	"v.io/x/ref/runtime/internal/rpc/stream"
+	"v.io/x/ref/runtime/internal/rpc/stream/crypto"
+	"v.io/x/ref/runtime/internal/rpc/stream/id"
+	"v.io/x/ref/runtime/internal/rpc/stream/message"
+	iversion "v.io/x/ref/runtime/internal/rpc/version"
+)
+
+var pool = iobuf.NewPool(0)
+var nullCipher = crypto.NullControlCipher{}
+var stopToken = iobuf.NewSlice(make([]byte, 1))
+
+// This is kind of a hack.  If you send this down a flow then all
+// bytes sent after will be sent unencyrpted.
+var disableEncryption = iobuf.NewSlice(make([]byte, 1))
+
+const (
+	// Priorities of the buffered queues used for flow control of writes.
+	authPriority bqueue.Priority = iota
+	expressPriority
+	systemXFlowPriority
+	normalXFlowPriority
+	stopPriority
+)
+
+type flowState struct {
+	flow               *flow
+	encryptionDisabled bool
+}
+
+type xvc struct {
+	mu                       sync.Mutex
+	flowMap                  map[id.Flow]*flowState
+	lDischarges, rDischarges map[string]security.Discharge
+	nextConnectFID           id.Flow
+
+	incommingMu sync.Mutex
+	incomming   *upcqueue.T
+
+	cipher                 crypto.ControlCipher
+	version                version.RPCVersion
+	principal              security.Principal
+	lBlessings, rBlessings security.Blessings
+	local, remote          naming.Endpoint
+	conn                   stream.XConn
+	outgoing               bqueue.T
+	expressQ, stopQ        bqueue.Writer
+	sharedCounters         *vsync.Semaphore
+	reader                 *iobuf.Reader
+	closed                 chan struct{}
+	dataCache              *dataCache
+	dClient                DischargeClient
+	dBuffer                time.Duration
+}
+
+func commonInit(
+	ctx *context.T,
+	conn stream.XConn,
+	principal security.Principal,
+	versions iversion.Range) (*xvc, stream.XFlow, error) {
+	vc := &xvc{
+		principal:      principal,
+		cipher:         nullCipher,
+		conn:           conn,
+		outgoing:       drrqueue.New(MaxPayloadSizeBytes),
+		flowMap:        make(map[id.Flow]*flowState),
+		sharedCounters: vsync.NewSemaphore(),
+		closed:         make(chan struct{}),
+		dataCache:      newDataCache(),
+	}
+	// Both sides create reserved flows ahead of time.
+	authFlow, err := vc.newFlow(AuthFlowID, authPriority)
+	authFlow.Release(DefaultBytesBufferedPerFlow)
+	return vc, authFlow, err
+}
+
+func XNewDialed(
+	ctx *context.T,
+	conn stream.XConn,
+	principal security.Principal,
+	local, remote naming.Endpoint,
+	versions iversion.Range,
+	opts ...stream.VCOpt) (stream.XVC, error) {
+	ctx, span := vtrace.WithNewSpan(ctx, "Dialing VC")
+	defer span.Finish()
+
+	vc, authFlow, err := commonInit(ctx, conn, principal, versions)
+	if err != nil {
+		return nil, err
+	}
+
+	vc.local = local
+	vc.nextConnectFID = NumReservedFlows
+
+	errch := make(chan error)
+	go func() {
+		errch <- vc.setup(ctx, local, remote, versions, principal != nil)
+	}()
+	if err := <-errch; err != nil {
+		vc.Close(err)
+		return nil, err
+	}
+	if principal == nil {
+		// Exit early for unencrypted VCs.
+		return vc, nil
+	}
+	if err := vc.authenticateClient(ctx, authFlow, opts...); err != nil {
+		vc.Close(err)
+		return nil, err
+	}
+	return vc, nil
+}
+
+func XNewAccepted(
+	ctx *context.T,
+	conn stream.XConn,
+	principal security.Principal,
+	local naming.Endpoint,
+	lBlessings security.Blessings,
+	versions iversion.Range,
+	incomming *upcqueue.T,
+	opts ...stream.ListenerOpt) (stream.XVC, error) {
+	ctx, span := vtrace.WithNewSpan(ctx, "Accepting VC")
+	defer span.Finish()
+
+	vc, authFlow, err := commonInit(ctx, conn, principal, versions)
+	if err != nil {
+		return nil, err
+	}
+	vc.incommingMu.Lock()
+	// We unlock this only AFTER authentication is finished to ensure
+	// that no flows are announced until we're fully initialized.
+	// TODO(mattr): Find a better way.
+	defer vc.incommingMu.Unlock()
+	vc.incomming = incomming
+
+	vc.local = local
+	vc.lBlessings = lBlessings
+	vc.nextConnectFID = NumReservedFlows + 1
+	vc.dClient, vc.dBuffer = dischargeOptions(opts)
+
+	errch := make(chan error)
+	go func() {
+		// TODO(mattr): Sending local twice, but... what should we send?
+		errch <- vc.setup(ctx, local, local, versions, principal != nil)
+	}()
+	if err := <-errch; err != nil {
+		vc.Close(err)
+		return nil, err
+	}
+	if principal == nil {
+		//Exit early for unencrypted VCs.
+		return vc, nil
+	}
+	if err := vc.authenticateServer(ctx, authFlow, opts...); err != nil {
+		vc.Close(err)
+		return nil, err
+	}
+	return vc, nil
+}
+
+func (vc *xvc) setup(
+	ctx *context.T,
+	local, remote naming.Endpoint,
+	versions iversion.Range,
+	encrypted bool) error {
+	ctx, span := vtrace.WithNewSpan(ctx, "Setup")
+	defer span.Finish()
+
+	_, keySpan := vtrace.WithNewSpan(ctx, "Generate Key")
+	pk, sk, err := crypto.GenerateBoxKey()
+	if err != nil {
+		return err
+	}
+	keySpan.Finish()
+
+	setup := &message.SetupVC{
+		LocalEndpoint:  local,
+		RemoteEndpoint: remote,
+		Counters:       message.NewCounters(),
+		Setup: message.Setup{
+			Versions: versions,
+			Options:  []message.SetupOption{&message.NaclBox{PublicKey: *pk}},
+		},
+	}
+	setup.Counters.Add(0, SharedFlowID, DefaultBytesBufferedPerFlow)
+
+	// Send and receive a SetupVC message.
+	// Note we use the nullCipher here.  The XConn is handling encryption.
+	errch := make(chan error, 1)
+	go func() {
+		_, span := vtrace.WithNewSpan(ctx, "Writing setup")
+		errch <- message.WriteTo(vc.conn, setup, nullCipher)
+		span.Finish()
+	}()
+	_, readSpan := vtrace.WithNewSpan(ctx, "Reading setup")
+	vc.reader = iobuf.NewReader(pool, vc.conn)
+	msg, err := message.ReadFrom(vc.reader, nullCipher)
+	if err != nil {
+		return err
+	}
+	remoteSetup, ok := msg.(*message.SetupVC)
+	if !ok {
+		return fmt.Errorf("wrong message.")
+	}
+	readSpan.Finish()
+
+	if err := <-errch; err != nil {
+		return err
+	}
+	vc.remote = remoteSetup.LocalEndpoint
+
+	// Release shared counters
+	vc.distributeCounters(remoteSetup.Counters)
+
+	// Choose a version.
+	vrange, err := versions.Intersect(&remoteSetup.Setup.Versions)
+	if err != nil {
+		return err
+	}
+	vc.version = vrange.Max
+
+	// Set up the cipher.
+	if encrypted {
+		_, naclSpan := vtrace.WithNewSpan(ctx, "Creating Cipher")
+		remoteKey := remoteSetup.Setup.NaclBox()
+		if remoteKey == nil {
+			return fmt.Errorf("missing key.")
+		}
+		// Disable the underlying encryption.  We'll do our own from now on.
+		vc.conn.DisableEncryption()
+
+		vc.cipher = crypto.NewControlCipherRPC11(pk, sk, &remoteKey.PublicKey)
+		naclSpan.Finish()
+	}
+
+	vc.expressQ, err = vc.outgoing.NewWriter(ExpressFlowID, expressPriority, DefaultBytesBufferedPerFlow)
+	if err != nil {
+		return err
+	}
+	vc.expressQ.Release(-1) // Disable flow control
+	vc.stopQ, err = vc.outgoing.NewWriter(StopFlowID, stopPriority, DefaultBytesBufferedPerFlow)
+	if err != nil {
+		return err
+	}
+	vc.stopQ.Release(-1) // Disable flow control
+
+	// Start read and write loops
+	go vc.readLoop()
+	go vc.writeLoop()
+
+	return nil
+}
+
+func (vc *xvc) connectSystemFlows() error {
+	conn, err := vc.connect(TypeFlowID, systemXFlowPriority)
+	if err != nil {
+		return err
+	}
+	vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(conn))
+	vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(conn))
+
+	if len(vc.rBlessings.ThirdPartyCaveats()) > 0 {
+		conn, err = vc.connect(DischargeFlowID, systemXFlowPriority)
+		if err != nil {
+			return err
+		}
+		go vc.recvDischargesLoop(conn)
+	}
+	return nil
+}
+
+func (vc *xvc) sendDischargesLoop(conn io.WriteCloser, tpCavs []security.Caveat) {
+	defer conn.Close()
+	if vc.dClient == nil {
+		return
+	}
+	enc := vom.NewEncoder(conn)
+	discharges := vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
+	for expiry := minExpiryTime(discharges, tpCavs); !expiry.IsZero(); expiry = minExpiryTime(discharges, tpCavs) {
+		select {
+		case <-time.After(fetchDuration(expiry, vc.dBuffer)):
+			discharges = vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
+			if err := enc.Encode(discharges); err != nil {
+				vlog.Errorf("encoding discharges on VC %v failed: %v", vc, err)
+				return
+			}
+			if len(discharges) == 0 {
+				continue
+			}
+			vc.mu.Lock()
+			if vc.lDischarges == nil {
+				vc.lDischarges = make(map[string]security.Discharge)
+			}
+			for _, d := range discharges {
+				vc.lDischarges[d.ID()] = d
+			}
+			vc.mu.Unlock()
+		case <-vc.closed:
+			vlog.VI(3).Infof("closing sendDischargesLoop on VC %v", vc)
+			return
+		}
+	}
+}
+
+func (vc *xvc) recvDischargesLoop(conn io.ReadCloser) {
+	defer conn.Close()
+	dec := vom.NewDecoder(conn)
+	for {
+		var discharges []security.Discharge
+		if err := dec.Decode(&discharges); err != nil {
+			vlog.VI(3).Infof("decoding discharges on %v failed: %v", vc, err)
+			return
+		}
+		if len(discharges) == 0 {
+			continue
+		}
+		vc.mu.Lock()
+		if vc.rDischarges == nil {
+			vc.rDischarges = make(map[string]security.Discharge)
+		}
+		for _, d := range discharges {
+			vc.rDischarges[d.ID()] = d
+		}
+		vc.mu.Unlock()
+	}
+}
+
+func (vc *xvc) handleMessage(msg message.T) error {
+	switch m := msg.(type) {
+
+	case *message.Data:
+		payload := m.Payload
+		defer func() {
+			if payload != nil {
+				payload.Release()
+			}
+		}()
+		vc.mu.Lock()
+		fs := vc.flowMap[m.Flow]
+		if m.Close() && fs != nil {
+			defer fs.flow.Shutdown()
+			delete(vc.flowMap, m.Flow)
+		}
+		vc.mu.Unlock()
+		if payload.Size() == 0 {
+			return nil
+		}
+		if fs == nil {
+			vlog.Infof("Ignoring data packet for unkown flow: %#v", msg)
+			return nil
+		}
+		if err := fs.flow.reader.Put(payload); err != nil {
+			vlog.Errorf("Could not queue data packet: %#v", msg)
+			return err
+		}
+		payload = nil
+
+	case *message.AddReceiveBuffers:
+		vc.distributeCounters(m.Counters)
+
+	case *message.OpenFlow:
+		if err := vc.accept(m.Flow, int(m.InitialCounters)); err != nil {
+			cm := &message.Data{Flow: m.Flow}
+			cm.SetClose()
+			if err := vc.sendOnExpressQ(cm); err != nil {
+				return err
+			}
+		}
+		// TODO(mattr): The first write uses shared counters, but the first write is
+		// always just 1 byte :(
+		counters := message.NewCounters()
+		counters.Add(0, m.Flow, uint32(DefaultBytesBufferedPerFlow))
+		err := vc.sendOnExpressQ(&message.AddReceiveBuffers{
+			Counters: counters,
+		})
+		return err
+
+	case *message.CloseVC:
+		vc.stopQ.Put(stopToken, nil)
+		if m.Error != "" {
+			vlog.Errorf("VC Closed from remote end due to error: %s", m.Error)
+			return fmt.Errorf(m.Error)
+		}
+		return io.EOF
+
+	default:
+		vlog.Infof("Ignoring irrelevant or unrecognized message %T: ", m)
+	}
+	return nil
+}
+
+func (vc *xvc) distributeCounters(counters message.Counters) {
+	for cid, bytes := range counters {
+		if cid.Flow() == SharedFlowID {
+			vc.sharedCounters.IncN(uint(bytes))
+			continue
+		}
+		vc.mu.Lock()
+		f := vc.flowMap[cid.Flow()]
+		vc.mu.Unlock()
+		if f == nil {
+			vlog.Infof("ignoring counters for unknown flow: %d", cid.Flow())
+			continue
+		}
+		f.flow.Release(int(bytes))
+	}
+}
+
+func (vc *xvc) readLoop() {
+	for {
+		msg, err := message.ReadFrom(vc.reader, vc.cipher)
+		if err != nil {
+			vlog.Errorf("Exiting VC read loop: %v", err)
+			return
+		}
+		if err := vc.handleMessage(msg); err != nil {
+			if err != io.EOF {
+				vlog.Errorf("Could not handle message: %v", err)
+			}
+			return
+		}
+	}
+}
+
+func releaseBufs(bufs []*iobuf.Slice) {
+	for _, b := range bufs {
+		if b != disableEncryption && b != stopToken {
+			b.Release()
+		}
+	}
+}
+
+func (vc *xvc) writeData(writer bqueue.Writer, bufs []*iobuf.Slice) error {
+	defer releaseBufs(bufs)
+	fid := id.Flow(writer.ID())
+	var encryptionDisabled bool
+	vc.mu.Lock()
+	fs := vc.flowMap[fid]
+	if fs != nil {
+		encryptionDisabled = fs.encryptionDisabled
+	}
+	vc.mu.Unlock()
+
+	// TODO(mattr): coalesce, but only after finding the encryption boundary.
+	last := len(bufs) - 1
+	// TODO(mattr): This is broken if disableEncryption is last.
+	drained := writer.IsDrained()
+
+	if drained {
+		defer func() {
+			vc.mu.Lock()
+			delete(vc.flowMap, fid)
+			vc.mu.Unlock()
+		}()
+	}
+
+	for i, b := range bufs {
+		if b == disableEncryption {
+			encryptionDisabled = true
+			vc.mu.Lock()
+			if fs != nil {
+				fs.encryptionDisabled = true
+			}
+			vc.mu.Unlock()
+			continue
+		}
+
+		d := &message.Data{Flow: fid, Payload: b}
+		if !encryptionDisabled {
+			d.SetEncrypted()
+		}
+		if drained && i == last {
+			d.SetClose()
+		}
+		if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil {
+			return err
+		}
+	}
+	if len(bufs) == 0 && drained {
+		d := &message.Data{Flow: fid}
+		d.SetClose()
+		if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (vc *xvc) writeSerializedMessage(msg *iobuf.Slice) error {
+	// TODO(mattr): All of this would be easier if the queues contained messages
+	// instead of byte arrays.
+	if err := message.EncryptMessage(msg.Contents, vc.cipher); err != nil {
+		return err
+	}
+	if _, err := vc.conn.Write(msg.Contents); err != nil {
+		return err
+	}
+	msg.Release()
+	return nil
+}
+
+func (vc *xvc) writeLoop() {
+	defer vc.outgoing.Close()
+	defer close(vc.closed)
+	for {
+		writer, bufs, err := vc.outgoing.Get(nil)
+		if err != nil {
+			return
+		}
+		switch writer {
+		case vc.expressQ:
+			for _, b := range bufs {
+				if err = vc.writeSerializedMessage(b); err != nil {
+					vlog.Errorf("Could not write control message: %v", err)
+					return
+				}
+			}
+		case vc.stopQ:
+			if len(bufs) > 0 {
+				// If we get stopToken here, then the vc was closed by the remote end.
+				if bufs[0] != stopToken {
+					if err := vc.writeSerializedMessage(bufs[0]); err != nil {
+						vlog.Errorf("Could not write CloseVC: %v", err)
+					}
+				}
+			}
+			return
+		default:
+			if err = vc.writeData(writer, bufs); err != nil {
+				vlog.Errorf("Error writing: %v", err)
+				return
+			}
+		}
+	}
+}
+
+func (vc *xvc) authenticateServer(ctx *context.T, authFlow stream.XFlow, opts ...stream.ListenerOpt) error {
+	ctx, span := vtrace.WithNewSpan(ctx, "Server Authentication")
+	defer span.Finish()
+	defer authFlow.Close()
+
+	if vc.lBlessings.IsZero() {
+		return fmt.Errorf("no server blessings")
+	}
+
+	// TODO(mattr): Check that authFlow has the right fid.
+
+	// This is a wart.  We always pass a null crypter (with channel binding) even
+	// though this is all encrypted by the flow write.  This is just because we
+	// want to re-use the existing Authenticate methods.
+	crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding())
+
+	var serverDischarges []security.Discharge
+	if tpcavs := vc.lBlessings.ThirdPartyCaveats(); len(tpcavs) > 0 && vc.dClient != nil {
+		_, span := vtrace.WithNewSpan(ctx, "Prepare Discharages")
+		serverDischarges = vc.dClient.PrepareDischarges(ctx, tpcavs, security.DischargeImpetus{})
+		span.Finish()
+	}
+	_, wspan := vtrace.WithNewSpan(ctx, "writeBlessings")
+	if err := writeBlessings(authFlow, authServerContextTag, crypter, vc.principal, vc.lBlessings, serverDischarges, vc.version); err != nil {
+		return err
+	}
+	wspan.Finish()
+
+	// Note that since the client uses a self-signed blessing to authenticate
+	// during VC setup, it does not share any discharges.
+	rBlessings, _, err := readBlessings(ctx, authFlow, authClientContextTag, crypter, vc.version)
+	if err != nil {
+		return err
+	}
+	vc.rBlessings = rBlessings
+	vc.lDischarges = mkDischargeMap(serverDischarges)
+	return nil
+}
+
+func (vc *xvc) authenticateClient(ctx *context.T, authFlow stream.XFlow, opts ...stream.VCOpt) error {
+	ctx, span := vtrace.WithNewSpan(ctx, "Client Authentication")
+	defer span.Finish()
+	defer authFlow.Close()
+
+	// This is a wart.  We always pass a null crypter (with channel binding) even
+	// though this is all encrypted by the flow write.  This is just because we
+	// want to re-use the existing Authenticate methods.
+	crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding())
+
+	server, serverDischarges, err := readBlessings(ctx, authFlow, authServerContextTag, crypter, vc.version)
+	if err != nil {
+		return err
+	}
+
+	// Authorize the server based on the provided authorizer.
+	auth := serverAuthorizer(opts)
+	if auth != nil {
+		_, authSpan := vtrace.WithNewSpan(ctx, "Authorize Server")
+		params := security.CallParams{
+			LocalPrincipal:   vc.principal,
+			LocalEndpoint:    vc.local,
+			RemoteEndpoint:   vc.remote,
+			RemoteBlessings:  server,
+			RemoteDischarges: serverDischarges,
+		}
+		if err := auth.Authorize(params); err != nil {
+			return fmt.Errorf("Not trusted.")
+		}
+		authSpan.Finish()
+	}
+
+	// The client shares its blessings at RPC time (as the blessings may vary
+	// across RPCs). During VC handshake, the client simply sends a self-signed
+	// blessing in order to reveal its public key to the server.
+	// TODO(suharshs): Actually we should just send the public key.
+	_, blessSpan := vtrace.WithNewSpan(ctx, "Bless self")
+	client, err := vc.principal.BlessSelf("vcauth")
+	if err != nil {
+		return fmt.Errorf("Could not create blessing")
+	}
+	blessSpan.Finish()
+	_, writeSpan := vtrace.WithNewSpan(ctx, "Write blessings")
+	if err := writeBlessings(authFlow, authClientContextTag, crypter, vc.principal, client, nil, vc.version); err != nil {
+		return err
+	}
+	writeSpan.Finish()
+
+	vc.lBlessings = client
+	vc.rBlessings = server
+	vc.rDischarges = serverDischarges
+
+	return vc.connectSystemFlows()
+}
+
+func (vc *xvc) newFlow(fid id.Flow, priority bqueue.Priority) (*flow, error) {
+	qw, err := vc.outgoing.NewWriter(bqueue.ID(fid), priority, MaxPayloadSizeBytes)
+	if err != nil {
+		return nil, err
+	}
+	f := &flow{
+		backingVC: vc,
+		reader:    newReader(&xReadHandler{fid, vc}),
+		writer:    newWriter(MaxPayloadSizeBytes, qw, iobuf.NewAllocator(pool, 0), vc.sharedCounters),
+	}
+
+	vc.mu.Lock()
+	defer vc.mu.Unlock()
+	if vc.flowMap == nil {
+		qw.Close()
+		return nil, fmt.Errorf("Could not create flow on closed VC.")
+	}
+	// TODO(mattr): it's an error if fid already exists.
+	vc.flowMap[fid] = &flowState{f, true}
+
+	return f, nil
+}
+
+func (vc *xvc) sendOnExpressQ(msg message.T) error {
+	return vc.sendOn(vc.expressQ, msg)
+}
+
+func (vc *xvc) sendOn(w bqueue.Writer, msg message.T) error {
+	var buf bytes.Buffer
+	if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(vc.cipher)); err != nil {
+		return err
+	}
+	err := w.Put(iobuf.NewSlice(buf.Bytes()), nil)
+	return err
+}
+
+func (vc *xvc) connect(fid id.Flow, priority bqueue.Priority, opts ...stream.FlowOpt) (stream.XFlow, error) {
+	f, err := vc.newFlow(fid, priority)
+	if err != nil {
+		return nil, err
+	}
+
+	vc.sendOnExpressQ(&message.OpenFlow{
+		Flow:            fid,
+		InitialCounters: uint32(DefaultBytesBufferedPerFlow),
+	})
+	return f, nil
+}
+
+func (vc *xvc) accept(fid id.Flow, initialCounters int) error {
+	priority := normalXFlowPriority
+	if fid < NumReservedFlows {
+		priority = systemXFlowPriority
+	}
+	f, err := vc.newFlow(fid, priority)
+	if err != nil {
+		return err
+	}
+	f.Release(initialCounters)
+	switch fid {
+	case TypeFlowID:
+		vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(f))
+		vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(f))
+		return nil
+	case DischargeFlowID:
+		tpCaveats := vc.lBlessings.ThirdPartyCaveats()
+		if len(tpCaveats) > 0 {
+			go vc.sendDischargesLoop(f, tpCaveats)
+		}
+		return nil
+	default:
+		vc.incommingMu.Lock()
+		q := vc.incomming
+		vc.incommingMu.Unlock()
+
+		if q == nil {
+			err = fmt.Errorf("VC not listening")
+		} else {
+			err = q.Put(f)
+		}
+		if err != nil {
+			vc.mu.Lock()
+			f.Shutdown()
+			delete(vc.flowMap, fid)
+			vc.mu.Unlock()
+		}
+		return err
+	}
+}
+
+func (vc *xvc) Connect(opts ...stream.FlowOpt) (stream.XFlow, error) {
+	vc.mu.Lock()
+	fid := vc.nextConnectFID
+	vc.nextConnectFID += 2
+	vc.mu.Unlock()
+	return vc.connect(fid, normalXFlowPriority, opts...)
+}
+
+func (vc *xvc) ListenTo(q *upcqueue.T) error {
+	vc.incommingMu.Lock()
+	defer vc.incommingMu.Unlock()
+	if vc.incomming != nil {
+		return fmt.Errorf("The given address already has a listener.")
+	}
+	vc.incomming = q
+	return nil
+}
+
+func (vc *xvc) Closed() chan struct{} {
+	return vc.closed
+}
+
+func (vc *xvc) Close(reason error) error {
+	vc.mu.Lock()
+	flows := vc.flowMap
+	vc.flowMap = nil
+	vc.mu.Unlock()
+
+	vc.incommingMu.Lock()
+	q := vc.incomming
+	vc.incomming = nil
+	vc.incommingMu.Unlock()
+
+	if q != nil {
+		q.Close()
+	}
+	vc.sharedCounters.Close()
+
+	for _, fs := range flows {
+		fs.flow.Close()
+	}
+
+	msg := ""
+	if reason != nil {
+		msg = reason.Error()
+	}
+	err := vc.sendOn(vc.stopQ, &message.CloseVC{
+		Error: msg,
+	})
+	return err
+}
+
+// Implement the backingVC interface for flows.
+// Note I don't bother with locking because these are all set by the time the
+// constructor is done.
+func (vc *xvc) LocalEndpoint() naming.Endpoint                  { return vc.local }
+func (vc *xvc) RemoteEndpoint() naming.Endpoint                 { return vc.remote }
+func (vc *xvc) LocalPrincipal() security.Principal              { return vc.principal }
+func (vc *xvc) LocalBlessings() security.Blessings              { return vc.lBlessings }
+func (vc *xvc) RemoteBlessings() security.Blessings             { return vc.rBlessings }
+func (vc *xvc) LocalDischarges() map[string]security.Discharge  { return vc.lDischarges }
+func (vc *xvc) RemoteDischarges() map[string]security.Discharge { return vc.rDischarges }
+func (vc *xvc) VCDataCache() stream.VCDataCache                 { return vc.dataCache }
+
+type xReadHandler struct {
+	flow id.Flow
+	vc   *xvc
+}
+
+func (h *xReadHandler) HandleRead(bytes uint) {
+	counters := message.NewCounters()
+	counters.Add(0, h.flow, uint32(bytes))
+	err := h.vc.sendOnExpressQ(&message.AddReceiveBuffers{
+		Counters: counters,
+	})
+	if err != nil {
+		vlog.Errorf("Unable to send counters: %v", err)
+	}
+}
diff --git a/runtime/internal/rpc/stream/vc/xvc_cache.go b/runtime/internal/rpc/stream/vc/xvc_cache.go
new file mode 100644
index 0000000..73818d8
--- /dev/null
+++ b/runtime/internal/rpc/stream/vc/xvc_cache.go
@@ -0,0 +1,119 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+	"fmt"
+	"sync"
+
+	"v.io/x/ref/runtime/internal/rpc/stream"
+
+	"v.io/v23/naming"
+	"v.io/v23/verror"
+)
+
+var errXVCCacheClosed = reg(".errXVCCacheClosed", "vc cache has been closed")
+
+// XVCCache implements a set of VIFs keyed by the endpoint of the remote end and the
+// local principal. Multiple goroutines can invoke methods on the XVCCache simultaneously.
+type XVCCache struct {
+	mu       sync.Mutex
+	epCache  map[string]*xvc           // GUARDED_BY(mu)
+	ridCache map[naming.RoutingID]*xvc // GUARDED_BY(mu)
+	started  map[string]bool           // GUARDED_BY(mu)
+	cond     *sync.Cond
+}
+
+// NewXVCCache returns a new cache for XVCs.
+func NewXVCCache() *XVCCache {
+	c := &XVCCache{
+		epCache:  make(map[string]*xvc),
+		ridCache: make(map[naming.RoutingID]*xvc),
+		started:  make(map[string]bool),
+	}
+	c.cond = sync.NewCond(&c.mu)
+	return c
+}
+
+// ReservedFind returns a XVC where the remote end of the underlying connection
+// is identified by the provided (ep). Returns nil if there is no
+// such XVC.
+//
+// Iff the cache is closed, ReservedFind will return an error.
+// If ReservedFind has no error, the caller is required to call Unreserve, to avoid deadlock.
+// The ep in Unreserve must be the same as the one used in the ReservedFind call.
+// During this time, all new ReservedFind calls for this ep will Block until
+// the corresponding Unreserve call is made.
+func (c *XVCCache) ReservedFind(ep naming.Endpoint) (stream.XVC, error) {
+	if ret, ok := c.ridCache[ep.RoutingID()]; ok {
+		return ret, nil
+	}
+	k := ep.String()
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	for c.started[k] {
+		c.cond.Wait()
+	}
+	if c.epCache == nil {
+		return nil, verror.New(errXVCCacheClosed, nil)
+	}
+	c.started[k] = true
+	ret := c.epCache[k]
+	if ret == nil {
+		return nil, nil
+	}
+	return ret, nil
+}
+
+// Unreserve marks the status of the ep as no longer started, and
+// broadcasts waiting threads.
+func (c *XVCCache) Unreserve(ep naming.Endpoint) {
+	c.mu.Lock()
+	delete(c.started, ep.String())
+	c.cond.Broadcast()
+	c.mu.Unlock()
+}
+
+// Insert adds vc to the cache and returns an error iff the cache has been closed.
+func (c *XVCCache) Insert(v stream.XVC) error {
+	vc, ok := v.(*xvc)
+	if !ok {
+		return fmt.Errorf("vc %#v is not of type *vc.xvc", vc)
+	}
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.epCache == nil {
+		return verror.New(errXVCCacheClosed, nil)
+	}
+	c.epCache[vc.RemoteEndpoint().String()] = vc
+	c.ridCache[vc.RemoteEndpoint().RoutingID()] = vc
+	return nil
+}
+
+// Close marks the XVCCache as closed and returns the XVCs remaining in the cache.
+func (c *XVCCache) Close() []*xvc {
+	c.mu.Lock()
+	vcs := make([]*xvc, 0, len(c.epCache))
+	for _, vc := range c.epCache {
+		vcs = append(vcs, vc)
+	}
+	c.epCache = nil
+	c.ridCache = nil
+	c.started = nil
+	c.mu.Unlock()
+	return vcs
+}
+
+// Delete removes vc from the cache, returning an error iff the cache has been closed.
+func (c *XVCCache) Delete(vc *xvc) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.epCache == nil {
+		return verror.New(errXVCCacheClosed, nil)
+	}
+	delete(c.epCache, vc.RemoteEndpoint().String())
+	delete(c.ridCache, vc.RemoteEndpoint().RoutingID())
+	return nil
+}
diff --git a/runtime/internal/rpc/stream/vc/xvc_test.go b/runtime/internal/rpc/stream/vc/xvc_test.go
new file mode 100644
index 0000000..da6eab0
--- /dev/null
+++ b/runtime/internal/rpc/stream/vc/xvc_test.go
@@ -0,0 +1,142 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vc
+
+import (
+	"bytes"
+	"crypto/rand"
+	"io"
+	"net"
+	"os"
+	"testing"
+
+	"v.io/v23/context"
+	"v.io/v23/vtrace"
+	"v.io/x/ref/lib/flags"
+	"v.io/x/ref/runtime/internal/lib/upcqueue"
+	"v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/runtime/internal/rpc/stream"
+	"v.io/x/ref/runtime/internal/rpc/version"
+	ivtrace "v.io/x/ref/runtime/internal/vtrace"
+	"v.io/x/ref/test/testutil"
+)
+
+var (
+	provider = testutil.NewIDProvider("root")
+	pClient  = testutil.NewPrincipal()
+	pServer  = testutil.NewPrincipal()
+)
+
+func init() {
+	provider.Bless(pClient, "client")
+	provider.Bless(pServer, "server")
+}
+
+func Init() (*context.T, context.CancelFunc) {
+	ctx, cancel := context.RootContext()
+	ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{
+		CollectRegexp:  ".*",
+		DumpOnShutdown: true,
+		CacheSize:      100,
+	})
+	if err != nil {
+		panic(err)
+	}
+	return ctx, func() {
+		vtrace.FormatTraces(os.Stderr, vtrace.GetStore(ctx).TraceRecords(), nil)
+		cancel()
+	}
+}
+
+type xConn struct {
+	net.Conn
+}
+
+func (xConn) DisableEncryption() {}
+
+func pipe() (xConn, xConn) {
+	a, b := net.Pipe()
+	return xConn{a}, xConn{b}
+}
+
+func createVCs(clientCtx, serverCtx *context.T, q *upcqueue.T) (client, server stream.XVC) {
+	xc1, xc2 := pipe()
+	clientEP := &naming.Endpoint{Address: "client"}
+	serverEP := &naming.Endpoint{Address: "server"}
+	vrange := version.Range{Min: 10, Max: 10}
+	blessings := pServer.BlessingStore().Default()
+	schan := make(chan stream.XVC, 1)
+	go func() {
+		server, err := XNewAccepted(serverCtx, xc2, pServer, serverEP, blessings, vrange, q)
+		if err != nil {
+			panic(err)
+		}
+		schan <- server
+	}()
+	client, err := XNewDialed(clientCtx, xc1, pClient, clientEP, serverEP, vrange)
+	if err != nil {
+		panic(err)
+	}
+	return client, <-schan
+}
+
+func TestXVC(t *testing.T) {
+	ctx, cancel := Init()
+	defer cancel()
+
+	clientCtx, clientspan := vtrace.WithNewTrace(ctx)
+	defer clientspan.Finish()
+	serverCtx, serverspan := vtrace.WithNewTrace(ctx)
+	defer serverspan.Finish()
+
+	q := upcqueue.New()
+
+	client, server := createVCs(clientCtx, serverCtx, q)
+
+	ch := make(chan []byte)
+	go func() {
+		_, span := vtrace.WithNewSpan(serverCtx, "server read")
+		defer span.Finish()
+		item, err := q.Get(nil)
+		if err != nil {
+			panic(err)
+		}
+		sflow := item.(stream.XFlow)
+		recvbuf := make([]byte, 4<<20)
+		n, err := io.ReadFull(sflow, recvbuf)
+		if err != nil {
+			panic(err)
+		}
+		if n != 4<<20 {
+			t.Errorf("Expected %d bytes got %d", 4<<20, n)
+		}
+		ch <- recvbuf
+	}()
+
+	_, span := vtrace.WithNewSpan(clientCtx, "client write")
+	cflow, err := client.Connect()
+	if err != nil {
+		panic(err)
+	}
+	sendbuf := make([]byte, 4<<20)
+	_, err = io.ReadFull(rand.Reader, sendbuf)
+	if err != nil {
+		panic(err)
+	}
+	_, err = cflow.Write(sendbuf)
+	if err != nil {
+		panic(err)
+	}
+	cflow.Close()
+	span.Finish()
+
+	if bytes.Compare(sendbuf, <-ch) != 0 {
+		t.Errorf("Didn't get the sent data.")
+	}
+
+	// Close the client and wait for the server to close.
+	client.Close(nil)
+	<-server.Closed()
+}
diff --git a/runtime/internal/rpc/stream/xmodel.go b/runtime/internal/rpc/stream/xmodel.go
new file mode 100644
index 0000000..3981a2c
--- /dev/null
+++ b/runtime/internal/rpc/stream/xmodel.go
@@ -0,0 +1,55 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package stream
+
+import (
+	"io"
+	"v.io/v23/naming"
+	"v.io/v23/security"
+
+	"v.io/x/ref/runtime/internal/lib/upcqueue"
+)
+
+type XFlow interface {
+	Flow
+
+	// DisableEncryption disables encryption for the flow.  All bytes
+	// written afterward will be sent in the clear. All calls after the
+	// first have no effect.
+	DisableEncryption()
+}
+
+type XManager interface {
+	Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...ListenerOpt) (XListener, naming.Endpoint, error)
+	Dial(remote naming.Endpoint, principal security.Principal, opts ...VCOpt) (XFlow, error)
+	Shutdown()
+	RoutingID() naming.RoutingID
+}
+
+type XConn interface {
+	io.ReadWriteCloser
+
+	// DisableEncryption disables encryption for the flow.  All bytes
+	// written afterward will be sent in the clear. All calls after the
+	// first have no effect.
+	DisableEncryption()
+}
+
+type XListener interface {
+	Accept() (XFlow, error)
+	Close() error
+}
+
+// TODO(mattr): XVC doesn't need to be an interface, and shouldn't be in the
+// model file.
+type XVC interface {
+	Connect(opts ...FlowOpt) (XFlow, error)
+	ListenTo(q *upcqueue.T) error
+	// Close closes the VC and all flows on it, allowing any pending writes in
+	// flows to drain.
+	Close(reason error) error
+	Closed() chan struct{}
+	VCDataCache() VCDataCache
+}
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
new file mode 100644
index 0000000..c7886ec
--- /dev/null
+++ b/runtime/internal/rpc/x_test.go
@@ -0,0 +1,81 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"v.io/x/ref/runtime/internal/rpc/proxy"
+	"v.io/x/ref/runtime/internal/rpc/stream/manager"
+	tnaming "v.io/x/ref/runtime/internal/testing/mocks/naming"
+	"v.io/x/ref/test/testutil"
+
+	"v.io/v23"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+)
+
+func TestProxyRPCX(t *testing.T) {
+	ctx, shutdown := initForTest()
+	defer shutdown()
+
+	p := testutil.NewPrincipal("test")
+	ctx, err := v23.WithPrincipal(ctx, p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	srid := naming.FixedRoutingID(0x5555)
+	crid := naming.FixedRoutingID(0x4444)
+	prid := naming.FixedRoutingID(0x6666)
+	sm := manager.XInternalNew(ctx, srid)
+	cm := manager.XInternalNew(ctx, crid)
+	ns := tnaming.NewSimpleNamespace()
+
+	client, err := XInternalNewClient(cm, ns)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	server, err := XInternalNewServer(ctx, sm, ns, nil, "", client, p)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	_, sd, pep, err := proxy.New("tcp", "127.0.0.1:0", prid, ctx, p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer sd()
+	fmt.Printf("proxy ep: %v\n", pep)
+
+	spec := rpc.ListenSpec{Proxy: pep.Name()}
+	// the endpoint to the proxy wont be returned here because connecting to the
+	// proxy happens asynchronously.
+	_, err = server.Listen(spec)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if err := server.Serve("", &testServer{}, nil); err != nil {
+		t.Fatal(err)
+	}
+
+	var proxies []rpc.ProxyStatus
+	for len(proxies) == 0 {
+		time.Sleep(100 * time.Millisecond)
+		t.Log("checking for proxies")
+		proxies = server.Status().Proxies
+	}
+	ep := proxies[0].Endpoint
+
+	var got string
+	if err := client.Call(ctx, ep.Name(), "Echo", []interface{}{"arg"}, []interface{}{&got}); err != nil {
+		t.Fatal(err)
+	}
+	if want := fmt.Sprintf("method:%q,suffix:%q,arg:%q", "Echo", "", "arg"); got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
new file mode 100644
index 0000000..6d32f6b
--- /dev/null
+++ b/runtime/internal/rpc/xclient.go
@@ -0,0 +1,436 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+	"fmt"
+	"net"
+	"time"
+
+	"v.io/x/lib/vlog"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/i18n"
+	"v.io/v23/namespace"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/verror"
+	"v.io/v23/vtrace"
+
+	inaming "v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/runtime/internal/rpc/stream"
+	"v.io/x/ref/runtime/internal/rpc/stream/vc"
+)
+
+type xclient struct {
+	streamMgr          stream.XManager
+	ns                 namespace.T
+	vcOpts             []stream.VCOpt // vc opts passed to dial
+	preferredProtocols []string
+
+	// We cache the IP networks on the device since it is not that cheap to read
+	// network interfaces through os syscall.
+	// TODO(jhahn): Add monitoring the network interface changes.
+	ipNets []*net.IPNet
+
+	dc vc.DischargeClient
+}
+
+var _ rpc.Client = (*xclient)(nil)
+
+func XInternalNewClient(streamMgr stream.XManager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+	c := &xclient{
+		streamMgr: streamMgr,
+		ns:        ns,
+		ipNets:    ipNetworks(),
+	}
+	c.dc = InternalNewDischargeClient(nil, c, 0)
+	for _, opt := range opts {
+		// Collect all client opts that are also vc opts.
+		switch v := opt.(type) {
+		case stream.VCOpt:
+			c.vcOpts = append(c.vcOpts, v)
+		case PreferredProtocols:
+			c.preferredProtocols = v
+		}
+	}
+
+	return c, nil
+}
+
+func (c *xclient) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
+	suberr := func(err error) *verror.SubErr {
+		return &verror.SubErr{Err: err, Options: verror.Print}
+	}
+	sm := c.streamMgr
+	flow, err := sm.Dial(ep, principal, vcOpts...)
+	if err != nil {
+		return nil, suberr(err)
+	}
+	return flow, nil
+}
+
+func (c *xclient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
+	return c.startCall(ctx, name, method, args, opts)
+}
+
+func (c *xclient) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
+	deadline := getDeadline(ctx, opts)
+
+	var lastErr error
+	for retries := uint(0); ; retries++ {
+		call, err := c.startCall(ctx, name, method, inArgs, opts)
+		if err != nil {
+			return err
+		}
+		err = call.Finish(outArgs...)
+		if err == nil {
+			return nil
+		}
+		lastErr = err
+		// We only retry if RetryBackoff is returned by the application because other
+		// RetryConnection and RetryRefetch required actions by the client before
+		// retrying.
+		if !shouldRetryBackoff(verror.Action(lastErr), deadline, opts) {
+			vlog.VI(4).Infof("Cannot retry after error: %s", lastErr)
+			break
+		}
+		if !backoff(retries, deadline) {
+			break
+		}
+		vlog.VI(4).Infof("Retrying due to error: %s", lastErr)
+	}
+	return lastErr
+}
+
+// startCall ensures StartCall always returns verror.E.
+func (c *xclient) startCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
+	if !ctx.Initialized() {
+		return nil, verror.ExplicitNew(verror.ErrBadArg, i18n.LangID("en-us"), "<rpc.Client>", "StartCall", "context not initialized")
+	}
+	ctx, span := vtrace.WithNewSpan(ctx, fmt.Sprintf("<rpc.Client>%q.%s", name, method))
+	if err := canCreateServerAuthorizer(ctx, opts); err != nil {
+		return nil, verror.New(verror.ErrBadArg, ctx, err)
+	}
+
+	deadline := getDeadline(ctx, opts)
+
+	var lastErr error
+	for retries := uint(0); ; retries++ {
+		call, action, requireResolve, err := c.tryCall(ctx, name, method, args, opts)
+		if err == nil {
+			return call, nil
+		}
+		lastErr = err
+		if !shouldRetry(action, requireResolve, deadline, opts) {
+			span.Annotatef("Cannot retry after error: %s", err)
+			break
+		}
+		if !backoff(retries, deadline) {
+			break
+		}
+		span.Annotatef("Retrying due to error: %s", err)
+	}
+	return nil, lastErr
+}
+
+// tryCreateFlow attempts to establish a Flow to "server" (which must be a
+// rooted name), over which a method invocation request could be sent.
+//
+// The server at the remote end of the flow is authorized using the provided
+// authorizer, both during creation of the VC underlying the flow and the
+// flow itself.
+// TODO(cnicolaou): implement real, configurable load balancing.
+func (c *xclient) tryCreateFlow(ctx *context.T, principal security.Principal, index int, name, server, method string, auth security.Authorizer, ch chan<- *serverStatus, vcOpts []stream.VCOpt) {
+	status := &serverStatus{index: index, server: server}
+	var span vtrace.Span
+	ctx, span = vtrace.WithNewSpan(ctx, "<client>tryCreateFlow")
+	span.Annotatef("address:%v", server)
+	defer func() {
+		ch <- status
+		span.Finish()
+	}()
+
+	suberr := func(err error) *verror.SubErr {
+		return &verror.SubErr{
+			Name:    suberrName(server, name, method),
+			Err:     err,
+			Options: verror.Print,
+		}
+	}
+
+	address, suffix := naming.SplitAddressName(server)
+	if len(address) == 0 {
+		status.serverErr = suberr(verror.New(errNonRootedName, ctx, server))
+		return
+	}
+	status.suffix = suffix
+
+	ep, err := inaming.NewEndpoint(address)
+	if err != nil {
+		status.serverErr = suberr(verror.New(errInvalidEndpoint, ctx))
+		return
+	}
+	if status.flow, status.serverErr = c.createFlow(ctx, principal, ep, append(vcOpts, &vc.ServerAuthorizer{Suffix: status.suffix, Method: method, Policy: auth})); status.serverErr != nil {
+		status.serverErr.Name = suberrName(server, name, method)
+		vlog.VI(2).Infof("rpc: Failed to create Flow with %v: %v", server, status.serverErr.Err)
+		return
+	}
+
+	// Authorize the remote end of the flow using the provided authorizer.
+	if status.flow.LocalPrincipal() == nil {
+		// LocalPrincipal is nil which means we are operating under
+		// SecurityNone.
+		return
+	}
+
+	seccall := security.NewCall(&security.CallParams{
+		LocalPrincipal:   status.flow.LocalPrincipal(),
+		LocalBlessings:   status.flow.LocalBlessings(),
+		RemoteBlessings:  status.flow.RemoteBlessings(),
+		LocalEndpoint:    status.flow.LocalEndpoint(),
+		RemoteEndpoint:   status.flow.RemoteEndpoint(),
+		RemoteDischarges: status.flow.RemoteDischarges(),
+		Method:           method,
+		Suffix:           status.suffix,
+	})
+	if err := auth.Authorize(ctx, seccall); err != nil {
+		// We will test for errServerAuthorizeFailed in failedTryCall and report
+		// verror.ErrNotTrusted
+		status.serverErr = suberr(verror.New(errServerAuthorizeFailed, ctx, status.flow.RemoteBlessings(), err))
+		vlog.VI(2).Infof("rpc: Failed to authorize Flow created with server %v: %s", server, status.serverErr.Err)
+		status.flow.Close()
+		status.flow = nil
+		return
+	}
+	status.blessings, status.rejectedBlessings = security.RemoteBlessingNames(ctx, seccall)
+	return
+}
+
+// tryCall makes a single attempt at a call. It may connect to multiple servers
+// (all that serve "name"), but will invoke the method on at most one of them
+// (the server running on the most preferred protcol and network amongst all
+// the servers that were successfully connected to and authorized).
+// if requireResolve is true on return, then we shouldn't bother retrying unless
+// you can re-resolve.
+func (c *xclient) tryCall(ctx *context.T, name, method string, args []interface{}, opts []rpc.CallOpt) (call rpc.ClientCall, action verror.ActionCode, requireResolve bool, err error) {
+	var resolved *naming.MountEntry
+	var blessingPattern security.BlessingPattern
+	blessingPattern, name = security.SplitPatternName(name)
+	if resolved, err = c.ns.Resolve(ctx, name, getNamespaceOpts(opts)...); err != nil {
+		// We always return NoServers as the error so that the caller knows
+		// that's ok to retry the operation since the name may be registered
+		// in the near future.
+		switch {
+		case verror.ErrorID(err) == naming.ErrNoSuchName.ID:
+			return nil, verror.RetryRefetch, false, verror.New(verror.ErrNoServers, ctx, name)
+		case verror.ErrorID(err) == verror.ErrNoServers.ID:
+			// Avoid wrapping errors unnecessarily.
+			return nil, verror.NoRetry, false, err
+		default:
+			return nil, verror.NoRetry, false, verror.New(verror.ErrNoServers, ctx, name, err)
+		}
+	} else {
+		if len(resolved.Servers) == 0 {
+			// This should never happen.
+			return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
+		}
+		// An empty set of protocols means all protocols...
+		if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
+			return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
+		}
+	}
+
+	// We need to ensure calls to v23 factory methods do not occur during runtime
+	// initialization. Currently, the agent, which uses SecurityNone, is the only caller
+	// during runtime initialization. We would like to set the principal in the context
+	// to nil if we are running in SecurityNone, but this always results in a panic since
+	// the agent client would trigger the call v23.WithPrincipal during runtime
+	// initialization. So, we gate the call to v23.GetPrincipal instead since the agent
+	// client will have callEncrypted == false.
+	// Potential solutions to this are:
+	// (1) Create a separate client for the agent so that this code doesn't have to
+	//     account for its use during runtime initialization.
+	// (2) Have a ctx.IsRuntimeInitialized() method that we can additionally predicate
+	//     on here.
+	var principal security.Principal
+	if callEncrypted(opts) {
+		if principal = v23.GetPrincipal(ctx); principal == nil {
+			return nil, verror.NoRetry, false, verror.New(errNoPrincipal, ctx)
+		}
+	}
+
+	// servers is now ordered by the priority heurestic implemented in
+	// filterAndOrderServers.
+	//
+	// Try to connect to all servers in parallel.  Provide sufficient
+	// buffering for all of the connections to finish instantaneously. This
+	// is important because we want to process the responses in priority
+	// order; that order is indicated by the order of entries in servers.
+	// So, if two respones come in at the same 'instant', we prefer the
+	// first in the resolved.Servers)
+	attempts := len(resolved.Servers)
+
+	responses := make([]*serverStatus, attempts)
+	ch := make(chan *serverStatus, attempts)
+	vcOpts := append(getVCOpts(opts), c.vcOpts...)
+	authorizer := newServerAuthorizer(blessingPattern, opts...)
+	for i, server := range resolved.Names() {
+		// Create a copy of vcOpts for each call to tryCreateFlow
+		// to avoid concurrent tryCreateFlows from stepping on each
+		// other while manipulating their copy of the options.
+		vcOptsCopy := make([]stream.VCOpt, len(vcOpts))
+		copy(vcOptsCopy, vcOpts)
+		go c.tryCreateFlow(ctx, principal, i, name, server, method, authorizer, ch, vcOptsCopy)
+	}
+
+	var timeoutChan <-chan time.Time
+	if deadline, ok := ctx.Deadline(); ok {
+		timeoutChan = time.After(deadline.Sub(time.Now()))
+	}
+
+	for {
+		// Block for at least one new response from the server, or the timeout.
+		select {
+		case r := <-ch:
+			responses[r.index] = r
+			// Read as many more responses as we can without blocking.
+		LoopNonBlocking:
+			for {
+				select {
+				default:
+					break LoopNonBlocking
+				case r := <-ch:
+					responses[r.index] = r
+				}
+			}
+		case <-timeoutChan:
+			vlog.VI(2).Infof("rpc: timeout on connection to server %v ", name)
+			_, _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
+			if verror.ErrorID(err) != verror.ErrTimeout.ID {
+				return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err)
+			}
+			return nil, verror.NoRetry, false, err
+		}
+
+		dc := c.dc
+		if shouldNotFetchDischarges(opts) {
+			dc = nil
+		}
+		// Process new responses, in priority order.
+		numResponses := 0
+		for _, r := range responses {
+			if r != nil {
+				numResponses++
+			}
+			if r == nil || r.flow == nil {
+				continue
+			}
+
+			doneChan := ctx.Done()
+			r.flow.SetDeadline(doneChan)
+			fc, err := newFlowClient(ctx, r.flow, r.blessings, dc)
+			if err != nil {
+				return nil, verror.NoRetry, false, err
+			}
+
+			if err := fc.prepareBlessingsAndDischarges(ctx, method, r.suffix, args, r.rejectedBlessings, opts); err != nil {
+				r.serverErr = &verror.SubErr{
+					Name:    suberrName(r.server, name, method),
+					Options: verror.Print,
+					Err:     verror.New(verror.ErrNotTrusted, nil, verror.New(errPrepareBlessingsAndDischarges, ctx, r.flow.RemoteBlessings(), err)),
+				}
+				vlog.VI(2).Infof("rpc: err: %s", r.serverErr)
+				r.flow.Close()
+				r.flow = nil
+				continue
+			}
+
+			// This is the 'point of no return'; once the RPC is started (fc.start
+			// below) we can't be sure if it makes it to the server or not so, this
+			// code will never call fc.start more than once to ensure that we provide
+			// 'at-most-once' rpc semantics at this level. Retrying the network
+			// connections (i.e. creating flows) is fine since we can cleanup that
+			// state if we abort a call (i.e. close the flow).
+			//
+			// We must ensure that all flows other than r.flow are closed.
+			//
+			// TODO(cnicolaou): all errors below are marked as NoRetry
+			// because we want to provide at-most-once rpc semantics so
+			// we only ever attempt an RPC once. In the future, we'll cache
+			// responses on the server and then we can retry in-flight
+			// RPCs.
+			go cleanupTryCall(r, responses, ch)
+
+			if doneChan != nil {
+				go func() {
+					select {
+					case <-doneChan:
+						vtrace.GetSpan(fc.ctx).Annotate("Canceled")
+						fc.flow.Cancel()
+					case <-fc.flow.Closed():
+					}
+				}()
+			}
+
+			deadline, _ := ctx.Deadline()
+			if verr := fc.start(r.suffix, method, args, deadline); verr != nil {
+				return nil, verror.NoRetry, false, verr
+			}
+			return fc, verror.NoRetry, false, nil
+		}
+		if numResponses == len(responses) {
+			return c.failedTryCall(ctx, name, method, responses, ch)
+		}
+	}
+}
+
+// failedTryCall performs ©asynchronous cleanup for tryCall, and returns an
+// appropriate error from the responses we've already received.  All parallel
+// calls in tryCall failed or we timed out if we get here.
+func (c *xclient) failedTryCall(ctx *context.T, name, method string, responses []*serverStatus, ch chan *serverStatus) (rpc.ClientCall, verror.ActionCode, bool, error) {
+	go cleanupTryCall(nil, responses, ch)
+	c.ns.FlushCacheEntry(name)
+	suberrs := []verror.SubErr{}
+	topLevelError := verror.ErrNoServers
+	topLevelAction := verror.RetryRefetch
+	onlyErrNetwork := true
+	for _, r := range responses {
+		if r != nil && r.serverErr != nil && r.serverErr.Err != nil {
+			switch verror.ErrorID(r.serverErr.Err) {
+			case stream.ErrNotTrusted.ID, verror.ErrNotTrusted.ID, errServerAuthorizeFailed.ID:
+				topLevelError = verror.ErrNotTrusted
+				topLevelAction = verror.NoRetry
+				onlyErrNetwork = false
+			case stream.ErrAborted.ID, stream.ErrNetwork.ID:
+				// do nothing
+			default:
+				onlyErrNetwork = false
+			}
+			suberrs = append(suberrs, *r.serverErr)
+		}
+	}
+
+	if onlyErrNetwork {
+		// If we only encountered network errors, then report ErrBadProtocol.
+		topLevelError = verror.ErrBadProtocol
+	}
+
+	// TODO(cnicolaou): we get system errors for things like dialing using
+	// the 'ws' protocol which can never succeed even if we retry the connection,
+	// hence we return RetryRefetch below except for the case where the servers
+	// are not trusted, in case there's no point in retrying at all.
+	// TODO(cnicolaou): implementing at-most-once rpc semantics in the future
+	// will require thinking through all of the cases where the RPC can
+	// be retried by the client whilst it's actually being executed on the
+	// server.
+	return nil, topLevelAction, false, verror.AddSubErrs(verror.New(topLevelError, ctx), ctx, suberrs...)
+}
+
+func (c *xclient) Close() {
+	// TODO(suharshs): Implement this.
+}
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
new file mode 100644
index 0000000..840c83f
--- /dev/null
+++ b/runtime/internal/rpc/xserver.go
@@ -0,0 +1,1277 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+	"fmt"
+	"io"
+	"net"
+	"reflect"
+	"strings"
+	"sync"
+	"time"
+
+	"v.io/x/lib/netstate"
+	"v.io/x/lib/pubsub"
+	"v.io/x/lib/vlog"
+
+	"v.io/v23/context"
+	"v.io/v23/i18n"
+	"v.io/v23/namespace"
+	"v.io/v23/naming"
+	"v.io/v23/options"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	"v.io/v23/vdl"
+	"v.io/v23/verror"
+	"v.io/v23/vom"
+	"v.io/v23/vtrace"
+
+	"v.io/x/ref/lib/apilog"
+	"v.io/x/ref/lib/stats"
+	"v.io/x/ref/runtime/internal/lib/publisher"
+	inaming "v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/runtime/internal/rpc/stream"
+	"v.io/x/ref/runtime/internal/rpc/stream/manager"
+	"v.io/x/ref/runtime/internal/rpc/stream/vc"
+)
+
+// state for each requested listen address
+type xlistenState struct {
+	protocol, address string
+	ln                stream.XListener
+	lep               naming.Endpoint
+	lnerr, eperr      error
+	roaming           bool
+	// We keep track of all of the endpoints, the port and a copy of
+	// the original listen endpoint for use with roaming network changes.
+	ieps     []*inaming.Endpoint // list of currently active eps
+	port     string              // port to use for creating new eps
+	protoIEP inaming.Endpoint    // endpoint to use as template for new eps (includes rid, versions etc)
+}
+
+type xserver struct {
+	sync.Mutex
+	// context used by the server to make internal RPCs, error messages etc.
+	ctx               *context.T
+	cancel            context.CancelFunc   // function to cancel the above context.
+	state             serverState          // track state of the server.
+	streamMgr         stream.XManager      // stream manager to listen for new flows.
+	publisher         publisher.Publisher  // publisher to publish mounttable mounts.
+	dc                vc.DischargeClient   // fetches discharges of blessings
+	listenerOpts      []stream.ListenerOpt // listener opts for Listen.
+	settingsPublisher *pubsub.Publisher    // pubsub publisher for dhcp
+	settingsName      string               // pubwsub stream name for dhcp
+	dhcpState         *dhcpState           // dhcpState, nil if not using dhcp
+	principal         security.Principal
+	blessings         security.Blessings
+
+	// maps that contain state on listeners.
+	listenState map[*xlistenState]struct{}
+	listeners   map[stream.XListener]struct{}
+
+	// state of proxies keyed by the name of the proxy
+	proxies map[string]proxyState
+
+	disp               rpc.Dispatcher // dispatcher to serve RPCs
+	dispReserved       rpc.Dispatcher // dispatcher for reserved methods
+	active             sync.WaitGroup // active goroutines we've spawned.
+	stoppedChan        chan struct{}  // closed when the server has been stopped.
+	preferredProtocols []string       // protocols to use when resolving proxy name to endpoint.
+	// We cache the IP networks on the device since it is not that cheap to read
+	// network interfaces through os syscall.
+	// TODO(jhahn): Add monitoring the network interface changes.
+	ipNets           []*net.IPNet
+	ns               namespace.T
+	servesMountTable bool
+	isLeaf           bool
+
+	// TODO(cnicolaou): add roaming stats to rpcStats
+	stats *rpcStats // stats for this server.
+}
+
+func (s *xserver) allowed(next serverState, method string) error {
+	if states[s.state][next] {
+		s.state = next
+		return nil
+	}
+	return verror.New(verror.ErrBadState, s.ctx, fmt.Sprintf("%s called out of order or more than once", method))
+}
+
+func (s *xserver) isStopState() bool {
+	return s.state == stopping || s.state == stopped
+}
+
+func XInternalNewServer(
+	ctx *context.T,
+	streamMgr stream.XManager,
+	ns namespace.T,
+	settingsPublisher *pubsub.Publisher,
+	settingsName string,
+	client rpc.Client,
+	principal security.Principal,
+	opts ...rpc.ServerOpt) (rpc.Server, error) {
+	ctx, cancel := context.WithRootCancel(ctx)
+	ctx, _ = vtrace.WithNewSpan(ctx, "NewServer")
+	statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
+	s := &xserver{
+		ctx:               ctx,
+		cancel:            cancel,
+		streamMgr:         streamMgr,
+		principal:         principal,
+		publisher:         publisher.New(ctx, ns, publishPeriod),
+		listenState:       make(map[*xlistenState]struct{}),
+		listeners:         make(map[stream.XListener]struct{}),
+		proxies:           make(map[string]proxyState),
+		stoppedChan:       make(chan struct{}),
+		ipNets:            ipNetworks(),
+		ns:                ns,
+		stats:             newRPCStats(statsPrefix),
+		settingsPublisher: settingsPublisher,
+		settingsName:      settingsName,
+	}
+	var (
+		dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
+		securityLevel         options.SecurityLevel
+	)
+	for _, opt := range opts {
+		switch opt := opt.(type) {
+		case stream.ListenerOpt:
+			// Collect all ServerOpts that are also ListenerOpts.
+			s.listenerOpts = append(s.listenerOpts, opt)
+			switch opt := opt.(type) {
+			case vc.DischargeExpiryBuffer:
+				dischargeExpiryBuffer = time.Duration(opt)
+			}
+		case options.ServerBlessings:
+			s.blessings = opt.Blessings
+		case options.ServesMountTable:
+			s.servesMountTable = bool(opt)
+		case options.IsLeaf:
+			s.isLeaf = bool(opt)
+		case ReservedNameDispatcher:
+			s.dispReserved = opt.Dispatcher
+		case PreferredServerResolveProtocols:
+			s.preferredProtocols = []string(opt)
+		case options.SecurityLevel:
+			securityLevel = opt
+		}
+	}
+	if s.blessings.IsZero() && principal != nil {
+		s.blessings = principal.BlessingStore().Default()
+	}
+	if securityLevel == options.SecurityNone {
+		s.principal = nil
+		s.blessings = security.Blessings{}
+		s.dispReserved = nil
+	}
+	// Make dischargeExpiryBuffer shorter than the VC discharge buffer to ensure we have fetched
+	// the discharges by the time the VC asks for them.`
+	s.dc = InternalNewDischargeClient(ctx, client, dischargeExpiryBuffer-(5*time.Second))
+	s.listenerOpts = append(s.listenerOpts, s.dc)
+	s.listenerOpts = append(s.listenerOpts, vc.DialContext{ctx})
+	blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
+	// TODO(caprita): revist printing the blessings with %s, and
+	// instead expose them as a list.
+	stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", s.blessings))
+	if principal != nil {
+		stats.NewStringFunc(blessingsStatsName, func() string {
+			return fmt.Sprintf("%s (default)", principal.BlessingStore().Default())
+		})
+	}
+	return s, nil
+}
+
+func (s *xserver) Status() rpc.ServerStatus {
+	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	status := rpc.ServerStatus{}
+	s.Lock()
+	defer s.Unlock()
+	status.State = externalStates[s.state]
+	status.ServesMountTable = s.servesMountTable
+	status.Mounts = s.publisher.Status()
+	status.Endpoints = []naming.Endpoint{}
+	for ls, _ := range s.listenState {
+		if ls.eperr != nil {
+			status.Errors = append(status.Errors, ls.eperr)
+		}
+		if ls.lnerr != nil {
+			status.Errors = append(status.Errors, ls.lnerr)
+		}
+		for _, iep := range ls.ieps {
+			status.Endpoints = append(status.Endpoints, iep)
+		}
+	}
+	status.Proxies = make([]rpc.ProxyStatus, 0, len(s.proxies))
+	for k, v := range s.proxies {
+		status.Proxies = append(status.Proxies, rpc.ProxyStatus{k, v.endpoint, v.err})
+	}
+	return status
+}
+
+func (s *xserver) WatchNetwork(ch chan<- rpc.NetworkChange) {
+	defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	s.Lock()
+	defer s.Unlock()
+	if s.dhcpState != nil {
+		s.dhcpState.watchers[ch] = struct{}{}
+	}
+}
+
+func (s *xserver) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
+	defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	s.Lock()
+	defer s.Unlock()
+	if s.dhcpState != nil {
+		delete(s.dhcpState.watchers, ch)
+	}
+}
+
+// resolveToEndpoint resolves an object name or address to an endpoint.
+func (s *xserver) resolveToEndpoint(address string) (string, error) {
+	var resolved *naming.MountEntry
+	var err error
+	if s.ns != nil {
+		if resolved, err = s.ns.Resolve(s.ctx, address); err != nil {
+			return "", err
+		}
+	} else {
+		// Fake a namespace resolution
+		resolved = &naming.MountEntry{Servers: []naming.MountedServer{
+			{Server: address},
+		}}
+	}
+	// An empty set of protocols means all protocols...
+	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
+		return "", err
+	}
+	for _, n := range resolved.Names() {
+		address, suffix := naming.SplitAddressName(n)
+		if suffix != "" {
+			continue
+		}
+		if ep, err := inaming.NewEndpoint(address); err == nil {
+			return ep.String(), nil
+		}
+	}
+	return "", verror.New(errFailedToResolveToEndpoint, s.ctx, address)
+}
+
+// createEndpoints creates appropriate inaming.Endpoint instances for
+// all of the externally accessible network addresses that can be used
+// to reach this server.
+func (s *xserver) createEndpoints(lep naming.Endpoint, chooser netstate.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
+	iep, ok := lep.(*inaming.Endpoint)
+	if !ok {
+		return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
+	}
+	if !strings.HasPrefix(iep.Protocol, "tcp") &&
+		!strings.HasPrefix(iep.Protocol, "ws") {
+		// If not tcp, ws, or wsh, just return the endpoint we were given.
+		return []*inaming.Endpoint{iep}, "", false, nil
+	}
+	host, port, err := net.SplitHostPort(iep.Address)
+	if err != nil {
+		return nil, "", false, err
+	}
+	addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
+	if err != nil {
+		return nil, port, false, err
+	}
+
+	ieps := make([]*inaming.Endpoint, 0, len(addrs))
+	for _, addr := range addrs {
+		n, err := inaming.NewEndpoint(lep.String())
+		if err != nil {
+			return nil, port, false, err
+		}
+		n.IsMountTable = s.servesMountTable
+		n.Address = net.JoinHostPort(addr.String(), port)
+		ieps = append(ieps, n)
+	}
+	return ieps, port, unspecified, nil
+}
+
+func (s *xserver) Listen(listenSpec rpc.ListenSpec) ([]naming.Endpoint, error) {
+	defer apilog.LogCallf(nil, "listenSpec=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	useProxy := len(listenSpec.Proxy) > 0
+	if !useProxy && len(listenSpec.Addrs) == 0 {
+		return nil, verror.New(verror.ErrBadArg, s.ctx, "ListenSpec contains no proxy or addresses to listen on")
+	}
+
+	s.Lock()
+	defer s.Unlock()
+
+	if err := s.allowed(listening, "Listen"); err != nil {
+		return nil, err
+	}
+
+	// Start the proxy as early as possible, ignore duplicate requests
+	// for the same proxy.
+	if _, inuse := s.proxies[listenSpec.Proxy]; useProxy && !inuse {
+		// Pre-emptively fetch discharges on the blessings (they will be cached
+		// within s.dc for future calls).
+		// This shouldn't be required, but is a hack to reduce flakiness in
+		// JavaScript browser integration tests.
+		// See https://v.io/i/392
+		s.dc.PrepareDischarges(s.ctx, s.blessings.ThirdPartyCaveats(), security.DischargeImpetus{})
+		// We have a goroutine for listening on proxy connections.
+		s.active.Add(1)
+		go func() {
+			s.proxyListenLoop(listenSpec.Proxy)
+			s.active.Done()
+		}()
+	}
+
+	roaming := false
+	lnState := make([]*xlistenState, 0, len(listenSpec.Addrs))
+	for _, addr := range listenSpec.Addrs {
+		if len(addr.Address) > 0 {
+			// Listen if we have a local address to listen on.
+			ls := &xlistenState{
+				protocol: addr.Protocol,
+				address:  addr.Address,
+			}
+			ls.ln, ls.lep, ls.lnerr = s.streamMgr.Listen(addr.Protocol, addr.Address, s.principal, s.blessings, s.listenerOpts...)
+			lnState = append(lnState, ls)
+			if ls.lnerr != nil {
+				vlog.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, ls.lnerr)
+				continue
+			}
+			ls.ieps, ls.port, ls.roaming, ls.eperr = s.createEndpoints(ls.lep, listenSpec.AddressChooser)
+			if ls.roaming && ls.eperr == nil {
+				ls.protoIEP = *ls.lep.(*inaming.Endpoint)
+				roaming = true
+			}
+		}
+	}
+
+	found := false
+	for _, ls := range lnState {
+		if ls.ln != nil {
+			found = true
+			break
+		}
+	}
+	if !found && !useProxy {
+		return nil, verror.New(verror.ErrBadArg, s.ctx, "failed to create any listeners")
+	}
+
+	if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
+		// Create a dhcp listener if we haven't already done so.
+		dhcp := &dhcpState{
+			name:      s.settingsName,
+			publisher: s.settingsPublisher,
+			watchers:  make(map[chan<- rpc.NetworkChange]struct{}),
+		}
+		s.dhcpState = dhcp
+		dhcp.ch = make(chan pubsub.Setting, 10)
+		dhcp.stream, dhcp.err = dhcp.publisher.ForkStream(dhcp.name, dhcp.ch)
+		if dhcp.err == nil {
+			// We have a goroutine to listen for dhcp changes.
+			s.active.Add(1)
+			go func() {
+				s.dhcpLoop(dhcp.ch)
+				s.active.Done()
+			}()
+		}
+	}
+
+	eps := make([]naming.Endpoint, 0, 10)
+	for _, ls := range lnState {
+		s.listenState[ls] = struct{}{}
+		if ls.ln != nil {
+			// We have a goroutine per listener to accept new flows.
+			// Each flow is served from its own goroutine.
+			s.active.Add(1)
+			go func(ln stream.XListener, ep naming.Endpoint) {
+				s.listenLoop(ln, ep)
+				s.active.Done()
+			}(ls.ln, ls.lep)
+		}
+
+		for _, iep := range ls.ieps {
+			eps = append(eps, iep)
+		}
+	}
+
+	return eps, nil
+}
+
+func (s *xserver) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.XListener, error) {
+	resolved, err := s.resolveToEndpoint(proxy)
+	if err != nil {
+		return nil, nil, verror.New(errFailedToResolveProxy, s.ctx, proxy, err)
+	}
+	opts := append([]stream.ListenerOpt{xproxyAuth{s}}, s.listenerOpts...)
+	ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.principal, s.blessings, opts...)
+	if err != nil {
+		return nil, nil, verror.New(errFailedToListenForProxy, s.ctx, resolved, err)
+	}
+	iep, ok := ep.(*inaming.Endpoint)
+	if !ok {
+		ln.Close()
+		return nil, nil, verror.New(errInternalTypeConversion, s.ctx, fmt.Sprintf("%T", ep))
+	}
+	s.Lock()
+	s.proxies[proxy] = proxyState{iep, nil}
+	s.Unlock()
+	iep.IsMountTable = s.servesMountTable
+	iep.IsLeaf = s.isLeaf
+	s.publisher.AddServer(iep.String())
+	return iep, ln, nil
+}
+
+func (s *xserver) proxyListenLoop(proxy string) {
+	const (
+		min = 5 * time.Millisecond
+		max = 5 * time.Minute
+	)
+
+	iep, ln, err := s.reconnectAndPublishProxy(proxy)
+	if err != nil {
+		vlog.Errorf("Failed to connect to proxy: %s", err)
+	}
+	// the initial connection maybe have failed, but we enter the retry
+	// loop anyway so that we will continue to try and connect to the
+	// proxy.
+	s.Lock()
+	if s.isStopState() {
+		s.Unlock()
+		return
+	}
+	s.Unlock()
+
+	for {
+		if ln != nil && iep != nil {
+			err := s.listenLoop(ln, iep)
+			// The listener is done, so:
+			// (1) Unpublish its name
+			s.publisher.RemoveServer(iep.String())
+			s.Lock()
+			if err != nil {
+				s.proxies[proxy] = proxyState{iep, verror.New(verror.ErrNoServers, s.ctx, err)}
+			} else {
+				// err will be nil if we're stopping.
+				s.proxies[proxy] = proxyState{iep, nil}
+				s.Unlock()
+				return
+			}
+			s.Unlock()
+		}
+
+		s.Lock()
+		if s.isStopState() {
+			s.Unlock()
+			return
+		}
+		s.Unlock()
+
+		// (2) Reconnect to the proxy unless the server has been stopped
+		backoff := min
+		ln = nil
+		for {
+			select {
+			case <-time.After(backoff):
+				if backoff = backoff * 2; backoff > max {
+					backoff = max
+				}
+			case <-s.stoppedChan:
+				return
+			}
+			// (3) reconnect, publish new address
+			if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
+				vlog.Errorf("Failed to reconnect to proxy %q: %s", proxy, err)
+			} else {
+				vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
+				break
+			}
+		}
+	}
+}
+
+// addListener adds the supplied listener taking care to
+// check to see if we're already stopping. It returns true
+// if the listener was added.
+func (s *xserver) addListener(ln stream.XListener) bool {
+	s.Lock()
+	defer s.Unlock()
+	if s.isStopState() {
+		return false
+	}
+	s.listeners[ln] = struct{}{}
+	return true
+}
+
+// rmListener removes the supplied listener taking care to
+// check if we're already stopping. It returns true if the
+// listener was removed.
+func (s *xserver) rmListener(ln stream.XListener) bool {
+	s.Lock()
+	defer s.Unlock()
+	if s.isStopState() {
+		return false
+	}
+	delete(s.listeners, ln)
+	return true
+}
+
+func (s *xserver) listenLoop(ln stream.XListener, ep naming.Endpoint) error {
+	defer vlog.VI(1).Infof("rpc: Stopped listening on %s", ep)
+	var calls sync.WaitGroup
+
+	if !s.addListener(ln) {
+		// We're stopping.
+		return nil
+	}
+
+	defer func() {
+		calls.Wait()
+		s.rmListener(ln)
+	}()
+	for {
+		flow, err := ln.Accept()
+		if err != nil {
+			vlog.VI(10).Infof("rpc: Accept on %v failed: %v", ep, err)
+			return err
+		}
+		calls.Add(1)
+		go func(flow stream.Flow) {
+			defer calls.Done()
+			fs, err := newXFlowServer(flow, s)
+			if err != nil {
+				vlog.VI(1).Infof("newFlowServer on %v failed: %v", ep, err)
+				return
+			}
+			if err := fs.serve(); err != nil {
+				// TODO(caprita): Logging errors here is too spammy. For example, "not
+				// authorized" errors shouldn't be logged as server errors.
+				// TODO(cnicolaou): revisit this when verror2 transition is
+				// done.
+				if err != io.EOF {
+					vlog.VI(2).Infof("Flow.serve on %v failed: %v", ep, err)
+				}
+			}
+		}(flow)
+	}
+}
+
+func (s *xserver) dhcpLoop(ch chan pubsub.Setting) {
+	defer vlog.VI(1).Infof("rpc: Stopped listen for dhcp changes")
+	vlog.VI(2).Infof("rpc: dhcp loop")
+	for setting := range ch {
+		if setting == nil {
+			return
+		}
+		switch v := setting.Value().(type) {
+		case []net.Addr:
+			s.Lock()
+			if s.isStopState() {
+				s.Unlock()
+				return
+			}
+			change := rpc.NetworkChange{
+				Time:  time.Now(),
+				State: externalStates[s.state],
+			}
+			switch setting.Name() {
+			case NewAddrsSetting:
+				change.Changed = s.addAddresses(v)
+				change.AddedAddrs = v
+			case RmAddrsSetting:
+				change.Changed, change.Error = s.removeAddresses(v)
+				change.RemovedAddrs = v
+			}
+			vlog.VI(2).Infof("rpc: dhcp: change %v", change)
+			for ch, _ := range s.dhcpState.watchers {
+				select {
+				case ch <- change:
+				default:
+				}
+			}
+			s.Unlock()
+		default:
+			vlog.Errorf("rpc: dhcpLoop: unhandled setting type %T", v)
+		}
+	}
+}
+
+// Remove all endpoints that have the same host address as the supplied
+// address parameter.
+func (s *xserver) removeAddresses(addrs []net.Addr) ([]naming.Endpoint, error) {
+	var removed []naming.Endpoint
+	for _, address := range addrs {
+		host := getHost(address)
+		for ls, _ := range s.listenState {
+			if ls != nil && ls.roaming && len(ls.ieps) > 0 {
+				remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
+				for _, iep := range ls.ieps {
+					lnHost, _, err := net.SplitHostPort(iep.Address)
+					if err != nil {
+						lnHost = iep.Address
+					}
+					if lnHost == host {
+						vlog.VI(2).Infof("rpc: dhcp removing: %s", iep)
+						removed = append(removed, iep)
+						s.publisher.RemoveServer(iep.String())
+						continue
+					}
+					remaining = append(remaining, iep)
+				}
+				ls.ieps = remaining
+			}
+		}
+	}
+	return removed, nil
+}
+
+// Add new endpoints for the new address. There is no way to know with
+// 100% confidence which new endpoints to publish without shutting down
+// all network connections and reinitializing everything from scratch.
+// Instead, we find all roaming listeners with at least one endpoint
+// and create a new endpoint with the same port as the existing ones
+// but with the new address supplied to us to by the dhcp code. As
+// an additional safeguard we reject the new address if it is not
+// externally accessible.
+// This places the onus on the dhcp/roaming code that sends us addresses
+// to ensure that those addresses are externally reachable.
+func (s *xserver) addAddresses(addrs []net.Addr) []naming.Endpoint {
+	var added []naming.Endpoint
+	for _, address := range netstate.ConvertToAddresses(addrs) {
+		if !netstate.IsAccessibleIP(address) {
+			return added
+		}
+		host := getHost(address)
+		for ls, _ := range s.listenState {
+			if ls != nil && ls.roaming {
+				niep := ls.protoIEP
+				niep.Address = net.JoinHostPort(host, ls.port)
+				niep.IsMountTable = s.servesMountTable
+				niep.IsLeaf = s.isLeaf
+				ls.ieps = append(ls.ieps, &niep)
+				vlog.VI(2).Infof("rpc: dhcp adding: %s", niep)
+				s.publisher.AddServer(niep.String())
+				added = append(added, &niep)
+			}
+		}
+	}
+	return added
+}
+
+func (s *xserver) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
+	defer apilog.LogCallf(nil, "name=%.10s...,obj=,authorizer=", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	if obj == nil {
+		return verror.New(verror.ErrBadArg, s.ctx, "nil object")
+	}
+	invoker, err := objectToInvoker(obj)
+	if err != nil {
+		return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
+	}
+	s.setLeaf(true)
+	return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
+}
+
+func (s *xserver) setLeaf(value bool) {
+	s.Lock()
+	defer s.Unlock()
+	s.isLeaf = value
+	for ls, _ := range s.listenState {
+		for i := range ls.ieps {
+			ls.ieps[i].IsLeaf = s.isLeaf
+		}
+	}
+}
+
+func (s *xserver) ServeDispatcher(name string, disp rpc.Dispatcher) error {
+	defer apilog.LogCallf(nil, "name=%.10s...,disp=", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	if disp == nil {
+		return verror.New(verror.ErrBadArg, s.ctx, "nil dispatcher")
+	}
+	s.Lock()
+	defer s.Unlock()
+	if err := s.allowed(serving, "Serve or ServeDispatcher"); err != nil {
+		return err
+	}
+	vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
+	s.disp = disp
+	if len(name) > 0 {
+		for ls, _ := range s.listenState {
+			for _, iep := range ls.ieps {
+				s.publisher.AddServer(iep.String())
+			}
+		}
+		s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+	}
+	return nil
+}
+
+func (s *xserver) AddName(name string) error {
+	defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	if len(name) == 0 {
+		return verror.New(verror.ErrBadArg, s.ctx, "name is empty")
+	}
+	s.Lock()
+	defer s.Unlock()
+	if err := s.allowed(publishing, "AddName"); err != nil {
+		return err
+	}
+	vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
+	s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+	return nil
+}
+
+func (s *xserver) RemoveName(name string) {
+	defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	s.Lock()
+	defer s.Unlock()
+	if err := s.allowed(publishing, "RemoveName"); err != nil {
+		return
+	}
+	vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
+	s.publisher.RemoveName(name)
+}
+
+func (s *xserver) Stop() error {
+	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	serverDebug := fmt.Sprintf("Dispatcher: %T, Status:[%v]", s.disp, s.Status())
+	vlog.VI(1).Infof("Stop: %s", serverDebug)
+	defer vlog.VI(1).Infof("Stop done: %s", serverDebug)
+	s.Lock()
+	if s.isStopState() {
+		s.Unlock()
+		return nil
+	}
+	s.state = stopping
+	close(s.stoppedChan)
+	s.Unlock()
+
+	// Delete the stats object.
+	s.stats.stop()
+
+	// Note, It's safe to Stop/WaitForStop on the publisher outside of the
+	// server lock, since publisher is safe for concurrent access.
+
+	// Stop the publisher, which triggers unmounting of published names.
+	s.publisher.Stop()
+	// Wait for the publisher to be done unmounting before we can proceed to
+	// close the listeners (to minimize the number of mounted names pointing
+	// to endpoint that are no longer serving).
+	//
+	// TODO(caprita): See if make sense to fail fast on rejecting
+	// connections once listeners are closed, and parallelize the publisher
+	// and listener shutdown.
+	s.publisher.WaitForStop()
+
+	s.Lock()
+
+	// Close all listeners.  No new flows will be accepted, while in-flight
+	// flows will continue until they terminate naturally.
+	nListeners := len(s.listeners)
+	errCh := make(chan error, nListeners)
+
+	for ln, _ := range s.listeners {
+		go func(ln stream.XListener) {
+			errCh <- ln.Close()
+		}(ln)
+	}
+
+	drain := func(ch chan pubsub.Setting) {
+		for {
+			select {
+			case v := <-ch:
+				if v == nil {
+					return
+				}
+			default:
+				close(ch)
+				return
+			}
+		}
+	}
+
+	if dhcp := s.dhcpState; dhcp != nil {
+		// TODO(cnicolaou,caprita): investigate not having to close and drain
+		// the channel here. It's a little awkward right now since we have to
+		// be careful to not close the channel in two places, i.e. here and
+		// and from the publisher's Shutdown method.
+		if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
+			drain(dhcp.ch)
+		}
+	}
+
+	s.Unlock()
+
+	var firstErr error
+	for i := 0; i < nListeners; i++ {
+		if err := <-errCh; err != nil && firstErr == nil {
+			firstErr = err
+		}
+	}
+	// At this point, we are guaranteed that no new requests are going to be
+	// accepted.
+
+	// Wait for the publisher and active listener + flows to finish.
+	done := make(chan struct{}, 1)
+	go func() { s.active.Wait(); done <- struct{}{} }()
+
+	select {
+	case <-done:
+	case <-time.After(5 * time.Second):
+		vlog.Errorf("%s: Listener Close Error: %v", serverDebug, firstErr)
+		vlog.Errorf("%s: Timedout waiting for goroutines to stop: listeners: %d (currently: %d)", serverDebug, nListeners, len(s.listeners))
+		for ln, _ := range s.listeners {
+			vlog.Errorf("%s: Listener: %v", serverDebug, ln)
+		}
+		for ls, _ := range s.listenState {
+			vlog.Errorf("%s: ListenState: %v", serverDebug, ls)
+		}
+		<-done
+		vlog.Infof("%s: Done waiting.", serverDebug)
+	}
+
+	s.Lock()
+	defer s.Unlock()
+	s.disp = nil
+	if firstErr != nil {
+		return verror.New(verror.ErrInternal, s.ctx, firstErr)
+	}
+	s.state = stopped
+	s.cancel()
+	return nil
+}
+
+// flowServer implements the RPC server-side protocol for a single RPC, over a
+// flow that's already connected to the client.
+type xflowServer struct {
+	ctx    *context.T     // context associated with the RPC
+	server *xserver       // rpc.Server that this flow server belongs to
+	disp   rpc.Dispatcher // rpc.Dispatcher that will serve RPCs on this flow
+	dec    *vom.Decoder   // to decode requests and args from the client
+	enc    *vom.Encoder   // to encode responses and results to the client
+	flow   stream.Flow    // underlying flow
+
+	// Fields filled in during the server invocation.
+	clientBlessings  security.Blessings
+	ackBlessings     bool
+	grantedBlessings security.Blessings
+	method, suffix   string
+	tags             []*vdl.Value
+	discharges       map[string]security.Discharge
+	starttime        time.Time
+	endStreamArgs    bool // are the stream args at EOF?
+}
+
+var (
+	_ rpc.StreamServerCall = (*xflowServer)(nil)
+	_ security.Call        = (*xflowServer)(nil)
+)
+
+func newXFlowServer(flow stream.Flow, server *xserver) (*xflowServer, error) {
+	server.Lock()
+	disp := server.disp
+	server.Unlock()
+
+	fs := &xflowServer{
+		ctx:        server.ctx,
+		server:     server,
+		disp:       disp,
+		flow:       flow,
+		discharges: make(map[string]security.Discharge),
+	}
+	typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
+	if typeenc == nil {
+		fs.enc = vom.NewEncoder(flow)
+		fs.dec = vom.NewDecoder(flow)
+	} else {
+		fs.enc = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder))
+		typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
+		fs.dec = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder))
+	}
+	return fs, nil
+}
+
+// authorizeVtrace works by simulating a call to __debug/vtrace.Trace.  That
+// rpc is essentially equivalent in power to the data we are attempting to
+// attach here.
+func (fs *xflowServer) authorizeVtrace() error {
+	// Set up a context as though we were calling __debug/vtrace.
+	params := &security.CallParams{}
+	params.Copy(fs)
+	params.Method = "Trace"
+	params.MethodTags = []*vdl.Value{vdl.ValueOf(access.Debug)}
+	params.Suffix = "__debug/vtrace"
+
+	var auth security.Authorizer
+	if fs.server.dispReserved != nil {
+		_, auth, _ = fs.server.dispReserved.Lookup(params.Suffix)
+	}
+	return authorize(fs.ctx, security.NewCall(params), auth)
+}
+
+func (fs *xflowServer) serve() error {
+	defer fs.flow.Close()
+
+	results, err := fs.processRequest()
+
+	vtrace.GetSpan(fs.ctx).Finish()
+
+	var traceResponse vtrace.Response
+	// Check if the caller is permitted to view vtrace data.
+	if fs.authorizeVtrace() == nil {
+		traceResponse = vtrace.GetResponse(fs.ctx)
+	}
+
+	// Respond to the client with the response header and positional results.
+	response := rpc.Response{
+		Error:            err,
+		EndStreamResults: true,
+		NumPosResults:    uint64(len(results)),
+		TraceResponse:    traceResponse,
+		AckBlessings:     fs.ackBlessings,
+	}
+	if err := fs.enc.Encode(response); err != nil {
+		if err == io.EOF {
+			return err
+		}
+		return verror.New(errResponseEncoding, fs.ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
+	}
+	if response.Error != nil {
+		return response.Error
+	}
+	for ix, res := range results {
+		if err := fs.enc.Encode(res); err != nil {
+			if err == io.EOF {
+				return err
+			}
+			return verror.New(errResultEncoding, fs.ctx, ix, fmt.Sprintf("%T=%v", res, res), err)
+		}
+	}
+	// TODO(ashankar): Should unread data from the flow be drained?
+	//
+	// Reason to do so:
+	// The common stream.Flow implementation (v.io/x/ref/runtime/internal/rpc/stream/vc/reader.go)
+	// uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
+	// slices will not be returned to the pool leading to possibly increased memory usage.
+	//
+	// Reason to not do so:
+	// Draining here will conflict with any Reads on the flow in a separate goroutine
+	// (for example, see TestStreamReadTerminatedByServer in full_test.go).
+	//
+	// For now, go with the reason to not do so as having unread data in the stream
+	// should be a rare case.
+	return nil
+}
+
+func (fs *xflowServer) readRPCRequest() (*rpc.Request, error) {
+	// Set a default timeout before reading from the flow. Without this timeout,
+	// a client that sends no request or a partial request will retain the flow
+	// indefinitely (and lock up server resources).
+	initTimer := newTimer(defaultCallTimeout)
+	defer initTimer.Stop()
+	fs.flow.SetDeadline(initTimer.C)
+
+	// Decode the initial request.
+	var req rpc.Request
+	if err := fs.dec.Decode(&req); err != nil {
+		return nil, verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadRequest(fs.ctx, err))
+	}
+	return &req, nil
+}
+
+func (fs *xflowServer) processRequest() ([]interface{}, error) {
+	fs.starttime = time.Now()
+	req, err := fs.readRPCRequest()
+	if err != nil {
+		// We don't know what the rpc call was supposed to be, but we'll create
+		// a placeholder span so we can capture annotations.
+		fs.ctx, _ = vtrace.WithNewSpan(fs.ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
+		return nil, err
+	}
+	fs.method = req.Method
+	fs.suffix = strings.TrimLeft(req.Suffix, "/")
+
+	if req.Language != "" {
+		fs.ctx = i18n.WithLangID(fs.ctx, i18n.LangID(req.Language))
+	}
+
+	// TODO(mattr): Currently this allows users to trigger trace collection
+	// on the server even if they will not be allowed to collect the
+	// results later.  This might be considered a DOS vector.
+	spanName := fmt.Sprintf("\"%s\".%s", fs.suffix, fs.method)
+	fs.ctx, _ = vtrace.WithContinuedTrace(fs.ctx, spanName, req.TraceRequest)
+
+	var cancel context.CancelFunc
+	if !req.Deadline.IsZero() {
+		fs.ctx, cancel = context.WithDeadline(fs.ctx, req.Deadline.Time)
+	} else {
+		fs.ctx, cancel = context.WithCancel(fs.ctx)
+	}
+	fs.flow.SetDeadline(fs.ctx.Done())
+	go fs.cancelContextOnClose(cancel)
+
+	// Initialize security: blessings, discharges, etc.
+	if err := fs.initSecurity(req); err != nil {
+		return nil, err
+	}
+	// Lookup the invoker.
+	invoker, auth, err := fs.lookup(fs.suffix, fs.method)
+	if err != nil {
+		return nil, err
+	}
+
+	// Note that we strip the reserved prefix when calling the invoker so
+	// that __Glob will call Glob.  Note that we've already assigned a
+	// special invoker so that we never call the wrong method by mistake.
+	strippedMethod := naming.StripReserved(fs.method)
+
+	// Prepare invoker and decode args.
+	numArgs := int(req.NumPosArgs)
+	argptrs, tags, err := invoker.Prepare(strippedMethod, numArgs)
+	fs.tags = tags
+	if err != nil {
+		return nil, err
+	}
+	if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
+		err := newErrBadNumInputArgs(fs.ctx, fs.suffix, fs.method, called, want)
+		// If the client is sending the wrong number of arguments, try to drain the
+		// arguments sent by the client before returning an error to ensure the client
+		// receives the correct error in call.Finish(). Otherwise, the client may get
+		// an EOF error while encoding args since the server closes the flow upon returning.
+		var any interface{}
+		for i := 0; i < int(req.NumPosArgs); i++ {
+			if decerr := fs.dec.Decode(&any); decerr != nil {
+				return nil, err
+			}
+		}
+		return nil, err
+	}
+	for ix, argptr := range argptrs {
+		if err := fs.dec.Decode(argptr); err != nil {
+			return nil, newErrBadInputArg(fs.ctx, fs.suffix, fs.method, uint64(ix), err)
+		}
+	}
+
+	// Check application's authorization policy.
+	if err := authorize(fs.ctx, fs, auth); err != nil {
+		return nil, err
+	}
+
+	// Invoke the method.
+	results, err := invoker.Invoke(fs.ctx, fs, strippedMethod, argptrs)
+	fs.server.stats.record(fs.method, time.Since(fs.starttime))
+	return results, err
+}
+
+func (fs *xflowServer) cancelContextOnClose(cancel context.CancelFunc) {
+	// Ensure that the context gets cancelled if the flow is closed
+	// due to a network error, or client cancellation.
+	select {
+	case <-fs.flow.Closed():
+		// Here we remove the contexts channel as a deadline to the flow.
+		// We do this to ensure clients get a consistent error when they read/write
+		// after the flow is closed.  Since the flow is already closed, it doesn't
+		// matter that the context is also cancelled.
+		fs.flow.SetDeadline(nil)
+		cancel()
+	case <-fs.ctx.Done():
+	}
+}
+
+// lookup returns the invoker and authorizer responsible for serving the given
+// name and method.  The suffix is stripped of any leading slashes. If it begins
+// with rpc.DebugKeyword, we use the internal debug dispatcher to look up the
+// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
+// value may be modified to match the actual suffix and method to use.
+func (fs *xflowServer) lookup(suffix string, method string) (rpc.Invoker, security.Authorizer, error) {
+	if naming.IsReserved(method) {
+		return reservedInvoker(fs.disp, fs.server.dispReserved), security.AllowEveryone(), nil
+	}
+	disp := fs.disp
+	if naming.IsReserved(suffix) {
+		disp = fs.server.dispReserved
+	} else if fs.server.isLeaf && suffix != "" {
+		innerErr := verror.New(errUnexpectedSuffix, fs.ctx, suffix)
+		return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix, innerErr)
+	}
+	if disp != nil {
+		obj, auth, err := disp.Lookup(suffix)
+		switch {
+		case err != nil:
+			return nil, nil, err
+		case obj != nil:
+			invoker, err := objectToInvoker(obj)
+			if err != nil {
+				return nil, nil, verror.New(verror.ErrInternal, fs.ctx, "invalid received object", err)
+			}
+			return invoker, auth, nil
+		}
+	}
+	return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix)
+}
+
+func (fs *xflowServer) initSecurity(req *rpc.Request) error {
+	// LocalPrincipal is nil which means we are operating under
+	// SecurityNone.
+	if fs.LocalPrincipal() == nil {
+		return nil
+	}
+
+	// If additional credentials are provided, make them available in the context
+	// Detect unusable blessings now, rather then discovering they are unusable on
+	// first use.
+	//
+	// TODO(ashankar,ataly): Potential confused deputy attack: The client provides
+	// the server's identity as the blessing. Figure out what we want to do about
+	// this - should servers be able to assume that a blessing is something that
+	// does not have the authorizations that the server's own identity has?
+	if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
+		return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want))
+	}
+	fs.grantedBlessings = req.GrantedBlessings
+
+	var err error
+	if fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats); err != nil {
+		// When the server can't access the blessings cache, the client is not following
+		// protocol, so the server closes the VCs corresponding to the client endpoint.
+		// TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
+		// of all VCs connected to the RemoteEndpoint.
+		// TODO(mattr): commented out for experiment.
+		//fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
+		return verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadBlessingsCache(fs.ctx, err))
+	}
+	// Verify that the blessings sent by the client in the request have the same public
+	// key as those sent by the client during VC establishment.
+	if got, want := fs.clientBlessings.PublicKey(), fs.flow.RemoteBlessings().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
+		return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessings sent with the request are bound to a different public key (%v) from the blessing used during VC establishment (%v)", got, want))
+	}
+	fs.ackBlessings = true
+
+	for _, d := range req.Discharges {
+		fs.discharges[d.ID()] = d
+	}
+	return nil
+}
+
+// Send implements the rpc.Stream method.
+func (fs *xflowServer) Send(item interface{}) error {
+	defer apilog.LogCallf(nil, "item=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	// The empty response header indicates what follows is a streaming result.
+	if err := fs.enc.Encode(rpc.Response{}); err != nil {
+		return err
+	}
+	return fs.enc.Encode(item)
+}
+
+// Recv implements the rpc.Stream method.
+func (fs *xflowServer) Recv(itemptr interface{}) error {
+	defer apilog.LogCallf(nil, "itemptr=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	var req rpc.Request
+	if err := fs.dec.Decode(&req); err != nil {
+		return err
+	}
+	if req.EndStreamArgs {
+		fs.endStreamArgs = true
+		return io.EOF
+	}
+	return fs.dec.Decode(itemptr)
+}
+
+// Implementations of rpc.ServerCall and security.Call methods.
+
+func (fs *xflowServer) Security() security.Call {
+	//nologcall
+	return fs
+}
+func (fs *xflowServer) LocalDischarges() map[string]security.Discharge {
+	//nologcall
+	return fs.flow.LocalDischarges()
+}
+func (fs *xflowServer) RemoteDischarges() map[string]security.Discharge {
+	//nologcall
+	return fs.discharges
+}
+func (fs *xflowServer) Server() rpc.Server {
+	//nologcall
+	return fs.server
+}
+func (fs *xflowServer) Timestamp() time.Time {
+	//nologcall
+	return fs.starttime
+}
+func (fs *xflowServer) Method() string {
+	//nologcall
+	return fs.method
+}
+func (fs *xflowServer) MethodTags() []*vdl.Value {
+	//nologcall
+	return fs.tags
+}
+func (fs *xflowServer) Suffix() string {
+	//nologcall
+	return fs.suffix
+}
+func (fs *xflowServer) LocalPrincipal() security.Principal {
+	//nologcall
+	return fs.flow.LocalPrincipal()
+}
+func (fs *xflowServer) LocalBlessings() security.Blessings {
+	//nologcall
+	return fs.flow.LocalBlessings()
+}
+func (fs *xflowServer) RemoteBlessings() security.Blessings {
+	//nologcall
+	if !fs.clientBlessings.IsZero() {
+		return fs.clientBlessings
+	}
+	return fs.flow.RemoteBlessings()
+}
+func (fs *xflowServer) GrantedBlessings() security.Blessings {
+	//nologcall
+	return fs.grantedBlessings
+}
+func (fs *xflowServer) LocalEndpoint() naming.Endpoint {
+	//nologcall
+	return fs.flow.LocalEndpoint()
+}
+func (fs *xflowServer) RemoteEndpoint() naming.Endpoint {
+	//nologcall
+	return fs.flow.RemoteEndpoint()
+}
+
+type xproxyAuth struct {
+	s *xserver
+}
+
+func (a xproxyAuth) RPCStreamListenerOpt() {}
+
+func (a xproxyAuth) Login(proxy stream.Flow) (security.Blessings, []security.Discharge, error) {
+	var (
+		principal = a.s.principal
+		dc        = a.s.dc
+		ctx       = a.s.ctx
+	)
+	if principal == nil {
+		return security.Blessings{}, nil, nil
+	}
+	proxyNames, _ := security.RemoteBlessingNames(ctx, security.NewCall(&security.CallParams{
+		LocalPrincipal:   principal,
+		RemoteBlessings:  proxy.RemoteBlessings(),
+		RemoteDischarges: proxy.RemoteDischarges(),
+		RemoteEndpoint:   proxy.RemoteEndpoint(),
+		LocalEndpoint:    proxy.LocalEndpoint(),
+	}))
+	blessings := principal.BlessingStore().ForPeer(proxyNames...)
+	tpc := blessings.ThirdPartyCaveats()
+	if len(tpc) == 0 {
+		return blessings, nil, nil
+	}
+	// Set DischargeImpetus.Server = proxyNames.
+	// See https://v.io/i/392
+	discharges := dc.PrepareDischarges(ctx, tpc, security.DischargeImpetus{})
+	return blessings, discharges, nil
+}
+
+var _ manager.ProxyAuthenticator = proxyAuth{}