wspr: Implement vtrace (part 1 of many)
In this CL I add a basic implementation of vtrace spans and the vtrace
store. Also add enough functionality to collect trace data when
javascript is acting as a client.
There are many shortcomings which will be addressed by future CLs.
- The store always collects, there is no pattern based collection as in go.
- JS servers never return trace information
- There is no interface to dump traces from the console / etc.
MultiPart: 3/4
Change-Id: Id2d335aa195cd375b289ff4720ab446953eaef3d
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 97414a2..1b69988 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -19,6 +19,7 @@
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
+ "v.io/core/veyron2/vdl"
"v.io/core/veyron2/vdl/vdlroot/src/signature"
"v.io/core/veyron2/verror"
"v.io/core/veyron2/vlog"
@@ -143,7 +144,7 @@
}
// finishCall waits for the call to finish and write out the response to w.
-func (c *Controller) finishCall(ctx *context.T, w lib.ClientWriter, clientCall ipc.Call, msg *VeyronRPC) {
+func (c *Controller) finishCall(ctx *context.T, w lib.ClientWriter, clientCall ipc.Call, msg *VeyronRPCRequest, span vtrace.Span) {
if msg.IsStreaming {
for {
var item interface{}
@@ -182,10 +183,10 @@
w.Error(err)
return
}
- c.sendRPCResponse(ctx, w, results)
+ c.sendRPCResponse(ctx, w, span, results)
}
-func (c *Controller) sendRPCResponse(ctx *context.T, w lib.ClientWriter, results []interface{}) {
+func (c *Controller) sendRPCResponse(ctx *context.T, w lib.ClientWriter, span vtrace.Span, results []interface{}) {
// for now we assume last out argument is always error
if err, ok := results[len(results)-1].(error); ok {
// return the call Application error as is
@@ -193,17 +194,32 @@
return
}
- vomResults, err := lib.VomEncode(results[:len(results)-1])
+ outargs := make([]vdl.AnyRep, len(results)-1)
+ for i := range outargs {
+ outargs[i] = results[i]
+ }
+
+ span.Finish()
+ traceRecord := vtrace.GetStore(ctx).TraceRecord(span.Trace())
+
+ response := VeyronRPCResponse{
+ OutArgs: outargs,
+ TraceResponse: vtrace.Response{
+ Method: vtrace.InMemory,
+ Trace: *traceRecord,
+ },
+ }
+ encoded, err := lib.VomEncode(response)
if err != nil {
w.Error(err)
return
}
- if err := w.Send(lib.ResponseFinal, vomResults); err != nil {
+ if err := w.Send(lib.ResponseFinal, encoded); err != nil {
w.Error(verror.Convert(marshallingError, ctx, err))
}
}
-func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC, inArgs []interface{}) (ipc.Call, error) {
+func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPCRequest, inArgs []interface{}) (ipc.Call, error) {
methodName := lib.UppercaseFirstCharacter(msg.Method)
retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
clientCall, err := veyron2.GetClient(ctx).StartCall(ctx, msg.Name, methodName, inArgs, retryTimeoutOpt)
@@ -310,7 +326,7 @@
// SendVeyronRequest makes a veyron request for the given flowId. If signal is non-nil, it will receive
// the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream) {
+func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPCRequest, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream, span vtrace.Span) {
sig, err := c.getSignature(ctx, msg.Name)
if err != nil {
w.Error(err)
@@ -339,7 +355,7 @@
stream.init(call)
}
- c.finishCall(ctx, w, call, msg)
+ c.finishCall(ctx, w, call, msg, span)
c.Lock()
if request, ok := c.outstandingRequests[id]; ok {
delete(c.outstandingRequests, id)
@@ -355,7 +371,7 @@
// any of this context information.
type localCall struct {
ctx *context.T
- vrpc *VeyronRPC
+ vrpc *VeyronRPCRequest
tags []interface{}
}
@@ -376,7 +392,7 @@
func (l *localCall) LocalEndpoint() naming.Endpoint { return nil }
func (l *localCall) RemoteEndpoint() naming.Endpoint { return nil }
-func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPC, decoder *vom.Decoder, w lib.ClientWriter) {
+func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPCRequest, decoder *vom.Decoder, w lib.ClientWriter, span vtrace.Span) {
invoker, err := ipc.ReflectInvoker(ControllerServer(c))
if err != nil {
w.Error(verror.Convert(verror.Internal, ctx, err))
@@ -398,7 +414,7 @@
w.Error(verror.Convert(verror.Internal, ctx, err))
return
}
- c.sendRPCResponse(ctx, w, results)
+ c.sendRPCResponse(ctx, w, span, results)
}
// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
@@ -414,12 +430,14 @@
return
}
- var msg VeyronRPC
+ var msg VeyronRPCRequest
if err := decoder.Decode(&msg); err != nil {
w.Error(verror.Convert(verror.Internal, ctx, err))
return
}
vlog.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+ spanName := fmt.Sprintf("<wspr>%q.%s", msg.Name, msg.Method)
+ ctx, span := vtrace.SetContinuedTrace(ctx, spanName, msg.TraceRequest)
var cctx *context.T
var cancel context.CancelFunc
@@ -435,7 +453,7 @@
// If this message is for an internal service, do a short-circuit dispatch here.
if msg.Name == "controller" {
- c.handleInternalCall(ctx, &msg, decoder, w)
+ c.handleInternalCall(ctx, &msg, decoder, w, span)
return
}
@@ -460,7 +478,7 @@
}
c.Lock()
c.outstandingRequests[id] = request
- go c.sendVeyronRequest(cctx, id, &msg, inArgs, w, request.stream)
+ go c.sendVeyronRequest(cctx, id, &msg, inArgs, w, request.stream, span)
c.Unlock()
}
@@ -644,13 +662,13 @@
server.HandleServerResponse(id, data)
}
-// parseVeyronRequest parses a json rpc request into a VeyronRPC object.
-func (c *Controller) parseVeyronRequest(data string) (*VeyronRPC, error) {
- var msg VeyronRPC
+// parseVeyronRequest parses a json rpc request into a VeyronRPCRequest object.
+func (c *Controller) parseVeyronRequest(data string) (*VeyronRPCRequest, error) {
+ var msg VeyronRPCRequest
if err := lib.VomDecode(data, &msg); err != nil {
return nil, err
}
- vlog.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+ vlog.VI(2).Infof("VeyronRPCRequest: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
return &msg, nil
}