runtime/internal/flow: First iteration of flow.manager.

Implements the bare minimum to make a non-proxied connection.

Still to do:
- Caching of connections
- Proxying
- Bidirectional connections

Change-Id: I96b20fcbbe79906b0bcf2370e8fe5a25ff54d596
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index ef03f4e..3e7b57e 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -11,4 +11,7 @@
 // TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
 error (
   LargerThan3ByteUInt() {"en":"integer too large to represent in 3 bytes"}
+  UnknownProtocol(protocol string) {"en":"unknown protocol{:protocol}"}
+  ManagerClosed() {"en": "manager is already closed"}
+  AcceptFailed(err error) {"en": "accept failed{:err}"}
 )
\ No newline at end of file
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index 2029486..741c22a 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -16,13 +16,34 @@
 
 var (
 	ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+	ErrUnknownProtocol     = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
+	ErrManagerClosed       = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
+	ErrAcceptFailed        = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
 )
 
 func init() {
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrLargerThan3ByteUInt.ID), "{1:}{2:} integer too large to represent in 3 bytes")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownProtocol.ID), "{1:}{2:} unknown protocol{:3}")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrManagerClosed.ID), "{1:}{2:} manager is already closed")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
 }
 
 // NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
 func NewErrLargerThan3ByteUInt(ctx *context.T) error {
 	return verror.New(ErrLargerThan3ByteUInt, ctx)
 }
