services/xproxyd: Add support for multiple proxying.
Proxys can accept connections from other proxyings allowing
connections from server to client to span many proxies.
Change-Id: I5467ac762dd32cb570c7cec59beee92fc1ab3a78
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index fa8fabf..6d44609 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -80,5 +80,5 @@
}
m, err := message.Read(ctx, msg[:len(msg)-p.cipher.MACSize()])
ctx.VI(2).Infof("Read low-level message: %#v", m)
- return m, nil
+ return m, err
}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index af90572..da7707a 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -26,6 +26,11 @@
"v.io/x/ref/runtime/internal/rpc/version"
)
+const (
+ clientByte = 'c'
+ serverByte = 's'
+)
+
type manager struct {
rid naming.RoutingID
closed <-chan struct{}
@@ -95,7 +100,7 @@
return nil, flow.NewErrNetwork(ctx, err)
}
// Write to ensure we send an openFlow message.
- if _, err := f.Write([]byte{0}); err != nil {
+ if _, err := f.Write([]byte{serverByte}); err != nil {
return nil, flow.NewErrNetwork(ctx, err)
}
var lep string
@@ -312,6 +317,10 @@
// If we are dialing out to a Proxy, we need to dial a conn on this flow, and
// return a flow on that corresponding conn.
if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
+ // Write to tell the proxy that this should be routed.
+ if _, err := f.Write([]byte{clientByte}); err != nil {
+ return nil, flow.NewErrNetwork(ctx, err)
+ }
c, err = conn.NewDialed(
ctx,
f,
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index b519dce..4257409 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -32,46 +32,60 @@
t.Fatal(err)
}
- // Start the proxy.
- addr := struct {
- Protocol, Address string
- }{
- Protocol: "tcp",
- Address: "127.0.0.1:0",
- }
- pctx = v23.WithListenSpec(pctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{addr}})
- p, err := xproxyd.New(pctx)
- if err != nil {
- t.Fatal(err)
- }
- peps := p.ListeningEndpoints()
- if len(peps) == 0 {
- t.Fatal("Proxy not listening on any endpoints")
- }
- pep := peps[0]
+ pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
- t.Logf("proxy endpoint: %s", pep.String())
- // Start a accepting flow.Manager and make it listen through the proxy.
if err := am.Listen(actx, "v23", pep.String()); err != nil {
t.Fatal(err)
}
+ testEndToEndConnections(t, dctx, actx, dm, am)
+}
+
+func TestMultipleProxies(t *testing.T) {
+ pctx, shutdown := v23.Init()
+ defer shutdown()
+ p2ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ p3ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
+
+ p2ep := startProxy(t, p2ctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
+
+ p3ep := startProxy(t, p3ctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
+
+ if err := am.Listen(actx, "v23", p3ep.String()); err != nil {
+ t.Fatal(err)
+ }
+ testEndToEndConnections(t, dctx, actx, dm, am)
+}
+
+func testEndToEndConnections(t *testing.T, dctx, actx *context.T, dm, am flow.Manager) {
aeps := am.ListeningEndpoints()
if len(aeps) == 0 {
- t.Fatal("Acceptor not listening on any endpoints")
+ t.Fatal("acceptor not listening on any endpoints")
}
- aep := aeps[0]
+ for _, aep := range aeps {
+ testEndToEndConnection(t, dctx, actx, dm, am, aep)
+ }
+}
+func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) {
// The dialing flow.Manager dials a flow to the accepting flow.Manager.
want := "Do you read me?"
- bFn := func(
- ctx *context.T,
- localEndpoint, remoteEndpoint naming.Endpoint,
- remoteBlessings security.Blessings,
- remoteDischarges map[string]security.Discharge,
- ) (security.Blessings, map[string]security.Discharge, error) {
- return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
- }
- df, err := dm.Dial(dctx, aep, bFn)
+ df, err := dm.Dial(dctx, aep, bfp)
if err != nil {
t.Fatal(err)
}
@@ -83,11 +97,9 @@
}
got, err := readLine(af)
if err != nil {
- pctx.Errorf("error")
t.Fatal(err)
}
if got != want {
- pctx.Errorf("error")
t.Errorf("got %v, want %v", got, want)
}
@@ -96,17 +108,14 @@
writeLine(af, want)
got, err = readLine(df)
if err != nil {
- pctx.Errorf("error")
t.Fatal(err)
}
if got != want {
- pctx.Errorf("error")
t.Errorf("got %v, want %v", got, want)
}
}
-// TODO(suharshs): Add tests for multiple proxies and bidirectional RPC through
-// a proxy once we have bidirpc working.
+// TODO(suharshs): Add test for bidirectional RPC.
func readLine(f flow.Flow) (string, error) {
s, err := bufio.NewReader(f).ReadString('\n')
@@ -118,3 +127,36 @@
_, err := f.Write([]byte(data))
return err
}
+
+func bfp(
+ ctx *context.T,
+ localEndpoint, remoteEndpoint naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge,
+) (security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
+
+type address struct {
+ Protocol, Address string
+}
+
+func startProxy(t *testing.T, ctx *context.T, addrs ...address) naming.Endpoint {
+ var ls rpc.ListenSpec
+ for _, addr := range addrs {
+ ls.Addrs = append(ls.Addrs, addr)
+ }
+ ctx = v23.WithListenSpec(ctx, ls)
+ proxy, err := xproxyd.New(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ peps := proxy.ListeningEndpoints()
+ for _, pep := range peps {
+ if pep.Addr().Network() == "tcp" {
+ return pep
+ }
+ }
+ t.Fatal("Proxy not listening on network address.")
+ return nil
+}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index 6857ec8..239d633 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -5,22 +5,26 @@
package xproxyd
import (
- "fmt"
"io"
+ "sync"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
- "v.io/v23/flow/message"
"v.io/v23/naming"
- "v.io/v23/security"
"v.io/v23/vom"
)
// TODO(suharshs): Make sure that we don't leak any goroutines.
+const proxyByte = byte('p')
+const serverByte = byte('s')
+const clientByte = byte('c')
+
type proxy struct {
- m flow.Manager
+ m flow.Manager
+ mu sync.Mutex
+ proxyEndpoints []naming.Endpoint
}
func New(ctx *context.T) (*proxy, error) {
@@ -28,7 +32,31 @@
m: v23.ExperimentalGetFlowManager(ctx),
}
for _, addr := range v23.GetListenSpec(ctx).Addrs {
- if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
+ if addr.Protocol == "v23" {
+ ep, err := v23.NewEndpoint(addr.Address)
+ if err != nil {
+ return nil, err
+ }
+ f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
+ if err != nil {
+ return nil, err
+ }
+ // Send a byte telling the acceptor that we are a proxy.
+ if _, err := f.Write([]byte{proxyByte}); err != nil {
+ return nil, err
+ }
+ var lep string
+ if err := vom.NewDecoder(f).Decode(&lep); err != nil {
+ return nil, err
+ }
+ proxyEndpoint, err := v23.NewEndpoint(lep)
+ if err != nil {
+ return nil, err
+ }
+ p.mu.Lock()
+ p.proxyEndpoints = append(p.proxyEndpoints, proxyEndpoint)
+ p.mu.Unlock()
+ } else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
return nil, err
}
}
@@ -36,6 +64,10 @@
return p, nil
}
+func (p *proxy) ListeningEndpoints() []naming.Endpoint {
+ return p.m.ListeningEndpoints()
+}
+
func (p *proxy) listenLoop(ctx *context.T) {
for {
f, err := p.m.Accept(ctx)
@@ -43,10 +75,19 @@
ctx.Infof("p.m.Accept failed: %v", err)
break
}
- if p.shouldRoute(f) {
+ msg := make([]byte, 1)
+ if _, err := f.Read(msg); err != nil {
+ ctx.Errorf("reading type byte failed: %v", err)
+ }
+ switch msg[0] {
+ case clientByte:
err = p.startRouting(ctx, f)
- } else {
+ case proxyByte:
+ err = p.replyToProxy(ctx, f)
+ case serverByte:
err = p.replyToServer(ctx, f)
+ default:
+ continue
}
if err != nil {
ctx.Errorf("failed to handle incoming connection: %v", err)
@@ -64,27 +105,6 @@
return nil
}
-func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
- eps := p.ListeningEndpoints()
- if len(eps) == 0 {
- return NewErrNotListening(ctx)
- }
- // TODO(suharshs): handle listening on multiple endpoints.
- ep := eps[0]
- network, address := ep.Addr().Network(), ep.Addr().String()
- // TODO(suharshs): deal with routes and such here, if we are replying to a proxy.
- rid := f.Conn().RemoteEndpoint().RoutingID()
- epString := naming.FormatEndpoint(network, address, rid)
- if err := vom.NewEncoder(f).Encode(epString); err != nil {
- return err
- }
- return nil
-}
-
-func (p *proxy) ListeningEndpoints() []naming.Endpoint {
- return p.m.ListeningEndpoints()
-}
-
func (p *proxy) forwardLoop(ctx *context.T, fin, fout flow.Flow) {
for {
_, err := io.Copy(fin, fout)
@@ -98,54 +118,97 @@
}
func (p *proxy) dialNextHop(ctx *context.T, f flow.Flow) (flow.Flow, error) {
- // TODO(suharshs): Read route information here when implementing multi proxy.
m, err := readSetupMessage(ctx, f)
if err != nil {
return nil, err
}
- fout, err := p.m.Dial(ctx, m.PeerRemoteEndpoint, proxyBlessingsForPeer{}.run)
+ var rid naming.RoutingID
+ var ep naming.Endpoint
+ var shouldWriteClientByte bool
+ if routes := m.PeerRemoteEndpoint.Routes(); len(routes) > 0 {
+ if err := rid.FromString(routes[0]); err != nil {
+ return nil, err
+ }
+ // Make an endpoint with the correct routingID to dial out. All other fields
+ // do not matter.
+ // TODO(suharshs): Make sure that the routingID from the route belongs to a
+ // connection that is stored in the manager's cache. (i.e. a Server has connected
+ // with the routingID before)
+ if ep, err = setEndpointRoutingID(m.PeerRemoteEndpoint, rid); err != nil {
+ return nil, err
+ }
+ // Remove the read route from the setup message endpoint.
+ if m.PeerRemoteEndpoint, err = setEndpointRoutes(m.PeerRemoteEndpoint, routes[1:]); err != nil {
+ return nil, err
+ }
+ shouldWriteClientByte = true
+ } else {
+ ep = m.PeerRemoteEndpoint
+ }
+ fout, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
if err != nil {
return nil, err
}
+ if shouldWriteClientByte {
+ // We only write the clientByte on flows made to proxys. If we are creating
+ // the last hop flow to the end server, we don't need to send the byte.
+ if _, err := fout.Write([]byte{clientByte}); err != nil {
+ return nil, err
+ }
+ }
+
// Write the setup message back onto the flow for the next hop to read.
return fout, writeSetupMessage(ctx, m, fout)
}
-func readSetupMessage(ctx *context.T, f flow.Flow) (*message.Setup, error) {
- b, err := f.ReadMsg()
- if err != nil {
- return nil, err
- }
- m, err := message.Read(ctx, b)
- if err != nil {
- return nil, err
- }
- if m, isSetup := m.(*message.Setup); isSetup {
- return m, nil
- }
- return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
-}
-
-func writeSetupMessage(ctx *context.T, m message.Message, f flow.Flow) error {
- // TODO(suharshs): When reading the routes we should remove the read route from
- // the endpoint.
- w, err := message.Append(ctx, m, []byte{})
+func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
+ rid := f.Conn().RemoteEndpoint().RoutingID()
+ epString, err := p.returnEndpoint(ctx, rid, "")
if err != nil {
return err
}
- _, err = f.WriteMsg(w)
- return err
+ // TODO(suharshs): Make a low-level message for this information instead of
+ // VOM-Encoding the endpoint string.
+ return vom.NewEncoder(f).Encode(epString)
}
-func (p *proxy) shouldRoute(f flow.Flow) bool {
- rid := f.Conn().LocalEndpoint().RoutingID()
- return rid != p.m.RoutingID() && rid != naming.NullRoutingID
+func (p *proxy) replyToProxy(ctx *context.T, f flow.Flow) error {
+ // Add the routing id of the incoming proxy to the routes. The routing id of the
+ // returned endpoint doesn't matter because it will eventually be replaced
+ // by a server's rid by some later proxy.
+ // TODO(suharshs): Use a local route instead of this global routingID.
+ rid := f.Conn().RemoteEndpoint().RoutingID()
+ epString, err := p.returnEndpoint(ctx, naming.NullRoutingID, rid.String())
+ if err != nil {
+ return err
+ }
+ return vom.NewEncoder(f).Encode(epString)
}
-type proxyBlessingsForPeer struct{}
-
-// TODO(suharshs): Figure out what blessings to present here. And present discharges.
-func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
- rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
- return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+func (p *proxy) returnEndpoint(ctx *context.T, rid naming.RoutingID, route string) (string, error) {
+ p.mu.Lock()
+ eps := append(p.m.ListeningEndpoints(), p.proxyEndpoints...)
+ p.mu.Unlock()
+ if len(eps) == 0 {
+ return "", NewErrNotListening(ctx)
+ }
+ // TODO(suharshs): handle listening on multiple endpoints.
+ ep := eps[len(eps)-1]
+ var err error
+ if rid != naming.NullRoutingID {
+ ep, err = setEndpointRoutingID(ep, rid)
+ if err != nil {
+ return "", err
+ }
+ }
+ if len(route) > 0 {
+ var cp []string
+ cp = append(cp, ep.Routes()...)
+ cp = append(cp, route)
+ ep, err = setEndpointRoutes(ep, cp)
+ if err != nil {
+ return "", err
+ }
+ }
+ return ep.String(), nil
}
diff --git a/services/xproxyd/util.go b/services/xproxyd/util.go
new file mode 100644
index 0000000..b1fdd58
--- /dev/null
+++ b/services/xproxyd/util.go
@@ -0,0 +1,98 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xproxyd
+
+import (
+ "fmt"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/flow/message"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+)
+
+// setEndpointRoutingID returns a copy of ep with RoutingId changed to rid.
+func setEndpointRoutingID(ep naming.Endpoint, rid naming.RoutingID) (naming.Endpoint, error) {
+ network, address, routes, _, bnames, mountable := getEndpointParts(ep)
+ opts := routes
+ opts = append(opts, bnames...)
+ opts = append(opts, rid)
+ opts = append(opts, mountable)
+ epString := naming.FormatEndpoint(network, address, opts...)
+ return v23.NewEndpoint(epString)
+}
+
+// setEndpointRoutes returns a copy of ep with Routes changed to routes.
+func setEndpointRoutes(ep naming.Endpoint, routes []string) (naming.Endpoint, error) {
+ network, address, _, rid, bnames, mountable := getEndpointParts(ep)
+ opts := routeOpts(routes)
+ opts = append(opts, bnames...)
+ opts = append(opts, rid)
+ opts = append(opts, mountable)
+ epString := naming.FormatEndpoint(network, address, opts...)
+ return v23.NewEndpoint(epString)
+}
+
+// getEndpointParts returns all the fields of ep.
+func getEndpointParts(ep naming.Endpoint) (network string, address string,
+ routes []naming.EndpointOpt, rid naming.RoutingID,
+ blessingNames []naming.EndpointOpt, mountable naming.EndpointOpt) {
+ network, address = ep.Addr().Network(), ep.Addr().String()
+ routes = routeOpts(ep.Routes())
+ rid = ep.RoutingID()
+ blessingNames = blessingOpts(ep.BlessingNames())
+ mountable = naming.ServesMountTable(ep.ServesMountTable())
+ return
+}
+
+func routeOpts(routes []string) []naming.EndpointOpt {
+ var routeOpts []naming.EndpointOpt
+ for _, route := range routes {
+ routeOpts = append(routeOpts, naming.RouteOpt(route))
+ }
+ return routeOpts
+}
+
+func blessingOpts(blessingNames []string) []naming.EndpointOpt {
+ var blessingOpts []naming.EndpointOpt
+ for _, b := range blessingNames {
+ blessingOpts = append(blessingOpts, naming.BlessingOpt(b))
+ }
+ return blessingOpts
+}
+
+type proxyBlessingsForPeer struct{}
+
+// TODO(suharshs): Figure out what blessings to present here. And present discharges.
+func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
+ rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
+
+func readSetupMessage(ctx *context.T, f flow.Flow) (*message.Setup, error) {
+ b, err := f.ReadMsg()
+ if err != nil {
+ return nil, err
+ }
+ m, err := message.Read(ctx, b)
+ if err != nil {
+ return nil, err
+ }
+ if m, isSetup := m.(*message.Setup); isSetup {
+ return m, nil
+ }
+ return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
+}
+
+func writeSetupMessage(ctx *context.T, m message.Message, f flow.Flow) error {
+ w, err := message.Append(ctx, m, []byte{})
+ if err != nil {
+ return err
+ }
+ _, err = f.WriteMsg(w)
+ return err
+}