Added x/net/trace support to VRPC

The RPCs are now also monitored by the x/net/trace package.  This
exports an HTTP interface at the /debug/requests endpoint. In order to
access this interface, an RPC endpoint __debug/http has been created
to serve HTTP requests over RPC. A tool has been added to send these
type of RPCs and serve the HTML responses over a local HTTP server.

MultiPart: 2/2
Change-Id: I794882fd588f1c38eebd5c396ce1b67d3f979132
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index aae918d..1a4a685 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -27,6 +27,8 @@
 	slib "v.io/x/ref/lib/security"
 	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/flow/manager"
+
+	"golang.org/x/net/trace"
 )
 
 const pkgPath = "v.io/x/ref/runtime/internal/rpc"
@@ -153,12 +155,18 @@
 
 func (c *client) Call(ctx *context.T, name, method string, inArgs, outArgs []interface{}, opts ...rpc.CallOpt) error {
 	defer apilog.LogCallf(ctx, "name=%.10s...,method=%.10s...,inArgs=,outArgs=,opts...=%v", name, method, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+
+	tr := trace.New("Sent."+name, method)
+	defer tr.Finish()
+
 	connOpts := getConnectionOptions(ctx, opts)
 	var prevErr error
 	for retries := uint(0); ; retries++ {
 		call, err := c.startCall(ctx, name, method, inArgs, connOpts, opts)
 		if err != nil {
 			// See explanation in connectToName.
+			tr.LazyPrintf("%s\n", err)
+			tr.SetError()
 			return preferNonTimeout(err, prevErr)
 		}
 		switch err := call.Finish(outArgs...); {
@@ -167,11 +175,17 @@
 		case !shouldRetryBackoff(verror.Action(err), connOpts):
 			ctx.VI(4).Infof("Cannot retry after error: %s", err)
 			// See explanation in connectToName.
+			tr.LazyPrintf("%s\n", err)
+			tr.SetError()
 			return preferNonTimeout(err, prevErr)
 		case !backoff(retries, connOpts.connDeadline):
+			tr.LazyPrintf("%s\n", err)
+			tr.SetError()
 			return err
 		default:
 			ctx.VI(4).Infof("Retrying due to error: %s", err)
+			tr.LazyPrintf("%s\n", err)
+			tr.SetError()
 		}
 		prevErr = err
 	}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index f694f16..ef1f321 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"golang.org/x/net/trace"
 	"io"
 	"net"
 	"reflect"
@@ -495,7 +496,7 @@
 		sep := s.createEndpoint(ep)
 		endpoints[sep.String()] = sep
 	}
-	// Endpoints to add and remaove.
+	// Endpoints to add and remove.
 	rmEps := setDiff(s.endpoints, endpoints)
 	addEps := setDiff(endpoints, s.endpoints)
 	for k := range rmEps {
@@ -763,6 +764,19 @@
 		ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
 		return ctx, nil, err
 	}
+
+	tid := req.TraceRequest.TraceId.String()
+	sid := req.TraceRequest.SpanId.String()
+	title := fmt.Sprintf("(trace_id: %s span_id: %s)", tid, sid)
+
+	tr := trace.New("Recv."+req.Suffix, req.Method+" "+title)
+
+	if deadline, ok := ctx.Deadline(); ok {
+		tr.LazyPrintf("RPC has a deadline: %v", deadline)
+	}
+
+	defer tr.Finish()
+
 	fs.removeStat = fs.server.outstanding.start(req.Method, fs.flow.RemoteEndpoint())
 
 	// Start building up a new context for the request now that we know
@@ -786,12 +800,16 @@
 
 	if err := fs.readGrantedBlessings(ctx, req); err != nil {
 		fs.drainDecoderArgs(int(req.NumPosArgs))
+		tr.LazyPrintf("%s\n", err)
+		tr.SetError()
 		return ctx, nil, err
 	}
 	// Lookup the invoker.
 	invoker, auth, err := fs.lookup(ctx, fs.suffix, fs.method)
 	if err != nil {
 		fs.drainDecoderArgs(int(req.NumPosArgs))
+		tr.LazyPrintf("%s\n", err)
+		tr.SetError()
 		return ctx, nil, err
 	}
 
@@ -806,23 +824,45 @@
 	fs.tags = tags
 	if err != nil {
 		fs.drainDecoderArgs(numArgs)
+		tr.LazyPrintf("%s\n", err)
+		tr.SetError()
 		return ctx, nil, err
 	}
 	if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
 		fs.drainDecoderArgs(numArgs)
+		tr.LazyPrintf("%s\n", err)
+		tr.SetError()
 		return ctx, nil, newErrBadNumInputArgs(ctx, fs.suffix, fs.method, called, want)
 	}
 	for ix, argptr := range argptrs {
 		if err := fs.dec.Decode(argptr); err != nil {
+			tr.LazyPrintf("%s\n", err)
+			tr.SetError()
+
 			return ctx, nil, newErrBadInputArg(ctx, fs.suffix, fs.method, uint64(ix), err)
 		}
 	}
 
 	// Check application's authorization policy.
 	if err := authorize(ctx, fs, auth); err != nil {
+		tr.LazyPrintf("%s\n", err)
+		tr.SetError()
 		return ctx, nil, err
 	}
 
