ref: Change "profiles" directory to "runtime"
As per vanadium/issues#470
MultiPart: 4/10
Change-Id: I3ac47c1d9c514f7bbe1c80507c2b3db7fcd9f6d4
diff --git a/runtime/internal/rpc/test/proxy_test.go b/runtime/internal/rpc/test/proxy_test.go
new file mode 100644
index 0000000..05e1c36
--- /dev/null
+++ b/runtime/internal/rpc/test/proxy_test.go
@@ -0,0 +1,409 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "reflect"
+ "sort"
+ "strings"
+ "testing"
+ "time"
+
+ "v.io/x/lib/vlog"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/namespace"
+ "v.io/v23/naming"
+ "v.io/v23/options"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/verror"
+ "v.io/v23/vtrace"
+ "v.io/x/ref/lib/flags"
+ _ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/ref/runtime/internal/lib/publisher"
+ inaming "v.io/x/ref/runtime/internal/naming"
+ irpc "v.io/x/ref/runtime/internal/rpc"
+ imanager "v.io/x/ref/runtime/internal/rpc/stream/manager"
+ "v.io/x/ref/runtime/internal/rpc/stream/proxy"
+ tnaming "v.io/x/ref/runtime/internal/testing/mocks/naming"
+ ivtrace "v.io/x/ref/runtime/internal/vtrace"
+ "v.io/x/ref/test/modules"
+ "v.io/x/ref/test/testutil"
+)
+
+func testContext() (*context.T, func()) {
+ ctx, shutdown := v23.Init()
+ ctx, _ = context.WithTimeout(ctx, 20*time.Second)
+ var err error
+ if ctx, err = ivtrace.Init(ctx, flags.VtraceFlags{}); err != nil {
+ panic(err)
+ }
+ ctx, _ = vtrace.WithNewTrace(ctx)
+ return ctx, shutdown
+}
+
+func proxyServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ expected := len(args)
+
+ listenSpec := rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
+ proxyShutdown, proxyEp, err := proxy.New(ctx, listenSpec, security.AllowEveryone())
+ if err != nil {
+ fmt.Fprintf(stderr, "%s\n", verror.DebugString(err))
+ return err
+ }
+ defer proxyShutdown()
+ fmt.Fprintf(stdout, "PID=%d\n", os.Getpid())
+ if expected > 0 {
+ pub := publisher.New(ctx, v23.GetNamespace(ctx), time.Minute)
+ defer pub.WaitForStop()
+ defer pub.Stop()
+ pub.AddServer(proxyEp.String())
+ for _, name := range args {
+ if len(name) == 0 {
+ return fmt.Errorf("empty name specified on the command line")
+ }
+ pub.AddName(name, false, false)
+ }
+ // Wait for all the entries to be published.
+ for {
+ pubState := pub.Status()
+ if expected == len(pubState) {
+ break
+ }
+ delay := time.Second
+ time.Sleep(delay)
+ }
+ }
+ fmt.Fprintf(stdout, "PROXY_NAME=%s\n", proxyEp.Name())
+ modules.WaitForEOF(stdin)
+ fmt.Fprintf(stdout, "DONE\n")
+ return nil
+}
+
+type testServer struct{}
+
+func (*testServer) Echo(_ *context.T, call rpc.ServerCall, arg string) (string, error) {
+ return fmt.Sprintf("method:%q,suffix:%q,arg:%q", "Echo", call.Suffix(), arg), nil
+}
+
+type testServerAuthorizer struct{}
+
+func (testServerAuthorizer) Authorize(*context.T, security.Call) error {
+ return nil
+}
+
+type testServerDisp struct{ server interface{} }
+
+func (t testServerDisp) Lookup(suffix string) (interface{}, security.Authorizer, error) {
+ return t.server, testServerAuthorizer{}, nil
+}
+
+type proxyHandle struct {
+ ns namespace.T
+ sh *modules.Shell
+ proxy modules.Handle
+ name string
+}
+
+func (h *proxyHandle) Start(t *testing.T, ctx *context.T, args ...string) error {
+ sh, err := modules.NewShell(nil, nil, testing.Verbose(), t)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ h.sh = sh
+ p, err := sh.Start("proxyServer", nil, args...)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ h.proxy = p
+ p.ReadLine()
+ h.name = p.ExpectVar("PROXY_NAME")
+ if len(h.name) == 0 {
+ h.proxy.Shutdown(os.Stderr, os.Stderr)
+ t.Fatalf("failed to get PROXY_NAME from proxyd")
+ }
+ return h.ns.Mount(ctx, "proxy", h.name, time.Hour)
+}
+
+func (h *proxyHandle) Stop(ctx *context.T) error {
+ defer h.sh.Cleanup(os.Stderr, os.Stderr)
+ if err := h.proxy.Shutdown(os.Stderr, os.Stderr); err != nil {
+ return err
+ }
+ if len(h.name) == 0 {
+ return nil
+ }
+ return h.ns.Unmount(ctx, "proxy", h.name)
+}
+
+func TestProxyOnly(t *testing.T) {
+ listenSpec := rpc.ListenSpec{Proxy: "proxy"}
+ testProxy(t, listenSpec)
+}
+
+func TestProxy(t *testing.T) {
+ proxyListenSpec := rpc.ListenSpec{
+ Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}},
+ Proxy: "proxy",
+ }
+ testProxy(t, proxyListenSpec)
+}
+
+func TestWSProxy(t *testing.T) {
+ proxyListenSpec := rpc.ListenSpec{
+ Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}},
+ Proxy: "proxy",
+ }
+ // The proxy uses websockets only, but the server is using tcp.
+ testProxy(t, proxyListenSpec, "--v23.tcp.protocol=ws")
+}
+
+func testProxy(t *testing.T, spec rpc.ListenSpec, args ...string) {
+ ctx, shutdown := testContext()
+ defer shutdown()
+
+ var (
+ pserver = testutil.NewPrincipal("server")
+ pclient = testutil.NewPrincipal("client")
+ serverKey = pserver.PublicKey()
+ // We use different stream managers for the client and server
+ // to prevent VIF re-use (in other words, we want to test VIF
+ // creation from both the client and server end).
+ smserver = imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ smclient = imanager.InternalNew(naming.FixedRoutingID(0x444444444))
+ ns = tnaming.NewSimpleNamespace()
+ )
+ defer smserver.Shutdown()
+ defer smclient.Shutdown()
+ client, err := irpc.InternalNewClient(smserver, ns)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+ serverCtx, _ := v23.WithPrincipal(ctx, pserver)
+ server, err := irpc.InternalNewServer(serverCtx, smserver, ns, nil, "", nil, pserver)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer server.Stop()
+
+ // The client must recognize the server's blessings, otherwise it won't
+ // communicate with it.
+ pclient.AddToRoots(pserver.BlessingStore().Default())
+
+ // If no address is specified then we'll only 'listen' via
+ // the proxy.
+ hasLocalListener := len(spec.Addrs) > 0 && len(spec.Addrs[0].Address) != 0
+
+ name := "mountpoint/server/suffix"
+ makeCall := func(opts ...rpc.CallOpt) (string, error) {
+ clientCtx, _ := v23.WithPrincipal(ctx, pclient)
+ clientCtx, _ = context.WithDeadline(clientCtx, time.Now().Add(5*time.Second))
+ call, err := client.StartCall(clientCtx, name, "Echo", []interface{}{"batman"}, opts...)
+ if err != nil {
+ // proxy is down, we should return here/.... prepend
+ // the error with a well known string so that we can test for that.
+ return "", fmt.Errorf("RESOLVE: %s", err)
+ }
+ var result string
+ if err = call.Finish(&result); err != nil {
+ return "", err
+ }
+ return result, nil
+ }
+ proxy := &proxyHandle{ns: ns}
+ if err := proxy.Start(t, ctx, args...); err != nil {
+ t.Fatal(err)
+ }
+ defer proxy.Stop(ctx)
+ addrs := verifyMount(t, ctx, ns, spec.Proxy)
+ if len(addrs) != 1 {
+ t.Fatalf("failed to lookup proxy")
+ }
+
+ eps, err := server.Listen(spec)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := server.ServeDispatcher("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
+ t.Fatal(err)
+ }
+
+ // Proxy connections are started asynchronously, so we need to wait..
+ waitForMountTable := func(ch chan int, expect int) {
+ then := time.Now().Add(time.Minute)
+ for {
+ me, err := ns.Resolve(ctx, name)
+ if err != nil {
+ continue
+ }
+ for i, s := range me.Servers {
+ vlog.Infof("%d: %s", i, s)
+ }
+ if err == nil && len(me.Servers) == expect {
+ ch <- 1
+ return
+ }
+ if time.Now().After(then) {
+ t.Fatalf("timed out waiting for %d servers, found %d", expect, len(me.Servers))
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+ waitForServerStatus := func(ch chan int, proxy string) {
+ then := time.Now().Add(time.Minute)
+ for {
+ status := server.Status()
+ if len(status.Proxies) == 1 && status.Proxies[0].Proxy == proxy {
+ ch <- 2
+ return
+ }
+ if time.Now().After(then) {
+ t.Fatalf("timed out")
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+ proxyEP, _ := naming.SplitAddressName(addrs[0])
+ proxiedEP, err := inaming.NewEndpoint(proxyEP)
+ if err != nil {
+ t.Fatalf("unexpected error for %q: %s", proxyEP, err)
+ }
+ proxiedEP.RID = naming.FixedRoutingID(0x555555555)
+ proxiedEP.Blessings = []string{"server"}
+ expectedNames := []string{naming.JoinAddressName(proxiedEP.String(), "suffix")}
+ if hasLocalListener {
+ expectedNames = append(expectedNames, naming.JoinAddressName(eps[0].String(), "suffix"))
+ }
+
+ // Proxy connetions are created asynchronously, so we wait for the
+ // expected number of endpoints to appear for the specified service name.
+ ch := make(chan int, 2)
+ go waitForMountTable(ch, len(expectedNames))
+ go waitForServerStatus(ch, spec.Proxy)
+ select {
+ case <-time.After(time.Minute):
+ t.Fatalf("timedout waiting for two entries in the mount table and server status")
+ case i := <-ch:
+ select {
+ case <-time.After(time.Minute):
+ t.Fatalf("timedout waiting for two entries in the mount table or server status")
+ case j := <-ch:
+ if !((i == 1 && j == 2) || (i == 2 && j == 1)) {
+ t.Fatalf("unexpected return values from waiters")
+ }
+ }
+ }
+
+ status := server.Status()
+ if got, want := status.Proxies[0].Endpoint, proxiedEP; !reflect.DeepEqual(got, want) {
+ t.Fatalf("got %q, want %q", got, want)
+ }
+
+ got := []string{}
+ for _, s := range verifyMount(t, ctx, ns, name) {
+ got = append(got, s)
+ }
+ sort.Strings(got)
+ sort.Strings(expectedNames)
+ if !reflect.DeepEqual(got, expectedNames) {
+ t.Errorf("got %v, want %v", got, expectedNames)
+ }
+
+ if hasLocalListener {
+ // Listen will publish both the local and proxied endpoint with the
+ // mount table, given that we're trying to test the proxy, we remove
+ // the local endpoint from the mount table entry! We have to remove both
+ // the tcp and the websocket address.
+ sep := eps[0].String()
+ ns.Unmount(ctx, "mountpoint/server", sep)
+ }
+
+ addrs = verifyMount(t, ctx, ns, name)
+ if len(addrs) != 1 {
+ t.Fatalf("failed to lookup proxy: addrs %v", addrs)
+ }
+
+ // Proxied endpoint should be published and RPC should succeed (through proxy).
+ // Additionally, any server authorizaton options must only apply to the end server
+ // and not the proxy.
+ const expected = `method:"Echo",suffix:"suffix",arg:"batman"`
+ if result, err := makeCall(options.ServerPublicKey{serverKey}); result != expected || err != nil {
+ t.Fatalf("Got (%v, %v) want (%v, nil)", result, err, expected)
+ }
+
+ // Proxy dies, calls should fail and the name should be unmounted.
+ if err := proxy.Stop(ctx); err != nil {
+ t.Fatal(err)
+ }
+
+ if result, err := makeCall(options.NoRetry{}); err == nil || (!strings.HasPrefix(err.Error(), "RESOLVE") && !strings.Contains(err.Error(), "EOF")) {
+ t.Fatalf(`Got (%v, %v) want ("", "RESOLVE: <err>" or "EOF") as proxy is down`, result, err)
+ }
+
+ for {
+ if _, err := ns.Resolve(ctx, name); err != nil {
+ break
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ verifyMountMissing(t, ctx, ns, name)
+
+ status = server.Status()
+ if got, want := len(status.Proxies), 1; got != want {
+ t.Logf("Proxies: %v", status.Proxies)
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if got, want := status.Proxies[0].Proxy, spec.Proxy; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if got, want := verror.ErrorID(status.Proxies[0].Error), verror.ErrNoServers.ID; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+
+ // Proxy restarts, calls should eventually start succeeding.
+ if err := proxy.Start(t, ctx, args...); err != nil {
+ t.Fatal(err)
+ }
+
+ retries := 0
+ for {
+ if result, err := makeCall(); err == nil {
+ if result != expected {
+ t.Errorf("Got (%v, %v) want (%v, nil)", result, err, expected)
+ }
+ break
+ } else {
+ retries++
+ if retries > 10 {
+ t.Fatalf("Failed after 10 attempts: err: %s", err)
+ }
+ }
+ }
+}
+
+func verifyMount(t *testing.T, ctx *context.T, ns namespace.T, name string) []string {
+ me, err := ns.Resolve(ctx, name)
+ if err != nil {
+ t.Errorf("%s not found in mounttable", name)
+ return nil
+ }
+ return me.Names()
+}
+
+func verifyMountMissing(t *testing.T, ctx *context.T, ns namespace.T, name string) {
+ if me, err := ns.Resolve(ctx, name); err == nil {
+ names := me.Names()
+ t.Errorf("%s not supposed to be found in mounttable; got %d servers instead: %v", name, len(names), names)
+ }
+}