services/xproxy: Add publishing to proxy and first iteration of xproxyd
binary.
Done in this change:
(1) Moved ref/runtime/internal/lib/publisher to ref/lib/publisher so
that xproxy can use it. This involved fixing the publisher tests
to use v23.Init.
(2) Proxy publishes itself under a name.
(3) Proxy listens through other proxies if the Proxy field of the
listenSpec is specified.
(4) xproxyd binary is written.
TODO still (including things realized while writing this):
(1) The name of a proxy should be reresolved when reconnecting to it.
(2) The proxy should use a authorizer to authorize servers trying to
connect to it.
(3) The proxy should send update messages to servers as its endpoints
change.
Change-Id: I19284a241df6e17cabe9b9b0693d38005784f95c
diff --git a/runtime/internal/lib/publisher/publisher.go b/lib/publisher/publisher.go
similarity index 100%
rename from runtime/internal/lib/publisher/publisher.go
rename to lib/publisher/publisher.go
diff --git a/runtime/internal/lib/publisher/publisher_test.go b/lib/publisher/publisher_test.go
similarity index 61%
rename from runtime/internal/lib/publisher/publisher_test.go
rename to lib/publisher/publisher_test.go
index 6c93ada..4b94a9a 100644
--- a/runtime/internal/lib/publisher/publisher_test.go
+++ b/lib/publisher/publisher_test.go
@@ -11,31 +11,19 @@
"testing"
"time"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/namespace"
- "v.io/v23/vtrace"
- "v.io/x/ref/lib/flags"
- "v.io/x/ref/runtime/internal/lib/publisher"
- tnaming "v.io/x/ref/runtime/internal/testing/mocks/naming"
- ivtrace "v.io/x/ref/runtime/internal/vtrace"
+ "v.io/x/ref/lib/publisher"
+ _ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/ref/test"
)
//go:generate jiri test generate
-func testContext() *context.T {
- ctx, _ := context.RootContext()
- ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{})
- if err != nil {
- panic(err)
- }
- ctx, _ = vtrace.WithNewSpan(ctx, "")
- ctx, _ = context.WithDeadline(ctx, time.Now().Add(20*time.Second))
- return ctx
-}
-
func resolveWithRetry(t *testing.T, ns namespace.T, ctx *context.T, name string, expected int) []string {
- deadline := time.Now().Add(time.Minute)
+ deadline := time.Now().Add(10 * time.Second)
for {
me, err := ns.Resolve(ctx, name)
if err == nil && len(me.Names()) == expected {
@@ -43,62 +31,63 @@
}
if time.Now().After(deadline) {
t.Fatalf("failed to resolve %q", name)
- } else {
- continue
}
time.Sleep(100 * time.Millisecond)
}
}
func verifyMissing(t *testing.T, ns namespace.T, ctx *context.T, name string) {
- deadline := time.Now().Add(time.Minute)
+ deadline := time.Now().Add(10 * time.Second)
for {
- if _, err := ns.Resolve(ctx, "foo"); err == nil {
- if time.Now().After(deadline) {
- t.Errorf("%q is still mounted", name)
- }
- time.Sleep(100 * time.Millisecond)
- } else {
+ if _, err := ns.Resolve(ctx, "foo"); err != nil {
break
}
+ if time.Now().After(deadline) {
+ t.Fatalf("%q is still mounted", name)
+ }
+ time.Sleep(100 * time.Millisecond)
}
}
func TestAddAndRemove(t *testing.T) {
- tctx := testContext()
- ns := tnaming.NewSimpleNamespace()
- pub := publisher.New(testContext(), ns, time.Second)
+ ctx, shutdown := test.V23InitWithMounttable()
+ defer shutdown()
+ ns := v23.GetNamespace(ctx)
+ pub := publisher.New(ctx, ns, time.Second)
pub.AddName("foo", false, false)
- pub.AddServer("foo-addr")
- if got, want := resolveWithRetry(t, ns, tctx, "foo", 1), []string{"/foo-addr"}; !reflect.DeepEqual(got, want) {
+ pub.AddServer("foo:8000")
+ if got, want := resolveWithRetry(t, ns, ctx, "foo", 1), []string{"/foo:8000"}; !reflect.DeepEqual(got, want) {
t.Errorf("got %q, want %q", got, want)
}
- pub.AddServer("bar-addr")
- got, want := resolveWithRetry(t, ns, tctx, "foo", 2), []string{"/bar-addr", "/foo-addr"}
+ pub.AddServer("bar:8000")
+ got, want := resolveWithRetry(t, ns, ctx, "foo", 2), []string{"/bar:8000", "/foo:8000"}
sort.Strings(got)
if !reflect.DeepEqual(got, want) {
t.Errorf("got %q, want %q", got, want)
}
pub.AddName("baz", false, false)
- got = resolveWithRetry(t, ns, tctx, "baz", 2)
+ got = resolveWithRetry(t, ns, ctx, "baz", 2)
sort.Strings(got)
if !reflect.DeepEqual(got, want) {
t.Errorf("got %q, want %q", got, want)
}
pub.RemoveName("foo")
- verifyMissing(t, ns, tctx, "foo")
+ verifyMissing(t, ns, ctx, "foo")
+ pub.Stop()
+ pub.WaitForStop()
}
func TestStatus(t *testing.T) {
- tctx := testContext()
- ns := tnaming.NewSimpleNamespace()
- pub := publisher.New(testContext(), ns, time.Second)
+ ctx, shutdown := test.V23InitWithMounttable()
+ defer shutdown()
+ ns := v23.GetNamespace(ctx)
+ pub := publisher.New(ctx, ns, time.Second)
pub.AddName("foo", false, false)
status := pub.Status()
if got, want := len(status), 0; got != want {
t.Errorf("got %d, want %d", got, want)
}
- pub.AddServer("foo-addr")
+ pub.AddServer("foo:8000")
// Wait for the publisher to asynchronously publish the
// requisite number of servers.
@@ -125,7 +114,7 @@
t.Fatalf("%s", err)
}
- pub.AddServer("bar-addr")
+ pub.AddServer("bar:8000")
pub.AddName("baz", false, false)
go waitFor(4)
@@ -139,15 +128,17 @@
t.Errorf("got %q, want %q", got, want)
}
servers := status.Servers()
- if got, want := servers, []string{"bar-addr", "foo-addr"}; !reflect.DeepEqual(got, want) {
+ if got, want := servers, []string{"bar:8000", "foo:8000"}; !reflect.DeepEqual(got, want) {
t.Errorf("got %q, want %q", got, want)
}
pub.RemoveName("foo")
- verifyMissing(t, ns, tctx, "foo")
+ verifyMissing(t, ns, ctx, "foo")
status = pub.Status()
go waitFor(2)
if err := <-ch; err != nil {
t.Fatalf("%s", err)
}
+ pub.Stop()
+ pub.WaitForStop()
}
diff --git a/runtime/internal/lib/publisher/v23_internal_test.go b/lib/publisher/v23_internal_test.go
similarity index 100%
rename from runtime/internal/lib/publisher/v23_internal_test.go
rename to lib/publisher/v23_internal_test.go
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 92aa73c..7cb3921 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -311,12 +311,13 @@
func (m *manager) connectToProxy(ctx *context.T, ep naming.Endpoint) {
defer m.ls.listenLoops.Done()
var eps []naming.Endpoint
+ stopProxy := m.ls.stopProxy
for delay := reconnectDelay; ; delay *= 2 {
time.Sleep(delay - reconnectDelay)
select {
case <-ctx.Done():
return
- case <-m.ls.stopProxy:
+ case <-stopProxy:
return
default:
}
@@ -347,7 +348,7 @@
select {
case <-ctx.Done():
return
- case <-m.ls.stopProxy:
+ case <-stopProxy:
return
case <-f.Closed():
m.updateProxyEndpoints(nil)
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index dda36db..8ffe972 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -31,9 +31,9 @@
"v.io/v23/vtrace"
"v.io/x/ref/lib/apilog"
+ "v.io/x/ref/lib/publisher"
"v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
- "v.io/x/ref/runtime/internal/lib/publisher"
"v.io/x/ref/runtime/internal/lib/roaming"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream"
diff --git a/runtime/internal/rpc/stream/proxy/proxy.go b/runtime/internal/rpc/stream/proxy/proxy.go
index 5e3a32a..5438d35 100644
--- a/runtime/internal/rpc/stream/proxy/proxy.go
+++ b/runtime/internal/rpc/stream/proxy/proxy.go
@@ -21,10 +21,10 @@
"v.io/v23/verror"
"v.io/v23/vom"
+ "v.io/x/ref/lib/publisher"
"v.io/x/ref/runtime/internal/lib/bqueue"
"v.io/x/ref/runtime/internal/lib/bqueue/drrqueue"
"v.io/x/ref/runtime/internal/lib/iobuf"
- "v.io/x/ref/runtime/internal/lib/publisher"
"v.io/x/ref/runtime/internal/lib/upcqueue"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream"
diff --git a/runtime/internal/rpc/test/proxy_test.go b/runtime/internal/rpc/test/proxy_test.go
index d5bc952..44d1bbe 100644
--- a/runtime/internal/rpc/test/proxy_test.go
+++ b/runtime/internal/rpc/test/proxy_test.go
@@ -22,8 +22,8 @@
"v.io/v23/security"
"v.io/v23/verror"
"v.io/x/ref"
+ "v.io/x/ref/lib/publisher"
_ "v.io/x/ref/runtime/factories/generic"
- "v.io/x/ref/runtime/internal/lib/publisher"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream/proxy"
"v.io/x/ref/test"
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 4f14a1d..856941a 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -26,10 +26,10 @@
"v.io/v23/vom"
"v.io/v23/vtrace"
"v.io/x/ref/lib/apilog"
+ "v.io/x/ref/lib/publisher"
"v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
"v.io/x/ref/runtime/internal/flow/manager"
- "v.io/x/ref/runtime/internal/lib/publisher"
inaming "v.io/x/ref/runtime/internal/naming"
)
@@ -139,13 +139,6 @@
s.listen(rootCtx, v23.GetListenSpec(rootCtx))
if len(name) > 0 {
- // TODO(mattr): We only call AddServer here, but if someone calls AddName
- // later there will be no servers?
- s.Lock()
- for k, _ := range s.endpoints {
- s.publisher.AddServer(k)
- }
- s.Unlock()
s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
}
diff --git a/services/wspr/internal/app/app_test.go b/services/wspr/internal/app/app_test.go
index a5d0fc1..37ca2b1 100644
--- a/services/wspr/internal/app/app_test.go
+++ b/services/wspr/internal/app/app_test.go
@@ -32,7 +32,7 @@
"v.io/x/ref/services/wspr/internal/lib"
"v.io/x/ref/services/wspr/internal/lib/testwriter"
"v.io/x/ref/services/wspr/internal/rpc/server"
- "v.io/x/ref/services/xproxyd"
+ "v.io/x/ref/services/xproxy/xproxy"
"v.io/x/ref/test"
"v.io/x/ref/test/testutil"
)
@@ -335,7 +335,7 @@
var proxyEndpoint naming.Endpoint
if ref.RPCTransitionState() >= ref.XServers {
pctx, cancel := context.WithCancel(ctx)
- proxy, _, perr := xproxyd.New(v23.WithListenSpec(pctx, proxySpec))
+ proxy, perr := xproxy.New(v23.WithListenSpec(pctx, proxySpec), "")
proxyEndpoint = proxy.ListeningEndpoints()[0]
if protocol := proxyEndpoint.Addr().Network(); protocol != "tcp" {
return nil, fmt.Errorf("Got %s want tcp", protocol)
diff --git a/services/xproxyd/errors.vdl b/services/xproxy/xproxy/errors.vdl
similarity index 75%
rename from services/xproxyd/errors.vdl
rename to services/xproxy/xproxy/errors.vdl
index b67b78d..3e0167d 100644
--- a/services/xproxyd/errors.vdl
+++ b/services/xproxy/xproxy/errors.vdl
@@ -2,9 +2,10 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package xproxyd
+package xproxy
error (
NotListening() {"en": "Proxy is not listening on any endpoints."}
UnexpectedMessage(msgType string) {"en": "Unexpected message of type{:msgType}"}
+ FailedToResolveToEndpoint(name string) {"en": "Failed to resolve '{name}' to endpoint"}
)
diff --git a/services/xproxy/xproxy/errors.vdl.go b/services/xproxy/xproxy/errors.vdl.go
new file mode 100644
index 0000000..0068b13
--- /dev/null
+++ b/services/xproxy/xproxy/errors.vdl.go
@@ -0,0 +1,42 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: errors.vdl
+
+package xproxy
+
+import (
+ // VDL system imports
+ "v.io/v23/context"
+ "v.io/v23/i18n"
+ "v.io/v23/verror"
+)
+
+var (
+ ErrNotListening = verror.Register("v.io/x/ref/services/xproxy/xproxy.NotListening", verror.NoRetry, "{1:}{2:} Proxy is not listening on any endpoints.")
+ ErrUnexpectedMessage = verror.Register("v.io/x/ref/services/xproxy/xproxy.UnexpectedMessage", verror.NoRetry, "{1:}{2:} Unexpected message of type{:3}")
+ ErrFailedToResolveToEndpoint = verror.Register("v.io/x/ref/services/xproxy/xproxy.FailedToResolveToEndpoint", verror.NoRetry, "{1:}{2:} Failed to resolve '{3}' to endpoint")
+)
+
+func init() {
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNotListening.ID), "{1:}{2:} Proxy is not listening on any endpoints.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnexpectedMessage.ID), "{1:}{2:} Unexpected message of type{:3}")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFailedToResolveToEndpoint.ID), "{1:}{2:} Failed to resolve '{3}' to endpoint")
+}
+
+// NewErrNotListening returns an error with the ErrNotListening ID.
+func NewErrNotListening(ctx *context.T) error {
+ return verror.New(ErrNotListening, ctx)
+}
+
+// NewErrUnexpectedMessage returns an error with the ErrUnexpectedMessage ID.
+func NewErrUnexpectedMessage(ctx *context.T, msgType string) error {
+ return verror.New(ErrUnexpectedMessage, ctx, msgType)
+}
+
+// NewErrFailedToResolveToEndpoint returns an error with the ErrFailedToResolveToEndpoint ID.
+func NewErrFailedToResolveToEndpoint(ctx *context.T, name string) error {
+ return verror.New(ErrFailedToResolveToEndpoint, ctx, name)
+}
diff --git a/services/xproxyd/proxyd.go b/services/xproxy/xproxy/proxy.go
similarity index 65%
rename from services/xproxyd/proxyd.go
rename to services/xproxy/xproxy/proxy.go
index ebf4703..f0e3347 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxy/xproxy/proxy.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package xproxyd
+package xproxy
import (
"io"
@@ -14,38 +14,111 @@
"v.io/v23/flow"
"v.io/v23/flow/message"
"v.io/v23/naming"
+
+ "v.io/x/ref/lib/publisher"
)
+// TODO(suharshs): Make sure we don't leave any goroutines behind.
+
const reconnectDelay = 50 * time.Millisecond
type proxy struct {
- m flow.Manager
- mu sync.Mutex
- proxyEndpoints map[string][]naming.Endpoint // keyed by proxy address
+ m flow.Manager
+ pub publisher.Publisher
+ closed chan struct{}
+
+ mu sync.Mutex
+ listeningEndpoints map[string]naming.Endpoint // keyed by endpoint string
+ proxyEndpoints map[string][]naming.Endpoint // keyed by proxy address
}
-func New(ctx *context.T) (*proxy, *context.T, error) {
+func New(ctx *context.T, name string) (*proxy, error) {
mgr, err := v23.NewFlowManager(ctx)
if err != nil {
- return nil, nil, err
+ return nil, err
}
p := &proxy{
- m: mgr,
- proxyEndpoints: make(map[string][]naming.Endpoint),
+ m: mgr,
+ proxyEndpoints: make(map[string][]naming.Endpoint),
+ listeningEndpoints: make(map[string]naming.Endpoint),
+ pub: publisher.New(ctx, v23.GetNamespace(ctx), time.Minute),
+ closed: make(chan struct{}),
}
- for _, addr := range v23.GetListenSpec(ctx).Addrs {
- if addr.Protocol == "v23" {
- ep, err := v23.NewEndpoint(addr.Address)
- if err != nil {
- return nil, nil, err
- }
- go p.connectToProxy(ctx, addr.Address, ep)
- } else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
- return nil, nil, err
+ if len(name) > 0 {
+ p.pub.AddName(name, false, false)
+ }
+ lspec := v23.GetListenSpec(ctx)
+ if len(lspec.Proxy) > 0 {
+ address, ep, err := resolveToEndpoint(ctx, lspec.Proxy)
+ if err != nil {
+ return nil, err
+ }
+ go p.connectToProxy(ctx, address, ep)
+ }
+ for _, addr := range lspec.Addrs {
+ if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
+ return nil, err
}
}
+ leps, changed := p.m.ListeningEndpoints()
+ p.updateEndpoints(leps)
+ go p.updateEndpointsLoop(changed)
go p.listenLoop(ctx)
- return p, ctx, nil
+ go func() {
+ <-ctx.Done()
+ p.pub.Stop()
+ p.pub.WaitForStop()
+ close(p.closed)
+ }()
+ return p, nil
+}
+
+func (p *proxy) Closed() <-chan struct{} {
+ return p.closed
+}
+
+func (p *proxy) updateEndpointsLoop(changed <-chan struct{}) {
+ var leps []naming.Endpoint
+ for changed != nil {
+ <-changed
+ leps, changed = p.m.ListeningEndpoints()
+ p.updateEndpoints(leps)
+ }
+}
+
+func (p *proxy) updateEndpoints(leps []naming.Endpoint) {
+ p.mu.Lock()
+ endpoints := make(map[string]naming.Endpoint)
+ for _, ep := range leps {
+ endpoints[ep.String()] = ep
+ }
+ rmEps := setDiff(p.listeningEndpoints, endpoints)
+ addEps := setDiff(endpoints, p.listeningEndpoints)
+ for k := range rmEps {
+ delete(p.listeningEndpoints, k)
+ }
+ for k, ep := range addEps {
+ p.listeningEndpoints[k] = ep
+ }
+ p.mu.Unlock()
+
+ for k := range rmEps {
+ p.pub.RemoveServer(k)
+ }
+ for k := range addEps {
+ p.pub.AddServer(k)
+ }
+}
+
+// setDiff returns the endpoints in a that are not in b.
+func setDiff(a, b map[string]naming.Endpoint) map[string]naming.Endpoint {
+ ret := make(map[string]naming.Endpoint)
+ for k, ep := range a {
+ if _, ok := b[k]; !ok {
+ ret[k] = ep
+ }
+ }
+ return ret
}
func (p *proxy) ListeningEndpoints() []naming.Endpoint {
@@ -234,3 +307,20 @@
}
}
}
+
+func resolveToEndpoint(ctx *context.T, name string) (string, naming.Endpoint, error) {
+ resolved, err := v23.GetNamespace(ctx).Resolve(ctx, name)
+ if err != nil {
+ return "", nil, err
+ }
+ for _, n := range resolved.Names() {
+ address, suffix := naming.SplitAddressName(n)
+ if len(suffix) > 0 {
+ continue
+ }
+ if ep, err := v23.NewEndpoint(address); err == nil {
+ return address, ep, nil
+ }
+ }
+ return "", nil, NewErrFailedToResolveToEndpoint(ctx, name)
+}
diff --git a/services/xproxyd/proxy_test.go b/services/xproxy/xproxy/proxy_test.go
similarity index 67%
rename from services/xproxyd/proxy_test.go
rename to services/xproxy/xproxy/proxy_test.go
index a6314d2..3a1d8cb 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxy/xproxy/proxy_test.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package xproxyd_test
+package xproxy_test
import (
"bufio"
@@ -15,7 +15,8 @@
"v.io/x/ref"
_ "v.io/x/ref/runtime/factories/generic"
- "v.io/x/ref/services/xproxyd"
+ "v.io/x/ref/services/xproxy/xproxy"
+ "v.io/x/ref/test"
"v.io/x/ref/test/goroutines"
"v.io/x/ref/test/testutil"
@@ -23,6 +24,7 @@
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
+ "v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
)
@@ -44,26 +46,23 @@
t.Skip("Test only runs under 'V23_RPC_TRANSITION_STATE==xservers'")
}
defer goroutines.NoLeaks(t, leakWaitTime)()
- ctx, shutdown := v23.Init()
+ ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
// Start the proxy.
- pep := startProxy(t, ctx, address{"tcp", "127.0.0.1:0"})
+ pname, stop := startProxy(t, ctx, "proxy", "", address{"tcp", "127.0.0.1:0"})
+ defer stop()
// Start the server listening through the proxy.
- ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: pep.Name()})
- _, s, err := v23.WithNewServer(ctx, "", &testService{}, nil)
- if err != nil {
+ ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: pname})
+ sctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ if _, _, err := v23.WithNewServer(sctx, "server", &testService{}, nil); err != nil {
t.Fatal(err)
}
- // Wait for the server to finish listening through the proxy.
- eps := s.Status().Endpoints
- for ; len(eps) == 0 || eps[0].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
- time.Sleep(pollTime)
- }
var got string
- if err := v23.GetClient(ctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
+ if err := v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
t.Fatal(err)
}
if want := "response:hello"; got != want {
@@ -78,27 +77,25 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
kp := newKillProtocol()
flow.RegisterProtocol("kill", kp)
- ctx, shutdown := v23.Init()
+ ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
// Start the proxies.
- pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
- p2ep := startProxy(t, ctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
+ p1name, stop := startProxy(t, ctx, "p1", "", address{"kill", "127.0.0.1:0"})
+ defer stop()
+ p2name, stop := startProxy(t, ctx, "p2", p1name, address{"kill", "127.0.0.1:0"})
+ defer stop()
// Start the server listening through the proxy.
- ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: p2ep.Name()})
- _, s, err := v23.WithNewServer(ctx, "", &testService{}, nil)
- if err != nil {
+ ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: p2name})
+ sctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ if _, _, err := v23.WithNewServer(sctx, "server", &testService{}, nil); err != nil {
t.Fatal(err)
}
- // Wait for the server to finish listening through the proxy.
- eps := s.Status().Endpoints
- for ; len(eps) == 0 || eps[0].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
- time.Sleep(pollTime)
- }
var got string
- if err := v23.GetClient(ctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
+ if err := v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
t.Fatal(err)
}
if want := "response:hello"; got != want {
@@ -111,10 +108,10 @@
t.Skip("Test only runs under 'V23_RPC_TRANSITION_STATE==xservers'")
}
defer goroutines.NoLeaks(t, leakWaitTime)()
- ctx, shutdown := v23.Init()
+ ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
- // Make principals for the proxy and server.
+ // Make principals for the proxy, server, and client.
pctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("proxy"))
if err != nil {
t.Fatal(err)
@@ -123,37 +120,40 @@
if err != nil {
t.Fatal(err)
}
-
- // Server blesses the proxy so that the server is willing to talk to it.
- ids := testutil.IDProviderFromPrincipal(v23.GetPrincipal(sctx))
- if err := ids.Bless(v23.GetPrincipal(pctx), "proxy"); err != nil {
+ cctx, err := v23.WithPrincipal(ctx, testutil.NewPrincipal("client"))
+ if err != nil {
t.Fatal(err)
}
- // Client blesses the server so that the client is willing to talk to it.
- idc := testutil.IDProviderFromPrincipal(v23.GetPrincipal(ctx))
- if err := idc.Bless(v23.GetPrincipal(sctx), "server"); err != nil {
+
+ // Have the root bless the client, server, and proxy.
+ root := testutil.IDProviderFromPrincipal(v23.GetPrincipal(ctx))
+ if err := root.Bless(v23.GetPrincipal(pctx), "proxy"); err != nil {
+ t.Fatal(err)
+ }
+ if err := root.Bless(v23.GetPrincipal(cctx), "client"); err != nil {
+ t.Fatal(err)
+ }
+ if err := root.Bless(v23.GetPrincipal(sctx), "server"); err != nil {
t.Fatal(err)
}
// Now the proxy's blessings would fail authorization from the client using the
// default authorizer.
- pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
+ pname, stop := startProxy(t, pctx, "proxy", "", address{"tcp", "127.0.0.1:0"})
+ defer stop()
// Start the server listening through the proxy.
- sctx = v23.WithListenSpec(sctx, rpc.ListenSpec{Proxy: pep.Name()})
- _, s, err := v23.WithNewServer(sctx, "", &testService{}, security.AllowEveryone())
- if err != nil {
+ sctx = v23.WithListenSpec(sctx, rpc.ListenSpec{Proxy: pname})
+ sctx, cancel := context.WithCancel(sctx)
+ defer cancel()
+ if _, _, err := v23.WithNewServer(sctx, "server", &testService{}, security.AllowEveryone()); err != nil {
t.Fatal(err)
}
- // Wait for the server to finish listening through the proxy.
- eps := s.Status().Endpoints
- for ; len(eps) == 0 || eps[0].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
- time.Sleep(pollTime)
- }
// The call should succeed which means that the client did not try to authorize
// the proxy's blessings.
var got string
- if err := v23.GetClient(ctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
+ if err := v23.GetClient(cctx).Call(cctx, "server", "Echo", []interface{}{"hello"},
+ []interface{}{&got}, options.ServerAuthorizer{rejectProxyAuthorizer{}}); err != nil {
t.Fatal(err)
}
if want := "response:hello"; got != want {
@@ -161,12 +161,24 @@
}
}
+type rejectProxyAuthorizer struct{}
+
+func (rejectProxyAuthorizer) Authorize(ctx *context.T, call security.Call) error {
+ names, _ := security.RemoteBlessingNames(ctx, call)
+ for _, n := range names {
+ if strings.Contains(n, "proxy") {
+ panic("should not call authorizer on proxy")
+ }
+ }
+ return nil
+}
+
// TODO(suharshs): Remove the below tests when the transition is complete.
func TestSingleProxy(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
kp := newKillProtocol()
flow.RegisterProtocol("kill", kp)
- ctx, shutdown := v23.Init()
+ ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
am, err := v23.NewFlowManager(ctx)
if err != nil {
@@ -177,7 +189,13 @@
t.Fatal(err)
}
- pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
+ pname, stop := startProxy(t, ctx, "", "", address{"kill", "127.0.0.1:0"})
+ defer stop()
+ address, _ := naming.SplitAddressName(pname)
+ pep, err := v23.NewEndpoint(address)
+ if err != nil {
+ t.Fatal(err)
+ }
if err := am.ProxyListen(ctx, pep); err != nil {
t.Fatal(err)
@@ -198,7 +216,7 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
kp := newKillProtocol()
flow.RegisterProtocol("kill", kp)
- ctx, shutdown := v23.Init()
+ ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
am, err := v23.NewFlowManager(ctx)
if err != nil {
@@ -209,11 +227,18 @@
t.Fatal(err)
}
- pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
+ p1name, stop := startProxy(t, ctx, "", "", address{"kill", "127.0.0.1:0"})
+ defer stop()
+ p2name, stop := startProxy(t, ctx, "", p1name, address{"kill", "127.0.0.1:0"})
+ defer stop()
- p2ep := startProxy(t, ctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
-
- p3ep := startProxy(t, ctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
+ p3name, stop := startProxy(t, ctx, "", p2name, address{"kill", "127.0.0.1:0"})
+ defer stop()
+ address, _ := naming.SplitAddressName(p3name)
+ p3ep, err := v23.NewEndpoint(address)
+ if err != nil {
+ t.Fatal(err)
+ }
if err := am.ProxyListen(ctx, p3ep); err != nil {
t.Fatal(err)
@@ -308,34 +333,51 @@
Protocol, Address string
}
-func startProxy(t *testing.T, ctx *context.T, addrs ...address) naming.Endpoint {
+func startProxy(t *testing.T, ctx *context.T, name string, listenOnProxy string, addrs ...address) (string, func()) {
var ls rpc.ListenSpec
- hasProxies := false
+ hasProxies := len(listenOnProxy) > 0
for _, addr := range addrs {
ls.Addrs = append(ls.Addrs, addr)
- if addr.Protocol == "v23" {
- hasProxies = true
- }
}
+ ls.Proxy = listenOnProxy
ctx = v23.WithListenSpec(ctx, ls)
- proxy, _, err := xproxyd.New(ctx)
+ ctx, cancel := context.WithCancel(ctx)
+ proxy, err := xproxy.New(ctx, name)
if err != nil {
t.Fatal(err)
}
+ stop := func() {
+ cancel()
+ <-proxy.Closed()
+ }
// Wait for the proxy to connect to its proxies.
if hasProxies {
for len(proxy.MultipleProxyEndpoints()) == 0 {
time.Sleep(pollTime)
}
}
+ if len(name) > 0 {
+ waitForPublish(ctx, name)
+ return name, stop
+ }
peps := proxy.ListeningEndpoints()
for _, pep := range peps {
if pep.Addr().Network() == "tcp" || pep.Addr().Network() == "kill" {
- return pep
+ return pep.Name(), stop
}
}
t.Fatal("Proxy not listening on network address.")
- return nil
+ return "", nil
+}
+
+func waitForPublish(ctx *context.T, name string) {
+ ns := v23.GetNamespace(ctx)
+ for {
+ if entry, err := ns.Resolve(ctx, name); err == nil && len(entry.Names()) > 0 {
+ return
+ }
+ time.Sleep(pollTime)
+ }
}
type killProtocol struct {
diff --git a/services/xproxyd/util.go b/services/xproxy/xproxy/util.go
similarity index 99%
rename from services/xproxyd/util.go
rename to services/xproxy/xproxy/util.go
index 3d43430..1419caf 100644
--- a/services/xproxyd/util.go
+++ b/services/xproxy/xproxy/util.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package xproxyd
+package xproxy
import (
"fmt"
diff --git a/services/xproxy/xproxyd/doc.go b/services/xproxy/xproxyd/doc.go
new file mode 100644
index 0000000..6dcfd08
--- /dev/null
+++ b/services/xproxy/xproxyd/doc.go
@@ -0,0 +1,74 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated via go generate.
+// DO NOT UPDATE MANUALLY
+
+/*
+Command proxyd is a daemon that listens for connections from Vanadium services
+(typically behind NATs) and proxies these services to the outside world.
+
+Usage:
+ proxyd [flags]
+
+The proxyd flags are:
+ -healthz-address=
+ Network address on which the HTTP healthz server runs. It is intended to be
+ used with a load balancer. The load balancer must be able to reach this
+ address in order to verify that the proxy server is running.
+ -name=
+ Name to mount the proxy as.
+
+The global flags are:
+ -alsologtostderr=true
+ log to standard error as well as files
+ -log_backtrace_at=:0
+ when logging hits line file:N, emit a stack trace
+ -log_dir=
+ if non-empty, write log files to this directory
+ -logtostderr=false
+ log to standard error instead of files
+ -max_stack_buf_size=4292608
+ max size in bytes of the buffer to use for logging stack traces
+ -metadata=<just specify -metadata to activate>
+ Displays metadata for the program and exits.
+ -stderrthreshold=2
+ logs at or above this threshold go to stderr
+ -time=false
+ Dump timing information to stderr before exiting the program.
+ -v=0
+ log level for V logs
+ -v23.credentials=
+ directory to use for storing security credentials
+ -v23.i18n-catalogue=
+ 18n catalogue files to load, comma separated
+ -v23.namespace.root=[/(dev.v.io/role/vprod/service/mounttabled)@ns.dev.v.io:8101]
+ local namespace root; can be repeated to provided multiple roots
+ -v23.permissions.file=map[]
+ specify a perms file as <name>:<permsfile>
+ -v23.permissions.literal=
+ explicitly specify the runtime perms as a JSON-encoded access.Permissions.
+ Overrides all --v23.permissions.file flags.
+ -v23.proxy=
+ object name of proxy service to use to export services across network
+ boundaries
+ -v23.tcp.address=
+ address to listen on
+ -v23.tcp.protocol=wsh
+ protocol to listen with
+ -v23.vtrace.cache-size=1024
+ The number of vtrace traces to store in memory.
+ -v23.vtrace.collect-regexp=
+ Spans and annotations that match this regular expression will trigger trace
+ collection.
+ -v23.vtrace.dump-on-shutdown=true
+ If true, dump all stored traces on runtime shutdown.
+ -v23.vtrace.sample-rate=0
+ Rate (from 0.0 to 1.0) to sample vtrace traces.
+ -vmodule=
+ comma-separated list of pattern=N settings for filename-filtered logging
+ -vpath=
+ comma-separated list of pattern=N settings for file pathname-filtered logging
+*/
+package main
diff --git a/services/xproxy/xproxyd/main.go b/services/xproxy/xproxyd/main.go
new file mode 100644
index 0000000..834ac4d
--- /dev/null
+++ b/services/xproxy/xproxyd/main.go
@@ -0,0 +1,113 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The following enables go generate to generate the doc.go file.
+//go:generate go run $JIRI_ROOT/release/go/src/v.io/x/lib/cmdline/testdata/gendoc.go . -help
+
+package main
+
+import (
+ "fmt"
+ "net/http"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+
+ "v.io/x/lib/cmdline"
+ "v.io/x/ref/lib/signals"
+ "v.io/x/ref/lib/v23cmd"
+ _ "v.io/x/ref/runtime/factories/roaming"
+
+ "v.io/x/ref/services/xproxy/xproxy"
+)
+
+// TODO(suharshs): add authorization of server listening through proxy.
+
+var healthzAddr, name string
+
+const healthTimeout = 10 * time.Second
+
+func main() {
+ cmdProxyD.Flags.StringVar(&healthzAddr, "healthz-address", "", "Network address on which the HTTP healthz server runs. It is intended to be used with a load balancer. The load balancer must be able to reach this address in order to verify that the proxy server is running.")
+ cmdProxyD.Flags.StringVar(&name, "name", "", "Name to mount the proxy as.")
+
+ cmdline.HideGlobalFlagsExcept()
+ cmdline.Main(cmdProxyD)
+}
+
+var cmdProxyD = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runProxyD),
+ Name: "proxyd",
+ Short: "Proxies services to the outside world",
+ Long: `
+Command proxyd is a daemon that listens for connections from Vanadium services
+(typically behind NATs) and proxies these services to the outside world.
+`,
+}
+
+func runProxyD(ctx *context.T, env *cmdline.Env, args []string) error {
+ // TODO(suharshs): Add ability to specify multiple proxies through this tool.
+ proxy, err := xproxy.New(ctx, name)
+ if err != nil {
+ return err
+ }
+ peps := proxy.ListeningEndpoints()
+ proxyEndpoint := peps[0]
+
+ if len(name) > 0 {
+ // Print out a directly accessible name for the proxy table so
+ // that integration tests can reliably read it from stdout.
+ fmt.Printf("NAME=%s\n", proxyEndpoint.Name())
+ } else {
+ fmt.Printf("Proxy listening on %s\n", proxyEndpoint)
+ }
+
+ if len(healthzAddr) != 0 {
+ go startHealthzServer(ctx, healthzAddr)
+ }
+
+ // Start an RPC Server that listens through the proxy itself. This
+ // server will serve reserved methods only.
+ var monitoringName string
+ if len(name) > 0 {
+ monitoringName = name + "-mon"
+ }
+ ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: proxyEndpoint.Name()})
+ if _, _, err := v23.WithNewDispatchingServer(ctx, monitoringName, &nilDispatcher{}); err != nil {
+ return fmt.Errorf("NewServer failed: %v", err)
+ }
+ <-signals.ShutdownOnSignals(ctx)
+ return nil
+}
+
+type nilDispatcher struct{}
+
+func (nilDispatcher) Lookup(*context.T, string) (interface{}, security.Authorizer, error) {
+ return nil, nil, nil
+}
+
+// healthzHandler implements net/http.Handler
+type healthzHandler struct{}
+
+func (healthzHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
+ w.Write([]byte("ok"))
+}
+
+// startHealthzServer starts a HTTP server that simply returns "ok" to every
+// request. This is needed to let the load balancer know that the proxy server
+// is running.
+func startHealthzServer(ctx *context.T, addr string) {
+ s := http.Server{
+ Addr: addr,
+ Handler: healthzHandler{},
+ ReadTimeout: healthTimeout,
+ WriteTimeout: healthTimeout,
+ }
+ if err := s.ListenAndServe(); err != nil {
+ ctx.Fatal(err)
+ }
+}
diff --git a/services/xproxyd/errors.vdl.go b/services/xproxyd/errors.vdl.go
deleted file mode 100644
index 688e8ad..0000000
--- a/services/xproxyd/errors.vdl.go
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// This file was auto-generated by the vanadium vdl tool.
-// Source: errors.vdl
-
-package xproxyd
-
-import (
- // VDL system imports
- "v.io/v23/context"
- "v.io/v23/i18n"
- "v.io/v23/verror"
-)
-
-var (
- ErrNotListening = verror.Register("v.io/x/ref/services/xproxyd.NotListening", verror.NoRetry, "{1:}{2:} Proxy is not listening on any endpoints.")
- ErrUnexpectedMessage = verror.Register("v.io/x/ref/services/xproxyd.UnexpectedMessage", verror.NoRetry, "{1:}{2:} Unexpected message of type{:3}")
-)
-
-func init() {
- i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNotListening.ID), "{1:}{2:} Proxy is not listening on any endpoints.")
- i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnexpectedMessage.ID), "{1:}{2:} Unexpected message of type{:3}")
-}
-
-// NewErrNotListening returns an error with the ErrNotListening ID.
-func NewErrNotListening(ctx *context.T) error {
- return verror.New(ErrNotListening, ctx)
-}
-
-// NewErrUnexpectedMessage returns an error with the ErrUnexpectedMessage ID.
-func NewErrUnexpectedMessage(ctx *context.T, msgType string) error {
- return verror.New(ErrUnexpectedMessage, ctx, msgType)
-}