runtime/internal/flow/manager: flwo manager now uses flow registered
protocol instead of rpc registered protocol.

Change-Id: I8a3d6923618d5a249daead1add0b5083da37cebd
diff --git a/runtime/factories/fake/fake.go b/runtime/factories/fake/fake.go
index 35b055a..fa0e5fe 100644
--- a/runtime/factories/fake/fake.go
+++ b/runtime/factories/fake/fake.go
@@ -15,7 +15,10 @@
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 
+	_ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
 	"v.io/x/ref/runtime/internal/lib/websocket"
+
+	// TODO(suharshs): Remove these once we switch to the flow protocols.
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
diff --git a/runtime/factories/gce/gce.go b/runtime/factories/gce/gce.go
index a0d25e8..f99417c 100644
--- a/runtime/factories/gce/gce.go
+++ b/runtime/factories/gce/gce.go
@@ -20,13 +20,16 @@
 	"v.io/x/lib/netstate"
 	"v.io/x/ref/lib/flags"
 	"v.io/x/ref/runtime/internal"
+	_ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
 	"v.io/x/ref/runtime/internal/gce"
 	"v.io/x/ref/runtime/internal/lib/appcycle"
 	"v.io/x/ref/runtime/internal/lib/websocket"
+	grt "v.io/x/ref/runtime/internal/rt"
+
+	// TODO(suharshs): Remove these once we switch to the flow protocols.
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
-	grt "v.io/x/ref/runtime/internal/rt"
 )
 
 var commonFlags *flags.Flags
diff --git a/runtime/factories/generic/generic.go b/runtime/factories/generic/generic.go
index 54e4149..651f710 100644
--- a/runtime/factories/generic/generic.go
+++ b/runtime/factories/generic/generic.go
@@ -15,12 +15,15 @@
 
 	"v.io/x/ref/lib/flags"
 	"v.io/x/ref/runtime/internal"
+	_ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
 	"v.io/x/ref/runtime/internal/lib/appcycle"
 	"v.io/x/ref/runtime/internal/lib/websocket"
+	grt "v.io/x/ref/runtime/internal/rt"
+
+	// TODO(suharshs): Remove these once we switch to the flow protocols.
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
-	grt "v.io/x/ref/runtime/internal/rt"
 )
 
 var commonFlags *flags.Flags
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index e5d0bb4..c139bfc 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -29,14 +29,17 @@
 	"v.io/x/ref/lib/pubsub"
 	"v.io/x/ref/lib/security/securityflag"
 	"v.io/x/ref/runtime/internal"
+	_ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
 	"v.io/x/ref/runtime/internal/lib/appcycle"
 	"v.io/x/ref/runtime/internal/lib/websocket"
 	irpc "v.io/x/ref/runtime/internal/rpc"
+	"v.io/x/ref/runtime/internal/rt"
+	"v.io/x/ref/services/debug/debuglib"
+
+	// TODO(suharshs): Remove these once we switch to the flow protocols.
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
-	"v.io/x/ref/runtime/internal/rt"
-	"v.io/x/ref/services/debug/debuglib"
 )
 
 const (
diff --git a/runtime/factories/static/static.go b/runtime/factories/static/static.go
index 76ab113..9665d4d 100644
--- a/runtime/factories/static/static.go
+++ b/runtime/factories/static/static.go
@@ -17,13 +17,17 @@
 	"v.io/x/ref/lib/flags"
 	"v.io/x/ref/lib/security/securityflag"
 	"v.io/x/ref/runtime/internal"
+	_ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
 	"v.io/x/ref/runtime/internal/lib/appcycle"
 	"v.io/x/ref/runtime/internal/lib/websocket"
+
+	"v.io/x/ref/runtime/internal/rt"
+	"v.io/x/ref/services/debug/debuglib"
+
+	// TODO(suharshs): Remove these once we switch to the flow protocols.
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
-	"v.io/x/ref/runtime/internal/rt"
-	"v.io/x/ref/services/debug/debuglib"
 )
 
 var commonFlags *flags.Flags
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 122264a..588150f 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -15,12 +15,10 @@
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/naming"
-	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/vom"
 
 	"v.io/x/ref/runtime/internal/flow/conn"
-	"v.io/x/ref/runtime/internal/lib/framer"
 	"v.io/x/ref/runtime/internal/lib/upcqueue"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	"v.io/x/ref/runtime/internal/rpc/version"
@@ -72,16 +70,16 @@
 }
 
 func (m *manager) listen(ctx *context.T, protocol, address string) (naming.Endpoint, error) {
-	netLn, err := listen(ctx, protocol, address)
+	ln, err := listen(ctx, protocol, address)
 	if err != nil {
 		return nil, flow.NewErrNetwork(ctx, err)
 	}
 	local := &inaming.Endpoint{
 		Protocol: protocol,
-		Address:  netLn.Addr().String(),
+		Address:  ln.Addr().String(),
 		RID:      m.rid,
 	}
-	go m.netLnAcceptLoop(ctx, netLn, local)
+	go m.lnAcceptLoop(ctx, ln, local)
 	return local, nil
 }
 
@@ -113,10 +111,10 @@
 	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
 }
 
