ref: Change the shutdown mechanism for the new runtime.

Now we properly lame duck servers, and avoid error messages on clean
shutdowns..

MultiPart: 2/2

Change-Id: Ifb5e3080ff28a4496819c3ebdf2a8b0f7c1a02db
diff --git a/runtime/factories/fake/rpc.go b/runtime/factories/fake/rpc.go
index 22f0bf0..f020b7b 100644
--- a/runtime/factories/fake/rpc.go
+++ b/runtime/factories/fake/rpc.go
@@ -12,13 +12,22 @@
 	"v.io/x/ref/lib/apilog"
 )
 
-// SetClient can be used to inject a mock client implementation into the context.
-func SetClient(ctx *context.T, client rpc.Client) *context.T {
+// SetClientFactory can be used to inject a mock Client implementation
+// into the context.  When v23.WithNewClient is called passed function
+// will be invoked.
+func SetClientFactory(ctx *context.T, factory func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client) *context.T {
+	client := factory(ctx)
+	ctx = context.WithValue(ctx, clientFactoryKey, factory)
 	return context.WithValue(ctx, clientKey, client)
 }
 func (r *Runtime) WithNewClient(ctx *context.T, opts ...rpc.ClientOpt) (*context.T, rpc.Client, error) {
 	defer apilog.LogCallf(ctx, "opts...=%v", opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	panic("unimplemented")
+	factory, ok := ctx.Value(clientFactoryKey).(func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client)
+	if !ok {
+		panic("Calling WithNewClient on the fake runtime, but no factory has been set.")
+	}
+	client := factory(ctx, opts...)
+	return context.WithValue(ctx, clientKey, client), client, nil
 }
 func (r *Runtime) GetClient(ctx *context.T) rpc.Client {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
@@ -26,11 +35,6 @@
 	return c
 }
 
-func (r *Runtime) WithNewStreamManager(ctx *context.T) (*context.T, error) {
-	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	panic("unimplemented")
-}
-
 func (r *Runtime) GetListenSpec(ctx *context.T) rpc.ListenSpec {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	ls, _ := ctx.Value(listenSpecKey).(rpc.ListenSpec)
@@ -43,21 +47,28 @@
 	return ctx
 }
 
-func SetFlowManager(ctx *context.T, manager flow.Manager) *context.T {
-	return context.WithValue(ctx, flowManagerKey, manager)
-}
-
-func (r *Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
-	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	fm, _ := ctx.Value(flowManagerKey).(flow.Manager)
-	return fm
-}
-
-func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+func (r *Runtime) WithNewStreamManager(ctx *context.T) (*context.T, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	panic("unimplemented")
 }
 
+// SetFlowManagerFactory can be used to inject a mock FlowManager
+// implementation into the context.  When v23.NewFlowManager is called
+// passed function will be invoked.
+func SetFlowManagerFactory(ctx *context.T, factory func(ctx *context.T) flow.Manager) *context.T {
+	return context.WithValue(ctx, flowFactoryKey, factory)
+}
+
+func (r *Runtime) NewFlowManager(ctx *context.T) (flow.Manager, error) {
+	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+
+	factory, ok := ctx.Value(flowFactoryKey).(func(ctx *context.T) flow.Manager)
+	if !ok {
+		panic("Calling NewFlowManager on the fake runtime, but no factory has been set.")
+	}
+	return factory(ctx), nil
+}
+
 func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	panic("unimplemented")
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index 464ab11..ef7e102 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -24,7 +24,9 @@
 	loggerKey
 	backgroundKey
 	listenSpecKey
-	flowManagerKey
+
+	clientFactoryKey
+	flowFactoryKey
 )
 
 type Runtime struct {
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 16b31d1..40337ec 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -14,7 +14,6 @@
 	"golang.org/x/crypto/nacl/box"
 	"v.io/v23"
 	"v.io/v23/context"
-	"v.io/v23/flow"
 	"v.io/v23/flow/message"
 	"v.io/v23/rpc/version"
 	"v.io/v23/security"
@@ -33,8 +32,9 @@
 	if err != nil {
 		return err
 	}
+
 	bflow := c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true)
-	bflow.worker.Release(ctx, defaultBufferSize)
+	bflow.worker.Release(ctx, DefaultBytesBufferedPerFlow)
 	c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG, bflow, true)
 
 	if err = c.readRemoteAuth(ctx, authAcceptorTag, binding); err != nil {
@@ -217,6 +217,7 @@
 type blessingsFlow struct {
 	enc *vom.Encoder
 	dec *vom.Decoder
+	f   *flw
 
 	mu      sync.Mutex
 	cond    *sync.Cond
@@ -226,8 +227,9 @@
 	byBKey  map[uint64]*Blessings
 }
 
-func newBlessingsFlow(ctx *context.T, loopWG *sync.WaitGroup, f flow.Flow, dialed bool) *blessingsFlow {
+func newBlessingsFlow(ctx *context.T, loopWG *sync.WaitGroup, f *flw, dialed bool) *blessingsFlow {
 	b := &blessingsFlow{
+		f:       f,
 		enc:     vom.NewEncoder(f),
 		dec:     vom.NewDecoder(f),
 		nextKey: 1,
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index 422eb73..c64bc5b 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -95,14 +95,18 @@
 func (fc *fakeDischargeClient) StartCall(*context.T, string, string, []interface{}, ...rpc.CallOpt) (rpc.ClientCall, error) {
 	return nil, nil
 }
-func (fc *fakeDischargeClient) Close() {}
+func (fc *fakeDischargeClient) Close()                  {}
+func (fc *fakeDischargeClient) Closed() <-chan struct{} { return nil }
 
 func TestUnidirectional(t *testing.T) {
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	ctx = fake.SetClient(ctx, &fakeDischargeClient{v23.GetPrincipal(ctx)})
+	ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
+		return &fakeDischargeClient{v23.GetPrincipal(ctx)}
+	})
+	ctx, _, _ = v23.WithNewClient(ctx)
 
 	dctx := NewPrincipalWithTPCaveat(t, ctx, "dialer")
 	actx := NewPrincipalWithTPCaveat(t, ctx, "acceptor")
@@ -136,7 +140,10 @@
 
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	ctx = fake.SetClient(ctx, &fakeDischargeClient{v23.GetPrincipal(ctx)})
+	ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
+		return &fakeDischargeClient{v23.GetPrincipal(ctx)}
+	})
+	ctx, _, _ = v23.WithNewClient(ctx)
 
 	dctx := NewPrincipalWithTPCaveat(t, ctx, "dialer")
 	actx := NewPrincipalWithTPCaveat(t, ctx, "acceptor")
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 17475a5..846fce3 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -29,7 +29,7 @@
 )
 
 const mtu = 1 << 16
