ref: Preliminary proxy implementation.

This change provides single hop proxying support.

TODO in subsequent cls:
(1) Implement multiple proxying.
(2) Switch encoding format of routing information away from VOM.
(3) Ensure that BidiRPC works through proxying. (once bidi is done)

MultiPart: 1/2
Change-Id: Ic5d1635b0dcf968ecaf766380f787885c999cb6e
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index a3394d8..7dc3582 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -11,10 +11,13 @@
 	"syscall"
 	"time"
 
+	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/vom"
 
 	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/lib/upcqueue"
@@ -49,20 +52,64 @@
 // The flow.Manager associated with ctx must be the receiver of the method,
 // otherwise an error is returned.
 func (m *manager) Listen(ctx *context.T, protocol, address string) error {
+	var (
+		ep  naming.Endpoint
+		err error
+	)
+	if protocol == inaming.Network {
+		ep, err = m.proxyListen(ctx, address)
+	} else {
+		ep, err = m.listen(ctx, protocol, address)
+	}
+	if err != nil {
+		return err
+	}
+	m.mu.Lock()
+	m.listenEndpoints = append(m.listenEndpoints, ep)
+	m.mu.Unlock()
+	return nil
+}
+
+func (m *manager) listen(ctx *context.T, protocol, address string) (naming.Endpoint, error) {
 	netLn, err := listen(ctx, protocol, address)
 	if err != nil {
-		return flow.NewErrNetwork(ctx, err)
+		return nil, flow.NewErrNetwork(ctx, err)
 	}
 	local := &inaming.Endpoint{
 		Protocol: protocol,
 		Address:  netLn.Addr().String(),
 		RID:      m.rid,
 	}
-	m.mu.Lock()
-	m.listenEndpoints = append(m.listenEndpoints, local)
-	m.mu.Unlock()
 	go m.netLnAcceptLoop(ctx, netLn, local)
-	return nil
+	return local, nil
+}
+
+func (m *manager) proxyListen(ctx *context.T, address string) (naming.Endpoint, error) {
+	ep, err := inaming.NewEndpoint(address)
+	if err != nil {
+		return nil, flow.NewErrBadArg(ctx, err)
+	}
+	f, err := m.internalDial(ctx, ep, proxyBlessingsForPeer{}.run, &proxyFlowHandler{ctx: ctx, m: m})
+	if err != nil {
+		return nil, flow.NewErrNetwork(ctx, err)
+	}
+	// Write to ensure we send an openFlow message.
+	if _, err := f.Write([]byte{0}); err != nil {
+		return nil, flow.NewErrNetwork(ctx, err)
+	}
+	var lep string
+	if err := vom.NewDecoder(f).Decode(&lep); err != nil {
+		return nil, flow.NewErrNetwork(ctx, err)
+	}
+	return inaming.NewEndpoint(lep)
+}
+
+type proxyBlessingsForPeer struct{}
+
+// TODO(suharshs): Figure out what blessings to present here. And present discharges.
+func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
+	rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
+	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
 }
 
 func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
@@ -85,7 +132,7 @@
 			ctx.Errorf("net.Listener.Accept on localEP %v failed: %v", local, err)
 			continue
 		}
-		_, err = conn.NewAccepted(
+		c, err := conn.NewAccepted(
 			ctx,
 			&framer{ReadWriteCloser: netConn},
 			local,
@@ -97,12 +144,9 @@
 			ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
 			continue
 		}
-		// TODO(suharshs): We need the remote endpoint in conn to be able to insert
-		// it into the cache. This handshake has not been implemented yet. So for now
-		// we will comment it out.
-		// if err := m.cache.Insert(c); err != nil {
-		// 	ctx.VI(2).Infof("failed to cache conn %v: %v", c, err)
-		// }
+		if err := m.cache.Insert(c); err != nil {
+			ctx.VI(2).Infof("failed to cache conn %v: %v", c, err)
+		}
 	}
 }
 
@@ -121,6 +165,37 @@
 	return h.q.Put(f)
 }
 
