ref/runtime/internal/flow: Move the conncache back into the manager package.

Change-Id: Id479ee2a219ee8b1cfb3626ad6094acf62b00c66
diff --git a/runtime/internal/flow/conn/close_test.go b/runtime/internal/flow/conn/close_test.go
index fd73c50..8e65b46 100644
--- a/runtime/internal/flow/conn/close_test.go
+++ b/runtime/internal/flow/conn/close_test.go
@@ -21,7 +21,7 @@
 	d.Close(ctx, fmt.Errorf("Closing randomly."))
 	<-d.Closed()
 	<-a.Closed()
-	if !w.isClosed() {
+	if !w.IsClosed() {
 		t.Errorf("The connection should be closed")
 	}
 }
@@ -33,7 +33,7 @@
 	a.Close(ctx, fmt.Errorf("Closing randomly."))
 	<-a.Closed()
 	<-d.Closed()
-	if !w.isClosed() {
+	if !w.IsClosed() {
 		t.Errorf("The connection should be closed")
 	}
 }
@@ -42,7 +42,7 @@
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 	d, a, w := setupConns(t, ctx, ctx, nil, nil)
-	w.close()
+	w.Close()
 	<-a.Closed()
 	<-d.Closed()
 }
diff --git a/runtime/internal/flow/conn/errors.vdl b/runtime/internal/flow/conn/errors.vdl
index 40e5c0f..7e18e71 100644
--- a/runtime/internal/flow/conn/errors.vdl
+++ b/runtime/internal/flow/conn/errors.vdl
@@ -21,12 +21,10 @@
   UnknownControlMsg(cmd byte) {"en": "unknown control command{:cmd}."}
   UnexpectedMsg(typ string) {"en": "unexpected message type{:typ}."}
   ConnectionClosed() {"en": "connection closed."}
-  ConnKilledToFreeResources() {"en": "Connection killed to free resources."}
   ConnClosedRemotely(msg string) {"en": "connection closed remotely{:msg}."}
   FlowClosedRemotely() {"en": "flow closed remotely."}
   Send(typ, dest string, err error) {"en": "failure sending {typ} message to {dest}{:err}."}
   Recv(src string, err error) {"en": "error reading from {src}{:err}"}
-  CacheClosed() {"en":"cache is closed"}
   CounterOverflow() {"en": "A remote process has sent more data than allowed."}
   BlessingsFlowClosed() {"en": "The blessings flow was closed."}
   InvalidChannelBinding() {"en": "The channel binding was invalid."}