-const defaultBufferSize = 1 << 20
+const DefaultBytesBufferedPerFlow = 1 << 20
 
 const (
 	expressPriority = iota
@@ -95,10 +95,11 @@
 	conn flow.MsgReadWriteCloser,
 	local, remote naming.Endpoint,
 	versions version.RPCVersionRange,
+	handshakeTimeout time.Duration,
 	handler FlowHandler,
 	events chan<- StatusUpdate) (*Conn, error) {
 	c := &Conn{
-		fc:           flowcontrol.New(defaultBufferSize, mtu),
+		fc:           flowcontrol.New(DefaultBytesBufferedPerFlow, mtu),
 		mp:           newMessagePipe(conn),
 		handler:      handler,
 		lBlessings:   v23.GetPrincipal(ctx).BlessingStore().Default(),
@@ -112,7 +113,14 @@
 		borrowing:    map[uint64]bool{},
 		events:       events,
 	}
-	if err := c.dialHandshake(ctx, versions); err != nil {
+	// TODO(mattr): This scheme for deadlines is nice, but it doesn't
+	// provide for cancellation when ctx is canceled.
+	t := time.AfterFunc(handshakeTimeout, func() { conn.Close() })
+	err := c.dialHandshake(ctx, versions)
+	if stopped := t.Stop(); !stopped {
+		err = verror.NewErrTimeout(ctx)
+	}
+	if err != nil {
 		c.Close(ctx, err)
 		return nil, err
 	}
@@ -127,10 +135,11 @@
 	conn flow.MsgReadWriteCloser,
 	local naming.Endpoint,
 	versions version.RPCVersionRange,
+	handshakeTimeout time.Duration,
 	handler FlowHandler,
 	events chan<- StatusUpdate) (*Conn, error) {
 	c := &Conn{
-		fc:           flowcontrol.New(defaultBufferSize, mtu),
+		fc:           flowcontrol.New(DefaultBytesBufferedPerFlow, mtu),
 		mp:           newMessagePipe(conn),
 		handler:      handler,
 		lBlessings:   v23.GetPrincipal(ctx).BlessingStore().Default(),
@@ -143,7 +152,14 @@
 		borrowing:    map[uint64]bool{},
 		events:       events,
 	}
-	if err := c.acceptHandshake(ctx, versions); err != nil {
+	// FIXME(mattr): This scheme for deadlines is nice, but it doesn't
+	// provide for cancellation when ctx is canceled.
+	t := time.AfterFunc(handshakeTimeout, func() { conn.Close() })
+	err := c.acceptHandshake(ctx, versions)
+	if stopped := t.Stop(); !stopped {
+		err = verror.NewErrTimeout(ctx)
+	}
+	if err != nil {
 		c.Close(ctx, err)
 		return nil, err
 	}
@@ -250,17 +266,28 @@
 			if err != nil {
 				msg = err.Error()
 			}
-			cerr := c.fc.Run(ctx, "close", expressPriority, func(_ int) (int, bool, error) {
+			// We use a root context here to ensure that the message get's sent even if
+			// the context is canceled.
+			// TODO(mattr): Consider other options like a special mode in the flow controller
+			// that doesn't care about the context.  Or perhaps 'stop' the flow controller
+			// and then just write the message directly.
+			rootctx, cancel := context.WithRootCancel(ctx)
+			cerr := c.fc.Run(rootctx, "close", expressPriority, func(_ int) (int, bool, error) {
 				return 0, true, c.mp.writeMsg(ctx, &message.TearDown{Message: msg})
 			})
 			if cerr != nil {
 				ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, cerr)
 			}
+			cancel()
 		}
+		flowErr := NewErrConnectionClosed(ctx)
 		for _, f := range flows {
-			f.close(ctx, NewErrConnectionClosed(ctx))
+			f.close(ctx, flowErr)
 		}
-		if cerr := c.mp.close(); cerr != nil {
+		if c.blessingsFlow != nil {
+			c.blessingsFlow.f.close(ctx, flowErr)
+		}
+		if cerr := c.mp.rw.Close(); cerr != nil {
 			ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
 		}
 		c.loopWG.Wait()
@@ -282,9 +309,9 @@
 	c.toRelease[fid] += count
 	if c.borrowing[fid] {
 		c.toRelease[invalidFlowID] += count
-		release = c.toRelease[invalidFlowID] > defaultBufferSize/2
+		release = c.toRelease[invalidFlowID] > DefaultBytesBufferedPerFlow/2
 	} else {
-		release = c.toRelease[fid] > defaultBufferSize/2
+		release = c.toRelease[fid] > DefaultBytesBufferedPerFlow/2
 	}
 	if release {
 		toRelease = c.toRelease
@@ -358,7 +385,7 @@
 		handler := c.handler
 		f := c.newFlowLocked(ctx, msg.ID, msg.BlessingsKey, msg.DischargeKey, false, true)
 		f.worker.Release(ctx, int(msg.InitialCounters))
-		c.toRelease[msg.ID] = defaultBufferSize
+		c.toRelease[msg.ID] = DefaultBytesBufferedPerFlow
 		c.borrowing[msg.ID] = true
 		c.mu.Unlock()
 
@@ -403,7 +430,8 @@
 		f := c.flows[msg.ID]
 		c.mu.Unlock()
 		if f == nil {
-			ctx.Infof("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.ID)
+			ctx.Infof("Ignoring data message for unknown flow on connection to %s: %d",
+				c.remote, msg.ID)
 			return nil
 		}
 		if err := f.q.put(ctx, msg.Payload); err != nil {
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 6837486..ce1961b 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -27,7 +27,7 @@
 
 func init() {
 	test.Init()
-	randData = make([]byte, 2*defaultBufferSize)
+	randData = make([]byte, 2*DefaultBytesBufferedPerFlow)
 	if _, err := rand.Read(randData); err != nil {
 		panic("Could not read random data.")
 	}
@@ -100,14 +100,14 @@
 	q1, q2 := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
 	fh1, fh2 := fh(q1), fh(q2)
 	go func() {
-		d, err := NewDialed(ctx, dmrw, ep, ep, versions, nil, nil)
+		d, err := NewDialed(ctx, dmrw, ep, ep, versions, time.Minute, nil, nil)
 		if err != nil {
 			panic(err)
 		}
 		dch <- d
 	}()
 	go func() {
-		a, err := NewAccepted(ctx, amrw, ep, versions, fh1, nil)
+		a, err := NewAccepted(ctx, amrw, ep, versions, time.Minute, fh1, nil)
 		if err != nil {
 			panic(err)
 		}
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 04fdfcf..72929d1 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -6,6 +6,7 @@
 
 import (
 	"strconv"
+	"time"
 
 	"v.io/v23/context"
 	"v.io/v23/flow"
@@ -42,7 +43,7 @@
 		dkey:   dkey,
 		opened: preopen,
 	}
-	f.SetContext(ctx)
+	f.ctx, f.cancel = context.WithCancel(ctx)
 	if !f.opened {
 		c.unopenedFlows.Add(1)
 	}
@@ -132,7 +133,7 @@
 		} else {
 			err = f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
 				ID:              f.id,
-				InitialCounters: defaultBufferSize,
+				InitialCounters: DefaultBytesBufferedPerFlow,
 				BlessingsKey:    f.bkey,
 				DischargeKey:    f.dkey,
 				Flags:           d.Flags,
@@ -177,12 +178,16 @@
 //
 // TODO(mattr): update v23/flow documentation.
 // SetContext may not be called concurrently with other methods.
-func (f *flw) SetContext(ctx *context.T) error {
+func (f *flw) SetDeadlineContext(ctx *context.T, deadline time.Time) *context.T {
 	if f.cancel != nil {
 		f.cancel()
 	}
-	f.ctx, f.cancel = context.WithCancel(ctx)
-	return nil
+	if !deadline.IsZero() {
+		f.ctx, f.cancel = context.WithDeadline(ctx, deadline)
+	} else {
+		f.ctx, f.cancel = context.WithCancel(ctx)
+	}
+	return f.ctx
 }
 
 // LocalBlessings returns the blessings presented by the local end of the flow
@@ -264,36 +269,38 @@
 }
 
 func (f *flw) close(ctx *context.T, err error) {
-	f.q.close(ctx)
-	f.cancel()
-	// We want to try to send this message even if ctx is already canceled.
-	ctx, cancel := context.WithRootCancel(ctx)
-	serr := f.worker.Run(ctx, func(tokens int) (int, bool, error) {
-		f.conn.mu.Lock()
-		delete(f.conn.flows, f.id)
-		connClosed := f.conn.flows == nil
-		f.conn.mu.Unlock()
+	if f.q.close(ctx) {
+		f.cancel()
+		// We want to try to send this message even if ctx is already canceled.
+		ctx, cancel := context.WithRootCancel(ctx)
+		serr := f.worker.Run(ctx, func(tokens int) (int, bool, error) {
+			f.conn.mu.Lock()
+			delete(f.conn.flows, f.id)
+			connClosed := f.conn.flows == nil
+			f.conn.mu.Unlock()
 
-		if !f.opened {
-			// Closing a flow that was never opened.
-			f.conn.unopenedFlows.Done()
-			return 0, true, nil
-		} else if eid := verror.ErrorID(err); eid == ErrFlowClosedRemotely.ID || connClosed {
-			// Note: If the conn is closed there is no point in trying to send
-			// the flow close message as it will fail.  This is racy with the connection
-			// closing, but there are no ill-effects other than spamming the logs a little
-			// so it's OK.
-			return 0, true, nil
-		}
-		return 0, true, f.conn.mp.writeMsg(ctx, &message.Data{
-			ID:    f.id,
-			Flags: message.CloseFlag,
+			eid := verror.ErrorID(err)
+			if !f.opened {
+				// Closing a flow that was never opened.
+				f.conn.unopenedFlows.Done()
+				return 0, true, nil
+			} else if eid == ErrFlowClosedRemotely.ID || connClosed {
+				// Note: If the conn is closed there is no point in trying to send
+				// the flow close message as it will fail.  This is racy with the connection
+				// closing, but there are no ill-effects other than spamming the logs a little
+				// so it's OK.
+				return 0, true, nil
+			}
+			return 0, true, f.conn.mp.writeMsg(ctx, &message.Data{
+				ID:    f.id,
+				Flags: message.CloseFlag,
+			})
 		})
-	})
-	if serr != nil {
-		ctx.Errorf("Could not send close flow message: %v", err)
+		if serr != nil {
+			ctx.Errorf("Could not send close flow message: %v", err)
+		}
+		cancel()
 	}
-	cancel()
 }
 
 // Close marks the flow as closed. After Close is called, new data cannot be
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 9435354..cf51f8f 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -35,10 +35,6 @@
 	return p.cipher.ChannelBinding()
 }
 
-func (p *messagePipe) close() error {
-	return p.rw.Close()
-}
-
 func (p *messagePipe) writeMsg(ctx *context.T, m message.Message) (err error) {
 	// TODO(mattr): Because of the API of the underlying crypto library,
 	// an enormous amount of copying happens here.
diff --git a/runtime/internal/flow/conn/readq.go b/runtime/internal/flow/conn/readq.go
index 54db134..89abc97 100644
--- a/runtime/internal/flow/conn/readq.go
+++ b/runtime/internal/flow/conn/readq.go
@@ -50,7 +50,7 @@
 		return nil
 	}
 	newSize := l + r.size
-	if newSize > defaultBufferSize {
+	if newSize > DefaultBytesBufferedPerFlow {
 		return NewErrCounterOverflow(ctx)
 	}
 	newBufs := r.nbufs + len(bufs)
@@ -124,13 +124,16 @@
 	return
 }
 
-func (r *readq) close(ctx *context.T) {
+func (r *readq) close(ctx *context.T) bool {
 	r.mu.Lock()
+	closed := false
 	if r.e != -1 {
+		closed = true
 		r.e = -1
 		close(r.notify)
 	}
 	r.mu.Unlock()
+	return closed
 }
 
 func (r *readq) reserveLocked(n int) {
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index b68247b..eeb7207 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -51,7 +51,7 @@
 		if dflows != nil {
 			handler = fh(dflows)
 		}
-		d, err := NewDialed(dctx, dmrw, ep, ep, versions, handler, events)
+		d, err := NewDialed(dctx, dmrw, ep, ep, versions, time.Minute, handler, events)
 		if err != nil {
 			panic(err)
 		}
@@ -62,7 +62,7 @@
 		if aflows != nil {
 			handler = fh(aflows)
 		}
-		a, err := NewAccepted(actx, amrw, ep, versions, handler, events)
+		a, err := NewAccepted(actx, amrw, ep, versions, time.Minute, handler, events)
 		if err != nil {
 			panic(err)
 		}
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 4c31df8..4dcc990 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -7,13 +7,13 @@
 import (
 	"strconv"
 	"testing"
+	"time"
 
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
-
 	connpackage "v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 	inaming "v.io/x/ref/runtime/internal/naming"
@@ -296,7 +296,7 @@
 	ach := make(chan *connpackage.Conn)
 	go func() {
 		d, err := connpackage.NewDialed(ctx, dmrw, ep, ep,
-			version.RPCVersionRange{Min: 1, Max: 5}, nil, nil)
+			version.RPCVersionRange{Min: 1, Max: 5}, time.Minute, nil, nil)
 		if err != nil {
 			t.Fatalf("Unexpected error: %v", err)
 		}
@@ -305,7 +305,7 @@
 	fh := fh{t, make(chan struct{})}
 	go func() {
 		a, err := connpackage.NewAccepted(ctx, amrw, ep,
-			version.RPCVersionRange{Min: 1, Max: 5}, fh, nil)
+			version.RPCVersionRange{Min: 1, Max: 5}, time.Minute, fh, nil)
 		if err != nil {
 			t.Fatalf("Unexpected error: %v", err)
 		}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index d8c8fee..0bc1ea2 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -20,7 +20,6 @@
 	"v.io/v23/security"
 	"v.io/v23/verror"
 
-	iflow "v.io/x/ref/runtime/internal/flow"
 	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/lib/upcqueue"
 	inaming "v.io/x/ref/runtime/internal/naming"
@@ -30,6 +29,7 @@
 const (
 	reconnectDelay    = 50 * time.Millisecond
 	reapCacheInterval = 5 * time.Minute
+	handshakeTimeout  = time.Minute
 )
 
 type manager struct {
@@ -37,6 +37,7 @@
 	closed chan struct{}
 	cache  *ConnCache
 	ls     *listenState
+	ctx    *context.T
 }
 
 func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
@@ -44,6 +45,7 @@
 		rid:    rid,
 		closed: make(chan struct{}),
 		cache:  NewConnCache(),
+		ctx:    ctx,
 	}
 	var events chan conn.StatusUpdate
 	if rid != naming.NullRoutingID {
@@ -122,17 +124,13 @@
 	case <-m.closed:
 	case <-done:
 	}
+	// Now nobody should send any more flows, so close the queue.
+	m.ls.q.Close()
 }
 
 // Listen causes the Manager to accept flows from the provided protocol and address.
 // Listen may be called muliple times.
-//
-// The flow.Manager associated with ctx must be the receiver of the method,
-// otherwise an error is returned.
 func (m *manager) Listen(ctx *context.T, protocol, address string) error {
-	if err := m.validateContext(ctx); err != nil {
-		return err
-	}
 	if m.ls == nil {
 		return NewErrListeningWithNullRid(ctx)
 	}
@@ -165,13 +163,7 @@
 //
 // update gets passed the complete set of endpoints for the proxy every time it
 // is called.
-//
-// The flow.Manager associated with ctx must be the receiver of the method,
-// otherwise an error is returned.
 func (m *manager) ProxyListen(ctx *context.T, ep naming.Endpoint, update func([]naming.Endpoint)) error {
-	if err := m.validateContext(ctx); err != nil {
-		return err
-	}
 	if m.ls == nil {
 		return NewErrListeningWithNullRid(ctx)
 	}
@@ -263,16 +255,22 @@
 			flowConn, err = ln.Accept(ctx)
 		}
 		if err != nil {
-			ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
+			m.ls.mu.Lock()
+			closed := m.ls.listeners == nil
+			m.ls.mu.Unlock()
+			if !closed {
+				ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
+			}
 			return
 		}
 		fh := &flowHandler{m, make(chan struct{})}
 		m.ls.activeConns.Add(1)
 		c, err := conn.NewAccepted(
-			ctx,
+			m.ctx,
 			flowConn,
 			local,
 			version.Supported,
+			handshakeTimeout,
 			fh,
 			m.ls.events)
 		if err != nil {
@@ -311,6 +309,7 @@
 			f,
 			f.Conn().LocalEndpoint(),
 			version.Supported,
+			handshakeTimeout,
 			fh,
 			h.m.ls.events)
 		if err != nil {
@@ -355,13 +354,7 @@
 //   }
 //
 // can be used to accept Flows initiated by remote processes.
-//
-// The flow.Manager associated with ctx must be the receiver of the method,
-// otherwise an error is returned.
 func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
-	if err := m.validateContext(ctx); err != nil {
-		return nil, err
-	}
 	if m.ls == nil {
 		return nil, NewErrListeningWithNullRid(ctx)
 	}
@@ -381,13 +374,7 @@
 //
 // To maximize re-use of connections, the Manager will also Listen on Dialed
 // connections for the lifetime of the connection.
-//
-// The flow.Manager associated with ctx must be the receiver of the method,
-// otherwise an error is returned.
 func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
-	if err := m.validateContext(ctx); err != nil {
-		return nil, err
-	}
 	f, _, err := m.internalDial(ctx, remote, fn)
 	return f, err
 }
@@ -443,11 +430,12 @@
 			m.ls.activeConns.Add(1)
 		}
 		c, err = conn.NewDialed(
-			ctx,
+			m.ctx,
 			flowConn,
 			localEndpoint(flowConn, m.rid),
 			remote,
 			version.Supported,
+			handshakeTimeout,
 			fh,
 			events,
 		)
@@ -474,11 +462,12 @@
 			m.ls.activeConns.Add(1)
 		}
 		c, err = conn.NewDialed(
-			ctx,
+			m.ctx,
 			f,
 			proxyConn.LocalEndpoint(),
 			remote,
 			version.Supported,
+			handshakeTimeout,
 			fh,
 			events)
 		if err != nil {
@@ -512,13 +501,6 @@
 	return m.closed
 }
 
-func (m *manager) validateContext(ctx *context.T) error {
-	if v23.ExperimentalGetFlowManager(ctx) != m {
-		return flow.NewErrBadArg(ctx, iflow.NewErrWrongObjectInContext(ctx, "manager"))
-	}
-	return nil
-}
-
 func dial(ctx *context.T, p flow.Protocol, protocol, address string) (flow.Conn, error) {
 	if p != nil {
 		var timeout time.Duration
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 4babdd0..c151586 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -14,8 +14,7 @@
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/naming"
-
-	"v.io/x/ref/runtime/factories/fake"
+	_ "v.io/x/ref/runtime/factories/fake"
 	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/flow/flowtest"
 	"v.io/x/ref/test"
@@ -33,14 +32,12 @@
 	ctx, shutdown := v23.Init()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
-	actx := fake.SetFlowManager(ctx, am)
-	if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
-	dctx := fake.SetFlowManager(ctx, dm)
 
-	testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 
 	shutdown()
 	<-am.Closed()
@@ -52,26 +49,24 @@
 	ctx, shutdown := v23.Init()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
-	actx := fake.SetFlowManager(ctx, am)
-	if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
-	dctx := fake.SetFlowManager(ctx, dm)
 	// At first the cache should be empty.
 	if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
 	// After dialing a connection the cache should hold one connection.
-	testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 	if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
 	}
 	old := dm.(*manager).cache.ridCache[am.RoutingID()]
 	// After dialing another connection the cache should still hold one connection
 	// because the connections should be reused.
-	testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 	if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
 		t.Errorf("got cache size %v, want %v", got, want)
 	}
@@ -89,16 +84,14 @@
 	ctx, shutdown := v23.Init()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
-	actx := fake.SetFlowManager(ctx, am)
-	if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
-	dctx := fake.SetFlowManager(ctx, dm)
-	testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 	// Now am should be able to make a flow to dm even though dm is not listening.
-	testFlows(t, actx, dctx, flowtest.BlessingsForPeer)
+	testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
 
 	shutdown()
 	<-am.Closed()
@@ -110,20 +103,17 @@
 	ctx, shutdown := v23.Init()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
-	actx := fake.SetFlowManager(ctx, am)
-	if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 	nulldm := New(ctx, naming.NullRoutingID)
-	nctx := fake.SetFlowManager(ctx, nulldm)
-	_, af := testFlows(t, nctx, actx, flowtest.BlessingsForPeer)
+	_, af := testFlows(t, ctx, nulldm, am, flowtest.BlessingsForPeer)
 	// Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
 	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
 		t.Errorf("got %v, want zero-value blessings", rBlessings)
 	}
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
-	dctx := fake.SetFlowManager(ctx, dm)
-	_, af = testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+	_, af = testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 	// Ensure that the remote blessings of the underlying conn of the accepted flow are
 	// non-zero if we did specify a RoutingID.
 	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
@@ -141,19 +131,15 @@
 	ctx, shutdown := v23.Init()
 
 	am := New(ctx, naming.FixedRoutingID(0x5555))
-	actx := fake.SetFlowManager(ctx, am)
-	if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
-
 	dm := New(ctx, naming.FixedRoutingID(0x1111))
-	dctx := fake.SetFlowManager(ctx, dm)
+	testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
 
-	testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+	am.StopListening(ctx)
 
-	am.StopListening(actx)
-
-	if f, err := dm.Dial(dctx, am.ListeningEndpoints()[0], flowtest.BlessingsForPeer); err == nil {
+	if f, err := dm.Dial(ctx, am.ListeningEndpoints()[0], flowtest.BlessingsForPeer); err == nil {
 		t.Errorf("dialing a lame duck should fail, but didn't %#v.", f)
 	}
 
@@ -162,17 +148,16 @@
 	<-dm.Closed()
 }
 
-func testFlows(t *testing.T, dctx, actx *context.T, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
-	am := v23.ExperimentalGetFlowManager(actx)
+func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
 	ep := am.ListeningEndpoints()[0]
 	var err error
-	df, err = v23.ExperimentalGetFlowManager(dctx).Dial(dctx, ep, bFn)
+	df, err = dm.Dial(ctx, ep, bFn)
 	if err != nil {
 		t.Fatal(err)
 	}
 	want := "do you read me?"
 	writeLine(df, want)
-	af, err = am.Accept(actx)
+	af, err = am.Accept(ctx)
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index a7f427f..33db6e7 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -75,35 +75,24 @@
 	ns                 namespace.T
 	vcOpts             []stream.VCOpt // vc opts passed to dial
 	preferredProtocols []string
+	vcCache            *vc.VCCache
+	wg                 sync.WaitGroup
+	dc                 vc.DischargeClient
 
-	// We cache the IP networks on the device since it is not that cheap to read
-	// network interfaces through os syscall.
-	// TODO(toddw): this can be removed since netstate now implements caching
-	// directly.
-	ipNets []*net.IPNet
-
-	vcCache *vc.VCCache
-
-	wg     sync.WaitGroup
-	mu     sync.Mutex
-	closed bool
-
-	dc vc.DischargeClient
+	mu      sync.Mutex
+	closed  bool
+	closech chan struct{}
 }
 
 var _ rpc.Client = (*client)(nil)
 
-func DeprecatedNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func DeprecatedNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) rpc.Client {
 	c := &client{
 		streamMgr: streamMgr,
 		ns:        ns,
 		vcCache:   vc.NewVCCache(),
+		closech:   make(chan struct{}),
 	}
-	ipNets, err := ipNetworks()
-	if err != nil {
-		return nil, err
-	}
-	c.ipNets = ipNets
 	c.dc = InternalNewDischargeClient(nil, c, 0)
 	for _, opt := range opts {
 		// Collect all client opts that are also vc opts.
@@ -114,8 +103,7 @@
 			c.preferredProtocols = v
 		}
 	}
-
-	return c, nil
+	return c
 }
 
 func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
@@ -446,7 +434,7 @@
 			return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
 		}
 		// An empty set of protocols means all protocols...
-		if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
+		if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols); err != nil {
 			return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
 		}
 	}
@@ -752,7 +740,10 @@
 func (c *client) Close() {
 	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	c.mu.Lock()
-	c.closed = true
+	if !c.closed {
+		c.closed = true
+		close(c.closech)
+	}
 	c.mu.Unlock()
 	for _, v := range c.vcCache.Close() {
 		c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
@@ -760,6 +751,10 @@
 	c.wg.Wait()
 }
 
+func (c *client) Closed() <-chan struct{} {
+	return c.closech
+}
+
 // flowClient implements the RPC client-side protocol for a single RPC, over a
 // flow that's already connected to the server.
 type flowClient struct {
diff --git a/runtime/internal/rpc/errors.vdl b/runtime/internal/rpc/errors.vdl
index 8f24707..a57dafc 100644
--- a/runtime/internal/rpc/errors.vdl
+++ b/runtime/internal/rpc/errors.vdl
@@ -27,4 +27,7 @@
 	badAuth(suffix, method string, err error) {
 		"en": "not authorized to call {suffix}.{method}: {err}",
 	}
+	typeFlowFailure(err error) {
+		"en": "type flow could not be constructed{:err}",
+	}
 )
diff --git a/runtime/internal/rpc/errors.vdl.go b/runtime/internal/rpc/errors.vdl.go
index 2c543d7..949ba3b 100644
--- a/runtime/internal/rpc/errors.vdl.go
+++ b/runtime/internal/rpc/errors.vdl.go
@@ -23,6 +23,7 @@
 	errBadBlessingsCache = verror.Register("v.io/x/ref/runtime/internal/rpc.badBlessingsCache", verror.NoRetry, "{1:}{2:} failed to find blessings in cache: {3}")
 	errBadDischarge      = verror.Register("v.io/x/ref/runtime/internal/rpc.badDischarge", verror.NoRetry, "{1:}{2:} failed to decode discharge #{3}: {4}")
 	errBadAuth           = verror.Register("v.io/x/ref/runtime/internal/rpc.badAuth", verror.NoRetry, "{1:}{2:} not authorized to call {3}.{4}: {5}")
+	errTypeFlowFailure   = verror.Register("v.io/x/ref/runtime/internal/rpc.typeFlowFailure", verror.NoRetry, "{1:}{2:} type flow could not be constructed{:3}")
 )
 
 func init() {
@@ -33,6 +34,7 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(errBadBlessingsCache.ID), "{1:}{2:} failed to find blessings in cache: {3}")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(errBadDischarge.ID), "{1:}{2:} failed to decode discharge #{3}: {4}")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(errBadAuth.ID), "{1:}{2:} not authorized to call {3}.{4}: {5}")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(errTypeFlowFailure.ID), "{1:}{2:} type flow could not be constructed{:3}")
 }
 
 // newErrBadRequest returns an error with the errBadRequest ID.
@@ -69,3 +71,8 @@
 func newErrBadAuth(ctx *context.T, suffix string, method string, err error) error {
 	return verror.New(errBadAuth, ctx, suffix, method, err)
 }
+
+// newErrTypeFlowFailure returns an error with the errTypeFlowFailure ID.
+func newErrTypeFlowFailure(ctx *context.T, err error) error {
+	return verror.New(errTypeFlowFailure, ctx, err)
+}
diff --git a/runtime/internal/rpc/full_test.go b/runtime/internal/rpc/full_test.go
index 495c048..6dfe486 100644
--- a/runtime/internal/rpc/full_test.go
+++ b/runtime/internal/rpc/full_test.go
@@ -70,10 +70,7 @@
 }
 
 func testInternalNewServerWithPubsub(ctx *context.T, streamMgr stream.Manager, ns namespace.T, settingsPublisher *pubsub.Publisher, settingsStreamName string, opts ...rpc.ServerOpt) (DeprecatedServer, error) {
-	client, err := DeprecatedNewClient(streamMgr, ns)
-	if err != nil {
-		return nil, err
-	}
+	client := DeprecatedNewClient(streamMgr, ns)
 	return DeprecatedNewServer(ctx, streamMgr, ns, settingsPublisher, settingsStreamName, client, opts...)
 }
 
