ref: Add localAddr to flow.Conn and make changes for API modification.
MultiPart: 2/2
Change-Id: Iaf1077d5490856770c28ffd6eca50c47a4b68e6c
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 5f78b77..d463ef2 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -62,8 +62,8 @@
lastUsedTime time.Time
}
-// Ensure that *Conn implements flow.Conn.
-var _ flow.Conn = &Conn{}
+// Ensure that *Conn implements flow.ManagedConn.
+var _ flow.ManagedConn = &Conn{}
// NewDialed dials a new Conn on the given conn.
func NewDialed(
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index ccea002..a3207dc 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -242,7 +242,7 @@
}
// Conn returns the connection the flow is multiplexed on.
-func (f *flw) Conn() flow.Conn {
+func (f *flw) Conn() flow.ManagedConn {
return f.conn
}
diff --git a/runtime/internal/lib/tcputil/tcputil.go b/runtime/internal/lib/tcputil/tcputil.go
index 9a4aeb5..39a167a 100644
--- a/runtime/internal/lib/tcputil/tcputil.go
+++ b/runtime/internal/lib/tcputil/tcputil.go
@@ -41,7 +41,7 @@
type TCP struct{}
// Dial dials a net.Conn to a the specific address and adds framing to the connection.
-func (TCP) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.MsgReadWriteCloser, error) {
+func (TCP) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.Conn, error) {
conn, err := net.DialTimeout(network, address, timeout)
if err != nil {
return nil, err
@@ -49,7 +49,7 @@
if err := EnableTCPKeepAlive(conn); err != nil {
return nil, err
}
- return framer.New(conn), nil
+ return NewTCPConn(conn), nil
}
// Resolve performs a DNS resolution on the provided network and address.
@@ -63,7 +63,7 @@
// Listen returns a listener that sets KeepAlive on all accepted connections.
// Connections returned from the listener will be framed.
-func (TCP) Listen(ctx *context.T, network, address string) (flow.MsgListener, error) {
+func (TCP) Listen(ctx *context.T, network, address string) (flow.Listener, error) {
ln, err := net.Listen(network, address)
if err != nil {
return nil, err
@@ -72,12 +72,12 @@
}
// tcpListener is a wrapper around net.Listener that sets KeepAlive on all
-// accepted connections and returns framed flow.MsgReadWriteClosers.
+// accepted connections and returns framed flow.Conns.
type tcpListener struct {
netLn net.Listener
}
-func (ln *tcpListener) Accept(ctx *context.T) (flow.MsgReadWriteCloser, error) {
+func (ln *tcpListener) Accept(ctx *context.T) (flow.Conn, error) {
conn, err := ln.netLn.Accept()
if err != nil {
return nil, err
@@ -85,9 +85,22 @@
if err := EnableTCPKeepAlive(conn); err != nil {
return nil, err
}
- return framer.New(conn), nil
+ return NewTCPConn(conn), nil
}
func (ln *tcpListener) Addr() net.Addr {
return ln.netLn.Addr()
}
+
+func NewTCPConn(c net.Conn) flow.Conn {
+ return tcpConn{framer.New(c), c.LocalAddr()}
+}
+
+type tcpConn struct {
+ flow.MsgReadWriteCloser
+ localAddr net.Addr
+}
+
+func (c tcpConn) LocalAddr() net.Addr {
+ return c.localAddr
+}
diff --git a/runtime/internal/lib/xwebsocket/conn.go b/runtime/internal/lib/xwebsocket/conn.go
index 7efc3e7..7d2c248 100644
--- a/runtime/internal/lib/xwebsocket/conn.go
+++ b/runtime/internal/lib/xwebsocket/conn.go
@@ -18,12 +18,12 @@
"v.io/v23/flow"
)
-// WebsocketConn provides a flow.MsgReadWriteCloser interface for a websocket connection.
-func WebsocketConn(ws *websocket.Conn) flow.MsgReadWriteCloser {
+// WebsocketConn provides a flow.Conn interface for a websocket connection.
+func WebsocketConn(ws *websocket.Conn) flow.Conn {
return &wrappedConn{ws: ws}
}
-// wrappedConn provides a flow.MsgReadWriteCloser interface to a websocket.
+// wrappedConn provides a flow.Conn interface to a websocket.
// The underlying websocket connection needs regular calls to Read to make sure
// websocket control messages (such as pings) are processed by the websocket
// library.
@@ -82,6 +82,10 @@
return c.ws.Close()
}
+func (c *wrappedConn) LocalAddr() net.Addr {
+ return c.ws.LocalAddr()
+}
+
// hybridConn is used by the 'hybrid' protocol that can accept
// either 'tcp' or 'websocket' connections. In particular, it allows
// for the reader to peek and buffer the first n bytes of a stream
diff --git a/runtime/internal/lib/xwebsocket/conn_test.go b/runtime/internal/lib/xwebsocket/conn_test.go
index cf7a631..c8c6357 100644
--- a/runtime/internal/lib/xwebsocket/conn_test.go
+++ b/runtime/internal/lib/xwebsocket/conn_test.go
@@ -20,14 +20,14 @@
"v.io/v23/flow"
)
-func writer(c flow.MsgReadWriteCloser, data []byte, times int, wg *sync.WaitGroup) {
+func writer(c flow.Conn, data []byte, times int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < times; i++ {
c.WriteMsg(data)
}
}
-func reader(t *testing.T, c flow.MsgReadWriteCloser, expected []byte, totalWrites int) {
+func reader(t *testing.T, c flow.Conn, expected []byte, totalWrites int) {
totalReads := 0
for buf, err := c.ReadMsg(); err == nil; buf, err = c.ReadMsg() {
totalReads++
diff --git a/runtime/internal/lib/xwebsocket/hybrid.go b/runtime/internal/lib/xwebsocket/hybrid.go
index ac23c24..c69348a 100644
--- a/runtime/internal/lib/xwebsocket/hybrid.go
+++ b/runtime/internal/lib/xwebsocket/hybrid.go
@@ -8,7 +8,6 @@
"net"
"time"
- "v.io/x/ref/runtime/internal/lib/framer"
"v.io/x/ref/runtime/internal/lib/tcputil"
"v.io/v23/context"
@@ -17,11 +16,11 @@
type WSH struct{}
-// Dial returns flow.MsgReadWriteCloser that can be used with a
+// Dial returns flow.Conn that can be used with a
// HybridListener but always uses tcp. A client must specifically elect to use
// websockets by calling websocket.Dialer. The returned net.Conn will report
// 'tcp' as its Network.
-func (WSH) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.MsgReadWriteCloser, error) {
+func (WSH) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.Conn, error) {
tcp := mapWebSocketToTCP[network]
conn, err := net.DialTimeout(tcp, address, timeout)
if err != nil {
@@ -30,7 +29,7 @@
if err := tcputil.EnableTCPKeepAlive(conn); err != nil {
return nil, err
}
- return framer.New(conn), nil
+ return tcputil.NewTCPConn(conn), nil
}
// Resolve performs a DNS resolution on the network, address and always
@@ -44,7 +43,7 @@
return tcp, tcpAddr.String(), nil
}
-// Listener returns a flow.MsgReadWriteCloser that supports both tcp and
+// Listener returns a flow.Conn that supports both tcp and
// websockets over the same, single, port. A listen address of
// --v23.tcp.protocol=wsh --v23.tcp.address=127.0.0.1:8101 means
// that port 8101 can accept connections that use either tcp or websocket.
@@ -52,6 +51,6 @@
// to decide if it's a websocket protocol or not. These must be 'GET ' for
// websockets, all other protocols must guarantee to not send 'GET ' as the
// first four bytes of the payload.
-func (WSH) Listen(ctx *context.T, protocol, address string) (flow.MsgListener, error) {
+func (WSH) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
return listener(protocol, address, true)
}
diff --git a/runtime/internal/lib/xwebsocket/listener.go b/runtime/internal/lib/xwebsocket/listener.go
index 9a3511d..3e6c48b 100644
--- a/runtime/internal/lib/xwebsocket/listener.go
+++ b/runtime/internal/lib/xwebsocket/listener.go
@@ -16,7 +16,6 @@
"github.com/gorilla/websocket"
"v.io/x/ref/internal/logger"
- "v.io/x/ref/runtime/internal/lib/framer"
"v.io/x/ref/runtime/internal/lib/tcputil"
"v.io/v23/context"
@@ -30,14 +29,14 @@
closed bool // GUARDED_BY(mu)
mu sync.Mutex
- acceptQ chan interface{} // flow.MsgReadWriteCloser or error returned by netLn.Accept
+ acceptQ chan interface{} // flow.Conn or error returned by netLn.Accept
httpQ chan net.Conn // Candidates for websocket upgrades before being added to acceptQ
netLn net.Listener // The underlying listener
httpReq sync.WaitGroup // Number of active HTTP requests
hybrid bool // true if running in 'hybrid' mode
}
-func listener(protocol, address string, hybrid bool) (flow.MsgListener, error) {
+func listener(protocol, address string, hybrid bool) (flow.Listener, error) {
netLn, err := net.Listen(mapWebSocketToTCP[protocol], address)
if err != nil {
return nil, err
@@ -54,14 +53,14 @@
return ln, nil
}
-func (ln *wsTCPListener) Accept(ctx *context.T) (flow.MsgReadWriteCloser, error) {
+func (ln *wsTCPListener) Accept(ctx *context.T) (flow.Conn, error) {
for {
item, ok := <-ln.acceptQ
if !ok {
return nil, NewErrListenerClosed(ctx)
}
switch v := item.(type) {
- case flow.MsgReadWriteCloser:
+ case flow.Conn:
return v, nil
case error:
return nil, v
@@ -159,7 +158,7 @@
ln.httpQ <- conn
return
}
- ln.acceptQ <- framer.New(conn)
+ ln.acceptQ <- tcputil.NewTCPConn(conn)
}
func (ln *wsTCPListener) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -213,7 +212,7 @@
if !ok {
return
}
- if conn, ok := item.(flow.MsgReadWriteCloser); ok {
+ if conn, ok := item.(flow.Conn); ok {
conn.Close()
}
}
diff --git a/runtime/internal/lib/xwebsocket/protocol.go b/runtime/internal/lib/xwebsocket/protocol.go
index 1fcd641..0429cc8 100644
--- a/runtime/internal/lib/xwebsocket/protocol.go
+++ b/runtime/internal/lib/xwebsocket/protocol.go
@@ -27,7 +27,7 @@
type WS struct{}
-func (WS) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.MsgReadWriteCloser, error) {
+func (WS) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.Conn, error) {
var deadline time.Time
if timeout > 0 {
deadline = time.Now().Add(timeout)
@@ -63,6 +63,6 @@
return "ws", tcpAddr.String(), nil
}
-func (WS) Listen(ctx *context.T, protocol, address string) (flow.MsgListener, error) {
+func (WS) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
return listener(protocol, address, false)
}
diff --git a/runtime/internal/lib/xwebsocket/ws_test.go b/runtime/internal/lib/xwebsocket/ws_test.go
index 6cfd8d6..2411030 100644
--- a/runtime/internal/lib/xwebsocket/ws_test.go
+++ b/runtime/internal/lib/xwebsocket/ws_test.go
@@ -51,7 +51,7 @@
ctx, _ := context.RootContext()
address := "127.0.0.1:0"
timeout := time.Second
- acceptCh := make(chan flow.MsgReadWriteCloser)
+ acceptCh := make(chan flow.Conn)
ln, err := listenObj.Listen(ctx, listenP, address)
if err != nil {
@@ -76,7 +76,7 @@
go readData(t, accepted, randData)
}
-func writeData(t *testing.T, c flow.MsgReadWriteCloser, data []byte) {
+func writeData(t *testing.T, c flow.Conn, data []byte) {
for i := 0; i < numChunks; i++ {
if _, err := c.WriteMsg(data[:chunkSize]); err != nil {
t.Fatal(err)
@@ -85,7 +85,7 @@
}
}
-func readData(t *testing.T, c flow.MsgReadWriteCloser, expected []byte) {
+func readData(t *testing.T, c flow.Conn, expected []byte) {
read := make([]byte, len(expected))
read = read[:0]
for i := 0; i < numChunks; i++ {