diff --git a/runtime/internal/flow/conn/errors.vdl.go b/runtime/internal/flow/conn/errors.vdl.go
index ed75b03..f649ba4 100644
--- a/runtime/internal/flow/conn/errors.vdl.go
+++ b/runtime/internal/flow/conn/errors.vdl.go
@@ -15,27 +15,25 @@
 )
 
 var (
-	ErrInvalidMsg                = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
-	ErrInvalidControlMsg         = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd {3} and size {4} failed decoding at field {5}{:6}.")
-	ErrInvalidSetupOption        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidSetupOption", verror.NoRetry, "{1:}{2:} setup option{:3} failed decoding at field{:4}.")
-	ErrMissingSetupOption        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.MissingSetupOption", verror.NoRetry, "{1:}{2:} missing required setup option{:3}.")
-	ErrUnknownSetupOption        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownSetupOption", verror.NoRetry, "{1:}{2:} unknown setup option{:3}.")
-	ErrUnknownMsg                = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
-	ErrUnknownControlMsg         = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
-	ErrUnexpectedMsg             = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnexpectedMsg", verror.NoRetry, "{1:}{2:} unexpected message type{:3}.")
-	ErrConnectionClosed          = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnectionClosed", verror.NoRetry, "{1:}{2:} connection closed.")
-	ErrConnKilledToFreeResources = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnKilledToFreeResources", verror.NoRetry, "{1:}{2:} Connection killed to free resources.")
-	ErrConnClosedRemotely        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnClosedRemotely", verror.NoRetry, "{1:}{2:} connection closed remotely{:3}.")
-	ErrFlowClosedRemotely        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.FlowClosedRemotely", verror.NoRetry, "{1:}{2:} flow closed remotely.")
-	ErrSend                      = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Send", verror.NoRetry, "{1:}{2:} failure sending {3} message to {4}{:5}.")
-	ErrRecv                      = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Recv", verror.NoRetry, "{1:}{2:} error reading from {3}{:4}")
-	ErrCacheClosed               = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
-	ErrCounterOverflow           = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CounterOverflow", verror.NoRetry, "{1:}{2:} A remote process has sent more data than allowed.")
-	ErrBlessingsFlowClosed       = verror.Register("v.io/x/ref/runtime/internal/flow/conn.BlessingsFlowClosed", verror.NoRetry, "{1:}{2:} The blessings flow was closed.")
-	ErrInvalidChannelBinding     = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidChannelBinding", verror.NoRetry, "{1:}{2:} The channel binding was invalid.")
-	ErrNoPublicKey               = verror.Register("v.io/x/ref/runtime/internal/flow/conn.NoPublicKey", verror.NoRetry, "{1:}{2:} No public key was received by the remote end.")
-	ErrDialingNonServer          = verror.Register("v.io/x/ref/runtime/internal/flow/conn.DialingNonServer", verror.NoRetry, "{1:}{2:} You are attempting to dial on a connection with no remote server.")
-	ErrAcceptorBlessingsMissing  = verror.Register("v.io/x/ref/runtime/internal/flow/conn.AcceptorBlessingsMissing", verror.NoRetry, "{1:}{2:} The acceptor did not send blessings.")
+	ErrInvalidMsg               = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidMsg", verror.NoRetry, "{1:}{2:} message of type{:3} and size{:4} failed decoding at field{:5}.")
+	ErrInvalidControlMsg        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidControlMsg", verror.NoRetry, "{1:}{2:} control message of cmd {3} and size {4} failed decoding at field {5}{:6}.")
+	ErrInvalidSetupOption       = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidSetupOption", verror.NoRetry, "{1:}{2:} setup option{:3} failed decoding at field{:4}.")
+	ErrMissingSetupOption       = verror.Register("v.io/x/ref/runtime/internal/flow/conn.MissingSetupOption", verror.NoRetry, "{1:}{2:} missing required setup option{:3}.")
+	ErrUnknownSetupOption       = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownSetupOption", verror.NoRetry, "{1:}{2:} unknown setup option{:3}.")
+	ErrUnknownMsg               = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownMsg", verror.NoRetry, "{1:}{2:} unknown message type{:3}.")
+	ErrUnknownControlMsg        = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnknownControlMsg", verror.NoRetry, "{1:}{2:} unknown control command{:3}.")
+	ErrUnexpectedMsg            = verror.Register("v.io/x/ref/runtime/internal/flow/conn.UnexpectedMsg", verror.NoRetry, "{1:}{2:} unexpected message type{:3}.")
+	ErrConnectionClosed         = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnectionClosed", verror.NoRetry, "{1:}{2:} connection closed.")
+	ErrConnClosedRemotely       = verror.Register("v.io/x/ref/runtime/internal/flow/conn.ConnClosedRemotely", verror.NoRetry, "{1:}{2:} connection closed remotely{:3}.")
+	ErrFlowClosedRemotely       = verror.Register("v.io/x/ref/runtime/internal/flow/conn.FlowClosedRemotely", verror.NoRetry, "{1:}{2:} flow closed remotely.")
+	ErrSend                     = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Send", verror.NoRetry, "{1:}{2:} failure sending {3} message to {4}{:5}.")
+	ErrRecv                     = verror.Register("v.io/x/ref/runtime/internal/flow/conn.Recv", verror.NoRetry, "{1:}{2:} error reading from {3}{:4}")
+	ErrCounterOverflow          = verror.Register("v.io/x/ref/runtime/internal/flow/conn.CounterOverflow", verror.NoRetry, "{1:}{2:} A remote process has sent more data than allowed.")
+	ErrBlessingsFlowClosed      = verror.Register("v.io/x/ref/runtime/internal/flow/conn.BlessingsFlowClosed", verror.NoRetry, "{1:}{2:} The blessings flow was closed.")
+	ErrInvalidChannelBinding    = verror.Register("v.io/x/ref/runtime/internal/flow/conn.InvalidChannelBinding", verror.NoRetry, "{1:}{2:} The channel binding was invalid.")
+	ErrNoPublicKey              = verror.Register("v.io/x/ref/runtime/internal/flow/conn.NoPublicKey", verror.NoRetry, "{1:}{2:} No public key was received by the remote end.")
+	ErrDialingNonServer         = verror.Register("v.io/x/ref/runtime/internal/flow/conn.DialingNonServer", verror.NoRetry, "{1:}{2:} You are attempting to dial on a connection with no remote server.")
+	ErrAcceptorBlessingsMissing = verror.Register("v.io/x/ref/runtime/internal/flow/conn.AcceptorBlessingsMissing", verror.NoRetry, "{1:}{2:} The acceptor did not send blessings.")
 )
 
 func init() {
@@ -48,12 +46,10 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownControlMsg.ID), "{1:}{2:} unknown control command{:3}.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnexpectedMsg.ID), "{1:}{2:} unexpected message type{:3}.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnectionClosed.ID), "{1:}{2:} connection closed.")
-	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnKilledToFreeResources.ID), "{1:}{2:} Connection killed to free resources.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnClosedRemotely.ID), "{1:}{2:} connection closed remotely{:3}.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFlowClosedRemotely.ID), "{1:}{2:} flow closed remotely.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrSend.ID), "{1:}{2:} failure sending {3} message to {4}{:5}.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrRecv.ID), "{1:}{2:} error reading from {3}{:4}")
-	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCounterOverflow.ID), "{1:}{2:} A remote process has sent more data than allowed.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBlessingsFlowClosed.ID), "{1:}{2:} The blessings flow was closed.")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidChannelBinding.ID), "{1:}{2:} The channel binding was invalid.")
@@ -107,11 +103,6 @@
 	return verror.New(ErrConnectionClosed, ctx)
 }
 
