veyron/services/mgmt/node/impl: Add proxy invoker for pprof and stats

proxyInvoker is an ipc.Invoker implementation that proxies all requests
to a remote object, i.e. requests to <suffix> are forwarded to
<remote>/<suffix> transparently.

It is used by the node manager to proxy the stats and pprof objects of
running apps.

Change-Id: I9a07b76edebd879d54bea4d6d913194d5d86a86e
diff --git a/services/mgmt/node/impl/app_invoker.go b/services/mgmt/node/impl/app_invoker.go
index e14dc1e..4de936a 100644
--- a/services/mgmt/node/impl/app_invoker.go
+++ b/services/mgmt/node/impl/app_invoker.go
@@ -794,10 +794,11 @@
 
 type treeNode struct {
 	children map[string]*treeNode
+	remote   string // Used to implement Glob for remote objects.
 }
 
 func newTreeNode() *treeNode {
-	return &treeNode{make(map[string]*treeNode)}
+	return &treeNode{children: make(map[string]*treeNode)}
 }
 
 func (n *treeNode) find(names []string, create bool) *treeNode {
@@ -862,7 +863,8 @@
 	}
 	for _, path := range instances {
 		instanceDir := filepath.Dir(path)
-		if _, err := loadInstanceInfo(instanceDir); err != nil {
+		info, err := loadInstanceInfo(instanceDir)
+		if err != nil {
 			continue
 		}
 		relpath, _ := filepath.Rel(i.config.Root, path)
@@ -877,6 +879,14 @@
 		if title, ok := appIDMap[appID]; ok {
 			n := tree.find([]string{title, installID, instanceID, "logs"}, true)
 			i.addLogFiles(n, filepath.Join(instanceDir, "logs"))
+
+			if instanceStateIs(instanceDir, started) {
+				// Set the name of the remote objects that should handle Glob.
+				for _, obj := range []string{"pprof", "stats"} {
+					n = tree.find([]string{title, installID, instanceID, obj}, true)
+					n.remote = naming.JoinAddressName(info.AppCycleMgrName, naming.Join("__debug", obj))
+				}
+			}
 		}
 	}
 	return tree
@@ -903,11 +913,15 @@
 	if n == nil {
 		return errInvalidSuffix
 	}
-	i.globStep("", g, n, stream)
+	i.globStep(ctx, "", g, n, stream)
 	return nil
 }
 
-func (i *appInvoker) globStep(prefix string, g *glob.Glob, n *treeNode, stream mounttable.GlobbableServiceGlobStream) {
+func (i *appInvoker) globStep(ctx ipc.ServerContext, prefix string, g *glob.Glob, n *treeNode, stream mounttable.GlobbableServiceGlobStream) {
+	if n.remote != "" {
+		remoteGlob(ctx, n.remote, prefix, g.String(), stream)
+		return
+	}
 	if g.Len() == 0 {
 		stream.SendStream().Send(types.MountEntry{Name: prefix})
 	}
@@ -916,7 +930,35 @@
 	}
 	for name, child := range n.children {
 		if ok, _, left := g.MatchInitialSegment(name); ok {
-			i.globStep(naming.Join(prefix, name), left, child, stream)
+			i.globStep(ctx, naming.Join(prefix, name), left, child, stream)
 		}
 	}
 }