+	defer func() {
+		switch ctx.Err() {
+		case context.DeadlineExceeded:
+			deadline, _ := ctx.Deadline()
+			tr.LazyPrintf("RPC exceeded deadline. Should have ended at %v", deadline)
+		case context.Canceled:
+			tr.LazyPrintf("RPC was cancelled.")
+		}
+		if ctx.Err() != nil {
+			tr.SetError()
+		}
+	}()
+
 	// Invoke the method.
 	results, err := invoker.Invoke(ctx, fs, strippedMethod, argptrs)
 	fs.server.stats.record(fs.method, time.Since(fs.starttime))
diff --git a/runtime/internal/rpc/test/debug_test.go b/runtime/internal/rpc/test/debug_test.go
index 50ab773..b7965b9 100644
--- a/runtime/internal/rpc/test/debug_test.go
+++ b/runtime/internal/rpc/test/debug_test.go
@@ -74,8 +74,8 @@
 	}{
 		{"", "*", []string{}},
 		{"", "__*", []string{"__debug"}},
-		{"", "__*/*", []string{"__debug/logs", "__debug/pprof", "__debug/stats", "__debug/vtrace"}},
-		{"__debug", "*", []string{"logs", "pprof", "stats", "vtrace"}},
+		{"", "__*/*", []string{"__debug/http", "__debug/logs", "__debug/pprof", "__debug/stats", "__debug/vtrace"}},
+		{"__debug", "*", []string{"http", "logs", "pprof", "stats", "vtrace"}},
 	}
 	for _, tc := range testcases {
 		fullname := naming.Join(name, tc.name)
diff --git a/services/debug/debug/debug_v23_test.go b/services/debug/debug/debug_v23_test.go
index d9d897a..1e78fb3 100644
--- a/services/debug/debug/debug_v23_test.go
+++ b/services/debug/debug/debug_v23_test.go
@@ -29,7 +29,7 @@
 	stdout := sh.Cmd(binary, "glob", "__debug/*").Stdout()
 
 	var want string
-	for _, entry := range []string{"logs", "pprof", "stats", "vtrace"} {
+	for _, entry := range []string{"logs", "pprof", "stats", "vtrace", "http"} {
 		want += "__debug/" + entry + "\n"
 	}
 	if got := stdout; got != want {
diff --git a/services/debug/debuglib/dispatcher.go b/services/debug/debuglib/dispatcher.go
index 7c3557a..41fd51f 100644
--- a/services/debug/debuglib/dispatcher.go
+++ b/services/debug/debuglib/dispatcher.go
@@ -13,6 +13,7 @@
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/x/ref/internal/logger"
+	"v.io/x/ref/services/internal/httplib"
 	"v.io/x/ref/services/internal/logreaderlib"
 	"v.io/x/ref/services/internal/pproflib"
 	"v.io/x/ref/services/internal/statslib"
@@ -44,7 +45,7 @@
 	suffix = strings.TrimLeft(suffix, "/")
 
 	if suffix == "" {
-		return rpc.ChildrenGlobberInvoker("logs", "pprof", "stats", "vtrace"), d.auth, nil
+		return rpc.ChildrenGlobberInvoker("logs", "pprof", "stats", "vtrace", "http"), d.auth, nil
 	}
 	parts := strings.SplitN(suffix, "/", 2)
 	if len(parts) == 2 {
@@ -61,6 +62,8 @@
 		return statslib.NewStatsService(suffix, 10*time.Second), d.auth, nil
 	case "vtrace":
 		return vtracelib.NewVtraceService(), d.auth, nil
+	case "http":
+		return httplib.NewHttpService(), d.auth, nil
 	}
 	return nil, d.auth, nil
 }
diff --git a/services/debug/debuglib/dispatcher_test.go b/services/debug/debuglib/dispatcher_test.go
index de17dca..99787af 100644
--- a/services/debug/debuglib/dispatcher_test.go
+++ b/services/debug/debuglib/dispatcher_test.go
@@ -208,6 +208,7 @@
 		}
 		sort.Strings(results)
 		expected = []string{
+			"http",
 			"logs",
 			"pprof",
 			"stats",
diff --git a/services/device/deviced/internal/impl/perms/debug_perms_test.go b/services/device/deviced/internal/impl/perms/debug_perms_test.go
index 9cdedd6..bb95718 100644
--- a/services/device/deviced/internal/impl/perms/debug_perms_test.go
+++ b/services/device/deviced/internal/impl/perms/debug_perms_test.go
@@ -98,7 +98,7 @@
 	}
 	appGlobtests := []utiltest.GlobTestVector{
 		{naming.Join("appV1", "__debug"), "*",
-			[]string{"logs", "pprof", "stats", "vtrace"},
+			[]string{"http", "logs", "pprof", "stats", "vtrace"},
 		},
 		{naming.Join("appV1", "__debug", "stats", "system"),
 			"start-time*",
@@ -216,7 +216,7 @@
 	// Create some globbing test vectors.
 	dmGlobtests := []utiltest.GlobTestVector{
 		{naming.Join("dm", "__debug"), "*",
-			[]string{"logs", "pprof", "stats", "vtrace"},
+			[]string{"http", "logs", "pprof", "stats", "vtrace"},
 		},
 		{naming.Join("dm", "__debug", "stats", "system"),
 			"start-time*",
diff --git a/services/http/http/http.go b/services/http/http/http.go
new file mode 100644
index 0000000..ed83467
--- /dev/null
+++ b/services/http/http/http.go
@@ -0,0 +1,74 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"v.io/x/ref/services/http/httplib"
+
+	"flag"
+	"fmt"
+	"log"
+	"net"
+	"net/http"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	_ "v.io/x/ref/runtime/factories/roaming"
+)
+
+var (
+	server   = flag.String("server", "", "Name of the server to connect to")
+	endpoint string
+	ctx      *context.T
+)
+
+func traceDataHandler(w http.ResponseWriter, req *http.Request) {
+	var data []byte
+
+	vdl_req := httplib.VDLRequestFromHTTPRequest(req)
+	client := v23.GetClient(ctx)
+	err := client.Call(ctx, endpoint, "RawDo", []interface{}{vdl_req}, []interface{}{&data})
+
+	if err != nil {
+		log.Println(err)
+		w.Write([]byte(fmt.Sprintf("%T", err)))
+	} else {
+		w.Write(data)
+	}
+}
+
+func findPortAndListen(mux *http.ServeMux) {
+	fmt_port := func(port int) string { return fmt.Sprintf("localhost:%d", port) }
+	curr_port := 3000
+
+	for {
+		ln, err := net.Listen("tcp", fmt_port(curr_port))
+		if err == nil {
+			log.Println("Monitoring on " + fmt_port(curr_port) + "/debug/requests...")
+			defer ln.Close()
+			http.Serve(ln, mux)
+			break
+		}
+		curr_port += 1
+	}
+}
+
+func main() {
+	var shutdown v23.Shutdown
+	ctx, shutdown = v23.Init()
+	defer shutdown()
+
+	if len(*server) == 0 {
+		fmt.Println("usage: http --server endpoint [--v23.credentials cred_dir]")
+		return
+	}
+
+	endpoint = *server + "/__debug/http"
+
+	mux := http.NewServeMux()
+	mux.Handle("/debug/events", http.HandlerFunc(traceDataHandler))
+	mux.Handle("/debug/requests", http.HandlerFunc(traceDataHandler))
+	findPortAndListen(mux)
+}
diff --git a/services/http/httplib/httplib.go b/services/http/httplib/httplib.go
new file mode 100644
index 0000000..bb177e8
--- /dev/null
+++ b/services/http/httplib/httplib.go
@@ -0,0 +1,83 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package httplib
+
+import (
+	"bytes"
+	"io/ioutil"
+	"net/url"
+
+	go_http "net/http"
+	v23_http "v.io/v23/services/http"
+)
+
+func VDLRequestFromHTTPRequest(req *go_http.Request) v23_http.Request {
+	url := v23_http.Url{
+		Scheme:   req.URL.Scheme,
+		Opaque:   req.URL.Opaque,
+		Host:     req.URL.Host,
+		Path:     req.URL.Path,
+		RawPath:  req.URL.RawPath,
+		RawQuery: req.URL.RawQuery,
+		Fragment: req.URL.Fragment,
+	}
+
+	body_buf := new(bytes.Buffer)
+	body_buf.ReadFrom(req.Body)
+
+	ifc_req := v23_http.Request{
+		Method:           req.Method,
+		Url:              url,
+		Proto:            req.Proto,
+		ProtoMajor:       int16(req.ProtoMajor),
+		ProtoMinor:       int16(req.ProtoMinor),
+		Header:           req.Header,
+		Body:             body_buf.Bytes(),
+		ContentLength:    req.ContentLength,
+		TransferEncoding: req.TransferEncoding,
+		Close:            req.Close,
+		Host:             req.Host,
+		Form:             req.Form,
+		PostForm:         req.PostForm,
+		Trailer:          req.Trailer,
+		RemoteAddr:       req.RemoteAddr,
+		RequestUri:       req.RequestURI,
+	}
+	return ifc_req
+}
+
+func HTTPRequestFromVDLRequest(req v23_http.Request) *go_http.Request {
+	url := &url.URL{
+		Scheme:   req.Url.Scheme,
+		Opaque:   req.Url.Opaque,
+		User:     nil,
+		Host:     req.Url.Host,
+		Path:     req.Url.Path,
+		RawPath:  req.Url.RawPath,
+		RawQuery: req.Url.RawQuery,
+		Fragment: req.Url.Fragment,
+	}
+
+	http_req := &go_http.Request{
+		Method:           req.Method,
+		URL:              url,
+		Proto:            req.Proto,
+		ProtoMajor:       int(req.ProtoMajor),
+		ProtoMinor:       int(req.ProtoMinor),
+		Header:           req.Header,
+		Body:             ioutil.NopCloser(bytes.NewReader(req.Body)),
+		ContentLength:    req.ContentLength,
+		TransferEncoding: req.TransferEncoding,
+		Close:            req.Close,
+		Host:             req.Host,
+		Form:             req.Form,
+		PostForm:         req.PostForm,
+		MultipartForm:    nil,
+		Trailer:          req.Trailer,
+		RemoteAddr:       req.RemoteAddr,
+		RequestURI:       req.RequestUri,
+	}
+	return http_req
+}
diff --git a/services/internal/httplib/http.go b/services/internal/httplib/http.go
new file mode 100644
index 0000000..9dc8d06
--- /dev/null
+++ b/services/internal/httplib/http.go
@@ -0,0 +1,32 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package httplib
+
+import (
+	"bytes"
+	"golang.org/x/net/trace"
+
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	v23_http "v.io/v23/services/http"
+	"v.io/x/ref/services/http/httplib"
+)
+
+type httpService struct{}
+
+func (f *httpService) RawDo(_ *context.T, _ rpc.ServerCall, req v23_http.Request) (
+	data []byte, err error) {
+
+	buf := bytes.NewBuffer(data)
+	http_req := httplib.HTTPRequestFromVDLRequest(req)
+
+	trace.Render(buf, http_req, false)
+
+	return buf.Bytes(), nil
+}
+
+func NewHttpService() interface{} {
+	return v23_http.HttpServer(&httpService{})
+}