services/xproxyd: Add support for multiple proxying.

Proxys can accept connections from other proxyings allowing
connections from server to client to span many proxies.

Change-Id: I5467ac762dd32cb570c7cec59beee92fc1ab3a78
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index fa8fabf..6d44609 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -80,5 +80,5 @@
 	}
 	m, err := message.Read(ctx, msg[:len(msg)-p.cipher.MACSize()])
 	ctx.VI(2).Infof("Read low-level message: %#v", m)
-	return m, nil
+	return m, err
 }
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index af90572..da7707a 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -26,6 +26,11 @@
 	"v.io/x/ref/runtime/internal/rpc/version"
 )
 
+const (
+	clientByte = 'c'
+	serverByte = 's'
+)
+
 type manager struct {
 	rid    naming.RoutingID
 	closed <-chan struct{}
@@ -95,7 +100,7 @@
 		return nil, flow.NewErrNetwork(ctx, err)
 	}
 	// Write to ensure we send an openFlow message.
-	if _, err := f.Write([]byte{0}); err != nil {
+	if _, err := f.Write([]byte{serverByte}); err != nil {
 		return nil, flow.NewErrNetwork(ctx, err)
 	}
 	var lep string
@@ -312,6 +317,10 @@
 	// If we are dialing out to a Proxy, we need to dial a conn on this flow, and
 	// return a flow on that corresponding conn.
 	if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
+		// Write to tell the proxy that this should be routed.
+		if _, err := f.Write([]byte{clientByte}); err != nil {
+			return nil, flow.NewErrNetwork(ctx, err)
+		}
 		c, err = conn.NewDialed(
 			ctx,
 			f,
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index b519dce..4257409 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -32,46 +32,60 @@
 		t.Fatal(err)
 	}
 
-	// Start the proxy.
-	addr := struct {
-		Protocol, Address string
-	}{
-		Protocol: "tcp",
-		Address:  "127.0.0.1:0",
-	}
-	pctx = v23.WithListenSpec(pctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{addr}})
-	p, err := xproxyd.New(pctx)
-	if err != nil {
-		t.Fatal(err)
-	}
-	peps := p.ListeningEndpoints()
-	if len(peps) == 0 {
-		t.Fatal("Proxy not listening on any endpoints")
-	}
-	pep := peps[0]
+	pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
 
-	t.Logf("proxy endpoint: %s", pep.String())
-	// Start a accepting flow.Manager and make it listen through the proxy.
 	if err := am.Listen(actx, "v23", pep.String()); err != nil {
 		t.Fatal(err)
 	}
+	testEndToEndConnections(t, dctx, actx, dm, am)
+}
+
+func TestMultipleProxies(t *testing.T) {
+	pctx, shutdown := v23.Init()
+	defer shutdown()
+	p2ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	p3ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
+
+	p2ep := startProxy(t, p2ctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
+
+	p3ep := startProxy(t, p3ctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
+
+	if err := am.Listen(actx, "v23", p3ep.String()); err != nil {
+		t.Fatal(err)
+	}
+	testEndToEndConnections(t, dctx, actx, dm, am)
+}
+
+func testEndToEndConnections(t *testing.T, dctx, actx *context.T, dm, am flow.Manager) {
 	aeps := am.ListeningEndpoints()
 	if len(aeps) == 0 {
-		t.Fatal("Acceptor not listening on any endpoints")
+		t.Fatal("acceptor not listening on any endpoints")
 	}
-	aep := aeps[0]
+	for _, aep := range aeps {
+		testEndToEndConnection(t, dctx, actx, dm, am, aep)
+	}
+}
 
+func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) {
 	// The dialing flow.Manager dials a flow to the accepting flow.Manager.
 	want := "Do you read me?"
-	bFn := func(
-		ctx *context.T,
-		localEndpoint, remoteEndpoint naming.Endpoint,
-		remoteBlessings security.Blessings,
-		remoteDischarges map[string]security.Discharge,
-	) (security.Blessings, map[string]security.Discharge, error) {
-		return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
-	}
-	df, err := dm.Dial(dctx, aep, bFn)
+	df, err := dm.Dial(dctx, aep, bfp)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -83,11 +97,9 @@
 	}
 	got, err := readLine(af)
 	if err != nil {
-		pctx.Errorf("error")
 		t.Fatal(err)
 	}
 	if got != want {
-		pctx.Errorf("error")
 		t.Errorf("got %v, want %v", got, want)
 	}
 
@@ -96,17 +108,14 @@
 	writeLine(af, want)
 	got, err = readLine(df)
 	if err != nil {
-		pctx.Errorf("error")
 		t.Fatal(err)
 	}
 	if got != want {
-		pctx.Errorf("error")
 		t.Errorf("got %v, want %v", got, want)
 	}
 }
 
