ref: Add localAddr to flow.Conn and make changes for API modification.

MultiPart: 2/2
Change-Id: Iaf1077d5490856770c28ffd6eca50c47a4b68e6c
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 5f78b77..d463ef2 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -62,8 +62,8 @@
 	lastUsedTime   time.Time
 }
 
-// Ensure that *Conn implements flow.Conn.
-var _ flow.Conn = &Conn{}
+// Ensure that *Conn implements flow.ManagedConn.
+var _ flow.ManagedConn = &Conn{}
 
 // NewDialed dials a new Conn on the given conn.
 func NewDialed(
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index ccea002..a3207dc 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -242,7 +242,7 @@
 }
 
 // Conn returns the connection the flow is multiplexed on.
-func (f *flw) Conn() flow.Conn {
+func (f *flw) Conn() flow.ManagedConn {
 	return f.conn
 }
 
diff --git a/runtime/internal/lib/tcputil/tcputil.go b/runtime/internal/lib/tcputil/tcputil.go
index 9a4aeb5..39a167a 100644
--- a/runtime/internal/lib/tcputil/tcputil.go
+++ b/runtime/internal/lib/tcputil/tcputil.go
@@ -41,7 +41,7 @@
 type TCP struct{}
 
 // Dial dials a net.Conn to a the specific address and adds framing to the connection.
-func (TCP) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.MsgReadWriteCloser, error) {
+func (TCP) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.Conn, error) {
 	conn, err := net.DialTimeout(network, address, timeout)
 	if err != nil {
 		return nil, err
@@ -49,7 +49,7 @@
 	if err := EnableTCPKeepAlive(conn); err != nil {
 		return nil, err
 	}
-	return framer.New(conn), nil
+	return NewTCPConn(conn), nil
 }
 
 // Resolve performs a DNS resolution on the provided network and address.
@@ -63,7 +63,7 @@
 
 // Listen returns a listener that sets KeepAlive on all accepted connections.
 // Connections returned from the listener will be framed.
-func (TCP) Listen(ctx *context.T, network, address string) (flow.MsgListener, error) {
+func (TCP) Listen(ctx *context.T, network, address string) (flow.Listener, error) {
 	ln, err := net.Listen(network, address)
 	if err != nil {
 		return nil, err
@@ -72,12 +72,12 @@
 }
 
 // tcpListener is a wrapper around net.Listener that sets KeepAlive on all
-// accepted connections and returns framed flow.MsgReadWriteClosers.
+// accepted connections and returns framed flow.Conns.
 type tcpListener struct {
 	netLn net.Listener
 }
 
-func (ln *tcpListener) Accept(ctx *context.T) (flow.MsgReadWriteCloser, error) {
+func (ln *tcpListener) Accept(ctx *context.T) (flow.Conn, error) {
 	conn, err := ln.netLn.Accept()
 	if err != nil {
 		return nil, err
@@ -85,9 +85,22 @@
 	if err := EnableTCPKeepAlive(conn); err != nil {
 		return nil, err
 	}
-	return framer.New(conn), nil
+	return NewTCPConn(conn), nil
 }
 
 func (ln *tcpListener) Addr() net.Addr {
 	return ln.netLn.Addr()
 }
+
+func NewTCPConn(c net.Conn) flow.Conn {
+	return tcpConn{framer.New(c), c.LocalAddr()}
+}
+
+type tcpConn struct {
+	flow.MsgReadWriteCloser
+	localAddr net.Addr
+}
+
+func (c tcpConn) LocalAddr() net.Addr {
+	return c.localAddr
+}
diff --git a/runtime/internal/lib/xwebsocket/conn.go b/runtime/internal/lib/xwebsocket/conn.go
index 7efc3e7..7d2c248 100644
--- a/runtime/internal/lib/xwebsocket/conn.go
+++ b/runtime/internal/lib/xwebsocket/conn.go
@@ -18,12 +18,12 @@
 	"v.io/v23/flow"
 )
 
-// WebsocketConn provides a flow.MsgReadWriteCloser interface for a websocket connection.
-func WebsocketConn(ws *websocket.Conn) flow.MsgReadWriteCloser {
+// WebsocketConn provides a flow.Conn interface for a websocket connection.
+func WebsocketConn(ws *websocket.Conn) flow.Conn {
 	return &wrappedConn{ws: ws}
 }
 
-// wrappedConn provides a flow.MsgReadWriteCloser interface to a websocket.
+// wrappedConn provides a flow.Conn interface to a websocket.
 // The underlying websocket connection needs regular calls to Read to make sure
 // websocket control messages (such as pings) are processed by the websocket
 // library.
@@ -82,6 +82,10 @@
 	return c.ws.Close()
 }
 
+func (c *wrappedConn) LocalAddr() net.Addr {
+	return c.ws.LocalAddr()
+}
+
 // hybridConn is used by the 'hybrid' protocol that can accept
 // either 'tcp' or 'websocket' connections. In particular, it allows
 // for the reader to peek and buffer the first n bytes of a stream
diff --git a/runtime/internal/lib/xwebsocket/conn_test.go b/runtime/internal/lib/xwebsocket/conn_test.go
index cf7a631..c8c6357 100644
--- a/runtime/internal/lib/xwebsocket/conn_test.go
+++ b/runtime/internal/lib/xwebsocket/conn_test.go
@@ -20,14 +20,14 @@
 	"v.io/v23/flow"
 )
 
-func writer(c flow.MsgReadWriteCloser, data []byte, times int, wg *sync.WaitGroup) {
+func writer(c flow.Conn, data []byte, times int, wg *sync.WaitGroup) {
 	defer wg.Done()
 	for i := 0; i < times; i++ {
 		c.WriteMsg(data)
 	}
 }
 
-func reader(t *testing.T, c flow.MsgReadWriteCloser, expected []byte, totalWrites int) {
+func reader(t *testing.T, c flow.Conn, expected []byte, totalWrites int) {
 	totalReads := 0
 	for buf, err := c.ReadMsg(); err == nil; buf, err = c.ReadMsg() {
 		totalReads++
diff --git a/runtime/internal/lib/xwebsocket/hybrid.go b/runtime/internal/lib/xwebsocket/hybrid.go
index ac23c24..c69348a 100644
--- a/runtime/internal/lib/xwebsocket/hybrid.go
+++ b/runtime/internal/lib/xwebsocket/hybrid.go
@@ -8,7 +8,6 @@
 	"net"
 	"time"
 
-	"v.io/x/ref/runtime/internal/lib/framer"
 	"v.io/x/ref/runtime/internal/lib/tcputil"
 
 	"v.io/v23/context"
@@ -17,11 +16,11 @@
 
 type WSH struct{}
 
-// Dial returns flow.MsgReadWriteCloser that can be used with a
+// Dial returns flow.Conn that can be used with a
 // HybridListener but always uses tcp. A client must specifically elect to use
 // websockets by calling websocket.Dialer. The returned net.Conn will report
 // 'tcp' as its Network.
-func (WSH) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.MsgReadWriteCloser, error) {
+func (WSH) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.Conn, error) {
 	tcp := mapWebSocketToTCP[network]
 	conn, err := net.DialTimeout(tcp, address, timeout)
 	if err != nil {
@@ -30,7 +29,7 @@
 	if err := tcputil.EnableTCPKeepAlive(conn); err != nil {
 		return nil, err
 	}
-	return framer.New(conn), nil
+	return tcputil.NewTCPConn(conn), nil
 }
 
 // Resolve performs a DNS resolution on the network, address and always
@@ -44,7 +43,7 @@
 	return tcp, tcpAddr.String(), nil
 }
 
-// Listener returns a flow.MsgReadWriteCloser that supports both tcp and
+// Listener returns a flow.Conn that supports both tcp and
 // websockets over the same, single, port. A listen address of
 // --v23.tcp.protocol=wsh --v23.tcp.address=127.0.0.1:8101 means
 // that port 8101 can accept connections that use either tcp or websocket.
@@ -52,6 +51,6 @@
 // to decide if it's a websocket protocol or not. These must be 'GET ' for
 // websockets, all other protocols must guarantee to not send 'GET ' as the
 // first four bytes of the payload.
-func (WSH) Listen(ctx *context.T, protocol, address string) (flow.MsgListener, error) {
+func (WSH) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
 	return listener(protocol, address, true)
 }
diff --git a/runtime/internal/lib/xwebsocket/listener.go b/runtime/internal/lib/xwebsocket/listener.go
index 9a3511d..3e6c48b 100644
--- a/runtime/internal/lib/xwebsocket/listener.go
+++ b/runtime/internal/lib/xwebsocket/listener.go
@@ -16,7 +16,6 @@
 	"github.com/gorilla/websocket"
 
 	"v.io/x/ref/internal/logger"
-	"v.io/x/ref/runtime/internal/lib/framer"
 	"v.io/x/ref/runtime/internal/lib/tcputil"
 
 	"v.io/v23/context"
@@ -30,14 +29,14 @@
 	closed bool // GUARDED_BY(mu)
 	mu     sync.Mutex
 
-	acceptQ chan interface{} // flow.MsgReadWriteCloser or error returned by netLn.Accept
+	acceptQ chan interface{} // flow.Conn or error returned by netLn.Accept
 	httpQ   chan net.Conn    // Candidates for websocket upgrades before being added to acceptQ
 	netLn   net.Listener     // The underlying listener
 	httpReq sync.WaitGroup   // Number of active HTTP requests
 	hybrid  bool             // true if running in 'hybrid' mode
 }
 
-func listener(protocol, address string, hybrid bool) (flow.MsgListener, error) {
+func listener(protocol, address string, hybrid bool) (flow.Listener, error) {
 	netLn, err := net.Listen(mapWebSocketToTCP[protocol], address)
 	if err != nil {
 		return nil, err
@@ -54,14 +53,14 @@
 	return ln, nil
 }
 
-func (ln *wsTCPListener) Accept(ctx *context.T) (flow.MsgReadWriteCloser, error) {
+func (ln *wsTCPListener) Accept(ctx *context.T) (flow.Conn, error) {
 	for {
 		item, ok := <-ln.acceptQ
 		if !ok {
 			return nil, NewErrListenerClosed(ctx)
 		}
 		switch v := item.(type) {
-		case flow.MsgReadWriteCloser:
+		case flow.Conn:
 			return v, nil
 		case error:
 			return nil, v
@@ -159,7 +158,7 @@
 		ln.httpQ <- conn
 		return
 	}
-	ln.acceptQ <- framer.New(conn)
+	ln.acceptQ <- tcputil.NewTCPConn(conn)
 }
 
 func (ln *wsTCPListener) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -213,7 +212,7 @@
 		if !ok {
 			return
 		}
-		if conn, ok := item.(flow.MsgReadWriteCloser); ok {
+		if conn, ok := item.(flow.Conn); ok {
 			conn.Close()
 		}
 	}
diff --git a/runtime/internal/lib/xwebsocket/protocol.go b/runtime/internal/lib/xwebsocket/protocol.go
index 1fcd641..0429cc8 100644
--- a/runtime/internal/lib/xwebsocket/protocol.go
+++ b/runtime/internal/lib/xwebsocket/protocol.go
@@ -27,7 +27,7 @@
 
 type WS struct{}
 
-func (WS) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.MsgReadWriteCloser, error) {
+func (WS) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.Conn, error) {
 	var deadline time.Time
 	if timeout > 0 {
 		deadline = time.Now().Add(timeout)
@@ -63,6 +63,6 @@
 	return "ws", tcpAddr.String(), nil
 }
 
-func (WS) Listen(ctx *context.T, protocol, address string) (flow.MsgListener, error) {
+func (WS) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
 	return listener(protocol, address, false)
 }
diff --git a/runtime/internal/lib/xwebsocket/ws_test.go b/runtime/internal/lib/xwebsocket/ws_test.go
index 6cfd8d6..2411030 100644
--- a/runtime/internal/lib/xwebsocket/ws_test.go
+++ b/runtime/internal/lib/xwebsocket/ws_test.go
@@ -51,7 +51,7 @@
 	ctx, _ := context.RootContext()
 	address := "127.0.0.1:0"
 	timeout := time.Second
-	acceptCh := make(chan flow.MsgReadWriteCloser)
+	acceptCh := make(chan flow.Conn)
 
 	ln, err := listenObj.Listen(ctx, listenP, address)
 	if err != nil {
@@ -76,7 +76,7 @@
 	go readData(t, accepted, randData)
 }
 
-func writeData(t *testing.T, c flow.MsgReadWriteCloser, data []byte) {
+func writeData(t *testing.T, c flow.Conn, data []byte) {
 	for i := 0; i < numChunks; i++ {
 		if _, err := c.WriteMsg(data[:chunkSize]); err != nil {
 			t.Fatal(err)
@@ -85,7 +85,7 @@
 	}
 }
 
-func readData(t *testing.T, c flow.MsgReadWriteCloser, expected []byte) {
+func readData(t *testing.T, c flow.Conn, expected []byte) {
 	read := make([]byte, len(expected))
 	read = read[:0]
 	for i := 0; i < numChunks; i++ {