services/xproxyd: Create non-vom proxy protocol.

(1) v23/flow/message now has Proxy request and response messages.
(2) Add support for proxy to pass multiple endpoints back to servers.

MultiPart: 1/2
Change-Id: I2ee28d690adcfad263ecc48c5f8df5448d3cd6e0
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index 4ea50bc..dbb38ae 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -15,4 +15,5 @@
   AcceptFailed(err error) {"en": "accept failed{:err}"}
   CacheClosed() {"en":"cache is closed"}
   ConnKilledToFreeResources() {"en": "Connection killed to free resources."}
+  InvalidProxyResponse(typ string) {"en": "Invalid proxy response{:typ}"}
 )
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index be7aa07..438e6fa 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -20,6 +20,7 @@
 	ErrAcceptFailed              = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
 	ErrCacheClosed               = verror.Register("v.io/x/ref/runtime/internal/flow/manager.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
 	ErrConnKilledToFreeResources = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ConnKilledToFreeResources", verror.NoRetry, "{1:}{2:} Connection killed to free resources.")
+	ErrInvalidProxyResponse      = verror.Register("v.io/x/ref/runtime/internal/flow/manager.InvalidProxyResponse", verror.NoRetry, "{1:}{2:} Invalid proxy response{:3}")
 )
 
 func init() {
@@ -28,6 +29,7 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnKilledToFreeResources.ID), "{1:}{2:} Connection killed to free resources.")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidProxyResponse.ID), "{1:}{2:} Invalid proxy response{:3}")
 }
 
 // NewErrUnknownProtocol returns an error with the ErrUnknownProtocol ID.
@@ -54,3 +56,8 @@
 func NewErrConnKilledToFreeResources(ctx *context.T) error {
 	return verror.New(ErrConnKilledToFreeResources, ctx)
 }