@@ -366,10 +363,7 @@
 	if server != nil {
 		b.ep, b.server = startServerWS(t, ctx, server, b.sm, b.ns, b.name, testServerDisp{ts}, shouldUseWebsocket)
 	}
-	var err error
-	if b.client, err = DeprecatedNewClient(b.sm, b.ns); err != nil {
-		t.Fatalf("DeprecatedNewClient failed: %v", err)
-	}
+	b.client = DeprecatedNewClient(b.sm, b.ns)
 	return
 }
 
@@ -502,10 +496,7 @@
 	if err := server.ServeDispatcher("mp/server", disp); err != nil {
 		t.Fatalf("server.Serve failed: %v", err)
 	}
-	client, err := DeprecatedNewClient(sm, ns)
-	if err != nil {
-		t.Fatalf("DeprecatedNewClient failed: %v", err)
-	}
+	client := DeprecatedNewClient(sm, ns)
 	// When using SecurityNone, all authorization checks should be skipped, so
 	// unauthorized methods should be callable.
 	var got string
@@ -535,10 +526,7 @@
 	if err := server.ServeDispatcher("mp/server", disp); err != nil {
 		t.Fatalf("server.Serve failed: %v", err)
 	}
-	client, err := DeprecatedNewClient(sm, ns)
-	if err != nil {
-		t.Fatalf("DeprecatedNewClient failed: %v", err)
-	}
+	client := DeprecatedNewClient(sm, ns)
 
 	// A call should fail if the principal in the ctx is nil and SecurityNone is not specified.
 	ctx, err = v23.WithPrincipal(ctx, nil)
@@ -583,12 +571,7 @@
 	runClient := func(server string) ([]string, error) {
 		smc := imanager.InternalNew(ctx, naming.FixedRoutingID(0xc))
 		defer smc.Shutdown()
-		client, err := DeprecatedNewClient(
-			smc,
-			ns)
-		if err != nil {
-			return nil, err
-		}
+		client := DeprecatedNewClient(smc, ns)
 		defer client.Close()
 		ctx, _ = v23.WithPrincipal(cctx, pclient)
 		call, err := client.StartCall(cctx, server, "Closure", nil)
@@ -653,10 +636,7 @@
 		}
 		smc := imanager.InternalNew(ctx, rid)
 		defer smc.Shutdown()
-		client, err := DeprecatedNewClient(smc, ns)
-		if err != nil {
-			t.Fatalf("failed to create client: %v", err)
-		}
+		client := DeprecatedNewClient(smc, ns)
 		defer client.Close()
 		var opts []rpc.CallOpt
 		if noDischarges {
@@ -718,10 +698,7 @@
 	}
 	sm := imanager.InternalNew(ctx, rid)
 
-	c, err := DeprecatedNewClient(sm, ns)
-	if err != nil {
-		t.Fatalf("failed to create client: %v", err)
-	}
+	c := DeprecatedNewClient(sm, ns)
 	dc := c.(*client).dc
 	tpcav2, err := security.NewPublicKeyCaveat(pdischarger2.PublicKey(), "mountpoint/discharger2", security.ThirdPartyRequirements{}, mkCaveat(security.NewExpiryCaveat(time.Now().Add(time.Hour))))
 	if err != nil {
@@ -768,11 +745,7 @@
 		}
 		smc := imanager.InternalNew(sctx, rid)
 		defer smc.Shutdown()
