Added x/net/trace support to VRPC
The RPCs are now also monitored by the x/net/trace package. This
exports an HTTP interface at the /debug/requests endpoint. In order to
access this interface, an RPC endpoint __debug/http has been created
to serve HTTP requests over RPC. A tool has been added to send these
type of RPCs and serve the HTML responses over a local HTTP server.
MultiPart: 2/2
Change-Id: I794882fd588f1c38eebd5c396ce1b67d3f979132
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index aae918d..1a4a685 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -27,6 +27,8 @@
slib "v.io/x/ref/lib/security"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/manager"
+
+ "golang.org/x/net/trace"
)
const pkgPath = "v.io/x/ref/runtime/internal/rpc"
@@ -153,12 +155,18 @@
func (c *client) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
defer apilog.LogCallf(ctx, "name=%.10s...,method=%.10s...,inArgs=,outArgs=,opts...=%v", name, method, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+
+ tr := trace.New("Sent."+name, method)
+ defer tr.Finish()
+
connOpts := getConnectionOptions(ctx, opts)
var prevErr error
for retries := uint(0); ; retries++ {
call, err := c.startCall(ctx, name, method, inArgs, connOpts, opts)
if err != nil {
// See explanation in connectToName.
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
return preferNonTimeout(err, prevErr)
}
switch err := call.Finish(outArgs...); {
@@ -167,11 +175,17 @@
case !shouldRetryBackoff(verror.Action(err), connOpts):
ctx.VI(4).Infof("Cannot retry after error: %s", err)
// See explanation in connectToName.
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
return preferNonTimeout(err, prevErr)
case !backoff(retries, connOpts.connDeadline):
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
return err
default:
ctx.VI(4).Infof("Retrying due to error: %s", err)
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
}
prevErr = err
}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index f694f16..ef1f321 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "golang.org/x/net/trace"
"io"
"net"
"reflect"
@@ -495,7 +496,7 @@
sep := s.createEndpoint(ep)
endpoints[sep.String()] = sep
}
- // Endpoints to add and remaove.
+ // Endpoints to add and remove.
rmEps := setDiff(s.endpoints, endpoints)
addEps := setDiff(endpoints, s.endpoints)
for k := range rmEps {
@@ -763,6 +764,19 @@
ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
return ctx, nil, err
}
+
+ tid := req.TraceRequest.TraceId.String()
+ sid := req.TraceRequest.SpanId.String()
+ title := fmt.Sprintf("(trace_id: %s span_id: %s)", tid, sid)
+
+ tr := trace.New("Recv."+req.Suffix, req.Method+" "+title)
+
+ if deadline, ok := ctx.Deadline(); ok {
+ tr.LazyPrintf("RPC has a deadline: %v", deadline)
+ }
+
+ defer tr.Finish()
+
fs.removeStat = fs.server.outstanding.start(req.Method, fs.flow.RemoteEndpoint())
// Start building up a new context for the request now that we know
@@ -786,12 +800,16 @@
if err := fs.readGrantedBlessings(ctx, req); err != nil {
fs.drainDecoderArgs(int(req.NumPosArgs))
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
return ctx, nil, err
}
// Lookup the invoker.
invoker, auth, err := fs.lookup(ctx, fs.suffix, fs.method)
if err != nil {
fs.drainDecoderArgs(int(req.NumPosArgs))
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
return ctx, nil, err
}
@@ -806,23 +824,45 @@
fs.tags = tags
if err != nil {
fs.drainDecoderArgs(numArgs)
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
return ctx, nil, err
}
if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
fs.drainDecoderArgs(numArgs)
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
return ctx, nil, newErrBadNumInputArgs(ctx, fs.suffix, fs.method, called, want)
}
for ix, argptr := range argptrs {
if err := fs.dec.Decode(argptr); err != nil {
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
+
return ctx, nil, newErrBadInputArg(ctx, fs.suffix, fs.method, uint64(ix), err)
}
}
// Check application's authorization policy.
if err := authorize(ctx, fs, auth); err != nil {
+ tr.LazyPrintf("%s\n", err)
+ tr.SetError()
return ctx, nil, err
}
+ defer func() {
+ switch ctx.Err() {
+ case context.DeadlineExceeded:
+ deadline, _ := ctx.Deadline()
+ tr.LazyPrintf("RPC exceeded deadline. Should have ended at %v", deadline)
+ case context.Canceled:
+ tr.LazyPrintf("RPC was cancelled.")
+ }
+ if ctx.Err() != nil {
+ tr.SetError()
+ }
+ }()
+
// Invoke the method.
results, err := invoker.Invoke(ctx, fs, strippedMethod, argptrs)
fs.server.stats.record(fs.method, time.Since(fs.starttime))
diff --git a/runtime/internal/rpc/test/debug_test.go b/runtime/internal/rpc/test/debug_test.go
index 50ab773..b7965b9 100644
--- a/runtime/internal/rpc/test/debug_test.go
+++ b/runtime/internal/rpc/test/debug_test.go
@@ -74,8 +74,8 @@
}{
{"", "*", []string{}},
{"", "__*", []string{"__debug"}},
- {"", "__*/*", []string{"__debug/logs", "__debug/pprof", "__debug/stats", "__debug/vtrace"}},
- {"__debug", "*", []string{"logs", "pprof", "stats", "vtrace"}},
+ {"", "__*/*", []string{"__debug/http", "__debug/logs", "__debug/pprof", "__debug/stats", "__debug/vtrace"}},
+ {"__debug", "*", []string{"http", "logs", "pprof", "stats", "vtrace"}},
}
for _, tc := range testcases {
fullname := naming.Join(name, tc.name)
diff --git a/services/debug/debug/debug_v23_test.go b/services/debug/debug/debug_v23_test.go
index d9d897a..1e78fb3 100644
--- a/services/debug/debug/debug_v23_test.go
+++ b/services/debug/debug/debug_v23_test.go
@@ -29,7 +29,7 @@
stdout := sh.Cmd(binary, "glob", "__debug/*").Stdout()
var want string
- for _, entry := range []string{"logs", "pprof", "stats", "vtrace"} {
+ for _, entry := range []string{"logs", "pprof", "stats", "vtrace", "http"} {
want += "__debug/" + entry + "\n"
}
if got := stdout; got != want {
diff --git a/services/debug/debuglib/dispatcher.go b/services/debug/debuglib/dispatcher.go
index 7c3557a..41fd51f 100644
--- a/services/debug/debuglib/dispatcher.go
+++ b/services/debug/debuglib/dispatcher.go
@@ -13,6 +13,7 @@
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/x/ref/internal/logger"
+ "v.io/x/ref/services/internal/httplib"
"v.io/x/ref/services/internal/logreaderlib"
"v.io/x/ref/services/internal/pproflib"
"v.io/x/ref/services/internal/statslib"
@@ -44,7 +45,7 @@
suffix = strings.TrimLeft(suffix, "/")
if suffix == "" {
- return rpc.ChildrenGlobberInvoker("logs", "pprof", "stats", "vtrace"), d.auth, nil
+ return rpc.ChildrenGlobberInvoker("logs", "pprof", "stats", "vtrace", "http"), d.auth, nil
}
parts := strings.SplitN(suffix, "/", 2)
if len(parts) == 2 {
@@ -61,6 +62,8 @@
return statslib.NewStatsService(suffix, 10*time.Second), d.auth, nil
case "vtrace":
return vtracelib.NewVtraceService(), d.auth, nil
+ case "http":
+ return httplib.NewHttpService(), d.auth, nil
}
return nil, d.auth, nil
}
diff --git a/services/debug/debuglib/dispatcher_test.go b/services/debug/debuglib/dispatcher_test.go
index de17dca..99787af 100644
--- a/services/debug/debuglib/dispatcher_test.go
+++ b/services/debug/debuglib/dispatcher_test.go
@@ -208,6 +208,7 @@
}
sort.Strings(results)
expected = []string{
+ "http",
"logs",
"pprof",
"stats",
diff --git a/services/device/deviced/internal/impl/perms/debug_perms_test.go b/services/device/deviced/internal/impl/perms/debug_perms_test.go
index 9cdedd6..bb95718 100644
--- a/services/device/deviced/internal/impl/perms/debug_perms_test.go
+++ b/services/device/deviced/internal/impl/perms/debug_perms_test.go
@@ -98,7 +98,7 @@
}
appGlobtests := []utiltest.GlobTestVector{
{naming.Join("appV1", "__debug"), "*",
- []string{"logs", "pprof", "stats", "vtrace"},
+ []string{"http", "logs", "pprof", "stats", "vtrace"},
},
{naming.Join("appV1", "__debug", "stats", "system"),
"start-time*",
@@ -216,7 +216,7 @@
// Create some globbing test vectors.
dmGlobtests := []utiltest.GlobTestVector{
{naming.Join("dm", "__debug"), "*",
- []string{"logs", "pprof", "stats", "vtrace"},
+ []string{"http", "logs", "pprof", "stats", "vtrace"},
},
{naming.Join("dm", "__debug", "stats", "system"),
"start-time*",
diff --git a/services/http/http/http.go b/services/http/http/http.go
new file mode 100644
index 0000000..ed83467
--- /dev/null
+++ b/services/http/http/http.go
@@ -0,0 +1,74 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "v.io/x/ref/services/http/httplib"
+
+ "flag"
+ "fmt"
+ "log"
+ "net"
+ "net/http"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ _ "v.io/x/ref/runtime/factories/roaming"
+)
+
+var (
+ server = flag.String("server", "", "Name of the server to connect to")
+ endpoint string
+ ctx *context.T
+)
+
+func traceDataHandler(w http.ResponseWriter, req *http.Request) {
+ var data []byte
+
+ vdl_req := httplib.VDLRequestFromHTTPRequest(req)
+ client := v23.GetClient(ctx)
+ err := client.Call(ctx, endpoint, "RawDo", []interface{}{vdl_req}, []interface{}{&data})
+
+ if err != nil {
+ log.Println(err)
+ w.Write([]byte(fmt.Sprintf("%T", err)))
+ } else {
+ w.Write(data)
+ }
+}
+
+func findPortAndListen(mux *http.ServeMux) {
+ fmt_port := func(port int) string { return fmt.Sprintf("localhost:%d", port) }
+ curr_port := 3000
+
+ for {
+ ln, err := net.Listen("tcp", fmt_port(curr_port))
+ if err == nil {
+ log.Println("Monitoring on " + fmt_port(curr_port) + "/debug/requests...")
+ defer ln.Close()
+ http.Serve(ln, mux)
+ break
+ }
+ curr_port += 1
+ }
+}
+
+func main() {
+ var shutdown v23.Shutdown
+ ctx, shutdown = v23.Init()
+ defer shutdown()
+
+ if len(*server) == 0 {
+ fmt.Println("usage: http --server endpoint [--v23.credentials cred_dir]")
+ return
+ }
+
+ endpoint = *server + "/__debug/http"
+
+ mux := http.NewServeMux()
+ mux.Handle("/debug/events", http.HandlerFunc(traceDataHandler))
+ mux.Handle("/debug/requests", http.HandlerFunc(traceDataHandler))
+ findPortAndListen(mux)
+}
diff --git a/services/http/httplib/httplib.go b/services/http/httplib/httplib.go
new file mode 100644
index 0000000..bb177e8
--- /dev/null
+++ b/services/http/httplib/httplib.go
@@ -0,0 +1,83 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package httplib
+
+import (
+ "bytes"
+ "io/ioutil"
+ "net/url"
+
+ go_http "net/http"
+ v23_http "v.io/v23/services/http"
+)
+
+func VDLRequestFromHTTPRequest(req *go_http.Request) v23_http.Request {
+ url := v23_http.Url{
+ Scheme: req.URL.Scheme,
+ Opaque: req.URL.Opaque,
+ Host: req.URL.Host,
+ Path: req.URL.Path,
+ RawPath: req.URL.RawPath,
+ RawQuery: req.URL.RawQuery,
+ Fragment: req.URL.Fragment,
+ }
+
+ body_buf := new(bytes.Buffer)
+ body_buf.ReadFrom(req.Body)
+
+ ifc_req := v23_http.Request{
+ Method: req.Method,
+ Url: url,
+ Proto: req.Proto,
+ ProtoMajor: int16(req.ProtoMajor),
+ ProtoMinor: int16(req.ProtoMinor),
+ Header: req.Header,
+ Body: body_buf.Bytes(),
+ ContentLength: req.ContentLength,
+ TransferEncoding: req.TransferEncoding,
+ Close: req.Close,
+ Host: req.Host,
+ Form: req.Form,
+ PostForm: req.PostForm,
+ Trailer: req.Trailer,
+ RemoteAddr: req.RemoteAddr,
+ RequestUri: req.RequestURI,
+ }
+ return ifc_req
+}
+
+func HTTPRequestFromVDLRequest(req v23_http.Request) *go_http.Request {
+ url := &url.URL{
+ Scheme: req.Url.Scheme,
+ Opaque: req.Url.Opaque,
+ User: nil,
+ Host: req.Url.Host,
+ Path: req.Url.Path,
+ RawPath: req.Url.RawPath,
+ RawQuery: req.Url.RawQuery,
+ Fragment: req.Url.Fragment,
+ }
+
+ http_req := &go_http.Request{
+ Method: req.Method,
+ URL: url,
+ Proto: req.Proto,
+ ProtoMajor: int(req.ProtoMajor),
+ ProtoMinor: int(req.ProtoMinor),
+ Header: req.Header,
+ Body: ioutil.NopCloser(bytes.NewReader(req.Body)),
+ ContentLength: req.ContentLength,
+ TransferEncoding: req.TransferEncoding,
+ Close: req.Close,
+ Host: req.Host,
+ Form: req.Form,
+ PostForm: req.PostForm,
+ MultipartForm: nil,
+ Trailer: req.Trailer,
+ RemoteAddr: req.RemoteAddr,
+ RequestURI: req.RequestUri,
+ }
+ return http_req
+}
diff --git a/services/internal/httplib/http.go b/services/internal/httplib/http.go
new file mode 100644
index 0000000..9dc8d06
--- /dev/null
+++ b/services/internal/httplib/http.go
@@ -0,0 +1,32 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package httplib
+
+import (
+ "bytes"
+ "golang.org/x/net/trace"
+
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ v23_http "v.io/v23/services/http"
+ "v.io/x/ref/services/http/httplib"
+)
+
+type httpService struct{}
+
+func (f *httpService) RawDo(_ *context.T, _ rpc.ServerCall, req v23_http.Request) (
+ data []byte, err error) {
+
+ buf := bytes.NewBuffer(data)
+ http_req := httplib.HTTPRequestFromVDLRequest(req)
+
+ trace.Render(buf, http_req, false)
+
+ return buf.Bytes(), nil
+}
+
+func NewHttpService() interface{} {
+ return v23_http.HttpServer(&httpService{})
+}