-// TODO(suharshs): Add tests for multiple proxies and bidirectional RPC through
-// a proxy once we have bidirpc working.
+// TODO(suharshs): Add test for bidirectional RPC.
 
 func readLine(f flow.Flow) (string, error) {
 	s, err := bufio.NewReader(f).ReadString('\n')
@@ -118,3 +127,36 @@
 	_, err := f.Write([]byte(data))
 	return err
 }
+
+func bfp(
+	ctx *context.T,
+	localEndpoint, remoteEndpoint naming.Endpoint,
+	remoteBlessings security.Blessings,
+	remoteDischarges map[string]security.Discharge,
+) (security.Blessings, map[string]security.Discharge, error) {
+	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
+
+type address struct {
+	Protocol, Address string
+}
+
+func startProxy(t *testing.T, ctx *context.T, addrs ...address) naming.Endpoint {
+	var ls rpc.ListenSpec
+	for _, addr := range addrs {
+		ls.Addrs = append(ls.Addrs, addr)
+	}
+	ctx = v23.WithListenSpec(ctx, ls)
+	proxy, err := xproxyd.New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	peps := proxy.ListeningEndpoints()
+	for _, pep := range peps {
+		if pep.Addr().Network() == "tcp" {
+			return pep
+		}
+	}
+	t.Fatal("Proxy not listening on network address.")
+	return nil
+}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index 6857ec8..239d633 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -5,22 +5,26 @@
 package xproxyd
 
 import (
-	"fmt"
 	"io"
+	"sync"
 
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
-	"v.io/v23/flow/message"
 	"v.io/v23/naming"
-	"v.io/v23/security"
 	"v.io/v23/vom"
 )
 
 // TODO(suharshs): Make sure that we don't leak any goroutines.
 
+const proxyByte = byte('p')
+const serverByte = byte('s')
+const clientByte = byte('c')
+
 type proxy struct {
-	m flow.Manager
+	m              flow.Manager
+	mu             sync.Mutex
+	proxyEndpoints []naming.Endpoint
 }
 
 func New(ctx *context.T) (*proxy, error) {
@@ -28,7 +32,31 @@
 		m: v23.ExperimentalGetFlowManager(ctx),
 	}
 	for _, addr := range v23.GetListenSpec(ctx).Addrs {
-		if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
+		if addr.Protocol == "v23" {
+			ep, err := v23.NewEndpoint(addr.Address)
+			if err != nil {
+				return nil, err
+			}
+			f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
+			if err != nil {
+				return nil, err
+			}
+			// Send a byte telling the acceptor that we are a proxy.
+			if _, err := f.Write([]byte{proxyByte}); err != nil {
+				return nil, err
+			}
+			var lep string
+			if err := vom.NewDecoder(f).Decode(&lep); err != nil {
+				return nil, err
+			}
+			proxyEndpoint, err := v23.NewEndpoint(lep)
+			if err != nil {
+				return nil, err
+			}
+			p.mu.Lock()
+			p.proxyEndpoints = append(p.proxyEndpoints, proxyEndpoint)
+			p.mu.Unlock()
+		} else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
 			return nil, err
 		}
 	}
@@ -36,6 +64,10 @@
 	return p, nil
 }
 