+
+// NewErrUnknownProtocol returns an error with the ErrUnknownProtocol ID.
+func NewErrUnknownProtocol(ctx *context.T, protocol string) error {
+	return verror.New(ErrUnknownProtocol, ctx, protocol)
+}
+
+// NewErrManagerClosed returns an error with the ErrManagerClosed ID.
+func NewErrManagerClosed(ctx *context.T) error {
+	return verror.New(ErrManagerClosed, ctx)
+}
+
+// NewErrAcceptFailed returns an error with the ErrAcceptFailed ID.
+func NewErrAcceptFailed(ctx *context.T, err error) error {
+	return verror.New(ErrAcceptFailed, ctx, err)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
new file mode 100644
index 0000000..e6d9360
--- /dev/null
+++ b/runtime/internal/flow/manager/manager.go
@@ -0,0 +1,244 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager
+
+import (
+	"net"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+
+	"v.io/x/ref/runtime/internal/flow/conn"
+	"v.io/x/ref/runtime/internal/lib/upcqueue"
+	inaming "v.io/x/ref/runtime/internal/naming"
+	"v.io/x/ref/runtime/internal/rpc/version"
+)
+
+type manager struct {
+	rid    naming.RoutingID
+	closed <-chan struct{}
+	q      *upcqueue.T
+
+	mu              *sync.Mutex
+	listenEndpoints []naming.Endpoint
+}
+
+func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
+	m := &manager{
+		rid:    rid,
+		closed: ctx.Done(),
+		mu:     &sync.Mutex{},
+		q:      upcqueue.New(),
+	}
+	return m
+}
+
+// Listen causes the Manager to accept flows from the provided protocol and address.
+// Listen may be called muliple times.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Listen(ctx *context.T, protocol, address string) error {
+	netLn, err := listen(ctx, protocol, address)
+	if err != nil {
+		return flow.NewErrNetwork(ctx, err)
+	}
+	local := &inaming.Endpoint{
+		Protocol: protocol,
+		Address:  netLn.Addr().String(),
+		RID:      m.rid,
+	}
+	m.mu.Lock()
+	m.listenEndpoints = append(m.listenEndpoints, local)
+	m.mu.Unlock()
+	go m.netLnAcceptLoop(ctx, netLn, local)
+	return nil
+}
+
+func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
+	const killConnectionsRetryDelay = 5 * time.Millisecond
+	for {
+		netConn, err := netLn.Accept()
+		for tokill := 1; isTemporaryError(err); tokill *= 2 {
+			if isTooManyOpenFiles(err) {
+				// TODO(suharshs): Find a way to kill connections here. We will need
+				// caching to be able to delete the connections.
+			} else {
+				tokill = 1
+			}
+			time.Sleep(killConnectionsRetryDelay)
+			netConn, err = netLn.Accept()
+		}
+		if err != nil {
+			ctx.VI(2).Infof("net.Listener.Accept on localEP %v failed: %v", local, err)
+		}
+		// TODO(suharshs): This conn needs to be cached instead of ignored.
+		_, err = conn.NewAccepted(
+			ctx,
+			&framer{ReadWriter: netConn},
+			local,
+			v23.GetPrincipal(ctx).BlessingStore().Default(),
+			version.Supported,
+			&flowHandler{q: m.q, closed: m.closed},
+		)
+		if err != nil {
+			netConn.Close()
+			ctx.VI(2).Infof("failed to accept flow.Conn on localEP %v failed: %v", local, err)
+		}
+	}
+}
+
+type flowHandler struct {
+	q      *upcqueue.T
+	closed <-chan struct{}
+}
+
+func (h *flowHandler) HandleFlow(f flow.Flow) error {
+	select {
+	case <-h.closed:
+		// This will make the Put call below return a upcqueue.ErrQueueIsClosed.
+		h.q.Close()
+	default:
+	}
+	return h.q.Put(f)
+}
+
+// ListeningEndpoints returns the endpoints that the Manager has explicitly
+// listened on. The Manager will accept new flows on these endpoints.
+// Returned endpoints all have a RoutingID unique to the Acceptor.
+func (m *manager) ListeningEndpoints() []naming.Endpoint {
+	m.mu.Lock()
+	ret := make([]naming.Endpoint, len(m.listenEndpoints))
+	copy(ret, m.listenEndpoints)
+	m.mu.Unlock()
+	return ret
+}
+
+// Accept blocks until a new Flow has been initiated by a remote process.
+// Flows are accepted from addresses that the Manager is listening on,
+// including outgoing dialed connections.
+//
+// For example:
+//   err := m.Listen(ctx, "tcp", ":0")
+//   for {
+//     flow, err := m.Accept(ctx)
+//     // process flow
+//   }
+//
+// can be used to accept Flows initiated by remote processes.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
+	// TODO(suharshs): Ensure that m is attached to ctx.
+	item, err := m.q.Get(m.closed)
+	switch {
+	case err == upcqueue.ErrQueueIsClosed:
+		return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx))
+	case err != nil:
+		return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err))
+	default:
+		return item.(flow.Flow), nil
+	}
+}
+
+// Dial creates a Flow to the provided remote endpoint, using 'fn' to
+// determine the blessings that will be sent to the remote end.
+//
+// To maximize re-use of connections, the Manager will also Listen on Dialed
+// connections for the lifetime of the connection.
+//
+// The flow.Manager associated with ctx must be the receiver of the method,
+// otherwise an error is returned.
+func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
+	// TODO(suharshs): Add caching of connections.
+	addr := remote.Addr()
+	d, _, _, _ := rpc.RegisteredProtocol(addr.Network())
+	netConn, err := dial(ctx, d, addr.Network(), addr.String())
+	if err != nil {
+		return nil, flow.NewErrDialFailed(ctx, err)
+	}
+
+	c, err := conn.NewDialed(
+		ctx,
+		&framer{ReadWriter: netConn}, // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
+		localEndpoint(netConn, m.rid),
+		remote,
+		version.Supported,
+		&flowHandler{q: m.q, closed: m.closed},
+		fn,
+	)
+	if err != nil {
+		return nil, flow.NewErrDialFailed(ctx, err)
+	}
+	return c.Dial(ctx)
+}
+
+// Closed returns a channel that remains open for the lifetime of the Manager
+// object. Once the channel is closed any operations on the Manager will
+// necessarily fail.
+func (m *manager) Closed() <-chan struct{} {
+	return m.closed
+}
+
+func dial(ctx *context.T, d rpc.DialerFunc, protocol, address string) (net.Conn, error) {
+	if d != nil {
+		var timeout time.Duration
+		if dl, ok := ctx.Deadline(); ok {
+			timeout = dl.Sub(time.Now())
+		}
+		return d(protocol, address, timeout)
+	}
+	return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func resolve(ctx *context.T, r rpc.ResolverFunc, protocol, address string) (string, string, error) {
+	if r != nil {
+		net, addr, err := r(protocol, address)
+		if err != nil {
+			return "", "", err
+		}
+		return net, addr, nil
+	}
+	return "", "", NewErrUnknownProtocol(ctx, protocol)
+}
+
+func listen(ctx *context.T, protocol, address string) (net.Listener, error) {
+	if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
+		ln, err := l(protocol, address)
+		if err != nil {
+			return nil, err
+		}
+		return ln, nil
+	}
+	return nil, NewErrUnknownProtocol(ctx, protocol)
+}
+
+func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
+	localAddr := conn.LocalAddr()
+	ep := &inaming.Endpoint{
+		Protocol: localAddr.Network(),
+		Address:  localAddr.String(),
+		RID:      rid,
+	}
+	return ep
+}
+
+func isTemporaryError(err error) bool {
+	oErr, ok := err.(*net.OpError)
+	return ok && oErr.Temporary()
+}
+
+func isTooManyOpenFiles(err error) bool {
+	oErr, ok := err.(*net.OpError)
+	return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
+}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
new file mode 100644
index 0000000..a179692
--- /dev/null
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -0,0 +1,73 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package manager_test
+
+import (
+	"bufio"
+	"strings"
+	"testing"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/security"
+
+	_ "v.io/x/ref/runtime/factories/generic"
+	"v.io/x/ref/runtime/internal/flow/manager"
+	"v.io/x/ref/test/testutil"
+)
+
+func TestDirectConnection(t *testing.T) {
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	p := testutil.NewPrincipal("test")
+	ctx, err := v23.WithPrincipal(ctx, p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	rid := naming.FixedRoutingID(0x5555)
+	m := manager.New(ctx, rid)
+	want := "read this please"
+
+	if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+		t.Fatal(err)
+	}
+
+	bFn := func(*context.T, security.Call) (security.Blessings, error) { return p.BlessingStore().Default(), nil }
+	eps := m.ListeningEndpoints()
+	if len(eps) == 0 {
+		t.Fatalf("no endpoints listened on")
+	}
+	flow, err := m.Dial(ctx, eps[0], bFn)
+	if err != nil {
+		t.Error(err)
+	}
+	writeLine(flow, want)
+
+	flow, err = m.Accept(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	got, err := readLine(flow)
+	if err != nil {
+		t.Error(err)
+	}
+	if got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+}
+
+func readLine(f flow.Flow) (string, error) {
+	s, err := bufio.NewReader(f).ReadString('\n')
+	return strings.TrimRight(s, "\n"), err
+}
+
+func writeLine(f flow.Flow, data string) error {
+	data += "\n"
+	_, err := f.Write([]byte(data))
+	return err
+}
diff --git a/runtime/internal/rpc/version/version.go b/runtime/internal/rpc/version/version.go
index 63f3133..06cc3cd 100644
--- a/runtime/internal/rpc/version/version.go
+++ b/runtime/internal/rpc/version/version.go
@@ -26,6 +26,7 @@
 // Min is incremented whenever we want to remove support for old protocol
 // versions.
 var SupportedRange = &Range{Min: version.RPCVersion10, Max: version.RPCVersion11}
+var Supported = version.RPCVersionRange{Min: version.RPCVersion10, Max: version.RPCVersion11}
 
 func init() {
 	metadata.Insert("v23.RPCVersionMax", fmt.Sprint(SupportedRange.Max))