-		client, err := DeprecatedNewClient(smc, ns)
-		if err != nil {
-			t.Fatalf("failed to create client: %v", err)
-		}
-		return client
+		return DeprecatedNewClient(smc, ns)
 	}
 
 	runClient := func(client rpc.Client) {
@@ -847,14 +820,12 @@
 	defer runServer(t, sctx, ns, mountName, &testServer{}).Shutdown()
 
 	smc := imanager.InternalNew(sctx, naming.FixedRoutingID(0xc))
-	client, err := DeprecatedNewClient(smc, ns)
-	if err != nil {
-		t.Fatal(err)
-	}
+	client := DeprecatedNewClient(smc, ns)
 	defer smc.Shutdown()
 	defer client.Close()
 
 	// The call should succeed when the server presents the same public as the opt...
+	var err error
 	if _, err = client.StartCall(cctx, mountName, "Closure", nil, options.SkipServerEndpointAuthorization{}, options.ServerPublicKey{
 		PublicKey: pserver.PublicKey(),
 	}); err != nil {
@@ -918,10 +889,7 @@
 	}
 	smc := imanager.InternalNew(ctx, rid)
 	defer smc.Shutdown()
-	client, err := DeprecatedNewClient(smc, ns)
-	if err != nil {
-		t.Fatalf("failed to create client: %v", err)
-	}
+	client := DeprecatedNewClient(smc, ns)
 	defer client.Close()
 
 	dc := InternalNewDischargeClient(ctx, client, 0)
diff --git a/runtime/internal/rpc/resolve_internal_test.go b/runtime/internal/rpc/resolve_internal_test.go
index 98e9dba..56ebd6f 100644
--- a/runtime/internal/rpc/resolve_internal_test.go
+++ b/runtime/internal/rpc/resolve_internal_test.go
@@ -5,13 +5,14 @@
 package rpc
 
 import (
+	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/x/ref"
 )
 
-func InternalServerResolveToEndpoint(s rpc.Server, name string) (string, error) {
+func InternalServerResolveToEndpoint(ctx *context.T, s rpc.Server, name string) (string, error) {
 	if ref.RPCTransitionState() == ref.XServers {
-		ep, err := s.(*xserver).resolveToEndpoint(name)
+		ep, err := s.(*xserver).resolveToEndpoint(ctx, name)
 		if err != nil {
 			return "", err
 		}
diff --git a/runtime/internal/rpc/resolve_test.go b/runtime/internal/rpc/resolve_test.go
index be38047..455334b 100644
--- a/runtime/internal/rpc/resolve_test.go
+++ b/runtime/internal/rpc/resolve_test.go
@@ -141,7 +141,7 @@
 		{"unknown", "", notfound},
 	}
 	for _, tc := range testcases {
-		result, err := irpc.InternalServerResolveToEndpoint(server, tc.address)
+		result, err := irpc.InternalServerResolveToEndpoint(ctx, server, tc.address)
 		if (err == nil) != (tc.err == nil) {
 			t.Errorf("Unexpected err for %q. Got %v, expected %v", tc.address, err, tc.err)
 		}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 846ac32..ed23e5c 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -185,7 +185,6 @@
 	// We cache the IP networks on the device since it is not that cheap to read
 	// network interfaces through os syscall.
 	// TODO(jhahn): Add monitoring the network interface changes.
-	ipNets           []*net.IPNet
 	ns               namespace.T
 	servesMountTable bool
 	isLeaf           bool
@@ -273,11 +272,6 @@
 		dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
 		securityLevel         options.SecurityLevel
 	)
-	ipNets, err := ipNetworks()
-	if err != nil {
-		return nil, err
-	}
-	s.ipNets = ipNets
 
 	for _, opt := range opts {
 		switch opt := opt.(type) {
@@ -399,7 +393,7 @@
 		}}
 	}
 	// An empty set of protocols means all protocols...
-	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
+	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols); err != nil {
 		return "", err
 	}
 	for _, n := range resolved.Names() {
@@ -1023,6 +1017,10 @@
 	return nil
 }
 
+func (s *server) Closed() <-chan struct{} {
+	return s.ctx.Done()
+}
+
 // flowServer implements the RPC server-side protocol for a single RPC, over a
 // flow that's already connected to the client.
 type flowServer struct {
diff --git a/runtime/internal/rpc/sort_endpoints.go b/runtime/internal/rpc/sort_endpoints.go
index 3b5b896..c587027 100644
--- a/runtime/internal/rpc/sort_endpoints.go
+++ b/runtime/internal/rpc/sort_endpoints.go
@@ -82,7 +82,13 @@
 // will be used, but unlike the previous case, any servers that don't support
 // these protocols will be returned also, but following the default
 // preferences.
-func filterAndOrderServers(servers []naming.MountedServer, protocols []string, ipnets []*net.IPNet) ([]naming.MountedServer, error) {
+func filterAndOrderServers(servers []naming.MountedServer, protocols []string, ipnets ...*net.IPNet) ([]naming.MountedServer, error) {
+	if ipnets == nil {
+		var err error
+		if ipnets, err = ipNetworks(); err != nil {
+			return nil, err
+		}
+	}
 	var (
 		errs       = verror.SubErrs{}
 		list       = make(sortableServerList, 0, len(servers))
@@ -94,6 +100,7 @@
 	adderr := func(name string, err error) {
 		errs = append(errs, verror.SubErr{Name: "server=" + name, Err: err, Options: verror.Print})
 	}
+
 	for _, server := range servers {
 		name := server.Server
 		ep, err := name2endpoint(name)
diff --git a/runtime/internal/rpc/sort_internal_test.go b/runtime/internal/rpc/sort_internal_test.go
index 7600d2b..59edbcd 100644
--- a/runtime/internal/rpc/sort_internal_test.go
+++ b/runtime/internal/rpc/sort_internal_test.go
@@ -21,7 +21,7 @@
 func TestIncompatible(t *testing.T) {
 	servers := []naming.MountedServer{}
 
-	_, err := filterAndOrderServers(servers, []string{"tcp"}, nil)
+	_, err := filterAndOrderServers(servers, []string{"tcp"})
 	if err == nil || err.Error() != "failed to find any compatible servers" {
 		t.Errorf("expected a different error: %v", err)
 	}
@@ -31,7 +31,7 @@
 		servers = append(servers, naming.MountedServer{Server: name})
 	}
 
-	_, err = filterAndOrderServers(servers, []string{"foobar"}, nil)
+	_, err = filterAndOrderServers(servers, []string{"foobar"})
 	if err == nil || !strings.HasSuffix(err.Error(), "undesired protocol: tcp]") {
 		t.Errorf("expected a different error to: %v", err)
 	}
@@ -58,7 +58,7 @@
 		name := naming.JoinAddressName(naming.FormatEndpoint("tcp6", a), "")
 		servers = append(servers, naming.MountedServer{Server: name})
 	}
-	if _, err := filterAndOrderServers(servers, []string{"batman"}, ipnets); err == nil {
+	if _, err := filterAndOrderServers(servers, []string{"batman"}, ipnets...); err == nil {
 		t.Fatalf("expected an error")
 	}
 
@@ -77,7 +77,7 @@
 		"/@6@tcp4@127.0.0.2@@@@@@",
 		"/127.0.0.12:14141",
 	}
-	result, err := filterAndOrderServers(servers, []string{"foobar", "tcp4"}, ipnets)
+	result, err := filterAndOrderServers(servers, []string{"foobar", "tcp4"}, ipnets...)
 	if err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
@@ -101,14 +101,14 @@
 		"/@6@foobar@127.0.0.11@@@@@@",
 		"/127.0.0.12:14141",
 	}
-	if result, err = filterAndOrderServers(servers, nil, ipnets); err != nil {
+	if result, err = filterAndOrderServers(servers, nil, ipnets...); err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
 	if got := servers2names(result); !reflect.DeepEqual(got, want) {
 		t.Errorf("got: %v, want %v", got, want)
 	}
 
-	if result, err = filterAndOrderServers(servers, []string{}, ipnets); err != nil {
+	if result, err = filterAndOrderServers(servers, []string{}, ipnets...); err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
 	if got := servers2names(result); !reflect.DeepEqual(got, want) {
@@ -125,7 +125,7 @@
 		"/@6@tcp6@127.0.0.8@@@@@@",
 		"/127.0.0.12:14141",
 	}
-	if result, err = filterAndOrderServers(servers, []string{"tcp"}, ipnets); err != nil {
+	if result, err = filterAndOrderServers(servers, []string{"tcp"}, ipnets...); err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
 	if got := servers2names(result); !reflect.DeepEqual(got, want) {
@@ -154,7 +154,7 @@
 		name := naming.JoinAddressName(naming.FormatEndpoint("foobar", a), "")
 		servers = append(servers, naming.MountedServer{Server: name})
 	}
-	if result, err = filterAndOrderServers(servers, []string{}, ipnets); err != nil {
+	if result, err = filterAndOrderServers(servers, []string{}, ipnets...); err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
 	if got := servers2names(result); !reflect.DeepEqual(got, want) {
@@ -171,7 +171,7 @@
 		name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
 		servers = append(servers, naming.MountedServer{Server: name})
 	}
-	result, err := filterAndOrderServers(servers, []string{"tcp"}, ipnets)
+	result, err := filterAndOrderServers(servers, []string{"tcp"}, ipnets...)
 	if err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
@@ -190,7 +190,7 @@
 		servers = append(servers, naming.MountedServer{Server: name})
 	}
 
-	if result, err = filterAndOrderServers(servers, []string{"ws", "tcp"}, ipnets); err != nil {
+	if result, err = filterAndOrderServers(servers, []string{"ws", "tcp"}, ipnets...); err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
 	want = []string{
diff --git a/runtime/internal/rpc/test/cancel_test.go b/runtime/internal/rpc/test/cancel_test.go
index b363a33..42ef4c2 100644
--- a/runtime/internal/rpc/test/cancel_test.go
+++ b/runtime/internal/rpc/test/cancel_test.go
@@ -13,6 +13,8 @@
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/verror"
+	"v.io/x/ref"
+	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/rpc/stream/vc"
 	"v.io/x/ref/test"
 )
@@ -27,14 +29,18 @@
 func (c *canceld) Run(ctx *context.T, _ rpc.ServerCall) error {
 	close(c.started)
 	client := v23.GetClient(ctx)
-	ctx.Infof("Run: %s", c.child)
+	var done chan struct{}
 	if c.child != "" {
-		if _, err := client.StartCall(ctx, c.child, "Run", []interface{}{}); err != nil {
-			ctx.Error(err)
-			return err
-		}
+		done = make(chan struct{})
+		go func() {
+			client.Call(ctx, c.child, "Run", nil, nil)
+			close(done)
+		}()
 	}
 	<-ctx.Done()
+	if done != nil {
+		<-done
+	}
 	close(c.canceled)
 	return nil
 }
@@ -50,7 +56,6 @@
 	if err != nil {
 		return nil, err
 	}
-	ctx.Infof("Serving: %q", name)
 	return c, nil
 }
 
@@ -70,20 +75,18 @@
 	}
 
 	ctx, cancel := context.WithCancel(ctx)
-	_, err = v23.GetClient(ctx).StartCall(ctx, "c1", "Run", []interface{}{})
-	if err != nil {
-		t.Fatalf("can't call: ", err)
-	}
+	done := make(chan struct{})
+	go func() {
+		v23.GetClient(ctx).Call(ctx, "c1", "Run", nil, nil)
+		close(done)
+	}()
 
 	<-c1.started
 	<-c2.started
-
-	ctx.Info("cancelling initial call")
 	cancel()
-
-	ctx.Info("waiting for children to be canceled")
 	<-c1.canceled
 	<-c2.canceled
+	<-done
 }
 
 type cancelTestServer struct {
@@ -141,11 +144,13 @@
 		t.Fatal(err)
 	}
 	cctx, cancel := context.WithCancel(cctx)
-	_, err = v23.GetClient(cctx).StartCall(cctx, "cancel", "CancelStreamReader", []interface{}{})
-	if err != nil {
-		t.Fatalf("Start call failed: %v", err)
-	}
+	done := make(chan struct{})
+	go func() {
+		v23.GetClient(cctx).Call(cctx, "cancel", "CancelStreamReader", nil, nil)
+		close(done)
+	}()
 	waitForCancel(t, ts, cancel)
+	<-done
 }
 
 // TestCancelWithFullBuffers tests that even if the writer has filled the buffers and
@@ -163,16 +168,25 @@
 		t.Fatal(err)
 	}
 	cctx, cancel := context.WithCancel(cctx)
-	call, err := v23.GetClient(cctx).StartCall(cctx, "cancel", "CancelStreamIgnorer", []interface{}{})
+	call, err := v23.GetClient(cctx).StartCall(cctx, "cancel", "CancelStreamIgnorer", nil)
 	if err != nil {
 		t.Fatalf("Start call failed: %v", err)
 	}
 
 	// Fill up all the write buffers to ensure that cancelling works even when the stream
 	// is blocked.
-	// TODO(mattr): Update for new RPC system.
-	call.Send(make([]byte, vc.MaxSharedBytes))
-	call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
+	if ref.RPCTransitionState() >= ref.XServers {
+		call.Send(conn.DefaultBytesBufferedPerFlow)
+	} else {
+		call.Send(make([]byte, vc.MaxSharedBytes))
+		call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
+	}
+	done := make(chan struct{})
+	go func() {
+		call.Finish()
+		close(done)
+	}()
 
 	waitForCancel(t, ts, cancel)
+	<-done
 }
diff --git a/runtime/internal/rpc/transitionclient.go b/runtime/internal/rpc/transitionclient.go
index ade2e3a..2516a43 100644
--- a/runtime/internal/rpc/transitionclient.go
+++ b/runtime/internal/rpc/transitionclient.go
@@ -6,7 +6,6 @@
 
 import (
 	"v.io/v23/context"
-	"v.io/v23/flow"
 	"v.io/v23/flow/message"
 	"v.io/v23/namespace"
 	"v.io/v23/rpc"
@@ -15,22 +14,24 @@
 )
 
 type transitionClient struct {
-	c, xc rpc.Client
+	c, xc   rpc.Client
+	closech chan struct{}
 }
 
 var _ = rpc.Client((*transitionClient)(nil))
 
-func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, flowMgr flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
-	var err error
-	ret := &transitionClient{}
-	if ret.xc, err = NewXClient(ctx, flowMgr, ns, opts...); err != nil {
-		return nil, err
+func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) rpc.Client {
+	tc := &transitionClient{
+		xc:      NewXClient(ctx, ns, opts...),
+		c:       DeprecatedNewClient(streamMgr, ns, opts...),
+		closech: make(chan struct{}),
 	}
-	if ret.c, err = DeprecatedNewClient(streamMgr, ns, opts...); err != nil {
-		ret.xc.Close()
-		return nil, err
-	}
-	return ret, nil
+	go func() {
+		<-tc.xc.Closed()
+		<-tc.c.Closed()
+		close(tc.closech)
+	}()
+	return tc
 }
 
 func (t *transitionClient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
@@ -50,6 +51,10 @@
 }
 
 func (t *transitionClient) Close() {
-	t.xc.Close()
 	t.c.Close()
+	<-t.xc.Closed()
+}
+
+func (t *transitionClient) Closed() <-chan struct{} {
+	return t.closech
 }
diff --git a/runtime/internal/rpc/typecache.go b/runtime/internal/rpc/typecache.go
index c734913..2fcb6bc 100644
--- a/runtime/internal/rpc/typecache.go
+++ b/runtime/internal/rpc/typecache.go
@@ -7,6 +7,7 @@
 import (
 	"sync"
 
+	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/vom"
 )
@@ -54,7 +55,7 @@
 	return
 }
 
-func (tc *typeCache) get(c flow.ManagedConn) (*vom.TypeEncoder, *vom.TypeDecoder) {
+func (tc *typeCache) get(ctx *context.T, c flow.ManagedConn) (*vom.TypeEncoder, *vom.TypeDecoder, error) {
 	tc.mu.Lock()
 	tce := tc.flows[c]
 	if tce == nil {
@@ -62,8 +63,14 @@
 		tc.flows[c] = tce
 	}
 	tc.mu.Unlock()
-	<-tce.ready
-	return tce.enc, tce.dec
+	select {
+	case <-c.Closed():
+		return nil, nil, newErrTypeFlowFailure(ctx, nil)
+	case <-ctx.Done():
+		return nil, nil, ctx.Err()
+	case <-tce.ready:
+	}
+	return tce.enc, tce.dec, nil
 }
 
 func (tc *typeCache) collect() {
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
index 0c4bf18..5568371 100644
--- a/runtime/internal/rpc/x_test.go
+++ b/runtime/internal/rpc/x_test.go
@@ -9,6 +9,7 @@
 
 	"v.io/v23"
 	"v.io/v23/context"
+	"v.io/v23/flow"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
@@ -26,21 +27,24 @@
 func TestXClientServer(t *testing.T) {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x1)))
+	var i uint64 = 1
+	ctx = fake.SetFlowManagerFactory(ctx, func(ctx *context.T) flow.Manager {
+		i++
+		return manager.New(ctx, naming.FixedRoutingID(i))
+	})
+	ctx = fake.SetClientFactory(ctx, func(ctx *context.T, o ...rpc.ClientOpt) rpc.Client {
+		return NewXClient(ctx, v23.GetNamespace(ctx), o...)
+	})
+
 	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
 		Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
 	})