+
+func remoteGlob(ctx ipc.ServerContext, remote, prefix, pattern string, stream mounttable.GlobbableServiceGlobStream) {
+	c, err := mounttable.BindGlobbable(remote)
+	if err != nil {
+		vlog.VI(1).Infof("BindGlobbable(%q): %v", remote, err)
+		return
+	}
+	call, err := c.Glob(ctx, pattern)
+	if err != nil {
+		vlog.VI(1).Infof("%q.Glob(%q): %v", remote, pattern, err)
+		return
+	}
+	it := call.RecvStream()
+	sender := stream.SendStream()
+	for it.Advance() {
+		me := it.Value()
+		me.Name = naming.Join(prefix, me.Name)
+		sender.Send(me)
+	}
+	if err := it.Err(); err != nil {
+		vlog.VI(1).Infof("%q.Glob(%q): %v", remote, pattern, err)
+		return
+	}
+	if err := call.Finish(); err != nil {
+		vlog.VI(1).Infof("%q.Glob(%q): %v", remote, pattern, err)
+		return
+	}
+}
diff --git a/services/mgmt/node/impl/app_state.go b/services/mgmt/node/impl/app_state.go
index df5049f..9fbc1c2 100644
--- a/services/mgmt/node/impl/app_state.go
+++ b/services/mgmt/node/impl/app_state.go
@@ -79,6 +79,12 @@
 	}
 }
 
+func instanceStateIs(instanceDir string, state instanceState) bool {
+	if _, err := os.Stat(filepath.Join(instanceDir, state.String())); err != nil {
+		return false
+	}
+	return true
+}
 func transitionInstance(instanceDir string, initial, target instanceState) error {
 	return transitionState(instanceDir, initial, target)
 }
diff --git a/services/mgmt/node/impl/dispatcher.go b/services/mgmt/node/impl/dispatcher.go
index 655e758..985fff0 100644
--- a/services/mgmt/node/impl/dispatcher.go
+++ b/services/mgmt/node/impl/dispatcher.go
@@ -24,6 +24,8 @@
 	"veyron.io/veyron/veyron2/rt"
 	"veyron.io/veyron/veyron2/security"
 	"veyron.io/veyron/veyron2/services/mgmt/node"
+	"veyron.io/veyron/veyron2/services/mgmt/pprof"
+	"veyron.io/veyron/veyron2/services/mgmt/stats"
 	"veyron.io/veyron/veyron2/services/security/access"
 	"veyron.io/veyron/veyron2/verror"
 	"veyron.io/veyron/veyron2/vlog"
@@ -247,14 +249,43 @@
 		})
 		return ipc.ReflectInvoker(receiver), d.auth, nil
 	case appsSuffix:
-		if method != "Glob" && len(components) >= 5 && components[4] == "logs" {
+		// Glob requests are handled by appInvoker, except for pprof and
+		// stats objects which handle Glob themselves.
+		// Requests to apps/*/*/*/logs are handled locally by LogFileInvoker.
+		// Requests to apps/*/*/*/pprof are proxied to the apps' __debug/pprof object.
+		// Requests to apps/*/*/*/stats are proxied to the apps' __debug/stats object.
+		// Everything else is handled by appInvoker.
+		if len(components) >= 5 && (method != "Glob" || components[4] != "logs") {
 			appInstanceDir, err := instanceDir(d.config.Root, components[1:4])
 			if err != nil {
 				return nil, nil, err
 			}
-			logsDir := filepath.Join(appInstanceDir, "logs")
-			suffix := naming.Join(components[5:]...)
-			return logsimpl.NewLogFileInvoker(logsDir, suffix), d.auth, nil
+			switch kind := components[4]; kind {
+			case "logs":
+				logsDir := filepath.Join(appInstanceDir, "logs")
+				suffix := naming.Join(components[5:]...)
+				return logsimpl.NewLogFileInvoker(logsDir, suffix), d.auth, nil
+			case "pprof", "stats":
+				info, err := loadInstanceInfo(appInstanceDir)
+				if err != nil {
+					return nil, nil, err
+				}
+				if !instanceStateIs(appInstanceDir, started) {
+					return nil, nil, errInvalidSuffix
+				}
+				var label security.Label
+				var sigStub signatureStub
+				if kind == "pprof" {
+					label = security.DebugLabel
+					sigStub = &pprof.ServerStubPProf{}
+				} else {
+					label = security.DebugLabel | security.MonitoringLabel
+					sigStub = &stats.ServerStubStats{}
+				}
+				suffix := naming.Join("__debug", naming.Join(components[4:]...))
+				remote := naming.JoinAddressName(info.AppCycleMgrName, suffix)
+				return &proxyInvoker{remote, label, sigStub}, d.auth, nil
+			}
 		}
 		receiver := node.NewServerApplication(&appInvoker{
 			callback: d.internal.callback,
diff --git a/services/mgmt/node/impl/impl_test.go b/services/mgmt/node/impl/impl_test.go
index e71690e..8794636 100644
--- a/services/mgmt/node/impl/impl_test.go
+++ b/services/mgmt/node/impl/impl_test.go
@@ -34,9 +34,13 @@
 	"veyron.io/veyron/veyron2/services/mgmt/application"
 	"veyron.io/veyron/veyron2/services/mgmt/logreader"
 	"veyron.io/veyron/veyron2/services/mgmt/node"
+	"veyron.io/veyron/veyron2/services/mgmt/pprof"
+	"veyron.io/veyron/veyron2/services/mgmt/stats"
 	"veyron.io/veyron/veyron2/services/mounttable"
+	"veyron.io/veyron/veyron2/services/mounttable/types"
 	"veyron.io/veyron/veyron2/verror"
 	"veyron.io/veyron/veyron2/vlog"
+	"veyron.io/veyron/veyron2/vom"
 
 	"veyron.io/veyron/veyron/lib/expect"
 	"veyron.io/veyron/veyron/lib/modules"
@@ -57,6 +61,9 @@
 )
 
 func init() {
+	// TODO(rthellend): Remove when vom2 is ready.
+	vom.Register(&types.MountedServer{})
+
 	modules.RegisterChild(execScriptCmd, "", execScript)
 	modules.RegisterChild(nodeManagerCmd, "", nodeManager)
 	modules.RegisterChild(appCmd, "", app)
@@ -974,7 +981,7 @@
 	nmh.Shutdown(os.Stderr, os.Stderr)
 }
 
-func TestNodeManagerGlobAndLogs(t *testing.T) {
+func TestNodeManagerGlobAndDebug(t *testing.T) {
 	sh, deferFn := createShellAndMountTable(t)
 	defer deferFn()
 
@@ -1033,31 +1040,45 @@
 			"apps/google naps/" + installID + "/" + instance1ID + "/logs/STDOUT-<timestamp>",
 			"apps/google naps/" + installID + "/" + instance1ID + "/logs/bin.INFO",
 			"apps/google naps/" + installID + "/" + instance1ID + "/logs/bin.<*>.INFO.<timestamp>",
+			"apps/google naps/" + installID + "/" + instance1ID + "/pprof",
+			"apps/google naps/" + installID + "/" + instance1ID + "/stats",
+			"apps/google naps/" + installID + "/" + instance1ID + "/stats/ipc",
+			"apps/google naps/" + installID + "/" + instance1ID + "/stats/system",
+			"apps/google naps/" + installID + "/" + instance1ID + "/stats/system/start-time-rfc1123",
+			"apps/google naps/" + installID + "/" + instance1ID + "/stats/system/start-time-unix",
 			"nm",
 		}},
 		{"nm/apps", "*", []string{"google naps"}},
 		{"nm/apps/google naps", "*", []string{installID}},
 		{"nm/apps/google naps/" + installID, "*", []string{instance1ID}},
-		{"nm/apps/google naps/" + installID + "/" + instance1ID, "*", []string{"logs"}},
+		{"nm/apps/google naps/" + installID + "/" + instance1ID, "*", []string{"logs", "pprof", "stats"}},
 		{"nm/apps/google naps/" + installID + "/" + instance1ID + "/logs", "*", []string{
 			"STDERR-<timestamp>",
 			"STDOUT-<timestamp>",
 			"bin.INFO",
 			"bin.<*>.INFO.<timestamp>",
 		}},
+		{"nm/apps/google naps/" + installID + "/" + instance1ID + "/stats/system", "start-time*", []string{"start-time-rfc1123", "start-time-unix"}},
 	}
 	logFileTimeStampRE := regexp.MustCompile("(STDOUT|STDERR)-[0-9]+$")
-	logFileTrimINFORE := regexp.MustCompile(`bin\..*\.INFO\.[0-9.-]+$`)
+	logFileTrimInfoRE := regexp.MustCompile(`bin\..*\.INFO\.[0-9.-]+$`)
 	logFileRemoveErrorFatalWarningRE := regexp.MustCompile("(ERROR|FATAL|WARNING)")
+	statsTrimRE := regexp.MustCompile("/stats/(ipc|system(/start-time.*)?)$")
 	for _, tc := range testcases {
 		results := doGlob(t, tc.name, tc.pattern)
 		filteredResults := []string{}
 		for _, name := range results {
+			// Keep only the stats object names that match this RE.
+			if strings.Contains(name, "/stats/") && !statsTrimRE.MatchString(name) {
+				continue
+			}
+			// Remove ERROR, WARNING, FATAL log files because
+			// they're not consistently there.
 			if logFileRemoveErrorFatalWarningRE.MatchString(name) {
 				continue
 			}
 			name = logFileTimeStampRE.ReplaceAllString(name, "$1-<timestamp>")
-			name = logFileTrimINFORE.ReplaceAllString(name, "bin.<*>.INFO.<timestamp>")
+			name = logFileTrimInfoRE.ReplaceAllString(name, "bin.<*>.INFO.<timestamp>")
 			filteredResults = append(filteredResults, name)
 		}
 		if !reflect.DeepEqual(filteredResults, tc.expected) {
@@ -1077,6 +1098,41 @@
 			t.Errorf("Size(%q) failed: %v", name, err)
 		}
 	}
+
+	// Call Value() on some of the stats objects.
+	objects := doGlob(t, "nm", "apps/google naps/"+installID+"/"+instance1ID+"/stats/system/start-time*")
+	if want, got := 2, len(objects); got != want {
+		t.Errorf("Unexpected number of matches. Got %d, want %d", got, want)
+	}
+	for _, obj := range objects {
+		name := naming.Join("nm", obj)
+		c, err := stats.BindStats(name)
+		if err != nil {
+			t.Fatalf("BindStats failed: %v", err)
+		}
+		if _, err := c.Value(rt.R().NewContext()); err != nil {
+			t.Errorf("Value(%q) failed: %v", name, err)
+		}
+	}
+
+	// Call CmdLine() on the pprof object.
+	{
+		name := "nm/apps/google naps/" + installID + "/" + instance1ID + "/pprof"
+		c, err := pprof.BindPProf(name)
+		if err != nil {
+			t.Fatalf("BindPProf failed: %v", err)
+		}
+		v, err := c.CmdLine(rt.R().NewContext())
+		if err != nil {
+			t.Errorf("CmdLine(%q) failed: %v", name, err)
+		}
+		if len(v) == 0 {
+			t.Fatalf("Unexpected empty cmdline: %v", v)
+		}
+		if got, want := filepath.Base(v[0]), "bin"; got != want {
+			t.Errorf("Unexpected value for argv[0]. Got %v, want %v", got, want)
+		}
+	}
 }
 
 func doGlob(t *testing.T, name, pattern string) []string {
diff --git a/services/mgmt/node/impl/proxy_invoker.go b/services/mgmt/node/impl/proxy_invoker.go
new file mode 100644
index 0000000..3897c50
--- /dev/null
+++ b/services/mgmt/node/impl/proxy_invoker.go
@@ -0,0 +1,135 @@
+package impl
+
+import (
+	"fmt"
+	"io"
+
+	"veyron.io/veyron/veyron2/ipc"
+	"veyron.io/veyron/veyron2/rt"
+	"veyron.io/veyron/veyron2/security"
+)
+
+// proxyInvoker is an ipc.Invoker implementation that proxies all requests
+// to a remote object, i.e. requests to <suffix> are forwarded to
+// <remote> transparently.
+//
+// remote is the name of the remote object.
+// label is the security label required to access this object.
+// sigStub is used to get the signature of the remote object.
+type proxyInvoker struct {
+	remote  string
+	label   security.Label
+	sigStub signatureStub
+}
+
+type signatureStub interface {
+	Signature(ipc.ServerCall) (ipc.ServiceSignature, error)
+}
+
+func (p *proxyInvoker) Prepare(method string, numArgs int) (argptrs []interface{}, label security.Label, err error) {
+	argptrs = make([]interface{}, numArgs)
+	for i, _ := range argptrs {
+		var x interface{}
+		argptrs[i] = &x
+	}
+	label = p.label
+	return
+}
+
+func (p *proxyInvoker) Invoke(method string, inCall ipc.ServerCall, argptrs []interface{}) (results []interface{}, err error) {
+	// We accept any values as argument and pass them through to the remote
+	// server.
+	args := make([]interface{}, len(argptrs))
+	for i, ap := range argptrs {
+		args[i] = ap
+	}
+	outCall, err := rt.R().Client().StartCall(inCall, p.remote, method, args)
+	if err != nil {
+		return nil, err
+	}
+
+	// Each RPC has a bi-directional stream, and there is no way to know in
+	// advance how much data will be sent in either direction, if any.
+	//
+	// This method (Invoke) must return when the remote server is done with
+	// the RPC, which is when outCall.Recv() returns EOF. When that happens,
+	// we need to call outCall.Finish() to get the return values, and then
+	// return these values to the client.
+	//
+	// While we are forwarding data from the server to the client, we must
+	// also forward data from the client to the server. This happens in a
+	// separate goroutine. This goroutine may return after Invoke has
+	// returned if the client doesn't call CloseSend() explicitly.
+	//
+	// Any error, other than EOF, will be returned to the client, if
+	// possible. The only situation where it is not possible to send an
+	// error to the client is when the error comes from forwarding data from
+	// the client to the server and Invoke has already returned or is about
+	// to return. In this case, the error is lost. So, it is possible that
+	// the client could successfully Send() data that the server doesn't
+	// actually receive if the server terminates the RPC while the data is
+	// in the proxy.
+	fwd := func(src, dst ipc.Stream, errors chan<- error) {
+		for {
+			var obj interface{}
+			switch err := src.Recv(&obj); err {
+			case io.EOF:
+				if call, ok := src.(ipc.Call); ok {
+					if err := call.CloseSend(); err != nil {
+						errors <- err
+					}
+				}
+				return
+			case nil:
+				break
+			default:
+				errors <- err
+				return
+			}
+			if err := dst.Send(obj); err != nil {
+				errors <- err
+				return
+			}
+		}
+	}
+	errors := make(chan error, 2)
+	go fwd(inCall, outCall, errors)
+	fwd(outCall, inCall, errors)
+	select {
+	case err := <-errors:
+		return nil, err
+	default:
+	}
+
+	nResults, err := p.numResults(method)
+	if err != nil {
+		return nil, err
+	}
+
+	// We accept any return values, without type checking, and return them
+	// to the client.
+	res := make([]interface{}, nResults)
+	for i := 0; i < len(res); i++ {
+		var foo interface{}
+		res[i] = &foo
+	}
+	err = outCall.Finish(res...)
+	results = make([]interface{}, len(res))
+	for i, r := range res {
+		results[i] = *r.(*interface{})
+	}
+	return results, err
+}
+
+// numResults returns the number of result values for the given method.
+func (p *proxyInvoker) numResults(method string) (int, error) {
+	sig, err := p.sigStub.Signature(nil)
+	if err != nil {
+		return 0, err
+	}
+	m, ok := sig.Methods[method]
+	if !ok {
+		return 0, fmt.Errorf("unknown method %q", method)
+	}
+	return len(m.OutArgs), nil
+}
diff --git a/services/mgmt/node/impl/proxy_invoker_test.go b/services/mgmt/node/impl/proxy_invoker_test.go
new file mode 100644
index 0000000..7f84099
--- /dev/null
+++ b/services/mgmt/node/impl/proxy_invoker_test.go
@@ -0,0 +1,107 @@
+package impl
+
+import (
+	"reflect"
+	"sort"
+	"testing"
+
+	"veyron.io/veyron/veyron2/ipc"
+	"veyron.io/veyron/veyron2/naming"
+	"veyron.io/veyron/veyron2/rt"
+	"veyron.io/veyron/veyron2/security"
+	"veyron.io/veyron/veyron2/services/mgmt/stats"
+	"veyron.io/veyron/veyron2/services/mounttable"
+
+	"veyron.io/veyron/veyron/profiles"
+)
+
+func TestProxyInvoker(t *testing.T) {
+	r := rt.R()
+
+	// server1 is a normal server with a nil dispatcher.
+	server1, err := r.NewServer()
+	if err != nil {
+		t.Fatalf("NewServer: %v", err)
+	}
+	defer server1.Stop()
+	ep1, err := server1.Listen(profiles.LocalListenSpec)
+	if err != nil {
+		t.Fatalf("Listen: %v", err)
+	}
+	if err := server1.Serve("", nil); err != nil {
+		t.Fatalf("server1.Serve: %v", err)
+	}
+
+	// server2 proxies requests to <suffix> to server1/__debug/stats/<suffix>
+	server2, err := r.NewServer()
+	if err != nil {
+		t.Fatalf("NewServer: %v", err)
+	}
+	defer server2.Stop()
+	ep2, err := server2.Listen(profiles.LocalListenSpec)
+	if err != nil {
+		t.Fatalf("Listen: %v", err)
+	}
+	disp := &proxyDispatcher{
+		naming.JoinAddressName(ep1.String(), "__debug/stats"),
+		security.Label(security.AllLabels),
+		&stats.ServerStubStats{},
+	}
+	if err := server2.Serve("", disp); err != nil {
+		t.Fatalf("server2.Serve: %v", err)
+	}
+
+	// Call Value()
+	name := naming.JoinAddressName(ep2.String(), "system/start-time-rfc1123")
+	c, err := stats.BindStats(name)
+	if err != nil {
+		t.Fatalf("BindStats error: %v", err)
+	}
+	if _, err := c.Value(r.NewContext()); err != nil {
+		t.Errorf("%q.Value() error: %v", name, err)
+	}
+
+	// Call Glob()
+	results := doGlob(t, naming.JoinAddressName(ep2.String(), "system"), "start-time-*")
+	expected := []string{
+		"start-time-rfc1123",
+		"start-time-unix",
+	}
+	if !reflect.DeepEqual(results, expected) {
+		t.Errorf("unexpected results. Got %q, want %q", results, expected)
+	}
+}
+
+func doGlob(t *testing.T, name, pattern string) []string {
+	c, err := mounttable.BindGlobbable(name)
+	if err != nil {
+		t.Fatalf("BindGlobbable failed: %v", err)
+	}
+	stream, err := c.Glob(rt.R().NewContext(), pattern)
+	if err != nil {
+		t.Errorf("Glob failed: %v", err)
+	}
+	results := []string{}
+	iterator := stream.RecvStream()
+	for iterator.Advance() {
+		results = append(results, iterator.Value().Name)
+	}
+	if err := iterator.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", err)
+	}
+	if err := stream.Finish(); err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+	sort.Strings(results)
+	return results
+}
+
+type proxyDispatcher struct {
+	remote  string
+	label   security.Label
+	sigStub signatureStub
+}
+
+func (d *proxyDispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
+	return &proxyInvoker{naming.Join(d.remote, suffix), d.label, d.sigStub}, nil, nil
+}