-// NewErrConnKilledToFreeResources returns an error with the ErrConnKilledToFreeResources ID.
-func NewErrConnKilledToFreeResources(ctx *context.T) error {
-	return verror.New(ErrConnKilledToFreeResources, ctx)
-}
-
 // NewErrConnClosedRemotely returns an error with the ErrConnClosedRemotely ID.
 func NewErrConnClosedRemotely(ctx *context.T, msg string) error {
 	return verror.New(ErrConnClosedRemotely, ctx, msg)
@@ -132,11 +123,6 @@
 	return verror.New(ErrRecv, ctx, src, err)
 }
 
-// NewErrCacheClosed returns an error with the ErrCacheClosed ID.
-func NewErrCacheClosed(ctx *context.T) error {
-	return verror.New(ErrCacheClosed, ctx)
-}
-
 // NewErrCounterOverflow returns an error with the ErrCounterOverflow ID.
 func NewErrCounterOverflow(ctx *context.T) error {
 	return verror.New(ErrCounterOverflow, ctx)
diff --git a/runtime/internal/flow/conn/message_test.go b/runtime/internal/flow/conn/message_test.go
index 81ceb64..2eefacf 100644
--- a/runtime/internal/flow/conn/message_test.go
+++ b/runtime/internal/flow/conn/message_test.go
@@ -12,6 +12,7 @@
 	"v.io/v23/context"
 	"v.io/v23/rpc/version"
 	_ "v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/runtime/internal/flow/flowtest"
 	"v.io/x/ref/test"
 )
 
