veyron/services/mgmt/node/impl: Add proxy invoker for pprof and stats
proxyInvoker is an ipc.Invoker implementation that proxies all requests
to a remote object, i.e. requests to <suffix> are forwarded to
<remote>/<suffix> transparently.
It is used by the node manager to proxy the stats and pprof objects of
running apps.
Change-Id: I9a07b76edebd879d54bea4d6d913194d5d86a86e
diff --git a/services/mgmt/node/impl/app_invoker.go b/services/mgmt/node/impl/app_invoker.go
index e14dc1e..4de936a 100644
--- a/services/mgmt/node/impl/app_invoker.go
+++ b/services/mgmt/node/impl/app_invoker.go
@@ -794,10 +794,11 @@
type treeNode struct {
children map[string]*treeNode
+ remote string // Used to implement Glob for remote objects.
}
func newTreeNode() *treeNode {
- return &treeNode{make(map[string]*treeNode)}
+ return &treeNode{children: make(map[string]*treeNode)}
}
func (n *treeNode) find(names []string, create bool) *treeNode {
@@ -862,7 +863,8 @@
}
for _, path := range instances {
instanceDir := filepath.Dir(path)
- if _, err := loadInstanceInfo(instanceDir); err != nil {
+ info, err := loadInstanceInfo(instanceDir)
+ if err != nil {
continue
}
relpath, _ := filepath.Rel(i.config.Root, path)
@@ -877,6 +879,14 @@
if title, ok := appIDMap[appID]; ok {
n := tree.find([]string{title, installID, instanceID, "logs"}, true)
i.addLogFiles(n, filepath.Join(instanceDir, "logs"))
+
+ if instanceStateIs(instanceDir, started) {
+ // Set the name of the remote objects that should handle Glob.
+ for _, obj := range []string{"pprof", "stats"} {
+ n = tree.find([]string{title, installID, instanceID, obj}, true)
+ n.remote = naming.JoinAddressName(info.AppCycleMgrName, naming.Join("__debug", obj))
+ }
+ }
}
}
return tree
@@ -903,11 +913,15 @@
if n == nil {
return errInvalidSuffix
}
- i.globStep("", g, n, stream)
+ i.globStep(ctx, "", g, n, stream)
return nil
}
-func (i *appInvoker) globStep(prefix string, g *glob.Glob, n *treeNode, stream mounttable.GlobbableServiceGlobStream) {
+func (i *appInvoker) globStep(ctx ipc.ServerContext, prefix string, g *glob.Glob, n *treeNode, stream mounttable.GlobbableServiceGlobStream) {
+ if n.remote != "" {
+ remoteGlob(ctx, n.remote, prefix, g.String(), stream)
+ return
+ }
if g.Len() == 0 {
stream.SendStream().Send(types.MountEntry{Name: prefix})
}
@@ -916,7 +930,35 @@
}
for name, child := range n.children {
if ok, _, left := g.MatchInitialSegment(name); ok {
- i.globStep(naming.Join(prefix, name), left, child, stream)
+ i.globStep(ctx, naming.Join(prefix, name), left, child, stream)
}
}
}
+
+func remoteGlob(ctx ipc.ServerContext, remote, prefix, pattern string, stream mounttable.GlobbableServiceGlobStream) {
+ c, err := mounttable.BindGlobbable(remote)
+ if err != nil {
+ vlog.VI(1).Infof("BindGlobbable(%q): %v", remote, err)
+ return
+ }
+ call, err := c.Glob(ctx, pattern)
+ if err != nil {
+ vlog.VI(1).Infof("%q.Glob(%q): %v", remote, pattern, err)
+ return
+ }
+ it := call.RecvStream()
+ sender := stream.SendStream()
+ for it.Advance() {
+ me := it.Value()
+ me.Name = naming.Join(prefix, me.Name)
+ sender.Send(me)
+ }
+ if err := it.Err(); err != nil {
+ vlog.VI(1).Infof("%q.Glob(%q): %v", remote, pattern, err)
+ return
+ }
+ if err := call.Finish(); err != nil {
+ vlog.VI(1).Infof("%q.Glob(%q): %v", remote, pattern, err)
+ return
+ }
+}
diff --git a/services/mgmt/node/impl/app_state.go b/services/mgmt/node/impl/app_state.go
index df5049f..9fbc1c2 100644
--- a/services/mgmt/node/impl/app_state.go
+++ b/services/mgmt/node/impl/app_state.go
@@ -79,6 +79,12 @@
}
}
+func instanceStateIs(instanceDir string, state instanceState) bool {
+ if _, err := os.Stat(filepath.Join(instanceDir, state.String())); err != nil {
+ return false
+ }
+ return true
+}
func transitionInstance(instanceDir string, initial, target instanceState) error {
return transitionState(instanceDir, initial, target)
}
diff --git a/services/mgmt/node/impl/dispatcher.go b/services/mgmt/node/impl/dispatcher.go
index 655e758..985fff0 100644
--- a/services/mgmt/node/impl/dispatcher.go
+++ b/services/mgmt/node/impl/dispatcher.go
@@ -24,6 +24,8 @@
"veyron.io/veyron/veyron2/rt"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/services/mgmt/node"
+ "veyron.io/veyron/veyron2/services/mgmt/pprof"
+ "veyron.io/veyron/veyron2/services/mgmt/stats"
"veyron.io/veyron/veyron2/services/security/access"
"veyron.io/veyron/veyron2/verror"
"veyron.io/veyron/veyron2/vlog"
@@ -247,14 +249,43 @@
})
return ipc.ReflectInvoker(receiver), d.auth, nil
case appsSuffix:
- if method != "Glob" && len(components) >= 5 && components[4] == "logs" {
+ // Glob requests are handled by appInvoker, except for pprof and
+ // stats objects which handle Glob themselves.
+ // Requests to apps/*/*/*/logs are handled locally by LogFileInvoker.
+ // Requests to apps/*/*/*/pprof are proxied to the apps' __debug/pprof object.
+ // Requests to apps/*/*/*/stats are proxied to the apps' __debug/stats object.
+ // Everything else is handled by appInvoker.
+ if len(components) >= 5 && (method != "Glob" || components[4] != "logs") {
appInstanceDir, err := instanceDir(d.config.Root, components[1:4])
if err != nil {
return nil, nil, err
}
- logsDir := filepath.Join(appInstanceDir, "logs")
- suffix := naming.Join(components[5:]...)
- return logsimpl.NewLogFileInvoker(logsDir, suffix), d.auth, nil
+ switch kind := components[4]; kind {
+ case "logs":
+ logsDir := filepath.Join(appInstanceDir, "logs")
+ suffix := naming.Join(components[5:]...)
+ return logsimpl.NewLogFileInvoker(logsDir, suffix), d.auth, nil
+ case "pprof", "stats":
+ info, err := loadInstanceInfo(appInstanceDir)
+ if err != nil {
+ return nil, nil, err
+ }
+ if !instanceStateIs(appInstanceDir, started) {
+ return nil, nil, errInvalidSuffix
+ }
+ var label security.Label
+ var sigStub signatureStub
+ if kind == "pprof" {
+ label = security.DebugLabel
+ sigStub = &pprof.ServerStubPProf{}
+ } else {
+ label = security.DebugLabel | security.MonitoringLabel
+ sigStub = &stats.ServerStubStats{}
+ }
+ suffix := naming.Join("__debug", naming.Join(components[4:]...))
+ remote := naming.JoinAddressName(info.AppCycleMgrName, suffix)
+ return &proxyInvoker{remote, label, sigStub}, d.auth, nil
+ }
}
receiver := node.NewServerApplication(&appInvoker{
callback: d.internal.callback,
diff --git a/services/mgmt/node/impl/impl_test.go b/services/mgmt/node/impl/impl_test.go
index e71690e..8794636 100644
--- a/services/mgmt/node/impl/impl_test.go
+++ b/services/mgmt/node/impl/impl_test.go
@@ -34,9 +34,13 @@
"veyron.io/veyron/veyron2/services/mgmt/application"
"veyron.io/veyron/veyron2/services/mgmt/logreader"
"veyron.io/veyron/veyron2/services/mgmt/node"
+ "veyron.io/veyron/veyron2/services/mgmt/pprof"
+ "veyron.io/veyron/veyron2/services/mgmt/stats"
"veyron.io/veyron/veyron2/services/mounttable"
+ "veyron.io/veyron/veyron2/services/mounttable/types"
"veyron.io/veyron/veyron2/verror"
"veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vom"
"veyron.io/veyron/veyron/lib/expect"
"veyron.io/veyron/veyron/lib/modules"
@@ -57,6 +61,9 @@
)
func init() {
+ // TODO(rthellend): Remove when vom2 is ready.
+ vom.Register(&types.MountedServer{})
+
modules.RegisterChild(execScriptCmd, "", execScript)
modules.RegisterChild(nodeManagerCmd, "", nodeManager)
modules.RegisterChild(appCmd, "", app)
@@ -974,7 +981,7 @@
nmh.Shutdown(os.Stderr, os.Stderr)
}
-func TestNodeManagerGlobAndLogs(t *testing.T) {
+func TestNodeManagerGlobAndDebug(t *testing.T) {
sh, deferFn := createShellAndMountTable(t)
defer deferFn()
@@ -1033,31 +1040,45 @@
"apps/google naps/" + installID + "/" + instance1ID + "/logs/STDOUT-<timestamp>",
"apps/google naps/" + installID + "/" + instance1ID + "/logs/bin.INFO",
"apps/google naps/" + installID + "/" + instance1ID + "/logs/bin.<*>.INFO.<timestamp>",
+ "apps/google naps/" + installID + "/" + instance1ID + "/pprof",
+ "apps/google naps/" + installID + "/" + instance1ID + "/stats",
+ "apps/google naps/" + installID + "/" + instance1ID + "/stats/ipc",
+ "apps/google naps/" + installID + "/" + instance1ID + "/stats/system",
+ "apps/google naps/" + installID + "/" + instance1ID + "/stats/system/start-time-rfc1123",
+ "apps/google naps/" + installID + "/" + instance1ID + "/stats/system/start-time-unix",
"nm",
}},
{"nm/apps", "*", []string{"google naps"}},
{"nm/apps/google naps", "*", []string{installID}},
{"nm/apps/google naps/" + installID, "*", []string{instance1ID}},
- {"nm/apps/google naps/" + installID + "/" + instance1ID, "*", []string{"logs"}},
+ {"nm/apps/google naps/" + installID + "/" + instance1ID, "*", []string{"logs", "pprof", "stats"}},
{"nm/apps/google naps/" + installID + "/" + instance1ID + "/logs", "*", []string{
"STDERR-<timestamp>",
"STDOUT-<timestamp>",
"bin.INFO",
"bin.<*>.INFO.<timestamp>",
}},
+ {"nm/apps/google naps/" + installID + "/" + instance1ID + "/stats/system", "start-time*", []string{"start-time-rfc1123", "start-time-unix"}},
}
logFileTimeStampRE := regexp.MustCompile("(STDOUT|STDERR)-[0-9]+$")
- logFileTrimINFORE := regexp.MustCompile(`bin\..*\.INFO\.[0-9.-]+$`)
+ logFileTrimInfoRE := regexp.MustCompile(`bin\..*\.INFO\.[0-9.-]+$`)
logFileRemoveErrorFatalWarningRE := regexp.MustCompile("(ERROR|FATAL|WARNING)")
+ statsTrimRE := regexp.MustCompile("/stats/(ipc|system(/start-time.*)?)$")
for _, tc := range testcases {
results := doGlob(t, tc.name, tc.pattern)
filteredResults := []string{}
for _, name := range results {
+ // Keep only the stats object names that match this RE.
+ if strings.Contains(name, "/stats/") && !statsTrimRE.MatchString(name) {
+ continue
+ }
+ // Remove ERROR, WARNING, FATAL log files because
+ // they're not consistently there.
if logFileRemoveErrorFatalWarningRE.MatchString(name) {
continue
}
name = logFileTimeStampRE.ReplaceAllString(name, "$1-<timestamp>")
- name = logFileTrimINFORE.ReplaceAllString(name, "bin.<*>.INFO.<timestamp>")
+ name = logFileTrimInfoRE.ReplaceAllString(name, "bin.<*>.INFO.<timestamp>")
filteredResults = append(filteredResults, name)
}
if !reflect.DeepEqual(filteredResults, tc.expected) {
@@ -1077,6 +1098,41 @@
t.Errorf("Size(%q) failed: %v", name, err)
}
}
+
+ // Call Value() on some of the stats objects.
+ objects := doGlob(t, "nm", "apps/google naps/"+installID+"/"+instance1ID+"/stats/system/start-time*")
+ if want, got := 2, len(objects); got != want {
+ t.Errorf("Unexpected number of matches. Got %d, want %d", got, want)
+ }
+ for _, obj := range objects {
+ name := naming.Join("nm", obj)
+ c, err := stats.BindStats(name)
+ if err != nil {
+ t.Fatalf("BindStats failed: %v", err)
+ }
+ if _, err := c.Value(rt.R().NewContext()); err != nil {
+ t.Errorf("Value(%q) failed: %v", name, err)
+ }
+ }
+
+ // Call CmdLine() on the pprof object.
+ {
+ name := "nm/apps/google naps/" + installID + "/" + instance1ID + "/pprof"
+ c, err := pprof.BindPProf(name)
+ if err != nil {
+ t.Fatalf("BindPProf failed: %v", err)
+ }
+ v, err := c.CmdLine(rt.R().NewContext())
+ if err != nil {
+ t.Errorf("CmdLine(%q) failed: %v", name, err)
+ }
+ if len(v) == 0 {
+ t.Fatalf("Unexpected empty cmdline: %v", v)
+ }
+ if got, want := filepath.Base(v[0]), "bin"; got != want {
+ t.Errorf("Unexpected value for argv[0]. Got %v, want %v", got, want)
+ }
+ }
}
func doGlob(t *testing.T, name, pattern string) []string {
diff --git a/services/mgmt/node/impl/proxy_invoker.go b/services/mgmt/node/impl/proxy_invoker.go
new file mode 100644
index 0000000..3897c50
--- /dev/null
+++ b/services/mgmt/node/impl/proxy_invoker.go
@@ -0,0 +1,135 @@
+package impl
+
+import (
+ "fmt"
+ "io"
+
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/security"
+)
+
+// proxyInvoker is an ipc.Invoker implementation that proxies all requests
+// to a remote object, i.e. requests to <suffix> are forwarded to
+// <remote> transparently.
+//
+// remote is the name of the remote object.
+// label is the security label required to access this object.
+// sigStub is used to get the signature of the remote object.
+type proxyInvoker struct {
+ remote string
+ label security.Label
+ sigStub signatureStub
+}
+
+type signatureStub interface {
+ Signature(ipc.ServerCall) (ipc.ServiceSignature, error)
+}
+
+func (p *proxyInvoker) Prepare(method string, numArgs int) (argptrs []interface{}, label security.Label, err error) {
+ argptrs = make([]interface{}, numArgs)
+ for i, _ := range argptrs {
+ var x interface{}
+ argptrs[i] = &x
+ }
+ label = p.label
+ return
+}
+
+func (p *proxyInvoker) Invoke(method string, inCall ipc.ServerCall, argptrs []interface{}) (results []interface{}, err error) {
+ // We accept any values as argument and pass them through to the remote
+ // server.
+ args := make([]interface{}, len(argptrs))
+ for i, ap := range argptrs {
+ args[i] = ap
+ }
+ outCall, err := rt.R().Client().StartCall(inCall, p.remote, method, args)
+ if err != nil {
+ return nil, err
+ }
+
+ // Each RPC has a bi-directional stream, and there is no way to know in
+ // advance how much data will be sent in either direction, if any.
+ //
+ // This method (Invoke) must return when the remote server is done with
+ // the RPC, which is when outCall.Recv() returns EOF. When that happens,
+ // we need to call outCall.Finish() to get the return values, and then
+ // return these values to the client.
+ //
+ // While we are forwarding data from the server to the client, we must
+ // also forward data from the client to the server. This happens in a
+ // separate goroutine. This goroutine may return after Invoke has
+ // returned if the client doesn't call CloseSend() explicitly.
+ //
+ // Any error, other than EOF, will be returned to the client, if
+ // possible. The only situation where it is not possible to send an
+ // error to the client is when the error comes from forwarding data from
+ // the client to the server and Invoke has already returned or is about
+ // to return. In this case, the error is lost. So, it is possible that
+ // the client could successfully Send() data that the server doesn't
+ // actually receive if the server terminates the RPC while the data is
+ // in the proxy.
+ fwd := func(src, dst ipc.Stream, errors chan<- error) {
+ for {
+ var obj interface{}
+ switch err := src.Recv(&obj); err {
+ case io.EOF:
+ if call, ok := src.(ipc.Call); ok {
+ if err := call.CloseSend(); err != nil {
+ errors <- err
+ }
+ }
+ return
+ case nil:
+ break
+ default:
+ errors <- err
+ return
+ }
+ if err := dst.Send(obj); err != nil {
+ errors <- err
+ return
+ }
+ }
+ }
+ errors := make(chan error, 2)
+ go fwd(inCall, outCall, errors)
+ fwd(outCall, inCall, errors)
+ select {
+ case err := <-errors:
+ return nil, err
+ default:
+ }
+
+ nResults, err := p.numResults(method)
+ if err != nil {
+ return nil, err
+ }
+
+ // We accept any return values, without type checking, and return them
+ // to the client.
+ res := make([]interface{}, nResults)
+ for i := 0; i < len(res); i++ {
+ var foo interface{}
+ res[i] = &foo
+ }
+ err = outCall.Finish(res...)
+ results = make([]interface{}, len(res))
+ for i, r := range res {
+ results[i] = *r.(*interface{})
+ }
+ return results, err
+}
+
+// numResults returns the number of result values for the given method.
+func (p *proxyInvoker) numResults(method string) (int, error) {
+ sig, err := p.sigStub.Signature(nil)
+ if err != nil {
+ return 0, err
+ }
+ m, ok := sig.Methods[method]
+ if !ok {
+ return 0, fmt.Errorf("unknown method %q", method)
+ }
+ return len(m.OutArgs), nil
+}
diff --git a/services/mgmt/node/impl/proxy_invoker_test.go b/services/mgmt/node/impl/proxy_invoker_test.go
new file mode 100644
index 0000000..7f84099
--- /dev/null
+++ b/services/mgmt/node/impl/proxy_invoker_test.go
@@ -0,0 +1,107 @@
+package impl
+
+import (
+ "reflect"
+ "sort"
+ "testing"
+
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/naming"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/services/mgmt/stats"
+ "veyron.io/veyron/veyron2/services/mounttable"
+
+ "veyron.io/veyron/veyron/profiles"
+)
+
+func TestProxyInvoker(t *testing.T) {
+ r := rt.R()
+
+ // server1 is a normal server with a nil dispatcher.
+ server1, err := r.NewServer()
+ if err != nil {
+ t.Fatalf("NewServer: %v", err)
+ }
+ defer server1.Stop()
+ ep1, err := server1.Listen(profiles.LocalListenSpec)
+ if err != nil {
+ t.Fatalf("Listen: %v", err)
+ }
+ if err := server1.Serve("", nil); err != nil {
+ t.Fatalf("server1.Serve: %v", err)
+ }
+
+ // server2 proxies requests to <suffix> to server1/__debug/stats/<suffix>
+ server2, err := r.NewServer()
+ if err != nil {
+ t.Fatalf("NewServer: %v", err)
+ }
+ defer server2.Stop()
+ ep2, err := server2.Listen(profiles.LocalListenSpec)
+ if err != nil {
+ t.Fatalf("Listen: %v", err)
+ }
+ disp := &proxyDispatcher{
+ naming.JoinAddressName(ep1.String(), "__debug/stats"),
+ security.Label(security.AllLabels),
+ &stats.ServerStubStats{},
+ }
+ if err := server2.Serve("", disp); err != nil {
+ t.Fatalf("server2.Serve: %v", err)
+ }
+
+ // Call Value()
+ name := naming.JoinAddressName(ep2.String(), "system/start-time-rfc1123")
+ c, err := stats.BindStats(name)
+ if err != nil {
+ t.Fatalf("BindStats error: %v", err)
+ }
+ if _, err := c.Value(r.NewContext()); err != nil {
+ t.Errorf("%q.Value() error: %v", name, err)
+ }
+
+ // Call Glob()
+ results := doGlob(t, naming.JoinAddressName(ep2.String(), "system"), "start-time-*")
+ expected := []string{
+ "start-time-rfc1123",
+ "start-time-unix",
+ }
+ if !reflect.DeepEqual(results, expected) {
+ t.Errorf("unexpected results. Got %q, want %q", results, expected)
+ }
+}
+
+func doGlob(t *testing.T, name, pattern string) []string {
+ c, err := mounttable.BindGlobbable(name)
+ if err != nil {
+ t.Fatalf("BindGlobbable failed: %v", err)
+ }
+ stream, err := c.Glob(rt.R().NewContext(), pattern)
+ if err != nil {
+ t.Errorf("Glob failed: %v", err)
+ }
+ results := []string{}
+ iterator := stream.RecvStream()
+ for iterator.Advance() {
+ results = append(results, iterator.Value().Name)
+ }
+ if err := iterator.Err(); err != nil {
+ t.Errorf("unexpected stream error: %v", err)
+ }
+ if err := stream.Finish(); err != nil {
+ t.Errorf("Finish failed: %v", err)
+ }
+ sort.Strings(results)
+ return results
+}
+
+type proxyDispatcher struct {
+ remote string
+ label security.Label
+ sigStub signatureStub
+}
+
+func (d *proxyDispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
+ return &proxyInvoker{naming.Join(d.remote, suffix), d.label, d.sigStub}, nil, nil
+}