ref: Preliminary proxy implementation.
This change provides single hop proxying support.
TODO in subsequent cls:
(1) Implement multiple proxying.
(2) Switch encoding format of routing information away from VOM.
(3) Ensure that BidiRPC works through proxying. (once bidi is done)
MultiPart: 1/2
Change-Id: Ic5d1635b0dcf968ecaf766380f787885c999cb6e
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index a3394d8..7dc3582 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -11,10 +11,13 @@
"syscall"
"time"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/vom"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/lib/upcqueue"
@@ -49,20 +52,64 @@
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Listen(ctx *context.T, protocol, address string) error {
+ var (
+ ep naming.Endpoint
+ err error
+ )
+ if protocol == inaming.Network {
+ ep, err = m.proxyListen(ctx, address)
+ } else {
+ ep, err = m.listen(ctx, protocol, address)
+ }
+ if err != nil {
+ return err
+ }
+ m.mu.Lock()
+ m.listenEndpoints = append(m.listenEndpoints, ep)
+ m.mu.Unlock()
+ return nil
+}
+
+func (m *manager) listen(ctx *context.T, protocol, address string) (naming.Endpoint, error) {
netLn, err := listen(ctx, protocol, address)
if err != nil {
- return flow.NewErrNetwork(ctx, err)
+ return nil, flow.NewErrNetwork(ctx, err)
}
local := &inaming.Endpoint{
Protocol: protocol,
Address: netLn.Addr().String(),
RID: m.rid,
}
- m.mu.Lock()
- m.listenEndpoints = append(m.listenEndpoints, local)
- m.mu.Unlock()
go m.netLnAcceptLoop(ctx, netLn, local)
- return nil
+ return local, nil
+}
+
+func (m *manager) proxyListen(ctx *context.T, address string) (naming.Endpoint, error) {
+ ep, err := inaming.NewEndpoint(address)
+ if err != nil {
+ return nil, flow.NewErrBadArg(ctx, err)
+ }
+ f, err := m.internalDial(ctx, ep, proxyBlessingsForPeer{}.run, &proxyFlowHandler{ctx: ctx, m: m})
+ if err != nil {
+ return nil, flow.NewErrNetwork(ctx, err)
+ }
+ // Write to ensure we send an openFlow message.
+ if _, err := f.Write([]byte{0}); err != nil {
+ return nil, flow.NewErrNetwork(ctx, err)
+ }
+ var lep string
+ if err := vom.NewDecoder(f).Decode(&lep); err != nil {
+ return nil, flow.NewErrNetwork(ctx, err)
+ }
+ return inaming.NewEndpoint(lep)
+}
+
+type proxyBlessingsForPeer struct{}
+
+// TODO(suharshs): Figure out what blessings to present here. And present discharges.
+func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
+ rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
}
func (m *manager) netLnAcceptLoop(ctx *context.T, netLn net.Listener, local naming.Endpoint) {
@@ -85,7 +132,7 @@
ctx.Errorf("net.Listener.Accept on localEP %v failed: %v", local, err)
continue
}
- _, err = conn.NewAccepted(
+ c, err := conn.NewAccepted(
ctx,
&framer{ReadWriteCloser: netConn},
local,
@@ -97,12 +144,9 @@
ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
continue
}
- // TODO(suharshs): We need the remote endpoint in conn to be able to insert
- // it into the cache. This handshake has not been implemented yet. So for now
- // we will comment it out.
- // if err := m.cache.Insert(c); err != nil {
- // ctx.VI(2).Infof("failed to cache conn %v: %v", c, err)
- // }
+ if err := m.cache.Insert(c); err != nil {
+ ctx.VI(2).Infof("failed to cache conn %v: %v", c, err)
+ }
}
}
@@ -121,6 +165,37 @@
return h.q.Put(f)
}
+type proxyFlowHandler struct {
+ ctx *context.T
+ m *manager
+}
+
+func (h *proxyFlowHandler) HandleFlow(f flow.Flow) error {
+ select {
+ case <-h.m.closed:
+ h.m.q.Close()
+ return upcqueue.ErrQueueIsClosed
+ default:
+ }
+ go func() {
+ c, err := conn.NewAccepted(
+ h.ctx,
+ closer{f},
+ f.Conn().LocalEndpoint(),
+ version.Supported,
+ &flowHandler{q: h.m.q, closed: h.m.closed})
+ if err != nil {
+ h.ctx.Errorf("failed to create accepted conn: %v", err)
+ return
+ }
+ if err := h.m.cache.Insert(c); err != nil {
+ h.ctx.Errorf("failed to create accepted conn: %v", err)
+ return
+ }
+ }()
+ return nil
+}
+
// ListeningEndpoints returns the endpoints that the Manager has explicitly
// listened on. The Manager will accept new flows on these endpoints.
// Returned endpoints all have a RoutingID unique to the Acceptor.
@@ -169,24 +244,40 @@
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
- addr := remote.Addr()
- d, r, _, _ := rpc.RegisteredProtocol(addr.Network())
- // (network, address) in the endpoint might not always match up
- // with the key used for caching conns. For example:
- // - conn, err := net.Dial("tcp", "www.google.com:80")
- // fmt.Println(conn.RemoteAddr()) // Might yield the corresponding IP address
- // - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
- // might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
- // Thus we look for Conns with the resolved address.
- network, address, err := resolve(ctx, r, addr.Network(), addr.String())
- if err != nil {
- return nil, flow.NewErrResolveFailed(ctx, err)
- }
- c, err := m.cache.ReservedFind(network, address, remote.BlessingNames())
+ return m.internalDial(ctx, remote, fn, &flowHandler{q: m.q, closed: m.closed})
+}
+
+func (m *manager) internalDial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer, fh conn.FlowHandler) (flow.Flow, error) {
+ // Look up the connection based on RoutingID first.
+ c, err := m.cache.FindWithRoutingID(remote.RoutingID())
if err != nil {
return nil, flow.NewErrBadState(ctx, err)
}
- defer m.cache.Unreserve(network, address, remote.BlessingNames())
+ var (
+ d rpc.DialerFunc
+ network, address string
+ )
+ if c == nil {
+ addr := remote.Addr()
+ var r rpc.ResolverFunc
+ d, r, _, _ = rpc.RegisteredProtocol(addr.Network())
+ // (network, address) in the endpoint might not always match up
+ // with the key used for caching conns. For example:
+ // - conn, err := net.Dial("tcp", "www.google.com:80")
+ // fmt.Println(conn.RemoteAddr()) // Might yield the corresponding IP address
+ // - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
+ // might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
+ // Thus we look for Conns with the resolved address.
+ network, address, err = resolve(ctx, r, addr.Network(), addr.String())
+ if err != nil {
+ return nil, flow.NewErrResolveFailed(ctx, err)
+ }
+ c, err = m.cache.ReservedFind(network, address, remote.BlessingNames())
+ if err != nil {
+ return nil, flow.NewErrBadState(ctx, err)
+ }
+ defer m.cache.Unreserve(network, address, remote.BlessingNames())
+ }
if c == nil {
netConn, err := dial(ctx, d, network, address)
if err != nil {
@@ -201,7 +292,7 @@
localEndpoint(netConn, m.rid),
remote,
version.Supported,
- &flowHandler{q: m.q, closed: m.closed},
+ fh,
)
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
@@ -214,9 +305,43 @@
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
}
+
+ // If we are dialing out to a Proxy, we need to dial a conn on this flow, and
+ // return a flow on that corresponding conn.
+ if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
+ if err := sendRouteInfo(remote, f); err != nil {
+ return nil, flow.NewErrDialFailed(ctx, err)
+ }
+ c, err = conn.NewDialed(
+ ctx,
+ closer{f},
+ c.LocalEndpoint(),
+ remote,
+ version.Supported,
+ fh,
+ )
+ if err != nil {
+ return nil, flow.NewErrDialFailed(ctx, err)
+ }
+ f, err = c.Dial(ctx, fn)
+ if err != nil {
+ return nil, flow.NewErrDialFailed(ctx, err)
+ }
+ }
return f, nil
}
+func sendRouteInfo(ep naming.Endpoint, f flow.Flow) error {
+ // TODO(suharshs): Also send endpoint routes here when implementing multi proxy.
+ rid := ep.RoutingID()
+ return vom.NewEncoder(f).Encode(rid.String())
+}
+
+// RoutingID returns the naming.Routing of the flow.Manager.
+func (m *manager) RoutingID() naming.RoutingID {
+ return m.rid
+}
+
// Closed returns a channel that remains open for the lifetime of the Manager
// object. Once the channel is closed any operations on the Manager will
// necessarily fail.
@@ -276,3 +401,12 @@
oErr, ok := err.(*net.OpError)
return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error())
}
+
+// TODO(suharshs): should we add Close method to the Flow API?
+type closer struct {
+ flow.Flow
+}
+
+func (c closer) Close() error {
+ return nil
+}
diff --git a/services/xproxyd/errors.vdl b/services/xproxyd/errors.vdl
new file mode 100644
index 0000000..f0a32c4
--- /dev/null
+++ b/services/xproxyd/errors.vdl
@@ -0,0 +1,9 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xproxyd
+
+error (
+ NotListening() {"en": "Proxy is not listening on any endpoints."}
+)
\ No newline at end of file
diff --git a/services/xproxyd/errors.vdl.go b/services/xproxyd/errors.vdl.go
new file mode 100644
index 0000000..c1be22c
--- /dev/null
+++ b/services/xproxyd/errors.vdl.go
@@ -0,0 +1,28 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: errors.vdl
+
+package xproxyd
+
+import (
+ // VDL system imports
+ "v.io/v23/context"
+ "v.io/v23/i18n"
+ "v.io/v23/verror"
+)
+
+var (
+ ErrNotListening = verror.Register("v.io/x/ref/services/xproxyd.NotListening", verror.NoRetry, "{1:}{2:} Proxy is not listening on any endpoints.")
+)
+
+func init() {
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNotListening.ID), "{1:}{2:} Proxy is not listening on any endpoints.")
+}
+
+// NewErrNotListening returns an error with the ErrNotListening ID.
+func NewErrNotListening(ctx *context.T) error {
+ return verror.New(ErrNotListening, ctx)
+}
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
new file mode 100644
index 0000000..b519dce
--- /dev/null
+++ b/services/xproxyd/proxy_test.go
@@ -0,0 +1,120 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xproxyd_test
+
+import (
+ "bufio"
+ "strings"
+ "testing"
+
+ _ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/ref/services/xproxyd"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+)
+
+func TestProxiedConnection(t *testing.T) {
+ pctx, shutdown := v23.Init()
+ defer shutdown()
+ actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Start the proxy.
+ addr := struct {
+ Protocol, Address string
+ }{
+ Protocol: "tcp",
+ Address: "127.0.0.1:0",
+ }
+ pctx = v23.WithListenSpec(pctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{addr}})
+ p, err := xproxyd.New(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ peps := p.ListeningEndpoints()
+ if len(peps) == 0 {
+ t.Fatal("Proxy not listening on any endpoints")
+ }
+ pep := peps[0]
+
+ t.Logf("proxy endpoint: %s", pep.String())
+ // Start a accepting flow.Manager and make it listen through the proxy.
+ if err := am.Listen(actx, "v23", pep.String()); err != nil {
+ t.Fatal(err)
+ }
+ aeps := am.ListeningEndpoints()
+ if len(aeps) == 0 {
+ t.Fatal("Acceptor not listening on any endpoints")
+ }
+ aep := aeps[0]
+
+ // The dialing flow.Manager dials a flow to the accepting flow.Manager.
+ want := "Do you read me?"
+ bFn := func(
+ ctx *context.T,
+ localEndpoint, remoteEndpoint naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge,
+ ) (security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+ }
+ df, err := dm.Dial(dctx, aep, bFn)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // We write before accepting to ensure that the openFlow message is sent.
+ writeLine(df, want)
+ af, err := am.Accept(actx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := readLine(af)
+ if err != nil {
+ pctx.Errorf("error")
+ t.Fatal(err)
+ }
+ if got != want {
+ pctx.Errorf("error")
+ t.Errorf("got %v, want %v", got, want)
+ }
+
+ // Writes in the opposite direction should work as well.
+ want = "I read you loud and clear."
+ writeLine(af, want)
+ got, err = readLine(df)
+ if err != nil {
+ pctx.Errorf("error")
+ t.Fatal(err)
+ }
+ if got != want {
+ pctx.Errorf("error")
+ t.Errorf("got %v, want %v", got, want)
+ }
+}
+
+// TODO(suharshs): Add tests for multiple proxies and bidirectional RPC through
+// a proxy once we have bidirpc working.
+
+func readLine(f flow.Flow) (string, error) {
+ s, err := bufio.NewReader(f).ReadString('\n')
+ return strings.TrimRight(s, "\n"), err
+}
+
+func writeLine(f flow.Flow, data string) error {
+ data += "\n"
+ _, err := f.Write([]byte(data))
+ return err
+}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
new file mode 100644
index 0000000..6bd5183
--- /dev/null
+++ b/services/xproxyd/proxyd.go
@@ -0,0 +1,132 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xproxyd
+
+import (
+ "io"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+ "v.io/v23/vom"
+)
+
+// TODO(suharshs): Make sure that we don't leak any goroutines.
+
+type proxy struct {
+ m flow.Manager
+}
+
+func New(ctx *context.T) (*proxy, error) {
+ p := &proxy{
+ m: v23.ExperimentalGetFlowManager(ctx),
+ }
+ for _, addr := range v23.GetListenSpec(ctx).Addrs {
+ ctx.Infof("proxy listening on %v", addr)
+ if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
+ return nil, err
+ }
+ }
+ go p.listenLoop(ctx)
+ return p, nil
+}
+
+func (p *proxy) listenLoop(ctx *context.T) {
+ for {
+ f, err := p.m.Accept(ctx)
+ if err != nil {
+ ctx.Infof("p.m.Accept failed: %v", err)
+ break
+ }
+ if p.shouldRoute(f) {
+ err = p.startRouting(ctx, f)
+ } else {
+ err = p.replyToServer(ctx, f)
+ }
+ if err != nil {
+ ctx.Errorf("failed to handle incoming connection: %v", err)
+ }
+ }
+}
+
+func (p *proxy) startRouting(ctx *context.T, f flow.Flow) error {
+ rid, err := p.readRouteInfo(ctx, f)
+ if err != nil {
+ return err
+ }
+ // TODO(suharshs): Find a better way to do this.
+ ep, err := v23.NewEndpoint("@6@@@@" + rid.String() + "@@@@")
+ if err != nil {
+ return err
+ }
+ fout, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
+ if err != nil {
+ return err
+ }
+ go p.forwardLoop(ctx, f, fout)
+ go p.forwardLoop(ctx, fout, f)
+ return nil
+}
+
+type proxyBlessingsForPeer struct{}
+
+// TODO(suharshs): Figure out what blessings to present here. And present discharges.
+func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
+ rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
+
+func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
+ eps := p.ListeningEndpoints()
+ if len(eps) == 0 {
+ return NewErrNotListening(ctx)
+ }
+ // TODO(suharshs): handle listening on multiple endpoints.
+ ep := eps[0]
+ network, address := ep.Addr().Network(), ep.Addr().String()
+ // TODO(suharshs): deal with routes and such here, if we are replying to a proxy.
+ rid := f.Conn().RemoteEndpoint().RoutingID()
+ epString := naming.FormatEndpoint(network, address, rid)
+ if err := vom.NewEncoder(f).Encode(epString); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (p *proxy) ListeningEndpoints() []naming.Endpoint {
+ return p.m.ListeningEndpoints()
+}
+
+func (p *proxy) forwardLoop(ctx *context.T, fin, fout flow.Flow) {
+ for {
+ _, err := io.Copy(fin, fout)
+ if err == io.EOF {
+ return
+ } else if err != nil {
+ ctx.Errorf("f.Read failed: %v", err)
+ return
+ }
+ }
+}
+
+func (p *proxy) readRouteInfo(ctx *context.T, f flow.Flow) (naming.RoutingID, error) {
+ // TODO(suharshs): Read route information here when implementing multi proxy.
+ var (
+ rid string
+ ret naming.RoutingID
+ )
+ if err := vom.NewDecoder(f).Decode(&rid); err != nil {
+ return ret, err
+ }
+ err := ret.FromString(rid)
+ return ret, err
+}
+
+func (p *proxy) shouldRoute(f flow.Flow) bool {
+ rid := f.Conn().LocalEndpoint().RoutingID()
+ return rid != p.m.RoutingID() && rid != naming.NullRoutingID
+}