+type proxyFlowHandler struct {
+	ctx *context.T
+	m   *manager
+}
+
+func (h *proxyFlowHandler) HandleFlow(f flow.Flow) error {
+	select {
+	case <-h.m.closed:
+		h.m.q.Close()
+		return upcqueue.ErrQueueIsClosed
+	default:
+	}
+	go func() {
+		c, err := conn.NewAccepted(
+			h.ctx,
+			closer{f},
+			f.Conn().LocalEndpoint(),
+			version.Supported,
+			&flowHandler{q: h.m.q, closed: h.m.closed})
+		if err != nil {
+			h.ctx.Errorf("failed to create accepted conn: %v", err)
+			return
+		}
+		if err := h.m.cache.Insert(c); err != nil {
+			h.ctx.Errorf("failed to create accepted conn: %v", err)
+			return
+		}
+	}()
+	return nil
+}
+
 // ListeningEndpoints returns the endpoints that the Manager has explicitly
 // listened on. The Manager will accept new flows on these endpoints.
 // Returned endpoints all have a RoutingID unique to the Acceptor.
@@ -169,24 +244,40 @@
 // The flow.Manager associated with ctx must be the receiver of the method,
 // otherwise an error is returned.
 func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
-	addr := remote.Addr()
-	d, r, _, _ := rpc.RegisteredProtocol(addr.Network())
-	// (network, address) in the endpoint might not always match up
-	// with the key used for caching conns. For example:
-	// - conn, err := net.Dial("tcp", "www.google.com:80")
-	//   fmt.Println(conn.RemoteAddr()) // Might yield the corresponding IP address
-	// - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
-	//   might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
-	// Thus we look for Conns with the resolved address.
-	network, address, err := resolve(ctx, r, addr.Network(), addr.String())
-	if err != nil {
-		return nil, flow.NewErrResolveFailed(ctx, err)
-	}
-	c, err := m.cache.ReservedFind(network, address, remote.BlessingNames())
+	return m.internalDial(ctx, remote, fn, &flowHandler{q: m.q, closed: m.closed})
+}
+
+func (m *manager) internalDial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer, fh conn.FlowHandler) (flow.Flow, error) {
+	// Look up the connection based on RoutingID first.
+	c, err := m.cache.FindWithRoutingID(remote.RoutingID())
 	if err != nil {
 		return nil, flow.NewErrBadState(ctx, err)
 	}
-	defer m.cache.Unreserve(network, address, remote.BlessingNames())
+	var (
+		d                rpc.DialerFunc
+		network, address string
+	)
+	if c == nil {
+		addr := remote.Addr()
+		var r rpc.ResolverFunc
+		d, r, _, _ = rpc.RegisteredProtocol(addr.Network())
+		// (network, address) in the endpoint might not always match up
+		// with the key used for caching conns. For example:
+		// - conn, err := net.Dial("tcp", "www.google.com:80")
+		//   fmt.Println(conn.RemoteAddr()) // Might yield the corresponding IP address
+		// - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
+		//   might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
+		// Thus we look for Conns with the resolved address.
+		network, address, err = resolve(ctx, r, addr.Network(), addr.String())
+		if err != nil {
+			return nil, flow.NewErrResolveFailed(ctx, err)
+		}
+		c, err = m.cache.ReservedFind(network, address, remote.BlessingNames())
+		if err != nil {
+			return nil, flow.NewErrBadState(ctx, err)
+		}
+		defer m.cache.Unreserve(network, address, remote.BlessingNames())
+	}
 	if c == nil {
 		netConn, err := dial(ctx, d, network, address)
 		if err != nil {
@@ -201,7 +292,7 @@
 			localEndpoint(netConn, m.rid),
 			remote,
 			version.Supported,
-			&flowHandler{q: m.q, closed: m.closed},
+			fh,
 		)
 		if err != nil {
 			return nil, flow.NewErrDialFailed(ctx, err)
@@ -214,9 +305,43 @@
 	if err != nil {
 		return nil, flow.NewErrDialFailed(ctx, err)
 	}
+
+	// If we are dialing out to a Proxy, we need to dial a conn on this flow, and
+	// return a flow on that corresponding conn.
+	if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
+		if err := sendRouteInfo(remote, f); err != nil {
+			return nil, flow.NewErrDialFailed(ctx, err)
+		}
+		c, err = conn.NewDialed(
+			ctx,
+			closer{f},
+			c.LocalEndpoint(),
+			remote,
+			version.Supported,
+			fh,
+		)
+		if err != nil {
+			return nil, flow.NewErrDialFailed(ctx, err)
+		}
+		f, err = c.Dial(ctx, fn)
+		if err != nil {
+			return nil, flow.NewErrDialFailed(ctx, err)
+		}
+	}
 	return f, nil
 }
 
+func sendRouteInfo(ep naming.Endpoint, f flow.Flow) error {
+	// TODO(suharshs): Also send endpoint routes here when implementing multi proxy.
+	rid := ep.RoutingID()
+	return vom.NewEncoder(f).Encode(rid.String())
+}
+
+// RoutingID returns the naming.Routing of the flow.Manager.
+func (m *manager) RoutingID() naming.RoutingID {
+	return m.rid
+}
+
 // Closed returns a channel that remains open for the lifetime of the Manager
 // object. Once the channel is closed any operations on the Manager will
 // necessarily fail.
@@ -276,3 +401,12 @@
 	oErr, ok := err.(*net.OpError)
 	return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
 }