@@ -49,7 +50,7 @@
 }
 
 func testMessages(t *testing.T, ctx *context.T, cases []message) {
-	w, r, _ := newMRWPair(ctx)
+	w, r, _ := flowtest.NewMRWPair(ctx)
 	wp, rp := newMessagePipe(w), newMessagePipe(r)
 	for _, want := range cases {
 		ch := make(chan struct{})
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 140515a..40bb0e2 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -5,8 +5,6 @@
 package conn
 
 import (
-	"io"
-	"sync"
 	"testing"
 
 	"v.io/v23"
@@ -15,89 +13,9 @@
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
 	"v.io/v23/security"
-	"v.io/x/ref/internal/logger"
+	"v.io/x/ref/runtime/internal/flow/flowtest"
 )
 
-type wire struct {
-	ctx    *context.T
-	mu     sync.Mutex
-	c      *sync.Cond
-	closed bool
-}
-
-func (w *wire) close() {
-	w.mu.Lock()
-	w.closed = true
-	w.c.Broadcast()
-	w.mu.Unlock()
-}
-
-func (w *wire) isClosed() bool {
-	w.mu.Lock()
-	c := w.closed
-	w.mu.Unlock()
-	return c
-}
-
-type mRW struct {
-	wire *wire
-	in   []byte
-	peer *mRW
-}
-
-func newMRWPair(ctx *context.T) (MsgReadWriteCloser, MsgReadWriteCloser, *wire) {
-	w := &wire{ctx: ctx}
-	w.c = sync.NewCond(&w.mu)
-	a, b := &mRW{wire: w}, &mRW{wire: w}
-	a.peer, b.peer = b, a
-	return a, b, w
-}
-
-func (f *mRW) WriteMsg(data ...[]byte) (int, error) {
-	buf := []byte{}
-	for _, d := range data {
-		buf = append(buf, d...)
-	}
-	logbuf := buf
-	if len(buf) > 128 {
-		logbuf = buf[:128]
-	}
-	logger.Global().VI(2).Infof("Writing %d bytes to the wire: %#v", len(buf), logbuf)
-	defer f.wire.mu.Unlock()
-	f.wire.mu.Lock()
-	for f.peer.in != nil && !f.wire.closed {
-		f.wire.c.Wait()
-	}
-	if f.wire.closed {
-		return 0, io.EOF
-	}
-	f.peer.in = buf
-	f.wire.c.Broadcast()
-	return len(buf), nil
-}
-func (f *mRW) ReadMsg() (buf []byte, err error) {
-	defer f.wire.mu.Unlock()
-	f.wire.mu.Lock()
-	for f.in == nil && !f.wire.closed {
-		f.wire.c.Wait()
-	}
-	if f.wire.closed {
-		return nil, io.EOF
-	}
-	buf, f.in = f.in, nil
-	f.wire.c.Broadcast()
-	logbuf := buf
-	if len(buf) > 128 {
-		logbuf = buf[:128]
-	}
-	logger.Global().VI(2).Infof("Reading %d bytes from the wire: %#v", len(buf), logbuf)
-	return buf, nil
-}
-func (f *mRW) Close() error {
-	f.wire.close()
-	return nil
-}
-
 type fh chan<- flow.Flow
 
 func (fh fh) HandleFlow(f flow.Flow) error {
@@ -108,8 +26,8 @@
 	return nil
 }
 
-func setupConns(t *testing.T, dctx, actx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn, _ *wire) {
-	dmrw, amrw, w := newMRWPair(dctx)
+func setupConns(t *testing.T, dctx, actx *context.T, dflows, aflows chan<- flow.Flow) (dialed, accepted *Conn, _ *flowtest.Wire) {
+	dmrw, amrw, w := flowtest.NewMRWPair(dctx)
 	versions := version.RPCVersionRange{Min: 3, Max: 5}
 	ep, err := v23.NewEndpoint("localhost:80")
 	if err != nil {
diff --git a/runtime/internal/flow/flowtest/flowtest.go b/runtime/internal/flow/flowtest/flowtest.go
new file mode 100644
index 0000000..652e8b4
--- /dev/null
+++ b/runtime/internal/flow/flowtest/flowtest.go
@@ -0,0 +1,93 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package flowtest
+
+import (
+	"io"
+	"sync"
+
+	"v.io/v23/context"
+	"v.io/x/ref/internal/logger"
+)
+
+type Wire struct {
+	ctx    *context.T
+	mu     sync.Mutex
+	c      *sync.Cond
+	closed bool
+}
+
+func (w *Wire) Close() {
+	w.mu.Lock()
+	w.closed = true
+	w.c.Broadcast()
+	w.mu.Unlock()
+}
+
+func (w *Wire) IsClosed() bool {
+	w.mu.Lock()
+	c := w.closed
+	w.mu.Unlock()
+	return c
+}
+
+type MRW struct {
+	wire *Wire
+	in   []byte
+	peer *MRW
+}
+
+func NewMRWPair(ctx *context.T) (*MRW, *MRW, *Wire) {
+	w := &Wire{ctx: ctx}
+	w.c = sync.NewCond(&w.mu)
+	a, b := &MRW{wire: w}, &MRW{wire: w}
+	a.peer, b.peer = b, a
+	return a, b, w
+}
+
+func (f *MRW) WriteMsg(data ...[]byte) (int, error) {
+	buf := []byte{}
+	for _, d := range data {
+		buf = append(buf, d...)
+	}
+	logbuf := buf
+	if len(buf) > 128 {
+		logbuf = buf[:128]
+	}
+	logger.Global().VI(2).Infof("Writing %d bytes to the wire: %#v", len(buf), logbuf)
+	defer f.wire.mu.Unlock()
+	f.wire.mu.Lock()
+	for f.peer.in != nil && !f.wire.closed {
+		f.wire.c.Wait()
+	}
+	if f.wire.closed {
+		return 0, io.EOF
+	}
+	f.peer.in = buf
+	f.wire.c.Broadcast()
+	return len(buf), nil
+}
+func (f *MRW) ReadMsg() (buf []byte, err error) {
+	defer f.wire.mu.Unlock()
+	f.wire.mu.Lock()
+	for f.in == nil && !f.wire.closed {
+		f.wire.c.Wait()
+	}
+	if f.wire.closed {
+		return nil, io.EOF
+	}
+	buf, f.in = f.in, nil
+	f.wire.c.Broadcast()
+	logbuf := buf
+	if len(buf) > 128 {
+		logbuf = buf[:128]
+	}
+	logger.Global().VI(2).Infof("Reading %d bytes from the wire: %#v", len(buf), logbuf)
+	return buf, nil
+}
+func (f *MRW) Close() error {
+	f.wire.Close()
+	return nil
+}
diff --git a/runtime/internal/flow/conn/conncache.go b/runtime/internal/flow/manager/conncache.go
similarity index 94%
rename from runtime/internal/flow/conn/conncache.go
rename to runtime/internal/flow/manager/conncache.go
index ae77f97..cc5943a 100644
--- a/runtime/internal/flow/conn/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package conn
+package manager
 
 import (
 	"strings"
@@ -10,6 +10,7 @@
 
 	"v.io/v23/context"
 	"v.io/v23/naming"
+	"v.io/x/ref/runtime/internal/flow/conn"
 )
 
 // ConnCache is a cache of Conns keyed by (protocol, address, blessingNames)
@@ -26,7 +27,7 @@
 }
 
 type connEntry struct {
-	conn       *Conn
+	conn       *conn.Conn
 	rid        naming.RoutingID
 	addrKey    string
 	next, prev *connEntry
@@ -57,7 +58,7 @@
 // be the same as the arguments provided to ReservedFind.
 // All new ReservedFind calls for the (protocol, address, blessings) will Block
 // until the corresponding Unreserve call is made.
-func (c *ConnCache) ReservedFind(protocol, address string, blessingNames []string) (*Conn, error) {
+func (c *ConnCache) ReservedFind(protocol, address string, blessingNames []string) (*conn.Conn, error) {
 	k := key(protocol, address, blessingNames)
 	defer c.mu.Unlock()
 	c.mu.Lock()
@@ -93,7 +94,7 @@
 
 // Insert adds conn to the cache.
 // An error will be returned iff the cache has been closed.
-func (c *ConnCache) Insert(conn *Conn) error {
+func (c *ConnCache) Insert(conn *conn.Conn) error {
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	if c.addrCache == nil {
@@ -155,7 +156,7 @@
 // FindWithRoutingID returns a Conn where the remote end of the connection
 // is identified by the provided rid. nil is returned if there is no such Conn.
 // FindWithRoutingID will return an error iff the cache is closed.
-func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*Conn, error) {
+func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*conn.Conn, error) {
 	defer c.mu.Unlock()
 	c.mu.Lock()
 	if c.addrCache == nil {
@@ -206,9 +207,9 @@
 	prev.next = c
 }
 
-func isClosed(conn *Conn) bool {
+func isClosed(conn *conn.Conn) bool {
 	select {
-	case <-conn.closed:
+	case <-conn.Closed():
 		return true
 	default:
 		return false
diff --git a/runtime/internal/flow/conn/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
similarity index 84%
rename from runtime/internal/flow/conn/conncache_test.go
rename to runtime/internal/flow/manager/conncache_test.go
index 2686b0b..80892a7 100644
--- a/runtime/internal/flow/conn/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package conn
+package manager
 
 import (
 	"strconv"
@@ -12,7 +12,8 @@
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/rpc/version"
-
+	connpackage "v.io/x/ref/runtime/internal/flow/conn"
+	"v.io/x/ref/runtime/internal/flow/flowtest"
 	inaming "v.io/x/ref/runtime/internal/naming"
 )
 
@@ -73,8 +74,8 @@
 		t.Errorf("got %v, want <nil>, err: %v", got, err)
 	}
 	// Looking it up again should block until a matching Unreserve call is made.
-	ch := make(chan *Conn, 1)
-	go func(ch chan *Conn) {
+	ch := make(chan *connpackage.Conn, 1)
+	go func(ch chan *connpackage.Conn) {
 		conn, err := c.ReservedFind(otherEP.Protocol, otherEP.Address, otherEP.Blessings)
 		if err != nil {
 			t.Fatal(err)
@@ -150,10 +151,11 @@
 		}
 	}
 	for _, conn := range conns[:7] {
-		if got, err := c.ReservedFind(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames()); err != nil || got != conn {
+		rep := conn.RemoteEndpoint()
+		if got, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames()); err != nil || got != conn {
 			t.Errorf("got %v, want %v, err: %v", got, conn, err)
 		}
-		c.Unreserve(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+		c.Unreserve(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
 	}
 	if err := c.KillConnections(ctx, 3); err != nil {
 		t.Fatal(err)
@@ -187,7 +189,8 @@
 		}
 	}
 	for _, conn := range conns[:7] {
-		if got, err := c.FindWithRoutingID(conn.remote.RoutingID()); err != nil || got != conn {
+		rep := conn.RemoteEndpoint()
+		if got, err := c.FindWithRoutingID(rep.RoutingID()); err != nil || got != conn {
 			t.Errorf("got %v, want %v, err: %v", got, conn, err)
 		}
 	}
@@ -215,13 +218,14 @@
 	}
 }
 
-func isInCache(t *testing.T, c *ConnCache, conn *Conn) bool {
-	rfconn, err := c.ReservedFind(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
+func isInCache(t *testing.T, c *ConnCache, conn *connpackage.Conn) bool {
+	rep := conn.RemoteEndpoint()
+	rfconn, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
 	if err != nil {
 		t.Errorf("got %v, want %v, err: %v", rfconn, conn, err)
 	}
-	c.Unreserve(conn.remote.Addr().Network(), conn.remote.Addr().String(), conn.remote.BlessingNames())
-	ridconn, err := c.FindWithRoutingID(conn.remote.RoutingID())
+	c.Unreserve(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
+	ridconn, err := c.FindWithRoutingID(rep.RoutingID())
 	if err != nil {
 		t.Errorf("got %v, want %v, err: %v", ridconn, conn, err)
 	}
@@ -243,8 +247,8 @@
 	return size
 }
 
-func nConns(t *testing.T, ctx *context.T, n int) []*Conn {
-	conns := make([]*Conn, n)
+func nConns(t *testing.T, ctx *context.T, n int) []*connpackage.Conn {
+	conns := make([]*connpackage.Conn, n)
 	for i := 0; i < n; i++ {
 		conns[i] = makeConn(t, ctx, &inaming.Endpoint{
 			Protocol: strconv.Itoa(i),
@@ -254,19 +258,21 @@
 	return conns
 }
 
-func makeConn(t *testing.T, ctx *context.T, ep naming.Endpoint) *Conn {
-	dmrw, amrw, _ := newMRWPair(ctx)
-	dch := make(chan *Conn)
-	ach := make(chan *Conn)
+func makeConn(t *testing.T, ctx *context.T, ep naming.Endpoint) *connpackage.Conn {
+	dmrw, amrw, _ := flowtest.NewMRWPair(ctx)
+	dch := make(chan *connpackage.Conn)
+	ach := make(chan *connpackage.Conn)
 	go func() {
-		d, err := NewDialed(ctx, dmrw, ep, ep, version.RPCVersionRange{1, 5}, nil)
+		d, err := connpackage.NewDialed(ctx, dmrw, ep, ep,
+			version.RPCVersionRange{Min: 1, Max: 5}, nil)
 		if err != nil {
 			t.Fatalf("Unexpected error: %v", err)
 		}
 		dch <- d
 	}()
 	go func() {
-		a, err := NewAccepted(ctx, amrw, ep, version.RPCVersionRange{1, 5}, nil)
+		a, err := connpackage.NewAccepted(ctx, amrw, ep,
+			version.RPCVersionRange{Min: 1, Max: 5}, nil)
 		if err != nil {
 			t.Fatalf("Unexpected error: %v", err)
 		}
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index 3e7b57e..463e2cb 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -14,4 +14,6 @@
   UnknownProtocol(protocol string) {"en":"unknown protocol{:protocol}"}
   ManagerClosed() {"en": "manager is already closed"}
   AcceptFailed(err error) {"en": "accept failed{:err}"}
+  CacheClosed() {"en":"cache is closed"}
+  ConnKilledToFreeResources() {"en": "Connection killed to free resources."}
 )
\ No newline at end of file
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index 741c22a..bfa716e 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -15,10 +15,12 @@
 )
 
 var (
-	ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
-	ErrUnknownProtocol     = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
-	ErrManagerClosed       = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
-	ErrAcceptFailed        = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
+	ErrLargerThan3ByteUInt       = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+	ErrUnknownProtocol           = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
+	ErrManagerClosed             = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
+	ErrAcceptFailed              = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
+	ErrCacheClosed               = verror.Register("v.io/x/ref/runtime/internal/flow/manager.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
+	ErrConnKilledToFreeResources = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ConnKilledToFreeResources", verror.NoRetry, "{1:}{2:} Connection killed to free resources.")
 )
 
 func init() {
@@ -26,6 +28,8 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownProtocol.ID), "{1:}{2:} unknown protocol{:3}")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrManagerClosed.ID), "{1:}{2:} manager is already closed")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnKilledToFreeResources.ID), "{1:}{2:} Connection killed to free resources.")
 }
 
 // NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
@@ -47,3 +51,13 @@
 func NewErrAcceptFailed(ctx *context.T, err error) error {
 	return verror.New(ErrAcceptFailed, ctx, err)
 }
+
+// NewErrCacheClosed returns an error with the ErrCacheClosed ID.
+func NewErrCacheClosed(ctx *context.T) error {
+	return verror.New(ErrCacheClosed, ctx)
+}
+
+// NewErrConnKilledToFreeResources returns an error with the ErrConnKilledToFreeResources ID.
+func NewErrConnKilledToFreeResources(ctx *context.T) error {
+	return verror.New(ErrConnKilledToFreeResources, ctx)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index ac26aa1..a3394d8 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -26,7 +26,7 @@
 	rid    naming.RoutingID
 	closed <-chan struct{}
 	q      *upcqueue.T
-	cache  *conn.ConnCache
+	cache  *ConnCache
 
 	mu              *sync.Mutex
 	listenEndpoints []naming.Endpoint
@@ -37,7 +37,7 @@
 		rid:    rid,
 		closed: ctx.Done(),
 		q:      upcqueue.New(),
-		cache:  conn.NewConnCache(),
+		cache:  NewConnCache(),
 		mu:     &sync.Mutex{},
 	}
 	return m