-func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
+func (m *manager) lnAcceptLoop(ctx *context.T, ln flow.Listener, local naming.Endpoint) {
 	const killConnectionsRetryDelay = 5 * time.Millisecond
 	for {
-		netConn, err := netLn.Accept()
+		flowConn, err := ln.Accept(ctx)
 		for tokill := 1; isTemporaryError(err); tokill *= 2 {
 			if isTooManyOpenFiles(err) {
 				if err := m.cache.KillConnections(ctx, tokill); err != nil {
@@ -127,21 +125,21 @@
 				tokill = 1
 			}
 			time.Sleep(killConnectionsRetryDelay)
-			netConn, err = netLn.Accept()
+			flowConn, err = ln.Accept(ctx)
 		}
 		if err != nil {
-			ctx.Errorf("net.Listener.Accept on localEP %v failed: %v", local, err)
+			ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
 			continue
 		}
 		c, err := conn.NewAccepted(
 			ctx,
-			framer.New(netConn),
+			flowConn,
 			local,
 			version.Supported,
 			&flowHandler{q: m.q, closed: m.closed},
 		)
 		if err != nil {
-			netConn.Close()
+			flowConn.Close()
 			ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
 			continue
 		}
@@ -181,7 +179,7 @@
 	go func() {
 		c, err := conn.NewAccepted(
 			h.ctx,
-			closer{f},
+			f,
 			f.Conn().LocalEndpoint(),
 			version.Supported,
 			&flowHandler{q: h.m.q, closed: h.m.closed})
@@ -255,13 +253,12 @@
 		return nil, flow.NewErrBadState(ctx, err)
 	}
 	var (
-		d                rpc.DialerFunc
+		protocol         flow.Protocol
 		network, address string
 	)
 	if c == nil {
 		addr := remote.Addr()
-		var r rpc.ResolverFunc
-		d, r, _, _ = rpc.RegisteredProtocol(addr.Network())
+		protocol, _ = flow.RegisteredProtocol(addr.Network())
 		// (network, address) in the endpoint might not always match up
 		// with the key used for caching conns. For example:
 		// - conn, err := net.Dial("tcp", "www.google.com:80")
@@ -269,7 +266,7 @@
 		// - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
 		//   might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
 		// Thus we look for Conns with the resolved address.
-		network, address, err = resolve(ctx, r, addr.Network(), addr.String())
+		network, address, err = resolve(ctx, protocol, addr.Network(), addr.String())
 		if err != nil {
 			return nil, flow.NewErrResolveFailed(ctx, err)
 		}
@@ -280,7 +277,7 @@
 		defer m.cache.Unreserve(network, address, remote.BlessingNames())
 	}
 	if c == nil {
-		netConn, err := dial(ctx, d, network, address)
+		flowConn, err := dial(ctx, protocol, network, address)
 		if err != nil {
 			return nil, flow.NewErrDialFailed(ctx, err)
 		}
@@ -289,8 +286,8 @@
 		// "serving flow manager" by passing a 0 RID to non-serving flow managers?
 		c, err = conn.NewDialed(
 			ctx,
-			framer.New(netConn), // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
-			localEndpoint(netConn, m.rid),
+			flowConn,
+			localEndpoint(flowConn, m.rid),
 			remote,
 			version.Supported,
 			fh,
@@ -312,7 +309,7 @@
 	if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
 		c, err = conn.NewDialed(
 			ctx,
-			closer{f},
+			f,
 			c.LocalEndpoint(),
 			remote,
 			version.Supported,
@@ -341,20 +338,20 @@
 	return m.closed
 }
 
-func dial(ctx *context.T, d rpc.DialerFunc, protocol, address string) (net.Conn, error) {
-	if d != nil {
+func dial(ctx *context.T, p flow.Protocol, protocol, address string) (flow.Conn, error) {
+	if p != nil {
 		var timeout time.Duration
 		if dl, ok := ctx.Deadline(); ok {
 			timeout = dl.Sub(time.Now())
 		}
-		return d(ctx, protocol, address, timeout)
+		return p.Dial(ctx, protocol, address, timeout)
 	}
 	return nil, NewErrUnknownProtocol(ctx, protocol)
 }
 
-func resolve(ctx *context.T, r rpc.ResolverFunc, protocol, address string) (string, string, error) {
-	if r != nil {
-		net, addr, err := r(ctx, protocol, address)
+func resolve(ctx *context.T, p flow.Protocol, protocol, address string) (string, string, error) {
+	if p != nil {
+		net, addr, err := p.Resolve(ctx, protocol, address)
 		if err != nil {
 			return "", "", err
 		}
@@ -363,9 +360,9 @@
 	return "", "", NewErrUnknownProtocol(ctx, protocol)
 }
 
-func listen(ctx *context.T, protocol, address string) (net.Listener, error) {
-	if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
-		ln, err := l(ctx, protocol, address)
+func listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
+	if p, _ := flow.RegisteredProtocol(protocol); p != nil {
+		ln, err := p.Listen(ctx, protocol, address)
 		if err != nil {
 			return nil, err
 		}
@@ -374,7 +371,7 @@
 	return nil, NewErrUnknownProtocol(ctx, protocol)
 }
 
-func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+func localEndpoint(conn flow.Conn, rid naming.RoutingID) naming.Endpoint {
 	localAddr := conn.LocalAddr()
 	ep := &inaming.Endpoint{
 		Protocol: localAddr.Network(),
@@ -393,12 +390,3 @@
 	oErr, ok := err.(*net.OpError)
 	return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
 }
-
-// TODO(suharshs): should we add Close method to the Flow API?
-type closer struct {
-	flow.Flow
-}
-
-func (c closer) Close() error {
-	return nil
-}