+
+// TODO(suharshs): should we add Close method to the Flow API?
+type closer struct {
+	flow.Flow
+}
+
+func (c closer) Close() error {
+	return nil
+}
diff --git a/services/xproxyd/errors.vdl b/services/xproxyd/errors.vdl
new file mode 100644
index 0000000..f0a32c4
--- /dev/null
+++ b/services/xproxyd/errors.vdl
@@ -0,0 +1,9 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xproxyd
+
+error (
+  NotListening() {"en": "Proxy is not listening on any endpoints."}
+)
\ No newline at end of file
diff --git a/services/xproxyd/errors.vdl.go b/services/xproxyd/errors.vdl.go
new file mode 100644
index 0000000..c1be22c
--- /dev/null
+++ b/services/xproxyd/errors.vdl.go
@@ -0,0 +1,28 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: errors.vdl
+
+package xproxyd
+
+import (
+	// VDL system imports
+	"v.io/v23/context"
+	"v.io/v23/i18n"
+	"v.io/v23/verror"
+)
+
+var (
+	ErrNotListening = verror.Register("v.io/x/ref/services/xproxyd.NotListening", verror.NoRetry, "{1:}{2:} Proxy is not listening on any endpoints.")
+)
+
+func init() {
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNotListening.ID), "{1:}{2:} Proxy is not listening on any endpoints.")
+}
+
+// NewErrNotListening returns an error with the ErrNotListening ID.
+func NewErrNotListening(ctx *context.T) error {
+	return verror.New(ErrNotListening, ctx)
+}
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
new file mode 100644
index 0000000..b519dce
--- /dev/null
+++ b/services/xproxyd/proxy_test.go
@@ -0,0 +1,120 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xproxyd_test
+
+import (
+	"bufio"
+	"strings"
+	"testing"
+
+	_ "v.io/x/ref/runtime/factories/generic"
+	"v.io/x/ref/services/xproxyd"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+)
+
+func TestProxiedConnection(t *testing.T) {
+	pctx, shutdown := v23.Init()
+	defer shutdown()
+	actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Start the proxy.
+	addr := struct {
+		Protocol, Address string
+	}{
+		Protocol: "tcp",
+		Address:  "127.0.0.1:0",
+	}
+	pctx = v23.WithListenSpec(pctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{addr}})
+	p, err := xproxyd.New(pctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	peps := p.ListeningEndpoints()
+	if len(peps) == 0 {
+		t.Fatal("Proxy not listening on any endpoints")
+	}
+	pep := peps[0]
+
+	t.Logf("proxy endpoint: %s", pep.String())
+	// Start a accepting flow.Manager and make it listen through the proxy.
+	if err := am.Listen(actx, "v23", pep.String()); err != nil {
+		t.Fatal(err)
+	}
+	aeps := am.ListeningEndpoints()
+	if len(aeps) == 0 {
+		t.Fatal("Acceptor not listening on any endpoints")
+	}
+	aep := aeps[0]
+
+	// The dialing flow.Manager dials a flow to the accepting flow.Manager.
+	want := "Do you read me?"
+	bFn := func(
+		ctx *context.T,
+		localEndpoint, remoteEndpoint naming.Endpoint,
+		remoteBlessings security.Blessings,
+		remoteDischarges map[string]security.Discharge,
+	) (security.Blessings, map[string]security.Discharge, error) {
+		return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+	}
+	df, err := dm.Dial(dctx, aep, bFn)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// We write before accepting to ensure that the openFlow message is sent.
+	writeLine(df, want)
+	af, err := am.Accept(actx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	got, err := readLine(af)
+	if err != nil {
+		pctx.Errorf("error")
+		t.Fatal(err)
+	}
+	if got != want {
+		pctx.Errorf("error")
+		t.Errorf("got %v, want %v", got, want)
+	}
+
+	// Writes in the opposite direction should work as well.
+	want = "I read you loud and clear."
+	writeLine(af, want)
+	got, err = readLine(df)
+	if err != nil {
+		pctx.Errorf("error")
+		t.Fatal(err)
+	}
+	if got != want {
+		pctx.Errorf("error")
+		t.Errorf("got %v, want %v", got, want)
+	}
+}
+
+// TODO(suharshs): Add tests for multiple proxies and bidirectional RPC through
+// a proxy once we have bidirpc working.
+
+func readLine(f flow.Flow) (string, error) {
+	s, err := bufio.NewReader(f).ReadString('\n')
+	return strings.TrimRight(s, "\n"), err
+}
+
+func writeLine(f flow.Flow, data string) error {
+	data += "\n"
+	_, err := f.Write([]byte(data))
+	return err
+}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
new file mode 100644
index 0000000..6bd5183
--- /dev/null
+++ b/services/xproxyd/proxyd.go
@@ -0,0 +1,132 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xproxyd
+
+import (
+	"io"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/naming"
+	"v.io/v23/security"
+	"v.io/v23/vom"
+)
+
+// TODO(suharshs): Make sure that we don't leak any goroutines.
+
+type proxy struct {
+	m flow.Manager
+}
+
+func New(ctx *context.T) (*proxy, error) {
+	p := &proxy{
+		m: v23.ExperimentalGetFlowManager(ctx),
+	}
+	for _, addr := range v23.GetListenSpec(ctx).Addrs {
+		ctx.Infof("proxy listening on %v", addr)
+		if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
+			return nil, err
+		}
+	}
+	go p.listenLoop(ctx)
+	return p, nil
+}
+
+func (p *proxy) listenLoop(ctx *context.T) {
+	for {
+		f, err := p.m.Accept(ctx)
+		if err != nil {
+			ctx.Infof("p.m.Accept failed: %v", err)
+			break
+		}
+		if p.shouldRoute(f) {
+			err = p.startRouting(ctx, f)
+		} else {
+			err = p.replyToServer(ctx, f)
+		}
+		if err != nil {
+			ctx.Errorf("failed to handle incoming connection: %v", err)
+		}
+	}
+}
+
+func (p *proxy) startRouting(ctx *context.T, f flow.Flow) error {
+	rid, err := p.readRouteInfo(ctx, f)
+	if err != nil {
+		return err
+	}
+	// TODO(suharshs): Find a better way to do this.
+	ep, err := v23.NewEndpoint("@6@@@@" + rid.String() + "@@@@")
+	if err != nil {
+		return err
+	}
+	fout, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
+	if err != nil {
+		return err
+	}
+	go p.forwardLoop(ctx, f, fout)
+	go p.forwardLoop(ctx, fout, f)
+	return nil
+}
+
+type proxyBlessingsForPeer struct{}
+
+// TODO(suharshs): Figure out what blessings to present here. And present discharges.
+func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
+	rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
+	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
+
+func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
+	eps := p.ListeningEndpoints()
+	if len(eps) == 0 {
+		return NewErrNotListening(ctx)
+	}
+	// TODO(suharshs): handle listening on multiple endpoints.
+	ep := eps[0]
+	network, address := ep.Addr().Network(), ep.Addr().String()
+	// TODO(suharshs): deal with routes and such here, if we are replying to a proxy.
+	rid := f.Conn().RemoteEndpoint().RoutingID()
+	epString := naming.FormatEndpoint(network, address, rid)
+	if err := vom.NewEncoder(f).Encode(epString); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (p *proxy) ListeningEndpoints() []naming.Endpoint {
+	return p.m.ListeningEndpoints()
+}
+
+func (p *proxy) forwardLoop(ctx *context.T, fin, fout flow.Flow) {
+	for {
+		_, err := io.Copy(fin, fout)
+		if err == io.EOF {
+			return
+		} else if err != nil {
+			ctx.Errorf("f.Read failed: %v", err)
+			return
+		}
+	}
+}
+
+func (p *proxy) readRouteInfo(ctx *context.T, f flow.Flow) (naming.RoutingID, error) {
+	// TODO(suharshs): Read route information here when implementing multi proxy.
+	var (
+		rid string
+		ret naming.RoutingID
+	)
+	if err := vom.NewDecoder(f).Decode(&rid); err != nil {
+		return ret, err
+	}
+	err := ret.FromString(rid)
+	return ret, err
+}
+
+func (p *proxy) shouldRoute(f flow.Flow) bool {
+	rid := f.Conn().LocalEndpoint().RoutingID()
+	return rid != p.m.RoutingID() && rid != naming.NullRoutingID
+}