veyron/runtimes/google/rt: Replace http debug server with exported stats.
This change adds exported function objects. When the value of the object is
requested, the user-supplied function is called and the value that it returns
becomes the value of the object.
This kind of pattern is dangerous because users could potentially block the
stats interface indefinitely. For the purpose that I have in mind (also
in the change), I don't see any good alternatives. To mitigate the risk,
if the user's function doesn't return within 100 ms, the previous value is used
instead.
Purpose: export the stream manager debug information. The format is
the same as what's on the http debug page and is suitable only for human
readers. This allows us to get rid of the http server.
We can explore better ways to export stream manager data later on.
Change-Id: I386252e5dbfc9c1a8c8b74291e359b463343c8bb
diff --git a/lib/stats/func.go b/lib/stats/func.go
new file mode 100644
index 0000000..2a9c193
--- /dev/null
+++ b/lib/stats/func.go
@@ -0,0 +1,86 @@
+package stats
+
+import (
+ "sync"
+ "time"
+)
+
+// NewIntegerFunc creates a new StatsObject with the given name. The function
+// argument must return an int64 value.
+func NewIntegerFunc(name string, function func() int64) StatsObject {
+ return newFunc(name, func() interface{} { return function() })
+}
+
+// NewFloatFunc creates a new StatsObject with the given name. The function
+// argument must return a float64 value.
+func NewFloatFunc(name string, function func() float64) StatsObject {
+ return newFunc(name, func() interface{} { return function() })
+}
+
+// NewStringFunc creates a new StatsObject with the given name. The function
+// argument must return a string value.
+func NewStringFunc(name string, function func() string) StatsObject {
+ return newFunc(name, func() interface{} { return function() })
+}
+
+func newFunc(name string, function func() interface{}) StatsObject {
+ f := funcType{function: function}
+ lock.Lock()
+ defer lock.Unlock()
+ node := findNodeLocked(name, true)
+ node.object = &f
+ return &f
+}
+
+// funcType implements the StatsObject interface by calling a user provided
+// function.
+type funcType struct {
+ mu sync.Mutex
+ function func() interface{}
+ waiters []chan interface{} // GUARDED_BY(mu)
+ lastValue interface{} // GUARDED_BY(mu)
+}
+
+// LastUpdate returns always returns the current time for this type of
+// StatsObject because Value() is expected to get a current (fresh) value.
+func (f *funcType) LastUpdate() time.Time {
+ return time.Now()
+}
+
+// Value returns the value returned by the object's function. If the function
+// takes more than 100 ms to return, the last value is used.
+func (f *funcType) Value() interface{} {
+ // There are two values that can be written to the channel, one from
+ // fetch() and one from time.AfterFunc(). In some cases, they will both
+ // be written but only one will be read. A buffer size of 1 would be
+ // sufficient to avoid deadlocks, but 2 will guarantee that fetch()
+ // never blocks on a channel.
+ ch := make(chan interface{}, 2)
+ f.mu.Lock()
+ if f.waiters = append(f.waiters, ch); len(f.waiters) == 1 {
+ go f.fetch()
+ }
+ f.mu.Unlock()
+
+ defer time.AfterFunc(100*time.Millisecond, func() {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ ch <- f.lastValue
+ }).Stop()
+
+ return <-ch
+}
+
+func (f *funcType) fetch() {
+ v := f.function()
+
+ f.mu.Lock()
+ waiters := f.waiters
+ f.waiters = nil
+ f.lastValue = v
+ f.mu.Unlock()
+
+ for _, c := range waiters {
+ c <- v
+ }
+}
diff --git a/lib/stats/stats_test.go b/lib/stats/stats_test.go
index 9a3f6b1..0773280 100644
--- a/lib/stats/stats_test.go
+++ b/lib/stats/stats_test.go
@@ -316,6 +316,50 @@
}
}
+func TestFunc(t *testing.T) {
+ libstats.NewIntegerFunc("testing/integer", func() int64 { return 123 })
+ libstats.NewFloatFunc("testing/float", func() float64 { return 456.789 })
+ libstats.NewStringFunc("testing/string", func() string { return "Hello World" })
+ ch := make(chan int64, 5)
+ libstats.NewIntegerFunc("testing/slowint", func() int64 {
+ return <-ch
+ })
+
+ testcases := []struct {
+ name string
+ expected interface{}
+ }{
+ {"testing/integer", int64(123)},
+ {"testing/float", float64(456.789)},
+ {"testing/string", "Hello World"},
+ {"testing/slowint", nil}, // Times out
+ }
+ for _, tc := range testcases {
+ checkVariable(t, tc.name, tc.expected)
+ }
+ checkVariable(t, "testing/slowint", nil) // Times out
+ checkVariable(t, "testing/slowint", nil) // Times out
+ ch <- int64(0)
+ checkVariable(t, "testing/slowint", int64(0)) // New value
+ checkVariable(t, "testing/slowint", int64(0)) // Times out
+ for i := 1; i <= 5; i++ {
+ ch <- int64(i)
+ }
+ for i := 1; i <= 5; i++ {
+ checkVariable(t, "testing/slowint", int64(i)) // New value each time
+ }
+}
+
+func checkVariable(t *testing.T, name string, expected interface{}) {
+ got, err := libstats.Value(name)
+ if err != nil {
+ t.Errorf("unexpected error for %q: %v", name, err)
+ }
+ if got != expected {
+ t.Errorf("unexpected result for %q. Got %v, want %v", name, got, expected)
+ }
+}
+
func TestDelete(t *testing.T) {
_ = libstats.NewInteger("a/b/c/d")
if _, err := libstats.GetStatsObject("a/b/c/d"); err != nil {
diff --git a/runtimes/google/ipc/stream/manager/http.go b/runtimes/google/ipc/stream/manager/http.go
deleted file mode 100644
index aa5540d..0000000
--- a/runtimes/google/ipc/stream/manager/http.go
+++ /dev/null
@@ -1,31 +0,0 @@
-package manager
-
-import (
- "fmt"
- "net/http"
-
- "veyron.io/veyron/veyron2/ipc/stream"
-)
-
-// HTTPHandler returns an http.Handler that dumps out debug information from
-// the stream.Manager.
-//
-// If the stream.Manager was not created by InternalNew in this package, an error
-// will be returned instead.
-//
-// TODO(ashankar): This should be made a secure handler that only exposes information
-// on VCs that an identity authenticated over HTTPS has access to.
-func HTTPHandler(mgr stream.Manager) (http.Handler, error) {
- m, ok := mgr.(*manager)
- if !ok {
- return nil, fmt.Errorf("unrecognized stream.Manager implementation: %T", mgr)
- }
- return httpHandler{m}, nil
-}
-
-type httpHandler struct{ m *manager }
-
-func (h httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "text/plain; charset=UTF-8")
- w.Write([]byte(h.m.DebugString()))
-}
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index 7afdad3..bc8284e 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -8,6 +8,7 @@
"strings"
"sync"
+ "veyron.io/veyron/veyron/lib/stats"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/crypto"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vif"
"veyron.io/veyron/veyron/runtimes/google/ipc/version"
@@ -28,12 +29,15 @@
// placed inside veyron/runtimes/google. Code outside the
// veyron/runtimes/google/* packages should never call this method.
func InternalNew(rid naming.RoutingID) stream.Manager {
- return &manager{
+ m := &manager{
rid: rid,
vifs: vif.NewSet(),
sessionCache: crypto.NewTLSClientSessionCache(),
listeners: make(map[listener]bool),
+ statsName: naming.Join("ipc", "stream", "routing-id", rid.String(), "debug"),
}
+ stats.NewStringFunc(m.statsName, m.DebugString)
+ return m
}
type manager struct {
@@ -44,6 +48,8 @@
muListeners sync.Mutex
listeners map[listener]bool // GUARDED_BY(muListeners)
shutdown bool // GUARDED_BY(muListeners)
+
+ statsName string
}
var _ stream.Manager = (*manager)(nil)
@@ -193,6 +199,7 @@
}
func (m *manager) Shutdown() {
+ stats.Delete(m.statsName)
m.muListeners.Lock()
if m.shutdown {
m.muListeners.Unlock()
diff --git a/runtimes/google/rt/http.go b/runtimes/google/rt/http.go
deleted file mode 100644
index 5ed3237..0000000
--- a/runtimes/google/rt/http.go
+++ /dev/null
@@ -1,104 +0,0 @@
-package rt
-
-import (
- "fmt"
- "html/template"
- "net"
- "net/http"
- "sync"
- // TODO(ashankar,cnicolaou): Remove net/http/pprof before "release"
- // since it installs default HTTP handlers.
- "net/http/pprof"
-
- "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
- "veyron.io/veyron/veyron2/ipc/stream"
- "veyron.io/veyron/veyron2/naming"
- "veyron.io/veyron/veyron2/vlog"
-)
-
-type debugServer struct {
- init sync.Once
- addr string
- mux *http.ServeMux
-
- mu sync.RWMutex
- rids []naming.RoutingID // GUARDED_BY(mu)
-}
-
-func (rt *vrt) initHTTPDebugServer() {
- // TODO(ashankar,cnicolaou): Change the default debug address to the empty
- // string.
- // In March 2014 this was temporarily set to "127.0.0.1:0" so that the
- // debugging HTTP server always runs, which was useful during initial veyron
- // development. We restrict it in this way to avoid annoying firewall warnings
- // and to provide a modicum of security.
- rt.debug.addr = "127.0.0.1:0"
- rt.debug.mux = http.NewServeMux()
-}
-
-func (rt *vrt) startHTTPDebugServerOnce() {
- rt.debug.init.Do(func() { startHTTPDebugServer(&rt.debug) })
-}
-
-func startHTTPDebugServer(info *debugServer) {
- if len(info.addr) == 0 {
- return
- }
- ln, err := net.Listen("tcp", info.addr)
- if err != nil {
- vlog.Errorf("Failed to setup debugging HTTP server. net.Listen(%q, %q): %v", "tcp", info.addr, err)
- return
- }
- vlog.Infof("Starting HTTP debug server. See http://%v/debug", ln.Addr())
- mux := info.mux
- mux.Handle("/debug/", info)
- // Since a custom ServeMux is used, net/http/pprof.init needs to be replicated
- mux.HandleFunc("/debug/pprof/", pprof.Index)
- mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
- mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
- mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
- go func() {
- server := &http.Server{Addr: ln.Addr().String(), Handler: mux}
- if err := server.Serve(ln); err != nil {
- vlog.Infof("Debug HTTP server exited: %v", err)
- }
- }()
-}
-
-func (s *debugServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- if err := tmplMain.Execute(w, s.rids); err != nil {
- vlog.Errorf("Error executing HTTP template: %v", err)
- }
-}
-
-func (s *debugServer) RegisterStreamManager(rid naming.RoutingID, sm stream.Manager) error {
- h, err := manager.HTTPHandler(sm)
- if err != nil {
- return fmt.Errorf("failed to setup HTTP handler for ipc/stream/Manager implementation: %v", err)
- }
- s.mu.Lock()
- s.rids = append(s.rids, rid)
- s.mu.Unlock()
- s.mux.Handle(fmt.Sprintf("/debug/veyron/%v", rid), h)
- return nil
-}
-
-var (
- tmplMain = template.Must(template.New("Debug").Parse(`<!doctype html>
-<html>
-<head>
-<meta charset="UTF-8">
-<title>Veyron Debug Server</title>
-</head>
-<body>
-<ul>
-<li><a href="/debug/pprof/">profiling</a></li>
-{{range .}}
-<li>ipc/stream/Manager for RoutingID <a href="/debug/veyron/{{.}}">{{.}}</a></li>
-{{end}}
-</ul>
-</body>
-</html>`))
-)
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index 65e1474..7b3251c 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -153,8 +153,6 @@
return nil, fmt.Errorf("failed to create ipc/stream/Manager: %v", err)
}
}
- // Start the http debug server exactly once for this runtime.
- rt.startHTTPDebugServerOnce()
ns := rt.ns
var id security.PublicID
var otherOpts []ipc.ServerOpt
@@ -181,12 +179,10 @@
return nil, err
}
sm := imanager.InternalNew(rid)
- rt.debug.RegisterStreamManager(rid, sm)
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.cleaningUp {
sm.Shutdown() // For whatever it's worth.
- // TODO(caprita): Should we also unregister sm from debug?
return nil, errCleaningUp
}
rt.sm = append(rt.sm, sm)
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index c321b2c..d670ca8 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -40,7 +40,6 @@
store security.PublicIDStore
client ipc.Client
mgmt *mgmtImpl
- debug debugServer
nServers int // GUARDED_BY(mu)
cleaningUp bool // GUARDED_BY(mu)
@@ -58,7 +57,6 @@
func New(opts ...veyron2.ROpt) (veyron2.Runtime, error) {
rt := &vrt{mgmt: new(mgmtImpl), lang: i18n.LangIDFromEnv(), program: filepath.Base(os.Args[0])}
flag.Parse()
- rt.initHTTPDebugServer()
nsRoots := []string{}
for _, o := range opts {
switch v := o.(type) {
@@ -70,8 +68,6 @@
rt.profile = v.Profile
case options.NamespaceRoots:
nsRoots = v
- case options.HTTPDebug:
- rt.debug.addr = string(v)
case options.RuntimeName:
if v != "google" && v != "" {
return nil, fmt.Errorf("%q is the wrong name for this runtime", v)