runtime/internal/flow: First iteration of flow.manager.
Implements the bare minimum to make a non-proxied connection.
Still to do:
- Caching of connections
- Proxying
- Bidirectional connections
Change-Id: I96b20fcbbe79906b0bcf2370e8fe5a25ff54d596
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index ef03f4e..3e7b57e 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -11,4 +11,7 @@
// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
error (
LargerThan3ByteUInt() {"en":"integer too large to represent in 3 bytes"}
+ UnknownProtocol(protocol string) {"en":"unknown protocol{:protocol}"}
+ ManagerClosed() {"en": "manager is already closed"}
+ AcceptFailed(err error) {"en": "accept failed{:err}"}
)
\ No newline at end of file
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index 2029486..741c22a 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -16,13 +16,34 @@
var (
ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+ ErrUnknownProtocol = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
+ ErrManagerClosed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
+ ErrAcceptFailed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
)
func init() {
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrLargerThan3ByteUInt.ID), "{1:}{2:} integer too large to represent in 3 bytes")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownProtocol.ID), "{1:}{2:} unknown protocol{:3}")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrManagerClosed.ID), "{1:}{2:} manager is already closed")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
}
// NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
func NewErrLargerThan3ByteUInt(ctx *context.T) error {
return verror.New(ErrLargerThan3ByteUInt, ctx)
}
+
+// NewErrUnknownProtocol returns an error with the ErrUnknownProtocol ID.
+func NewErrUnknownProtocol(ctx *context.T, protocol string) error {
+ return verror.New(ErrUnknownProtocol, ctx, protocol)
+}
+
+// NewErrManagerClosed returns an error with the ErrManagerClosed ID.
+func NewErrManagerClosed(ctx *context.T) error {
+ return verror.New(ErrManagerClosed, ctx)
+}
+
+// NewErrAcceptFailed returns an error with the ErrAcceptFailed ID.
+func NewErrAcceptFailed(ctx *context.T, err error) error {
+ return verror.New(ErrAcceptFailed, ctx, err)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
new file mode 100644
index 0000000..e6d9360
--- /dev/null
+++ b/runtime/internal/flow/manager/manager.go
@@ -0,0 +1,244 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+ "net"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+
+ "v.io/x/ref/runtime/internal/flow/conn"
+ "v.io/x/ref/runtime/internal/lib/upcqueue"
+ inaming "v.io/x/ref/runtime/internal/naming"
+ "v.io/x/ref/runtime/internal/rpc/version"
+)
+
+type manager struct {
+ rid naming.RoutingID
+ closed <-chan struct{}
+ q *upcqueue.T
+
+ mu *sync.Mutex
+ listenEndpoints []naming.Endpoint
+}
+
+func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
+ m := &manager{
+ rid: rid,
+ closed: ctx.Done(),
+ mu: &sync.Mutex{},
+ q: upcqueue.New(),
+ }
+ return m
+}
+
+// Listen causes the Manager to accept flows from the provided protocol and address.
+// Listen may be called muliple times.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Listen(ctx *context.T, protocol, address string) error {
+ netLn, err := listen(ctx, protocol, address)
+ if err != nil {
+ return flow.NewErrNetwork(ctx, err)
+ }
+ local := &inaming.Endpoint{
+ Protocol: protocol,
+ Address: netLn.Addr().String(),
+ RID: m.rid,
+ }
+ m.mu.Lock()
+ m.listenEndpoints = append(m.listenEndpoints, local)
+ m.mu.Unlock()
+ go m.netLnAcceptLoop(ctx, netLn, local)
+ return nil
+}
+
+func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
+ const killConnectionsRetryDelay = 5 * time.Millisecond
+ for {
+ netConn, err := netLn.Accept()
+ for tokill := 1; isTemporaryError(err); tokill *= 2 {
+ if isTooManyOpenFiles(err) {
+ // TODO(suharshs): Find a way to kill connections here. We will need
+ // caching to be able to delete the connections.
+ } else {
+ tokill = 1
+ }
+ time.Sleep(killConnectionsRetryDelay)
+ netConn, err = netLn.Accept()
+ }
+ if err != nil {
+ ctx.VI(2).Infof("net.Listener.Accept on localEP %v failed: %v", local, err)
+ }
+ // TODO(suharshs): This conn needs to be cached instead of ignored.
+ _, err = conn.NewAccepted(
+ ctx,
+ &framer{ReadWriter: netConn},
+ local,
+ v23.GetPrincipal(ctx).BlessingStore().Default(),
+ version.Supported,
+ &flowHandler{q: m.q, closed: m.closed},
+ )
+ if err != nil {
+ netConn.Close()
+ ctx.VI(2).Infof("failed to accept flow.Conn on localEP %v failed: %v", local, err)
+ }
+ }
+}
+
+type flowHandler struct {
+ q *upcqueue.T
+ closed <-chan struct{}
+}
+
+func (h *flowHandler) HandleFlow(f flow.Flow) error {
+ select {
+ case <-h.closed:
+ // This will make the Put call below return a upcqueue.ErrQueueIsClosed.
+ h.q.Close()
+ default:
+ }
+ return h.q.Put(f)
+}
+
+// ListeningEndpoints returns the endpoints that the Manager has explicitly
+// listened on. The Manager will accept new flows on these endpoints.
+// Returned endpoints all have a RoutingID unique to the Acceptor.
+func (m *manager) ListeningEndpoints() []naming.Endpoint {
+ m.mu.Lock()
+ ret := make([]naming.Endpoint, len(m.listenEndpoints))
+ copy(ret, m.listenEndpoints)
+ m.mu.Unlock()
+ return ret
+}
+
+// Accept blocks until a new Flow has been initiated by a remote process.
+// Flows are accepted from addresses that the Manager is listening on,
+// including outgoing dialed connections.
+//
+// For example:
+// err := m.Listen(ctx, "tcp", ":0")
+// for {
+// flow, err := m.Accept(ctx)
+// // process flow
+// }
+//
+// can be used to accept Flows initiated by remote processes.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
+ // TODO(suharshs): Ensure that m is attached to ctx.
+ item, err := m.q.Get(m.closed)
+ switch {
+ case err == upcqueue.ErrQueueIsClosed:
+ return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
+ case err != nil:
+ return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err))
+ default:
+ return item.(flow.Flow), nil
+ }
+}
+
+// Dial creates a Flow to the provided remote endpoint, using 'fn' to
+// determine the blessings that will be sent to the remote end.
+//
+// To maximize re-use of connections, the Manager will also Listen on Dialed
+// connections for the lifetime of the connection.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
+ // TODO(suharshs): Add caching of connections.
+ addr := remote.Addr()
+ d, _, _, _ := rpc.RegisteredProtocol(addr.Network())
+ netConn, err := dial(ctx, d, addr.Network(), addr.String())
+ if err != nil {
+ return nil, flow.NewErrDialFailed(ctx, err)
+ }
+
+ c, err := conn.NewDialed(
+ ctx,
+ &framer{ReadWriter: netConn}, // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
+ localEndpoint(netConn, m.rid),
+ remote,
+ version.Supported,
+ &flowHandler{q: m.q, closed: m.closed},
+ fn,
+ )
+ if err != nil {
+ return nil, flow.NewErrDialFailed(ctx, err)
+ }
+ return c.Dial(ctx)
+}
+
+// Closed returns a channel that remains open for the lifetime of the Manager
+// object. Once the channel is closed any operations on the Manager will
+// necessarily fail.
+func (m *manager) Closed() <-chan struct{} {
+ return m.closed
+}
+
+func dial(ctx *context.T, d rpc.DialerFunc, protocol, address string) (net.Conn, error) {
+ if d != nil {
+ var timeout time.Duration
+ if dl, ok := ctx.Deadline(); ok {
+ timeout = dl.Sub(time.Now())
+ }
+ return d(protocol, address, timeout)
+ }
+ return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func resolve(ctx *context.T, r rpc.ResolverFunc, protocol, address string) (string, string, error) {
+ if r != nil {
+ net, addr, err := r(protocol, address)
+ if err != nil {
+ return "", "", err
+ }
+ return net, addr, nil
+ }
+ return "", "", NewErrUnknownProtocol(ctx, protocol)
+}
+
+func listen(ctx *context.T, protocol, address string) (net.Listener, error) {
+ if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
+ ln, err := l(protocol, address)
+ if err != nil {
+ return nil, err
+ }
+ return ln, nil
+ }
+ return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+ localAddr := conn.LocalAddr()
+ ep := &inaming.Endpoint{
+ Protocol: localAddr.Network(),
+ Address: localAddr.String(),
+ RID: rid,
+ }
+ return ep
+}
+
+func isTemporaryError(err error) bool {
+ oErr, ok := err.(*net.OpError)
+ return ok && oErr.Temporary()
+}
+
+func isTooManyOpenFiles(err error) bool {
+ oErr, ok := err.(*net.OpError)
+ return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
+}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
new file mode 100644
index 0000000..a179692
--- /dev/null
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+ "bufio"
+ "strings"
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+
+ _ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/ref/runtime/internal/flow/manager"
+ "v.io/x/ref/test/testutil"
+)
+
+func TestDirectConnection(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ p := testutil.NewPrincipal("test")
+ ctx, err := v23.WithPrincipal(ctx, p)
+ if err != nil {
+ t.Fatal(err)
+ }
+ rid := naming.FixedRoutingID(0x5555)
+ m := manager.New(ctx, rid)
+ want := "read this please"
+
+ if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+ t.Fatal(err)
+ }
+
+ bFn := func(*context.T, security.Call) (security.Blessings, error) { return p.BlessingStore().Default(), nil }
+ eps := m.ListeningEndpoints()
+ if len(eps) == 0 {
+ t.Fatalf("no endpoints listened on")
+ }
+ flow, err := m.Dial(ctx, eps[0], bFn)
+ if err != nil {
+ t.Error(err)
+ }
+ writeLine(flow, want)
+
+ flow, err = m.Accept(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := readLine(flow)
+ if err != nil {
+ t.Error(err)
+ }
+ if got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+}
+
+func readLine(f flow.Flow) (string, error) {
+ s, err := bufio.NewReader(f).ReadString('\n')
+ return strings.TrimRight(s, "\n"), err
+}
+
+func writeLine(f flow.Flow, data string) error {
+ data += "\n"
+ _, err := f.Write([]byte(data))
+ return err
+}
diff --git a/runtime/internal/rpc/version/version.go b/runtime/internal/rpc/version/version.go
index 63f3133..06cc3cd 100644
--- a/runtime/internal/rpc/version/version.go
+++ b/runtime/internal/rpc/version/version.go
@@ -26,6 +26,7 @@
// Min is incremented whenever we want to remove support for old protocol
// versions.
var SupportedRange = &Range{Min: version.RPCVersion10, Max: version.RPCVersion11}
+var Supported = version.RPCVersionRange{Min: version.RPCVersion10, Max: version.RPCVersion11}
func init() {
metadata.Insert("v23.RPCVersionMax", fmt.Sprint(SupportedRange.Max))