runtime/internal/flow/manager: flwo manager now uses flow registered
protocol instead of rpc registered protocol.
Change-Id: I8a3d6923618d5a249daead1add0b5083da37cebd
diff --git a/runtime/factories/fake/fake.go b/runtime/factories/fake/fake.go
index 35b055a..fa0e5fe 100644
--- a/runtime/factories/fake/fake.go
+++ b/runtime/factories/fake/fake.go
@@ -15,7 +15,10 @@
"v.io/v23/context"
"v.io/v23/rpc"
+ _ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
"v.io/x/ref/runtime/internal/lib/websocket"
+
+ // TODO(suharshs): Remove these once we switch to the flow protocols.
_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
diff --git a/runtime/factories/gce/gce.go b/runtime/factories/gce/gce.go
index a0d25e8..f99417c 100644
--- a/runtime/factories/gce/gce.go
+++ b/runtime/factories/gce/gce.go
@@ -20,13 +20,16 @@
"v.io/x/lib/netstate"
"v.io/x/ref/lib/flags"
"v.io/x/ref/runtime/internal"
+ _ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
"v.io/x/ref/runtime/internal/gce"
"v.io/x/ref/runtime/internal/lib/appcycle"
"v.io/x/ref/runtime/internal/lib/websocket"
+ grt "v.io/x/ref/runtime/internal/rt"
+
+ // TODO(suharshs): Remove these once we switch to the flow protocols.
_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
- grt "v.io/x/ref/runtime/internal/rt"
)
var commonFlags *flags.Flags
diff --git a/runtime/factories/generic/generic.go b/runtime/factories/generic/generic.go
index 54e4149..651f710 100644
--- a/runtime/factories/generic/generic.go
+++ b/runtime/factories/generic/generic.go
@@ -15,12 +15,15 @@
"v.io/x/ref/lib/flags"
"v.io/x/ref/runtime/internal"
+ _ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
"v.io/x/ref/runtime/internal/lib/appcycle"
"v.io/x/ref/runtime/internal/lib/websocket"
+ grt "v.io/x/ref/runtime/internal/rt"
+
+ // TODO(suharshs): Remove these once we switch to the flow protocols.
_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
- grt "v.io/x/ref/runtime/internal/rt"
)
var commonFlags *flags.Flags
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index e5d0bb4..c139bfc 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -29,14 +29,17 @@
"v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/runtime/internal"
+ _ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
"v.io/x/ref/runtime/internal/lib/appcycle"
"v.io/x/ref/runtime/internal/lib/websocket"
irpc "v.io/x/ref/runtime/internal/rpc"
+ "v.io/x/ref/runtime/internal/rt"
+ "v.io/x/ref/services/debug/debuglib"
+
+ // TODO(suharshs): Remove these once we switch to the flow protocols.
_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
- "v.io/x/ref/runtime/internal/rt"
- "v.io/x/ref/services/debug/debuglib"
)
const (
diff --git a/runtime/factories/static/static.go b/runtime/factories/static/static.go
index 76ab113..9665d4d 100644
--- a/runtime/factories/static/static.go
+++ b/runtime/factories/static/static.go
@@ -17,13 +17,17 @@
"v.io/x/ref/lib/flags"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/runtime/internal"
+ _ "v.io/x/ref/runtime/internal/flow/protocols/tcp"
"v.io/x/ref/runtime/internal/lib/appcycle"
"v.io/x/ref/runtime/internal/lib/websocket"
+
+ "v.io/x/ref/runtime/internal/rt"
+ "v.io/x/ref/services/debug/debuglib"
+
+ // TODO(suharshs): Remove these once we switch to the flow protocols.
_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
- "v.io/x/ref/runtime/internal/rt"
- "v.io/x/ref/services/debug/debuglib"
)
var commonFlags *flags.Flags
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 122264a..588150f 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -15,12 +15,10 @@
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
- "v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/vom"
"v.io/x/ref/runtime/internal/flow/conn"
- "v.io/x/ref/runtime/internal/lib/framer"
"v.io/x/ref/runtime/internal/lib/upcqueue"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/version"
@@ -72,16 +70,16 @@
}
func (m *manager) listen(ctx *context.T, protocol, address string) (naming.Endpoint, error) {
- netLn, err := listen(ctx, protocol, address)
+ ln, err := listen(ctx, protocol, address)
if err != nil {
return nil, flow.NewErrNetwork(ctx, err)
}
local := &inaming.Endpoint{
Protocol: protocol,
- Address: netLn.Addr().String(),
+ Address: ln.Addr().String(),
RID: m.rid,
}
- go m.netLnAcceptLoop(ctx, netLn, local)
+ go m.lnAcceptLoop(ctx, ln, local)
return local, nil
}
@@ -113,10 +111,10 @@
return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
}
-func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
+func (m *manager) lnAcceptLoop(ctx *context.T, ln flow.Listener, local naming.Endpoint) {
const killConnectionsRetryDelay = 5 * time.Millisecond
for {
- netConn, err := netLn.Accept()
+ flowConn, err := ln.Accept(ctx)
for tokill := 1; isTemporaryError(err); tokill *= 2 {
if isTooManyOpenFiles(err) {
if err := m.cache.KillConnections(ctx, tokill); err != nil {
@@ -127,21 +125,21 @@
tokill = 1
}
time.Sleep(killConnectionsRetryDelay)
- netConn, err = netLn.Accept()
+ flowConn, err = ln.Accept(ctx)
}
if err != nil {
- ctx.Errorf("net.Listener.Accept on localEP %v failed: %v", local, err)
+ ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
continue
}
c, err := conn.NewAccepted(
ctx,
- framer.New(netConn),
+ flowConn,
local,
version.Supported,
&flowHandler{q: m.q, closed: m.closed},
)
if err != nil {
- netConn.Close()
+ flowConn.Close()
ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
continue
}
@@ -181,7 +179,7 @@
go func() {
c, err := conn.NewAccepted(
h.ctx,
- closer{f},
+ f,
f.Conn().LocalEndpoint(),
version.Supported,
&flowHandler{q: h.m.q, closed: h.m.closed})
@@ -255,13 +253,12 @@
return nil, flow.NewErrBadState(ctx, err)
}
var (
- d rpc.DialerFunc
+ protocol flow.Protocol
network, address string
)
if c == nil {
addr := remote.Addr()
- var r rpc.ResolverFunc
- d, r, _, _ = rpc.RegisteredProtocol(addr.Network())
+ protocol, _ = flow.RegisteredProtocol(addr.Network())
// (network, address) in the endpoint might not always match up
// with the key used for caching conns. For example:
// - conn, err := net.Dial("tcp", "www.google.com:80")
@@ -269,7 +266,7 @@
// - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
// might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
// Thus we look for Conns with the resolved address.
- network, address, err = resolve(ctx, r, addr.Network(), addr.String())
+ network, address, err = resolve(ctx, protocol, addr.Network(), addr.String())
if err != nil {
return nil, flow.NewErrResolveFailed(ctx, err)
}
@@ -280,7 +277,7 @@
defer m.cache.Unreserve(network, address, remote.BlessingNames())
}
if c == nil {
- netConn, err := dial(ctx, d, network, address)
+ flowConn, err := dial(ctx, protocol, network, address)
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
}
@@ -289,8 +286,8 @@
// "serving flow manager" by passing a 0 RID to non-serving flow managers?
c, err = conn.NewDialed(
ctx,
- framer.New(netConn), // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
- localEndpoint(netConn, m.rid),
+ flowConn,
+ localEndpoint(flowConn, m.rid),
remote,
version.Supported,
fh,
@@ -312,7 +309,7 @@
if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
c, err = conn.NewDialed(
ctx,
- closer{f},
+ f,
c.LocalEndpoint(),
remote,
version.Supported,
@@ -341,20 +338,20 @@
return m.closed
}
-func dial(ctx *context.T, d rpc.DialerFunc, protocol, address string) (net.Conn, error) {
- if d != nil {
+func dial(ctx *context.T, p flow.Protocol, protocol, address string) (flow.Conn, error) {
+ if p != nil {
var timeout time.Duration
if dl, ok := ctx.Deadline(); ok {
timeout = dl.Sub(time.Now())
}
- return d(ctx, protocol, address, timeout)
+ return p.Dial(ctx, protocol, address, timeout)
}
return nil, NewErrUnknownProtocol(ctx, protocol)
}
-func resolve(ctx *context.T, r rpc.ResolverFunc, protocol, address string) (string, string, error) {
- if r != nil {
- net, addr, err := r(ctx, protocol, address)
+func resolve(ctx *context.T, p flow.Protocol, protocol, address string) (string, string, error) {
+ if p != nil {
+ net, addr, err := p.Resolve(ctx, protocol, address)
if err != nil {
return "", "", err
}
@@ -363,9 +360,9 @@
return "", "", NewErrUnknownProtocol(ctx, protocol)
}
-func listen(ctx *context.T, protocol, address string) (net.Listener, error) {
- if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
- ln, err := l(ctx, protocol, address)
+func listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
+ if p, _ := flow.RegisteredProtocol(protocol); p != nil {
+ ln, err := p.Listen(ctx, protocol, address)
if err != nil {
return nil, err
}
@@ -374,7 +371,7 @@
return nil, NewErrUnknownProtocol(ctx, protocol)
}
-func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+func localEndpoint(conn flow.Conn, rid naming.RoutingID) naming.Endpoint {
localAddr := conn.LocalAddr()
ep := &inaming.Endpoint{
Protocol: localAddr.Network(),
@@ -393,12 +390,3 @@
oErr, ok := err.(*net.OpError)
return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
}
-
-// TODO(suharshs): should we add Close method to the Flow API?
-type closer struct {
- flow.Flow
-}
-
-func (c closer) Close() error {
- return nil
-}