+func (p *proxy) ListeningEndpoints() []naming.Endpoint {
+	return p.m.ListeningEndpoints()
+}
+
 func (p *proxy) listenLoop(ctx *context.T) {
 	for {
 		f, err := p.m.Accept(ctx)
@@ -43,10 +75,19 @@
 			ctx.Infof("p.m.Accept failed: %v", err)
 			break
 		}
-		if p.shouldRoute(f) {
+		msg := make([]byte, 1)
+		if _, err := f.Read(msg); err != nil {
+			ctx.Errorf("reading type byte failed: %v", err)
+		}
+		switch msg[0] {
+		case clientByte:
 			err = p.startRouting(ctx, f)
-		} else {
+		case proxyByte:
+			err = p.replyToProxy(ctx, f)
+		case serverByte:
 			err = p.replyToServer(ctx, f)
+		default:
+			continue
 		}
 		if err != nil {
 			ctx.Errorf("failed to handle incoming connection: %v", err)
@@ -64,27 +105,6 @@
 	return nil
 }
 
-func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
-	eps := p.ListeningEndpoints()
-	if len(eps) == 0 {
-		return NewErrNotListening(ctx)
-	}
-	// TODO(suharshs): handle listening on multiple endpoints.
-	ep := eps[0]
-	network, address := ep.Addr().Network(), ep.Addr().String()
-	// TODO(suharshs): deal with routes and such here, if we are replying to a proxy.
-	rid := f.Conn().RemoteEndpoint().RoutingID()
-	epString := naming.FormatEndpoint(network, address, rid)
-	if err := vom.NewEncoder(f).Encode(epString); err != nil {
-		return err
-	}
-	return nil
-}
-
-func (p *proxy) ListeningEndpoints() []naming.Endpoint {
-	return p.m.ListeningEndpoints()
-}
-
 func (p *proxy) forwardLoop(ctx *context.T, fin, fout flow.Flow) {
 	for {
 		_, err := io.Copy(fin, fout)
@@ -98,54 +118,97 @@
 }
 
 func (p *proxy) dialNextHop(ctx *context.T, f flow.Flow) (flow.Flow, error) {
-	// TODO(suharshs): Read route information here when implementing multi proxy.
 	m, err := readSetupMessage(ctx, f)
 	if err != nil {
 		return nil, err
 	}
-	fout, err := p.m.Dial(ctx, m.PeerRemoteEndpoint, proxyBlessingsForPeer{}.run)
+	var rid naming.RoutingID
+	var ep naming.Endpoint
+	var shouldWriteClientByte bool
+	if routes := m.PeerRemoteEndpoint.Routes(); len(routes) > 0 {
+		if err := rid.FromString(routes[0]); err != nil {
+			return nil, err
+		}
+		// Make an endpoint with the correct routingID to dial out. All other fields
+		// do not matter.
+		// TODO(suharshs): Make sure that the routingID from the route belongs to a
+		// connection that is stored in the manager's cache. (i.e. a Server has connected
+		// with the routingID before)
+		if ep, err = setEndpointRoutingID(m.PeerRemoteEndpoint, rid); err != nil {
+			return nil, err
+		}
+		// Remove the read route from the setup message endpoint.
+		if m.PeerRemoteEndpoint, err = setEndpointRoutes(m.PeerRemoteEndpoint, routes[1:]); err != nil {
+			return nil, err
+		}
+		shouldWriteClientByte = true
+	} else {
+		ep = m.PeerRemoteEndpoint
+	}
+	fout, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
 	if err != nil {
 		return nil, err
 	}
+	if shouldWriteClientByte {
+		// We only write the clientByte on flows made to proxys. If we are creating
+		// the last hop flow to the end server, we don't need to send the byte.
+		if _, err := fout.Write([]byte{clientByte}); err != nil {
+			return nil, err
+		}
+	}
+
 	// Write the setup message back onto the flow for the next hop to read.
 	return fout, writeSetupMessage(ctx, m, fout)
 }
 
-func readSetupMessage(ctx *context.T, f flow.Flow) (*message.Setup, error) {
-	b, err := f.ReadMsg()
-	if err != nil {
-		return nil, err
-	}
-	m, err := message.Read(ctx, b)
-	if err != nil {
-		return nil, err
-	}
-	if m, isSetup := m.(*message.Setup); isSetup {
-		return m, nil
-	}
-	return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
-}
-
-func writeSetupMessage(ctx *context.T, m message.Message, f flow.Flow) error {
-	// TODO(suharshs): When reading the routes we should remove the read route from
-	// the endpoint.
-	w, err := message.Append(ctx, m, []byte{})
+func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
+	rid := f.Conn().RemoteEndpoint().RoutingID()
+	epString, err := p.returnEndpoint(ctx, rid, "")
 	if err != nil {
 		return err
 	}
-	_, err = f.WriteMsg(w)
-	return err
+	// TODO(suharshs): Make a low-level message for this information instead of
+	// VOM-Encoding the endpoint string.
+	return vom.NewEncoder(f).Encode(epString)
 }
 
-func (p *proxy) shouldRoute(f flow.Flow) bool {
-	rid := f.Conn().LocalEndpoint().RoutingID()
-	return rid != p.m.RoutingID() && rid != naming.NullRoutingID
+func (p *proxy) replyToProxy(ctx *context.T, f flow.Flow) error {
+	// Add the routing id of the incoming proxy to the routes. The routing id of the
+	// returned endpoint doesn't matter because it will eventually be replaced
+	// by a server's rid by some later proxy.
+	// TODO(suharshs): Use a local route instead of this global routingID.
+	rid := f.Conn().RemoteEndpoint().RoutingID()
+	epString, err := p.returnEndpoint(ctx, naming.NullRoutingID, rid.String())
+	if err != nil {
+		return err
+	}
+	return vom.NewEncoder(f).Encode(epString)
 }
 
-type proxyBlessingsForPeer struct{}
-
-// TODO(suharshs): Figure out what blessings to present here. And present discharges.
-func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
-	rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
-	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+func (p *proxy) returnEndpoint(ctx *context.T, rid naming.RoutingID, route string) (string, error) {
+	p.mu.Lock()
+	eps := append(p.m.ListeningEndpoints(), p.proxyEndpoints...)
+	p.mu.Unlock()
+	if len(eps) == 0 {
+		return "", NewErrNotListening(ctx)
+	}
+	// TODO(suharshs): handle listening on multiple endpoints.
+	ep := eps[len(eps)-1]
+	var err error
+	if rid != naming.NullRoutingID {
+		ep, err = setEndpointRoutingID(ep, rid)
+		if err != nil {
+			return "", err
+		}
+	}
+	if len(route) > 0 {
+		var cp []string
+		cp = append(cp, ep.Routes()...)
+		cp = append(cp, route)
+		ep, err = setEndpointRoutes(ep, cp)
+		if err != nil {
+			return "", err
+		}
+	}
+	return ep.String(), nil
 }
diff --git a/services/xproxyd/util.go b/services/xproxyd/util.go
new file mode 100644
index 0000000..b1fdd58
--- /dev/null
+++ b/services/xproxyd/util.go
@@ -0,0 +1,98 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xproxyd
+
+import (
+	"fmt"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/flow/message"
+	"v.io/v23/naming"
+	"v.io/v23/security"
+)
+
+// setEndpointRoutingID returns a copy of ep with RoutingId changed to rid.
+func setEndpointRoutingID(ep naming.Endpoint, rid naming.RoutingID) (naming.Endpoint, error) {
+	network, address, routes, _, bnames, mountable := getEndpointParts(ep)
+	opts := routes
+	opts = append(opts, bnames...)
+	opts = append(opts, rid)
+	opts = append(opts, mountable)
+	epString := naming.FormatEndpoint(network, address, opts...)
+	return v23.NewEndpoint(epString)
+}
+
+// setEndpointRoutes returns a copy of ep with Routes changed to routes.
+func setEndpointRoutes(ep naming.Endpoint, routes []string) (naming.Endpoint, error) {
+	network, address, _, rid, bnames, mountable := getEndpointParts(ep)
+	opts := routeOpts(routes)
+	opts = append(opts, bnames...)
+	opts = append(opts, rid)
+	opts = append(opts, mountable)
+	epString := naming.FormatEndpoint(network, address, opts...)
+	return v23.NewEndpoint(epString)
+}
+
+// getEndpointParts returns all the fields of ep.
+func getEndpointParts(ep naming.Endpoint) (network string, address string,
+	routes []naming.EndpointOpt, rid naming.RoutingID,
+	blessingNames []naming.EndpointOpt, mountable naming.EndpointOpt) {
+	network, address = ep.Addr().Network(), ep.Addr().String()
+	routes = routeOpts(ep.Routes())
+	rid = ep.RoutingID()
+	blessingNames = blessingOpts(ep.BlessingNames())
+	mountable = naming.ServesMountTable(ep.ServesMountTable())
+	return
+}
+
+func routeOpts(routes []string) []naming.EndpointOpt {
+	var routeOpts []naming.EndpointOpt
+	for _, route := range routes {
+		routeOpts = append(routeOpts, naming.RouteOpt(route))
+	}
+	return routeOpts
+}
+
+func blessingOpts(blessingNames []string) []naming.EndpointOpt {
+	var blessingOpts []naming.EndpointOpt
+	for _, b := range blessingNames {
+		blessingOpts = append(blessingOpts, naming.BlessingOpt(b))
+	}
+	return blessingOpts
+}
+
+type proxyBlessingsForPeer struct{}
+
+// TODO(suharshs): Figure out what blessings to present here. And present discharges.
+func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
+	rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
+	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
+
+func readSetupMessage(ctx *context.T, f flow.Flow) (*message.Setup, error) {
+	b, err := f.ReadMsg()
+	if err != nil {
+		return nil, err
+	}
+	m, err := message.Read(ctx, b)
+	if err != nil {
+		return nil, err
+	}
+	if m, isSetup := m.(*message.Setup); isSetup {
+		return m, nil
+	}
+	return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
+}
+
+func writeSetupMessage(ctx *context.T, m message.Message, f flow.Flow) error {
+	w, err := message.Append(ctx, m, []byte{})
+	if err != nil {
+		return err
+	}
+	_, err = f.WriteMsg(w)
+	return err
+}