-	_, err := NewServer(ctx, "server", &testService{}, nil, nil, "")
-	if err != nil {
-		t.Fatal(verror.DebugString(err))
-	}
-	ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x2)))
-	client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
+	_, _, err := WithNewServer(ctx, "server", &testService{}, nil, nil, "")
 	if err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
 	var result string
-	if err = client.Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
+	if err = v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
 	if want := "response:hello"; result != want {
@@ -57,21 +61,24 @@
 func TestXClientDispatchingServer(t *testing.T) {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x1)))
+	var i uint64 = 1
+	ctx = fake.SetFlowManagerFactory(ctx, func(ctx *context.T) flow.Manager {
+		i++
+		return manager.New(ctx, naming.FixedRoutingID(i))
+	})
+	ctx = fake.SetClientFactory(ctx, func(ctx *context.T, o ...rpc.ClientOpt) rpc.Client {
+		return NewXClient(ctx, v23.GetNamespace(ctx), o...)
+	})
+
 	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
 		Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
 	})
-	_, err := NewDispatchingServer(ctx, "server", &testDispatcher{}, nil, "")
-	if err != nil {
-		t.Fatal(verror.DebugString(err))
-	}
-	ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x2)))
-	client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
+	_, _, err := WithNewDispatchingServer(ctx, "server", &testDispatcher{}, nil, "")
 	if err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
 	var result string
-	if err = client.Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
+	if err = v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
 	if want := "response:hello"; result != want {
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 7e6c618..c96d14a 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -7,7 +7,6 @@
 import (
 	"fmt"
 	"io"
-	"net"
 	"sync"
 	"time"
 
@@ -27,6 +26,7 @@
 
 	"v.io/x/ref/lib/apilog"
 	slib "v.io/x/ref/lib/security"
+	"v.io/x/ref/runtime/internal/flow/manager"
 	inaming "v.io/x/ref/runtime/internal/naming"
 )
 
@@ -37,16 +37,17 @@
 	dischargeBuffer = time.Minute
 )
 
+type clientFlowManagerOpt struct {
+	mgr flow.Manager
+}
+
+func (clientFlowManagerOpt) RPCClientOpt() {}
+
 type xclient struct {
 	flowMgr            flow.Manager
 	ns                 namespace.T
 	preferredProtocols []string
-
-	// We cache the IP networks on the device since it is not that cheap to read
-	// network interfaces through os syscall.
-	// TODO(toddw): this can be removed since netstate now implements caching
-	// directly.
-	ipNets []*net.IPNet
+	ctx                *context.T
 
 	// typeCache maintains a cache of type encoders and decoders.
 	typeCache *typeCache
@@ -58,24 +59,37 @@
 
 var _ rpc.Client = (*xclient)(nil)
 
-func NewXClient(ctx *context.T, fm flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewXClient(ctx *context.T, ns namespace.T, opts ...rpc.ClientOpt) rpc.Client {
 	c := &xclient{
-		flowMgr:   fm,
+		ctx:       ctx,
 		ns:        ns,
 		typeCache: newTypeCache(),
 	}
-	ipNets, err := ipNetworks()
-	if err != nil {
-		return nil, err
-	}
-	c.ipNets = ipNets
+
 	for _, opt := range opts {
 		switch v := opt.(type) {
 		case PreferredProtocols:
 			c.preferredProtocols = v
+		case clientFlowManagerOpt:
+			c.flowMgr = v.mgr
 		}
 	}
-	return c, nil
+	if c.flowMgr == nil {
+		c.flowMgr = manager.New(ctx, naming.NullRoutingID)
+	}
+
+	go func() {
+		<-ctx.Done()
+		c.mu.Lock()
+		// TODO(mattr): Do we really need c.closed?
+		c.closed = true
+		c.mu.Unlock()
+
+		<-c.flowMgr.Closed()
+		c.wg.Wait()
+	}()
+
+	return c
 }
 
 func (c *xclient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
@@ -173,26 +187,43 @@
 		return
 	}
 	bfp := blessingsForPeer{auth, method, suffix, args}.run
-	if status.flow, err = c.flowMgr.Dial(ctx, ep, bfp); err != nil {
+	flow, err := c.flowMgr.Dial(ctx, ep, bfp)
+	if err != nil {
 		ctx.VI(2).Infof("rpc: failed to create Flow with %v: %v", server, err)
 		status.serverErr = suberr(err)
 		return
 	}
-	if write := c.typeCache.writer(status.flow.Conn()); write != nil {
-		if tflow, err := c.flowMgr.Dial(ctx, ep, bfp); err != nil {
-			ctx.VI(2).Infof("rpc: failed to create type Flow with %v: %v", server, err)
-			status.serverErr = suberr(err)
+	if write := c.typeCache.writer(flow.Conn()); write != nil {
+		// Create a type flow, note that we use c.ctx instead of ctx.
+		// This is because type flows have a longer lifetime than the
+		// main flow being constructed.
+		tflow, err := c.flowMgr.Dial(c.ctx, ep, bfp)
+		if err != nil {
+			status.serverErr = suberr(newErrTypeFlowFailure(ctx, err))
+			flow.Close()
 			return
-		} else if _, err = tflow.Write([]byte{typeFlow}); err != nil {
-			ctx.VI(2).Infof("rpc: Failed to write type byte. %v: %v", server, err)
-			tflow.Close()
-			status.serverErr = suberr(err)
-			return
-		} else {
-			write(tflow)
 		}
+		if tflow.Conn() != flow.Conn() {
+			status.serverErr = suberr(newErrTypeFlowFailure(ctx, nil))
+			flow.Close()
+			tflow.Close()
+			return
+		}
+		if _, err = tflow.Write([]byte{typeFlow}); err != nil {
+			status.serverErr = suberr(newErrTypeFlowFailure(ctx, nil))
+			flow.Close()
+			tflow.Close()
+			return
+		}
+		write(tflow)
 	}
-	status.typeEnc, status.typeDec = c.typeCache.get(status.flow.Conn())
+	status.typeEnc, status.typeDec, err = c.typeCache.get(ctx, flow.Conn())
+	if err != nil {
+		status.serverErr = suberr(newErrTypeFlowFailure(ctx, err))
+		flow.Close()
+		return
+	}
+	status.flow = flow
 }
 
 type blessingsForPeer struct {
@@ -259,7 +290,7 @@
 		// This should never happen.
 		return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
 	}
-	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
+	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols); err != nil {
 		return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
 	}
 
@@ -306,7 +337,6 @@
 				}
 			}
 		case <-ctx.Done():
-			ctx.VI(2).Infof("rpc: timeout on connection to server %v ", name)
 			_, _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
 			if verror.ErrorID(err) != verror.ErrTimeout.ID {
 				return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err)
@@ -446,12 +476,11 @@
 }
 
 func (c *xclient) Close() {
-	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	c.mu.Lock()
-	c.closed = true
-	c.mu.Unlock()
-	// TODO(toddw): Implement this!
-	c.wg.Wait()
+	panic("this method is deprecated.")
+}
+
+func (c *xclient) Closed() <-chan struct{} {
+	return c.flowMgr.Closed()
 }
 
 // flowXClient implements the RPC client-side protocol for a single RPC, over a
@@ -705,6 +734,7 @@
 			berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
 			return fc.close(berr)
 		}
+
 		// The response header must indicate the streaming results have ended.
 		if fc.response.Error == nil && !fc.response.EndStreamResults {
 			berr := verror.New(errRemainingStreamResults, fc.ctx)
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 7266ab0..f2bdbac 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -19,7 +19,6 @@
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/i18n"
-	"v.io/v23/namespace"
 	"v.io/v23/naming"
 	"v.io/v23/options"
 	"v.io/v23/rpc"
@@ -63,50 +62,46 @@
 	disp               rpc.Dispatcher // dispatcher to serve RPCs
 	dispReserved       rpc.Dispatcher // dispatcher for reserved methods
 	active             sync.WaitGroup // active goroutines we've spawned.
-	stoppedChan        chan struct{}  // closed when the server has been stopped.
 	preferredProtocols []string       // protocols to use when resolving proxy name to endpoint.
-	// We cache the IP networks on the device since it is not that cheap to read
-	// network interfaces through os syscall.
-	// TODO(jhahn): Add monitoring the network interface changes.
-	ipNets           []*net.IPNet
-	ns               namespace.T
-	servesMountTable bool
-	isLeaf           bool
+	servesMountTable   bool
+	isLeaf             bool
 
 	// TODO(cnicolaou): add roaming stats to rpcStats
 	stats *rpcStats // stats for this server.
 }
 