+
+// NewErrInvalidProxyResponse returns an error with the ErrInvalidProxyResponse ID.
+func NewErrInvalidProxyResponse(ctx *context.T, typ string) error {
+	return verror.New(ErrInvalidProxyResponse, ctx, typ)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index c2eb5a9..6ec43fb 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -5,6 +5,7 @@
 package manager
 
 import (
+	"fmt"
 	"net"
 	"strings"
 	"sync"
@@ -18,7 +19,6 @@
 	"v.io/v23/naming"
 	"v.io/v23/security"
 	"v.io/v23/verror"
-	"v.io/v23/vom"
 
 	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/lib/upcqueue"
@@ -26,11 +26,6 @@
 	"v.io/x/ref/runtime/internal/rpc/version"
 )
 
-const (
-	clientByte = 'c'
-	serverByte = 's'
-)
-
 type manager struct {
 	rid    naming.RoutingID
 	closed <-chan struct{}
@@ -59,24 +54,24 @@
 // otherwise an error is returned.
 func (m *manager) Listen(ctx *context.T, protocol, address string) error {
 	var (
-		ep  naming.Endpoint
+		eps []naming.Endpoint
 		err error
 	)
 	if protocol == inaming.Network {
-		ep, err = m.proxyListen(ctx, address)
+		eps, err = m.proxyListen(ctx, address)
 	} else {
-		ep, err = m.listen(ctx, protocol, address)
+		eps, err = m.listen(ctx, protocol, address)
 	}
 	if err != nil {
 		return err
 	}
 	m.mu.Lock()
-	m.listenEndpoints = append(m.listenEndpoints, ep)
+	m.listenEndpoints = append(m.listenEndpoints, eps...)
 	m.mu.Unlock()
 	return nil
 }
 
-func (m *manager) listen(ctx *context.T, protocol, address string) (naming.Endpoint, error) {
+func (m *manager) listen(ctx *context.T, protocol, address string) ([]naming.Endpoint, error) {
 	ln, err := listen(ctx, protocol, address)
 	if err != nil {
 		return nil, flow.NewErrNetwork(ctx, err)
@@ -87,10 +82,10 @@
 		RID:      m.rid,
 	}
 	go m.lnAcceptLoop(ctx, ln, local)
-	return local, nil
+	return []naming.Endpoint{local}, nil
 }
 
-func (m *manager) proxyListen(ctx *context.T, address string) (naming.Endpoint, error) {
+func (m *manager) proxyListen(ctx *context.T, address string) ([]naming.Endpoint, error) {
 	ep, err := inaming.NewEndpoint(address)
 	if err != nil {
 		return nil, flow.NewErrBadArg(ctx, err)
@@ -99,15 +94,31 @@
 	if err != nil {
 		return nil, flow.NewErrNetwork(ctx, err)
 	}
-	// Write to ensure we send an openFlow message.
-	if _, err := f.Write([]byte{serverByte}); err != nil {
+	w, err := message.Append(ctx, &message.ProxyServerRequest{}, nil)
+	if err != nil {
+		return nil, flow.NewErrBadArg(ctx, err)
+	}
+	if _, err := f.WriteMsg(w); err != nil {
+		return nil, flow.NewErrBadArg(ctx, err)
+	}
+
+	return m.readProxyResponse(ctx, f)
+}
+
+func (m *manager) readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
+	b, err := f.ReadMsg()
+	if err != nil {
 		return nil, flow.NewErrNetwork(ctx, err)
 	}
-	var lep string
-	if err := vom.NewDecoder(f).Decode(&lep); err != nil {
-		return nil, flow.NewErrNetwork(ctx, err)
+	msg, err := message.Read(ctx, b)
+	if err != nil {
+		return nil, flow.NewErrBadArg(ctx, err)
 	}
-	return inaming.NewEndpoint(lep)
+	res, ok := msg.(*message.ProxyResponse)
+	if !ok {
+		return nil, flow.NewErrBadArg(ctx, NewErrInvalidProxyResponse(ctx, fmt.Sprintf("%t", res)))
+	}
+	return res.Endpoints, nil
 }
 
 type proxyBlessingsForPeer struct{}
@@ -322,10 +333,6 @@
 	// If we are dialing out to a Proxy, we need to dial a conn on this flow, and
 	// return a flow on that corresponding conn.
 	if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
-		// Write to tell the proxy that this should be routed.
-		if _, err := f.Write([]byte{clientByte}); err != nil {
-			return nil, flow.NewErrNetwork(ctx, err)
-		}
 		c, err = conn.NewDialed(
 			ctx,
 			f,
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index 239d633..fc97ceb 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -5,22 +5,19 @@
 package xproxyd
 
 import (
+	"fmt"
 	"io"
 	"sync"
 
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
+	"v.io/v23/flow/message"
 	"v.io/v23/naming"
-	"v.io/v23/vom"
 )
 
 // TODO(suharshs): Make sure that we don't leak any goroutines.
 
-const proxyByte = byte('p')
-const serverByte = byte('s')
-const clientByte = byte('c')
-
 type proxy struct {
 	m              flow.Manager
 	mu             sync.Mutex
@@ -42,19 +39,19 @@
 				return nil, err
 			}
 			// Send a byte telling the acceptor that we are a proxy.
-			if _, err := f.Write([]byte{proxyByte}); err != nil {
+			if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
 				return nil, err
 			}
-			var lep string
-			if err := vom.NewDecoder(f).Decode(&lep); err != nil {
-				return nil, err
-			}
-			proxyEndpoint, err := v23.NewEndpoint(lep)
+			msg, err := readMessage(ctx, f)
 			if err != nil {
 				return nil, err
 			}
+			m, ok := msg.(*message.ProxyResponse)
+			if !ok {
+				return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
+			}
 			p.mu.Lock()
-			p.proxyEndpoints = append(p.proxyEndpoints, proxyEndpoint)
+			p.proxyEndpoints = append(p.proxyEndpoints, m.Endpoints...)
 			p.mu.Unlock()
 		} else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
 			return nil, err
@@ -75,16 +72,16 @@
 			ctx.Infof("p.m.Accept failed: %v", err)
 			break
 		}
-		msg := make([]byte, 1)
-		if _, err := f.Read(msg); err != nil {
+		msg, err := readMessage(ctx, f)
+		if err != nil {
 			ctx.Errorf("reading type byte failed: %v", err)
 		}
-		switch msg[0] {
-		case clientByte:
-			err = p.startRouting(ctx, f)
-		case proxyByte:
+		switch m := msg.(type) {
+		case *message.Setup:
+			err = p.startRouting(ctx, f, m)
+		case *message.MultiProxyRequest:
 			err = p.replyToProxy(ctx, f)
-		case serverByte:
+		case *message.ProxyServerRequest:
 			err = p.replyToServer(ctx, f)
 		default:
 			continue
@@ -95,8 +92,8 @@
 	}
 }
 
-func (p *proxy) startRouting(ctx *context.T, f flow.Flow) error {
-	fout, err := p.dialNextHop(ctx, f)
+func (p *proxy) startRouting(ctx *context.T, f flow.Flow, m *message.Setup) error {
+	fout, err := p.dialNextHop(ctx, f, m)
 	if err != nil {
 		return err
 	}
@@ -117,14 +114,12 @@
 	}
 }
 
-func (p *proxy) dialNextHop(ctx *context.T, f flow.Flow) (flow.Flow, error) {
-	m, err := readSetupMessage(ctx, f)
-	if err != nil {
-		return nil, err
-	}
-	var rid naming.RoutingID
-	var ep naming.Endpoint
-	var shouldWriteClientByte bool
+func (p *proxy) dialNextHop(ctx *context.T, f flow.Flow, m *message.Setup) (flow.Flow, error) {
+	var (
+		rid naming.RoutingID
+		ep  naming.Endpoint
+		err error
+	)
 	if routes := m.PeerRemoteEndpoint.Routes(); len(routes) > 0 {
 		if err := rid.FromString(routes[0]); err != nil {
 			return nil, err
@@ -141,7 +136,6 @@
 		if m.PeerRemoteEndpoint, err = setEndpointRoutes(m.PeerRemoteEndpoint, routes[1:]); err != nil {
 			return nil, err
 		}
-		shouldWriteClientByte = true
 	} else {
 		ep = m.PeerRemoteEndpoint
 	}
@@ -149,27 +143,17 @@
 	if err != nil {
 		return nil, err
 	}
-	if shouldWriteClientByte {
-		// We only write the clientByte on flows made to proxys. If we are creating
-		// the last hop flow to the end server, we don't need to send the byte.
-		if _, err := fout.Write([]byte{clientByte}); err != nil {
-			return nil, err
-		}
-	}
-
 	// Write the setup message back onto the flow for the next hop to read.
-	return fout, writeSetupMessage(ctx, m, fout)
+	return fout, writeMessage(ctx, m, fout)
 }
 
 func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
 	rid := f.Conn().RemoteEndpoint().RoutingID()
-	epString, err := p.returnEndpoint(ctx, rid, "")
+	eps, err := p.returnEndpoints(ctx, rid, "")
 	if err != nil {
 		return err
 	}
-	// TODO(suharshs): Make a low-level message for this information instead of
-	// VOM-Encoding the endpoint string.
-	return vom.NewEncoder(f).Encode(epString)
+	return writeMessage(ctx, &message.ProxyResponse{Endpoints: eps}, f)
 }
 
 func (p *proxy) replyToProxy(ctx *context.T, f flow.Flow) error {
@@ -178,37 +162,38 @@
 	// by a server's rid by some later proxy.
 	// TODO(suharshs): Use a local route instead of this global routingID.
 	rid := f.Conn().RemoteEndpoint().RoutingID()
-	epString, err := p.returnEndpoint(ctx, naming.NullRoutingID, rid.String())
+	eps, err := p.returnEndpoints(ctx, naming.NullRoutingID, rid.String())
 	if err != nil {
 		return err
 	}
-	return vom.NewEncoder(f).Encode(epString)
+	return writeMessage(ctx, &message.ProxyResponse{Endpoints: eps}, f)
 }
 
-func (p *proxy) returnEndpoint(ctx *context.T, rid naming.RoutingID, route string) (string, error) {
+func (p *proxy) returnEndpoints(ctx *context.T, rid naming.RoutingID, route string) ([]naming.Endpoint, error) {
 	p.mu.Lock()
 	eps := append(p.m.ListeningEndpoints(), p.proxyEndpoints...)
 	p.mu.Unlock()
 	if len(eps) == 0 {
-		return "", NewErrNotListening(ctx)
+		return nil, NewErrNotListening(ctx)
 	}
-	// TODO(suharshs): handle listening on multiple endpoints.
-	ep := eps[len(eps)-1]
-	var err error
-	if rid != naming.NullRoutingID {
-		ep, err = setEndpointRoutingID(ep, rid)
-		if err != nil {
-			return "", err
+	for idx, ep := range eps {
+		var err error
+		if rid != naming.NullRoutingID {
+			ep, err = setEndpointRoutingID(ep, rid)
+			if err != nil {
+				return nil, err
+			}
 		}
-	}
-	if len(route) > 0 {
-		var cp []string
-		cp = append(cp, ep.Routes()...)
-		cp = append(cp, route)
-		ep, err = setEndpointRoutes(ep, cp)
-		if err != nil {
-			return "", err
+		if len(route) > 0 {
+			var cp []string
+			cp = append(cp, ep.Routes()...)
+			cp = append(cp, route)
+			ep, err = setEndpointRoutes(ep, cp)
+			if err != nil {
+				return nil, err
+			}
 		}
+		eps[idx] = ep
 	}
-	return ep.String(), nil
+	return eps, nil
 }
diff --git a/services/xproxyd/util.go b/services/xproxyd/util.go
index b1fdd58..4ccad31 100644
--- a/services/xproxyd/util.go
+++ b/services/xproxyd/util.go
@@ -5,8 +5,6 @@
 package xproxyd
 
 import (
-	"fmt"
-
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
@@ -73,26 +71,19 @@
 	return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
 }
 
-func readSetupMessage(ctx *context.T, f flow.Flow) (*message.Setup, error) {
-	b, err := f.ReadMsg()
-	if err != nil {
-		return nil, err
-	}
-	m, err := message.Read(ctx, b)
-	if err != nil {
-		return nil, err
-	}
-	if m, isSetup := m.(*message.Setup); isSetup {
-		return m, nil
-	}
-	return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
-}
-
-func writeSetupMessage(ctx *context.T, m message.Message, f flow.Flow) error {
-	w, err := message.Append(ctx, m, []byte{})
+func writeMessage(ctx *context.T, m message.Message, f flow.Flow) error {
+	w, err := message.Append(ctx, m, nil)
 	if err != nil {
 		return err
 	}
 	_, err = f.WriteMsg(w)
 	return err
 }
+
+func readMessage(ctx *context.T, f flow.Flow) (message.Message, error) {
+	b, err := f.ReadMsg()
+	if err != nil {
+		return nil, err
+	}
+	return message.Read(ctx, b)
+}