-func NewServer(ctx *context.T, name string, object interface{}, authorizer security.Authorizer, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func WithNewServer(ctx *context.T,
+	name string, object interface{}, authorizer security.Authorizer,
+	settingsPublisher *pubsub.Publisher, settingsName string,
+	opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	if object == nil {
-		return nil, verror.New(verror.ErrBadArg, ctx, "nil object")
+		return ctx, nil, verror.New(verror.ErrBadArg, ctx, "nil object")
 	}
 	invoker, err := objectToInvoker(object)
 	if err != nil {
-		return nil, verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("bad object: %v", err))
+		return ctx, nil, verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("bad object: %v", err))
 	}
 	d := &leafDispatcher{invoker, authorizer}
 	opts = append([]rpc.ServerOpt{options.IsLeaf(true)}, opts...)
-	return NewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...)
+	return WithNewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...)
 }
 
-func NewDispatchingServer(ctx *context.T, name string, dispatcher rpc.Dispatcher, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func WithNewDispatchingServer(ctx *context.T,
+	name string, dispatcher rpc.Dispatcher,
+	settingsPublisher *pubsub.Publisher, settingsName string,
+	opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	if dispatcher == nil {
-		return nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher")
+		return ctx, nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher")
 	}
-	ctx, cancel := context.WithRootCancel(ctx)
-	flowMgr := v23.ExperimentalGetFlowManager(ctx)
-	ns, principal := v23.GetNamespace(ctx), v23.GetPrincipal(ctx)
-	statsPrefix := naming.Join("rpc", "server", "routing-id", flowMgr.RoutingID().String())
+
+	rootCtx, cancel := context.WithRootCancel(ctx)
+	fm, err := v23.NewFlowManager(rootCtx)
+
+	statsPrefix := naming.Join("rpc", "server", "routing-id", fm.RoutingID().String())
 	s := &xserver{
-		ctx:               ctx,
 		cancel:            cancel,
-		flowMgr:           flowMgr,
-		principal:         principal,
-		blessings:         principal.BlessingStore().Default(),
-		publisher:         publisher.New(ctx, ns, publishPeriod),
-		stoppedChan:       make(chan struct{}),
-		ns:                ns,
+		flowMgr:           fm,
+		blessings:         v23.GetPrincipal(rootCtx).BlessingStore().Default(),
 		stats:             newRPCStats(statsPrefix),
 		settingsPublisher: settingsPublisher,
 		settingsName:      settingsName,
@@ -114,12 +109,6 @@
 		typeCache:         newTypeCache(),
 		proxyEndpoints:    make(map[string]map[string]*inaming.Endpoint),
 	}
-	ipNets, err := ipNetworks()
-	if err != nil {
-		return nil, err
-	}
-	s.ipNets = ipNets
-
 	for _, opt := range opts {
 		switch opt := opt.(type) {
 		case options.ServesMountTable:
@@ -132,19 +121,25 @@
 			s.preferredProtocols = []string(opt)
 		}
 	}
-
-	blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
-	// TODO(caprita): revist printing the blessings with %s, and
-	// instead expose them as a list.
-	stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", s.blessings))
-	stats.NewStringFunc(blessingsStatsName, func() string {
-		return fmt.Sprintf("%s (default)", s.principal.BlessingStore().Default())
-	})
-	if err = s.listen(ctx, v23.GetListenSpec(ctx)); err != nil {
-		s.Stop()
-		return nil, err
+	rootCtx, _, err = v23.WithNewClient(rootCtx,
+		clientFlowManagerOpt{fm},
+		PreferredProtocols(s.preferredProtocols))
+	if err != nil {
+		cancel()
+		return ctx, nil, err
 	}
+	s.ctx = rootCtx
+	s.publisher = publisher.New(rootCtx, v23.GetNamespace(rootCtx), publishPeriod)
+
+	// TODO(caprita): revist printing the blessings with string, and
+	// instead expose them as a list.
+	blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
+	stats.NewString(blessingsStatsName).Set(s.blessings.String())
+
+	s.listen(rootCtx, v23.GetListenSpec(rootCtx))
 	if len(name) > 0 {
+		// TODO(mattr): We only call AddServer here, but if someone calls AddName
+		// later there will be no servers?
 		s.mu.Lock()
 		for k, _ := range s.chosenEndpoints {
 			s.publisher.AddServer(k)
@@ -153,7 +148,66 @@
 		s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
 		vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
 	}
-	return s, nil
+
+	go func() {
+		<-ctx.Done()
+		serverDebug := fmt.Sprintf("Dispatcher: %T, Status:[%v]", s.disp, s.Status())
+		s.ctx.VI(1).Infof("Stop: %s", serverDebug)
+		defer s.ctx.VI(1).Infof("Stop done: %s", serverDebug)
+
+		s.stats.stop()
+		s.publisher.Stop()
+
+		done := make(chan struct{})
+		go func() {
+			s.flowMgr.StopListening(ctx)
+			s.publisher.WaitForStop()
+			// At this point no new flows should arrive.  Wait for existing calls
+			// to complete.
+			s.active.Wait()
+			close(done)
+		}()
+
+		s.Lock()
+		// TODO(mattr): I don't understand what this is.
+		if dhcp := s.dhcpState; dhcp != nil {
+			// TODO(cnicolaou,caprita): investigate not having to close and drain
+			// the channel here. It's a little awkward right now since we have to
+			// be careful to not close the channel in two places, i.e. here and
+			// and from the publisher's Shutdown method.
+			if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
+			drain:
+				for {
+					select {
+					case v := <-dhcp.ch:
+						if v == nil {
+							break drain
+						}
+					default:
+						close(dhcp.ch)
+						break drain
+					}
+				}
+			}
+		}
+		s.Unlock()
+
+		select {
+		case <-done:
+		case <-time.After(5 * time.Second): // TODO(mattr): This should be configurable.
+			s.ctx.Errorf("%s: Timedout waiting for active requests to complete", serverDebug)
+		}
+		// Now we cancel the root context which closes all the connections
+		// in the flow manager and cancels all the contexts used by
+		// ongoing requests.  Hopefully this will bring all outstanding
+		// operations to a close.
+		s.cancel()
+		<-s.flowMgr.Closed()
+		// Now we really will wait forever.  If this doesn't exit, there's something
+		// wrong with the users code.
+		<-done
+	}()
+	return rootCtx, s, nil
 }
 
 func (s *xserver) Status() rpc.ServerStatus {
@@ -185,11 +239,12 @@
 }
 
 // resolveToEndpoint resolves an object name or address to an endpoint.
-func (s *xserver) resolveToEndpoint(address string) (naming.Endpoint, error) {
+func (s *xserver) resolveToEndpoint(ctx *context.T, address string) (naming.Endpoint, error) {
 	var resolved *naming.MountEntry
 	var err error
-	if s.ns != nil {
-		if resolved, err = s.ns.Resolve(s.ctx, address); err != nil {
+	// TODO(mattr): Why should ns be nil?
+	if ns := v23.GetNamespace(ctx); ns != nil {
+		if resolved, err = ns.Resolve(ctx, address); err != nil {
 			return nil, err
 		}
 	} else {
@@ -199,7 +254,7 @@
 		}}
 	}
 	// An empty set of protocols means all protocols...
-	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
+	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols); err != nil {
 		return nil, err
 	}
 	for _, n := range resolved.Names() {
@@ -295,13 +350,14 @@
 	return ret
 }
 
-func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) error {
+func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) {
 	s.Lock()
 	defer s.Unlock()
 	var lastErr error
+	var ep naming.Endpoint
 	if len(listenSpec.Proxy) > 0 {
-		var ep naming.Endpoint
-		if ep, lastErr = s.resolveToEndpoint(listenSpec.Proxy); lastErr != nil {
+		ep, lastErr = s.resolveToEndpoint(ctx, listenSpec.Proxy)
+		if lastErr != nil {
 			s.ctx.VI(2).Infof("resolveToEndpoint(%q) failed: %v", listenSpec.Proxy, lastErr)
 		} else {
 			lastErr = s.flowMgr.ProxyListen(ctx, ep, s.update(ep))
@@ -320,7 +376,6 @@
 	}
 
 	leps := s.flowMgr.ListeningEndpoints()
-
 	s.addressChooser = listenSpec.AddressChooser
 	roaming := false
 	chosenEps := make(map[string]*inaming.Endpoint)
@@ -346,7 +401,6 @@
 
 	s.active.Add(1)
 	go s.acceptLoop(ctx)
-	return nil
 }
 
 func (s *xserver) acceptLoop(ctx *context.T) error {
@@ -389,12 +443,9 @@
 					}
 				}
 			case typeFlow:
-				write := s.typeCache.writer(fl.Conn())
-				if write == nil {
-					s.ctx.VI(1).Infof("ignoring duplicate type flow.")
-					return
+				if write := s.typeCache.writer(fl.Conn()); write != nil {
+					write(fl)
 				}
-				write(fl)
 			}
 		}(fl)
 	}
@@ -422,105 +473,23 @@
 
 func (s *xserver) Stop() error {
 	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	panic("unimplemented")
+}
 
-	serverDebug := fmt.Sprintf("Dispatcher: %T, Status:[%v]", s.disp, s.Status())
-	s.ctx.VI(1).Infof("Stop: %s", serverDebug)
-	defer s.ctx.VI(1).Infof("Stop done: %s", serverDebug)
-
-	s.Lock()
-	if s.disp == nil {
-		s.Unlock()
-		return nil
-	}
-	s.disp = nil
-	close(s.stoppedChan)
-	s.Unlock()
-
-	// Delete the stats object.
-	s.stats.stop()
-
-	// Note, It's safe to Stop/WaitForStop on the publisher outside of the
-	// server lock, since publisher is safe for concurrent access.
-	// Stop the publisher, which triggers unmounting of published names.
-	s.publisher.Stop()
-
-	// Wait for the publisher to be done unmounting before we can proceed to
-	// close the listeners (to minimize the number of mounted names pointing
-	// to endpoint that are no longer serving).
-	//
-	// TODO(caprita): See if make sense to fail fast on rejecting
-	// connections once listeners are closed, and parallelize the publisher
-	// and listener shutdown.
-	s.publisher.WaitForStop()
-
-	s.Lock()
-
-	// TODO(mattr): What should we do when we stop a server now?  We need to
-	// interrupt Accept at some point, but it's weird to stop the flowmanager.
-	// Close all listeners.  No new flows will be accepted, while in-flight
-	// flows will continue until they terminate naturally.
-	// nListeners := len(s.listeners)
-	// errCh := make(chan error, nListeners)
-	// for ln, _ := range s.listeners {
-	// 	go func(ln stream.Listener) {
-	// 		errCh <- ln.Close()
-	// 	}(ln)
-	// }
-
-	if dhcp := s.dhcpState; dhcp != nil {
-		// TODO(cnicolaou,caprita): investigate not having to close and drain
-		// the channel here. It's a little awkward right now since we have to
-		// be careful to not close the channel in two places, i.e. here and
-		// and from the publisher's Shutdown method.
-		if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
-		drain:
-			for {
-				select {
-				case v := <-dhcp.ch:
-					if v == nil {
-						break drain
-					}
-				default:
-					close(dhcp.ch)
-					break drain
-				}
-			}
-		}
-	}
-
-	s.Unlock()
-
-	// At this point, we are guaranteed that no new requests are going to be
-	// accepted.
-
-	// Wait for the publisher and active listener + flows to finish.
-	done := make(chan struct{}, 1)
-	go func() { s.active.Wait(); done <- struct{}{} }()
-
-	select {
-	case <-done:
-	case <-time.After(5 * time.Second):
-		s.ctx.Errorf("%s: Timedout waiting for goroutines to stop", serverDebug)
-		// TODO(mattr): This doesn't make sense, shouldn't we not wait after timing out?
-		<-done
-		s.ctx.Infof("%s: Done waiting.", serverDebug)
-	}
-
-	s.cancel()
-	return nil
+func (s *xserver) Closed() <-chan struct{} {
+	return s.ctx.Done()
 }
 
 // flowServer implements the RPC server-side protocol for a single RPC, over a
 // flow that's already connected to the client.
 type xflowServer struct {
-	ctx    *context.T     // context associated with the RPC
 	server *xserver       // rpc.Server that this flow server belongs to
 	disp   rpc.Dispatcher // rpc.Dispatcher that will serve RPCs on this flow
-	dec    *vom.Decoder   // to decode requests and args from the client
-	enc    *vom.Encoder   // to encode responses and results to the client
 	flow   flow.Flow      // underlying flow
 
 	// Fields filled in during the server invocation.
+	dec              *vom.Decoder // to decode requests and args from the client
+	enc              *vom.Encoder // to encode responses and results to the client
 	grantedBlessings security.Blessings
 	method, suffix   string
 	tags             []*vdl.Value
@@ -535,17 +504,10 @@
 )
 
 func newXFlowServer(flow flow.Flow, server *xserver) (*xflowServer, error) {
-	server.Lock()
-	disp := server.disp
-	server.Unlock()
-	typeEnc, typeDec := server.typeCache.get(flow.Conn())
 	fs := &xflowServer{
-		ctx:        server.ctx,
 		server:     server,
-		disp:       disp,
+		disp:       server.disp,
 		flow:       flow,
-		enc:        vom.NewEncoderWithTypeEncoder(flow, typeEnc),
-		dec:        vom.NewDecoderWithTypeDecoder(flow, typeDec),
 		discharges: make(map[string]security.Discharge),
 	}
 	return fs, nil
@@ -566,20 +528,19 @@
 	if fs.server.dispReserved != nil {
 		_, auth, _ = fs.server.dispReserved.Lookup(ctx, params.Suffix)
 	}
-	return authorize(fs.ctx, security.NewCall(params), auth)
+	return authorize(ctx, security.NewCall(params), auth)
 }
 
 func (fs *xflowServer) serve() error {
 	defer fs.flow.Close()
 
-	results, err := fs.processRequest()
-
-	vtrace.GetSpan(fs.ctx).Finish()
+	ctx, results, err := fs.processRequest()
+	vtrace.GetSpan(ctx).Finish()
 
 	var traceResponse vtrace.Response
 	// Check if the caller is permitted to view vtrace data.
-	if fs.authorizeVtrace(fs.ctx) == nil {
-		traceResponse = vtrace.GetResponse(fs.ctx)
+	if fs.authorizeVtrace(ctx) == nil {
+		traceResponse = vtrace.GetResponse(ctx)
 	}
 
 	// Respond to the client with the response header and positional results.
@@ -593,7 +554,7 @@
 		if err == io.EOF {
 			return err
 		}
-		return verror.New(errResponseEncoding, fs.ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
+		return verror.New(errResponseEncoding, ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
 	}
 	if response.Error != nil {
 		return response.Error
@@ -603,7 +564,7 @@
 			if err == io.EOF {
 				return err
 			}
-			return verror.New(errResultEncoding, fs.ctx, ix, fmt.Sprintf("%T=%v", res, res), err)
+			return verror.New(errResultEncoding, ctx, ix, fmt.Sprintf("%T=%v", res, res), err)
 		}
 	}
 	// TODO(ashankar): Should unread data from the flow be drained?
@@ -622,69 +583,67 @@
 	return nil
 }
 
-func (fs *xflowServer) readRPCRequest() (*rpc.Request, error) {
-	// TODO(toddw): How do we set the initial timeout?  It might be shorter than
-	// the timeout we set later, which we learn after we've decoded the request.
-	/*
-		// Set a default timeout before reading from the flow. Without this timeout,
-		// a client that sends no request or a partial request will retain the flow
-		// indefinitely (and lock up server resources).
-		initTimer := newTimer(defaultCallTimeout)
-		defer initTimer.Stop()
-		fs.flow.SetDeadline(initTimer.C)
-	*/
-
+func (fs *xflowServer) readRPCRequest(ctx *context.T) (*rpc.Request, error) {
 	// Decode the initial request.
 	var req rpc.Request
 	if err := fs.dec.Decode(&req); err != nil {
-		return nil, verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadRequest(fs.ctx, err))
+		return nil, verror.New(verror.ErrBadProtocol, ctx, newErrBadRequest(ctx, err))
 	}
 	return &req, nil
 }
 
-func (fs *xflowServer) processRequest() ([]interface{}, error) {
+func (fs *xflowServer) processRequest() (*context.T, []interface{}, error) {
 	fs.starttime = time.Now()
-	req, err := fs.readRPCRequest()
+
+	// Set an initial deadline on the flow to ensure that we don't wait forever
+	// for the initial read.
+	ctx := fs.flow.SetDeadlineContext(fs.server.ctx, time.Now().Add(defaultCallTimeout))
+
+	typeEnc, typeDec, err := fs.server.typeCache.get(ctx, fs.flow.Conn())
+	if err != nil {
+		return ctx, nil, err
+	}
+	fs.enc = vom.NewEncoderWithTypeEncoder(fs.flow, typeEnc)
+	fs.dec = vom.NewDecoderWithTypeDecoder(fs.flow, typeDec)
+
+	req, err := fs.readRPCRequest(ctx)
 	if err != nil {
 		// We don't know what the rpc call was supposed to be, but we'll create
 		// a placeholder span so we can capture annotations.
-		fs.ctx, _ = vtrace.WithNewSpan(fs.ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
-		return nil, err
+		// TODO(mattr): I'm not sure this makes sense anymore, but I'll revisit it
+		// when I'm doing another round of vtrace next quarter.
+		ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
+		return ctx, nil, err
 	}
+
+	// Start building up a new context for the request now that we know
+	// the header information.
+	ctx = fs.server.ctx
+
 	// We must call fs.drainDecoderArgs for any error that occurs
 	// after this point, and before we actually decode the arguments.
 	fs.method = req.Method
 	fs.suffix = strings.TrimLeft(req.Suffix, "/")
-
 	if req.Language != "" {
-		fs.ctx = i18n.WithLangID(fs.ctx, i18n.LangID(req.Language))
+		ctx = i18n.WithLangID(ctx, i18n.LangID(req.Language))
 	}
 
 	// TODO(mattr): Currently this allows users to trigger trace collection
 	// on the server even if they will not be allowed to collect the
 	// results later.  This might be considered a DOS vector.
 	spanName := fmt.Sprintf("\"%s\".%s", fs.suffix, fs.method)
-	fs.ctx, _ = vtrace.WithContinuedTrace(fs.ctx, spanName, req.TraceRequest)
+	ctx, _ = vtrace.WithContinuedTrace(ctx, spanName, req.TraceRequest)
+	ctx = fs.flow.SetDeadlineContext(ctx, req.Deadline.Time)
 
-	var cancel context.CancelFunc
-	if !req.Deadline.IsZero() {
-		fs.ctx, cancel = context.WithDeadline(fs.ctx, req.Deadline.Time)
-	} else {
-		fs.ctx, cancel = context.WithCancel(fs.ctx)
-	}
-	fs.flow.SetContext(fs.ctx)
-	// TODO(toddw): Explicitly cancel the context when the flow is done.
-	_ = cancel
-
-	if err := fs.readGrantedBlessings(req); err != nil {
+	if err := fs.readGrantedBlessings(ctx, req); err != nil {
 		fs.drainDecoderArgs(int(req.NumPosArgs))
-		return nil, err
+		return ctx, nil, err
 	}
 	// Lookup the invoker.
-	invoker, auth, err := fs.lookup(fs.suffix, fs.method)
+	invoker, auth, err := fs.lookup(ctx, fs.suffix, fs.method)
 	if err != nil {
 		fs.drainDecoderArgs(int(req.NumPosArgs))
-		return nil, err
+		return ctx, nil, err
 	}
 
 	// Note that we strip the reserved prefix when calling the invoker so
@@ -694,31 +653,31 @@
 
 	// Prepare invoker and decode args.
 	numArgs := int(req.NumPosArgs)
-	argptrs, tags, err := invoker.Prepare(fs.ctx, strippedMethod, numArgs)
+	argptrs, tags, err := invoker.Prepare(ctx, strippedMethod, numArgs)
 	fs.tags = tags
 	if err != nil {
 		fs.drainDecoderArgs(numArgs)
-		return nil, err
+		return ctx, nil, err
 	}
 	if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
 		fs.drainDecoderArgs(numArgs)
-		return nil, newErrBadNumInputArgs(fs.ctx, fs.suffix, fs.method, called, want)
+		return ctx, nil, newErrBadNumInputArgs(ctx, fs.suffix, fs.method, called, want)
 	}
 	for ix, argptr := range argptrs {
 		if err := fs.dec.Decode(argptr); err != nil {
-			return nil, newErrBadInputArg(fs.ctx, fs.suffix, fs.method, uint64(ix), err)
+			return ctx, nil, newErrBadInputArg(ctx, fs.suffix, fs.method, uint64(ix), err)
 		}
 	}
 
 	// Check application's authorization policy.
-	if err := authorize(fs.ctx, fs, auth); err != nil {
-		return nil, err
+	if err := authorize(ctx, fs, auth); err != nil {
+		return ctx, nil, err
 	}
 
 	// Invoke the method.
-	results, err := invoker.Invoke(fs.ctx, fs, strippedMethod, argptrs)
+	results, err := invoker.Invoke(ctx, fs, strippedMethod, argptrs)
 	fs.server.stats.record(fs.method, time.Since(fs.starttime))
-	return results, err
+	return ctx, results, err
 }
 
 // drainDecoderArgs drains the next n arguments encoded onto the flows decoder.
@@ -741,7 +700,7 @@
 // with rpc.DebugKeyword, we use the internal debug dispatcher to look up the
 // invoker. Otherwise, and we use the server's dispatcher. The suffix and method
 // value may be modified to match the actual suffix and method to use.
-func (fs *xflowServer) lookup(suffix string, method string) (rpc.Invoker, security.Authorizer, error) {
+func (fs *xflowServer) lookup(ctx *context.T, suffix string, method string) (rpc.Invoker, security.Authorizer, error) {
 	if naming.IsReserved(method) {
 		return reservedInvoker(fs.disp, fs.server.dispReserved), security.AllowEveryone(), nil
 	}
@@ -749,26 +708,26 @@
 	if naming.IsReserved(suffix) {
 		disp = fs.server.dispReserved
 	} else if fs.server.isLeaf && suffix != "" {
-		innerErr := verror.New(errUnexpectedSuffix, fs.ctx, suffix)
-		return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix, innerErr)
+		innerErr := verror.New(errUnexpectedSuffix, ctx, suffix)
+		return nil, nil, verror.New(verror.ErrUnknownSuffix, ctx, suffix, innerErr)
 	}
 	if disp != nil {
-		obj, auth, err := disp.Lookup(fs.ctx, suffix)
+		obj, auth, err := disp.Lookup(ctx, suffix)
 		switch {
 		case err != nil:
 			return nil, nil, err
 		case obj != nil:
 			invoker, err := objectToInvoker(obj)
 			if err != nil {
-				return nil, nil, verror.New(verror.ErrInternal, fs.ctx, "invalid received object", err)
+				return nil, nil, verror.New(verror.ErrInternal, ctx, "invalid received object", err)
 			}
 			return invoker, auth, nil
 		}
 	}
-	return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix)
+	return nil, nil, verror.New(verror.ErrUnknownSuffix, ctx, suffix)
 }
 
-func (fs *xflowServer) readGrantedBlessings(req *rpc.Request) error {
+func (fs *xflowServer) readGrantedBlessings(ctx *context.T, req *rpc.Request) error {
 	if req.GrantedBlessings.IsZero() {
 		return nil
 	}
@@ -781,7 +740,8 @@
 	// this - should servers be able to assume that a blessing is something that
 	// does not have the authorizations that the server's own identity has?
 	if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
-		return verror.New(verror.ErrNoAccess, fs.ctx, verror.New(errBlessingsNotBound, fs.ctx, got, want))
+		return verror.New(verror.ErrNoAccess, ctx,
+			verror.New(errBlessingsNotBound, ctx, got, want))
 	}
 	fs.grantedBlessings = req.GrantedBlessings
 	return nil
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 77f07f9..fd3f075 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -54,7 +54,6 @@
 	backgroundKey
 	reservedNameKey
 	listenKey
-	flowManagerKey
 
 	// initKey is used to store values that are only set at init time.
 	initKey
@@ -169,12 +168,6 @@
 		return nil, nil, nil, err
 	}
 
-	// Add the flow.Manager to the context.
-	// This initial Flow Manager can only be used as a client.
-	ctx, _, err = r.setNewClientFlowManager(ctx)
-	if err != nil {
-		return nil, nil, nil, err
-	}
 	// Add the Client to the context.
 	ctx, _, err = r.WithNewClient(ctx)
 	if err != nil {
@@ -306,27 +299,6 @@
 	return sm, nil
 }
 
-func (r *Runtime) setNewClientFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
-	return r.setNewFlowManager(ctx, naming.NullRoutingID)
-}
-
-func (r *Runtime) setNewBidiFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
-	rid, err := naming.NewRoutingID()
-	if err != nil {
-		return nil, nil, err
-	}
-	return r.setNewFlowManager(ctx, rid)
-}
-
-func (r *Runtime) setNewFlowManager(ctx *context.T, rid naming.RoutingID) (*context.T, flow.Manager, error) {
-	fm := manager.New(ctx, rid)
-	if err := r.addChild(ctx, fm, func() { <-fm.Closed() }); err != nil {
-		return ctx, nil, err
-	}
-	newctx := context.WithValue(ctx, flowManagerKey, fm)
-	return newctx, fm, nil
-}
-
 func (r *Runtime) setNewStreamManager(ctx *context.T) (*context.T, error) {
 	sm, err := newStreamManager(ctx)
 	if err != nil {
@@ -381,14 +353,6 @@
 	if newctx, err = r.setNewStreamManager(newctx); err != nil {
 		return ctx, err
 	}
-	if rid := r.ExperimentalGetFlowManager(newctx).RoutingID(); rid == naming.NullRoutingID {
-		newctx, _, err = r.setNewClientFlowManager(newctx)
-	} else {
-		newctx, _, err = r.setNewBidiFlowManager(newctx)
-	}
-	if err != nil {
-		return ctx, err
-	}
 	if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
 		return ctx, err
 	}
@@ -412,35 +376,28 @@
 	p, _ := ctx.Value(principalKey).(security.Principal)
 	sm, _ := ctx.Value(streamManagerKey).(stream.Manager)
 	ns, _ := ctx.Value(namespaceKey).(namespace.T)
-	fm, _ := ctx.Value(flowManagerKey).(flow.Manager)
 	otherOpts = append(otherOpts, imanager.DialTimeout(5*time.Minute))
-
 	if id, _ := ctx.Value(initKey).(*initData); id.protocols != nil {
 		otherOpts = append(otherOpts, irpc.PreferredProtocols(id.protocols))
 	}
 	var client rpc.Client
-	var err error
 	deps := []interface{}{vtraceDependency{}}
 
-	if fm != nil && ref.RPCTransitionState() >= ref.XClients {
-		client, err = irpc.NewTransitionClient(ctx, sm, fm, ns, otherOpts...)
-		deps = append(deps, fm, sm)
-	} else {
-		client, err = irpc.DeprecatedNewClient(sm, ns, otherOpts...)
+	if ref.RPCTransitionState() >= ref.XClients {
+		client = irpc.NewTransitionClient(ctx, sm, ns, otherOpts...)
 		deps = append(deps, sm)
-	}
-
-	if err != nil {
-		return ctx, nil, err
+	} else {
+		client = irpc.DeprecatedNewClient(sm, ns, otherOpts...)
+		deps = append(deps, sm)
 	}
 	newctx := context.WithValue(ctx, clientKey, client)
 	if p != nil {
 		deps = append(deps, p)
 	}
-	if err = r.addChild(ctx, client, client.Close, deps...); err != nil {
+	if err := r.addChild(ctx, client, client.Close, deps...); err != nil {
 		return ctx, nil, err
 	}
-	return newctx, client, err
+	return newctx, client, nil
 }
 
 func (*Runtime) GetClient(ctx *context.T) rpc.Client {
@@ -539,33 +496,16 @@
 	return nil
 }
 
-func (*Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
-	// nologcall
-	if d, ok := ctx.Value(flowManagerKey).(flow.Manager); ok {
-		return d
-	}
-	return nil
-}
-
-func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+func (r *Runtime) NewFlowManager(ctx *context.T) (flow.Manager, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	newctx, m, err := r.setNewBidiFlowManager(ctx)
+	rid, err := naming.NewRoutingID()
 	if err != nil {
-		return ctx, nil, err
+		return nil, err
 	}
-	// Create a new client since it depends on the flow manager.
-	newctx, _, err = r.WithNewClient(newctx)
-	if err != nil {
-		return ctx, nil, err
-	}
-	return newctx, m, nil
+	return manager.New(ctx, rid), nil
 }
 
-func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*context.T, *pubsub.Publisher, string, []rpc.ServerOpt, error) {
-	newctx, _, err := r.ExperimentalWithNewFlowManager(ctx)
-	if err != nil {
-		return ctx, nil, "", nil, err
-	}
+func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*pubsub.Publisher, string, []rpc.ServerOpt, error) {
 	otherOpts := append([]rpc.ServerOpt{}, opts...)
 	if reservedDispatcher := r.GetReservedNameDispatcher(ctx); reservedDispatcher != nil {
 		otherOpts = append(otherOpts, irpc.ReservedNameDispatcher{
@@ -576,20 +516,21 @@
 	if id.protocols != nil {
 		otherOpts = append(otherOpts, irpc.PreferredServerResolveProtocols(id.protocols))
 	}
-	return newctx, id.settingsPublisher, id.settingsName, otherOpts, nil
+	return id.settingsPublisher, id.settingsName, otherOpts, nil
 }
 
 func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	if ref.RPCTransitionState() >= ref.XServers {
-		// TODO(mattr): Deal with shutdown deps.
-		newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+		spub, sname, opts, err := r.commonServerInit(ctx, opts...)
 		if err != nil {
 			return ctx, nil, err
 		}
-		s, err := irpc.NewServer(newctx, name, object, auth, spub, sname, opts...)
+		newctx, s, err := irpc.WithNewServer(ctx, name, object, auth, spub, sname, opts...)
 		if err != nil {
-			// TODO(mattr): Stop the flow manager.
+			return ctx, nil, err
+		}
+		if err = r.addChild(ctx, s, func() { <-s.Closed() }); err != nil {
 			return ctx, nil, err
 		}
 		return newctx, s, nil
@@ -612,14 +553,15 @@
 func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	if ref.RPCTransitionState() >= ref.XServers {
-		// TODO(mattr): Deal with shutdown deps.
-		newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+		spub, sname, opts, err := r.commonServerInit(ctx, opts...)
 		if err != nil {
 			return ctx, nil, err
 		}
-		s, err := irpc.NewDispatchingServer(newctx, name, disp, spub, sname, opts...)
+		newctx, s, err := irpc.WithNewDispatchingServer(ctx, name, disp, spub, sname, opts...)
 		if err != nil {
-			// TODO(mattr): Stop the flow manager.
+			return ctx, nil, err
+		}
+		if err = r.addChild(ctx, s, func() { <-s.Closed() }); err != nil {
 			return ctx, nil, err
 		}
 		return newctx, s, nil
diff --git a/runtime/internal/rt/runtime_test.go b/runtime/internal/rt/runtime_test.go
index da50a92..0a424e7 100644
--- a/runtime/internal/rt/runtime_test.go
+++ b/runtime/internal/rt/runtime_test.go
@@ -141,25 +141,12 @@
 	ctx, shutdown := test.V23Init()
 	defer shutdown()
 
-	oldman := v23.ExperimentalGetFlowManager(ctx)
-	if oldman == nil {
-		t.Error("ExperimentalGetFlowManager should have returned a non-nil value")
+	oldman, err := v23.NewFlowManager(ctx)
+	if err != nil || oldman == nil {
+		t.Error("NewFlowManager failed: %v, %v", oldman, err)
 	}
-	if rid := oldman.RoutingID(); rid != naming.NullRoutingID {
-		t.Errorf("Initial flow.Manager should have NullRoutingID, got %v", rid)
-	}
-	newctx, newman, err := v23.ExperimentalWithNewFlowManager(ctx)
+	newman, err := v23.NewFlowManager(ctx)
 	if err != nil || newman == nil || newman == oldman {
-		t.Fatalf("Could not create flow manager: %v", err)
-	}
-	if !newctx.Initialized() {
-		t.Fatal("Got uninitialized context.")
-	}
-	man := v23.ExperimentalGetFlowManager(newctx)
-	if man != newman || man == oldman {
-		t.Error("ExperimentalWithNewFlowManager didn't update the context properly")
-	}
-	if man.RoutingID() == naming.NullRoutingID {
-		t.Error("Newly created flow.Manager should not have NullRoutingID")
+		t.Fatalf("NewFlowManager failed: %v, %v", newman, err)
 	}
 }
diff --git a/services/wspr/internal/lib/signature_manager_test.go b/services/wspr/internal/lib/signature_manager_test.go
index 689df80..97240ab 100644
--- a/services/wspr/internal/lib/signature_manager_test.go
+++ b/services/wspr/internal/lib/signature_manager_test.go
@@ -11,6 +11,7 @@
 
 	"v.io/v23"
 	"v.io/v23/context"
+	"v.io/v23/rpc"
 	"v.io/v23/vdl"
 	"v.io/v23/vdlroot/signature"
 	"v.io/x/ref/runtime/factories/fake"
@@ -38,7 +39,9 @@
 			"__Signature": []interface{}{initialSig},
 		},
 	)
-	ctx = fake.SetClient(ctx, client)
+	ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
+		return client
+	})
 	return ctx, client, shutdown
 }
 
diff --git a/services/wspr/internal/lib/simple_client.go b/services/wspr/internal/lib/simple_client.go
index a46fd82..17d18ab 100644
--- a/services/wspr/internal/lib/simple_client.go
+++ b/services/wspr/internal/lib/simple_client.go
@@ -90,6 +90,10 @@
 func (*simpleMockClient) Close() {
 }
 
+func (*simpleMockClient) Closed() <-chan struct{} {
+	return nil
+}
+
 // mockCall implements rpc.ClientCall
 type mockCall struct {
 	mockStream
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index 4a08676..5343d26 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -89,11 +89,6 @@
 	if err != nil {
 		t.Fatal(err)
 	}
-	// Create a new flow manager for the client.
-	cctx, _, err := v23.ExperimentalWithNewFlowManager(ctx)
-	if err != nil {
-		t.Fatal(err)
-	}
 	// Wait for the server to finish listening through the proxy.
 	eps := s.Status().Endpoints
 	for ; len(eps) == 0 || eps[0].Addr().Network() == ""; eps = s.Status().Endpoints {
@@ -101,7 +96,7 @@
 	}
 
 	var got string
-	if err := v23.GetClient(cctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
+	if err := v23.GetClient(ctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
 		t.Fatal(err)
 	}
 	if want := "response:hello"; got != want {
@@ -114,30 +109,29 @@
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	kp := newKillProtocol()
 	flow.RegisterProtocol("kill", kp)
-	pctx, shutdown := v23.Init()
+	ctx, shutdown := v23.Init()
 	defer shutdown()
-	actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
+	am, err := v23.NewFlowManager(ctx)
 	if err != nil {
 		t.Fatal(err)
 	}
-	dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
+	dm, err := v23.NewFlowManager(ctx)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
+	pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
 
 	done := make(chan struct{})
 	update := func(eps []naming.Endpoint) {
 		if len(eps) > 0 {
-			if err := testEndToEndConnection(t, dctx, actx, dm, am, eps[0]); err != nil {
+			if err := testEndToEndConnection(t, ctx, dm, am, eps[0]); err != nil {
 				t.Error(err)
 			}
 			close(done)
 		}
 	}
-
-	if err := am.ProxyListen(actx, pep, update); err != nil {
+	if err := am.ProxyListen(ctx, pep, update); err != nil {
 		t.Fatal(err)
 	}
 	<-done
@@ -147,22 +141,22 @@
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	kp := newKillProtocol()
 	flow.RegisterProtocol("kill", kp)
-	pctx, shutdown := v23.Init()
+	ctx, shutdown := v23.Init()
 	defer shutdown()
-	actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
+	am, err := v23.NewFlowManager(ctx)
 	if err != nil {
 		t.Fatal(err)
 	}
-	dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
+	dm, err := v23.NewFlowManager(ctx)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
+	pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
 
-	p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
+	p2ep := startProxy(t, ctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
 
-	p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
+	p3ep := startProxy(t, ctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
 
 	done := make(chan struct{})
 	update := func(eps []naming.Endpoint) {
@@ -173,24 +167,24 @@
 		// to each other and to the server. For now we at least check a random endpoint so the
 		// test will at least fail over many runs if something is wrong.
 		if len(eps) > 0 {
-			if err := testEndToEndConnection(t, dctx, actx, dm, am, eps[rand.Int()%3]); err != nil {
+			if err := testEndToEndConnection(t, ctx, dm, am, eps[rand.Int()%3]); err != nil {
 				t.Error(err)
 			}
 			close(done)
 		}
 	}
 
-	if err := am.ProxyListen(actx, p3ep, update); err != nil {
+	if err := am.ProxyListen(ctx, p3ep, update); err != nil {
 		t.Fatal(err)
 	}
 
 	<-done
 }
 
-func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) error {
+func testEndToEndConnection(t *testing.T, ctx *context.T, dm, am flow.Manager, aep naming.Endpoint) error {
 	// The dialing flow.Manager dials a flow to the accepting flow.Manager.
 	want := "Do you read me?"
-	df, err := dm.Dial(dctx, aep, bfp)
+	df, err := dm.Dial(ctx, aep, bfp)
 	if err != nil {
 		return err
 	}
@@ -198,7 +192,7 @@
 	if err := writeLine(df, want); err != nil {
 		return err
 	}
-	af, err := am.Accept(actx)
+	af, err := am.Accept(ctx)
 	if err != nil {
 		return err
 	}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index 85d66f7..1ead875 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -25,7 +25,7 @@
 }
 
 func New(ctx *context.T) (*proxy, *context.T, error) {
-	ctx, mgr, err := v23.ExperimentalWithNewFlowManager(ctx)
+	mgr, err := v23.NewFlowManager(ctx)
 	if err != nil {
 		return nil, nil, err
 	}