wspr: Implement vtrace (part 1 of many)
In this CL I add a basic implementation of vtrace spans and the vtrace
store. Also add enough functionality to collect trace data when
javascript is acting as a client.
There are many shortcomings which will be addressed by future CLs.
- The store always collects, there is no pattern based collection as in go.
- JS servers never return trace information
- There is no interface to dump traces from the console / etc.
MultiPart: 3/4
Change-Id: Id2d335aa195cd375b289ff4720ab446953eaef3d
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 97414a2..1b69988 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -19,6 +19,7 @@
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
+ "v.io/core/veyron2/vdl"
"v.io/core/veyron2/vdl/vdlroot/src/signature"
"v.io/core/veyron2/verror"
"v.io/core/veyron2/vlog"
@@ -143,7 +144,7 @@
}
// finishCall waits for the call to finish and write out the response to w.
-func (c *Controller) finishCall(ctx *context.T, w lib.ClientWriter, clientCall ipc.Call, msg *VeyronRPC) {
+func (c *Controller) finishCall(ctx *context.T, w lib.ClientWriter, clientCall ipc.Call, msg *VeyronRPCRequest, span vtrace.Span) {
if msg.IsStreaming {
for {
var item interface{}
@@ -182,10 +183,10 @@
w.Error(err)
return
}
- c.sendRPCResponse(ctx, w, results)
+ c.sendRPCResponse(ctx, w, span, results)
}
-func (c *Controller) sendRPCResponse(ctx *context.T, w lib.ClientWriter, results []interface{}) {
+func (c *Controller) sendRPCResponse(ctx *context.T, w lib.ClientWriter, span vtrace.Span, results []interface{}) {
// for now we assume last out argument is always error
if err, ok := results[len(results)-1].(error); ok {
// return the call Application error as is
@@ -193,17 +194,32 @@
return
}
- vomResults, err := lib.VomEncode(results[:len(results)-1])
+ outargs := make([]vdl.AnyRep, len(results)-1)
+ for i := range outargs {
+ outargs[i] = results[i]
+ }
+
+ span.Finish()
+ traceRecord := vtrace.GetStore(ctx).TraceRecord(span.Trace())
+
+ response := VeyronRPCResponse{
+ OutArgs: outargs,
+ TraceResponse: vtrace.Response{
+ Method: vtrace.InMemory,
+ Trace: *traceRecord,
+ },
+ }
+ encoded, err := lib.VomEncode(response)
if err != nil {
w.Error(err)
return
}
- if err := w.Send(lib.ResponseFinal, vomResults); err != nil {
+ if err := w.Send(lib.ResponseFinal, encoded); err != nil {
w.Error(verror.Convert(marshallingError, ctx, err))
}
}
-func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPC, inArgs []interface{}) (ipc.Call, error) {
+func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *VeyronRPCRequest, inArgs []interface{}) (ipc.Call, error) {
methodName := lib.UppercaseFirstCharacter(msg.Method)
retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
clientCall, err := veyron2.GetClient(ctx).StartCall(ctx, msg.Name, methodName, inArgs, retryTimeoutOpt)
@@ -310,7 +326,7 @@
// SendVeyronRequest makes a veyron request for the given flowId. If signal is non-nil, it will receive
// the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPC, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream) {
+func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *VeyronRPCRequest, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream, span vtrace.Span) {
sig, err := c.getSignature(ctx, msg.Name)
if err != nil {
w.Error(err)
@@ -339,7 +355,7 @@
stream.init(call)
}
- c.finishCall(ctx, w, call, msg)
+ c.finishCall(ctx, w, call, msg, span)
c.Lock()
if request, ok := c.outstandingRequests[id]; ok {
delete(c.outstandingRequests, id)
@@ -355,7 +371,7 @@
// any of this context information.
type localCall struct {
ctx *context.T
- vrpc *VeyronRPC
+ vrpc *VeyronRPCRequest
tags []interface{}
}
@@ -376,7 +392,7 @@
func (l *localCall) LocalEndpoint() naming.Endpoint { return nil }
func (l *localCall) RemoteEndpoint() naming.Endpoint { return nil }
-func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPC, decoder *vom.Decoder, w lib.ClientWriter) {
+func (c *Controller) handleInternalCall(ctx *context.T, msg *VeyronRPCRequest, decoder *vom.Decoder, w lib.ClientWriter, span vtrace.Span) {
invoker, err := ipc.ReflectInvoker(ControllerServer(c))
if err != nil {
w.Error(verror.Convert(verror.Internal, ctx, err))
@@ -398,7 +414,7 @@
w.Error(verror.Convert(verror.Internal, ctx, err))
return
}
- c.sendRPCResponse(ctx, w, results)
+ c.sendRPCResponse(ctx, w, span, results)
}
// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
@@ -414,12 +430,14 @@
return
}
- var msg VeyronRPC
+ var msg VeyronRPCRequest
if err := decoder.Decode(&msg); err != nil {
w.Error(verror.Convert(verror.Internal, ctx, err))
return
}
vlog.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+ spanName := fmt.Sprintf("<wspr>%q.%s", msg.Name, msg.Method)
+ ctx, span := vtrace.SetContinuedTrace(ctx, spanName, msg.TraceRequest)
var cctx *context.T
var cancel context.CancelFunc
@@ -435,7 +453,7 @@
// If this message is for an internal service, do a short-circuit dispatch here.
if msg.Name == "controller" {
- c.handleInternalCall(ctx, &msg, decoder, w)
+ c.handleInternalCall(ctx, &msg, decoder, w, span)
return
}
@@ -460,7 +478,7 @@
}
c.Lock()
c.outstandingRequests[id] = request
- go c.sendVeyronRequest(cctx, id, &msg, inArgs, w, request.stream)
+ go c.sendVeyronRequest(cctx, id, &msg, inArgs, w, request.stream, span)
c.Unlock()
}
@@ -644,13 +662,13 @@
server.HandleServerResponse(id, data)
}
-// parseVeyronRequest parses a json rpc request into a VeyronRPC object.
-func (c *Controller) parseVeyronRequest(data string) (*VeyronRPC, error) {
- var msg VeyronRPC
+// parseVeyronRequest parses a json rpc request into a VeyronRPCRequest object.
+func (c *Controller) parseVeyronRequest(data string) (*VeyronRPCRequest, error) {
+ var msg VeyronRPCRequest
if err := lib.VomDecode(data, &msg); err != nil {
return nil, err
}
- vlog.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+ vlog.VI(2).Infof("VeyronRPCRequest: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
return &msg, nil
}
diff --git a/services/wsprd/app/app.vdl b/services/wsprd/app/app.vdl
index f9629cb..f83c7ca 100644
--- a/services/wsprd/app/app.vdl
+++ b/services/wsprd/app/app.vdl
@@ -2,22 +2,29 @@
// javascript requests to veyron requests and vice versa.
package app
-import(
- "v.io/core/veyron2/security"
+import (
+ "v.io/core/veyron2/security"
+ "v.io/core/veyron2/vtrace"
)
-type VeyronRPC struct {
- Name string
- Method string
- NumInArgs int32
- NumOutArgs int32
- IsStreaming bool
- Timeout int64
+type VeyronRPCRequest struct {
+ Name string
+ Method string
+ NumInArgs int32
+ NumOutArgs int32
+ IsStreaming bool
+ Timeout int64
+ TraceRequest vtrace.Request
+}
+
+type VeyronRPCResponse struct {
+ OutArgs []any
+ TraceResponse vtrace.Response
}
type BlessingRequest struct {
- Handle int32
- Caveats []security.Caveat
- DurationMs int32
- Extension string
-}
\ No newline at end of file
+ Handle int32
+ Caveats []security.Caveat
+ DurationMs int32
+ Extension string
+}
diff --git a/services/wsprd/app/app.vdl.go b/services/wsprd/app/app.vdl.go
index edbfab7..0140423 100644
--- a/services/wsprd/app/app.vdl.go
+++ b/services/wsprd/app/app.vdl.go
@@ -11,19 +11,31 @@
// VDL user imports
"v.io/core/veyron2/security"
+ "v.io/core/veyron2/vtrace"
)
-type VeyronRPC struct {
- Name string
- Method string
- NumInArgs int32
- NumOutArgs int32
- IsStreaming bool
- Timeout int64
+type VeyronRPCRequest struct {
+ Name string
+ Method string
+ NumInArgs int32
+ NumOutArgs int32
+ IsStreaming bool
+ Timeout int64
+ TraceRequest vtrace.Request
}
-func (VeyronRPC) __VDLReflect(struct {
- Name string "v.io/wspr/veyron/services/wsprd/app.VeyronRPC"
+func (VeyronRPCRequest) __VDLReflect(struct {
+ Name string "v.io/wspr/veyron/services/wsprd/app.VeyronRPCRequest"
+}) {
+}
+
+type VeyronRPCResponse struct {
+ OutArgs []vdl.AnyRep
+ TraceResponse vtrace.Response
+}
+
+func (VeyronRPCResponse) __VDLReflect(struct {
+ Name string "v.io/wspr/veyron/services/wsprd/app.VeyronRPCResponse"
}) {
}
@@ -40,6 +52,7 @@
}
func init() {
- vdl.Register((*VeyronRPC)(nil))
+ vdl.Register((*VeyronRPCRequest)(nil))
+ vdl.Register((*VeyronRPCResponse)(nil))
vdl.Register((*BlessingRequest)(nil))
}
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index 6ae9d6d..db3986c 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -17,6 +17,7 @@
"v.io/core/veyron2/vdl/vdlroot/src/signature"
"v.io/core/veyron2/verror"
"v.io/core/veyron2/vom"
+ "v.io/core/veyron2/vtrace"
"v.io/wspr/veyron/services/wsprd/ipc/server"
"v.io/wspr/veyron/services/wsprd/lib"
"v.io/wspr/veyron/services/wsprd/lib/testwriter"
@@ -230,20 +231,29 @@
}()
}
- request := VeyronRPC{
+ request := VeyronRPCRequest{
Name: "/" + endpoint.String(),
Method: test.method,
NumInArgs: int32(len(test.inArgs)),
NumOutArgs: test.numOutArgs,
IsStreaming: stream != nil,
}
- controller.sendVeyronRequest(ctx, 0, &request, test.inArgs, &writer, stream)
+ controller.sendVeyronRequest(ctx, 0, &request, test.inArgs, &writer, stream, vtrace.GetSpan(ctx))
if err := testwriter.CheckResponses(&writer, test.expectedStream, test.expectedError); err != nil {
t.Error(err)
}
}
+func makeRPCResponse(outArgs ...vdl.AnyRep) string {
+ return lib.VomEncodeOrDie(VeyronRPCResponse{
+ OutArgs: outArgs,
+ TraceResponse: vtrace.Response{
+ Method: vtrace.InMemory,
+ },
+ })
+}
+
func TestCallingGoServer(t *testing.T) {
runGoServerTestCase(t, goServerTestCase{
method: "Add",
@@ -251,7 +261,7 @@
numOutArgs: 1,
expectedStream: []lib.Response{
lib.Response{
- Message: lib.VomEncodeOrDie([]interface{}{int32(5)}),
+ Message: makeRPCResponse(int32(5)),
Type: lib.ResponseFinal,
},
},
@@ -294,7 +304,7 @@
Type: lib.ResponseStreamClose,
},
lib.Response{
- Message: lib.VomEncodeOrDie([]interface{}{int32(10)}),
+ Message: makeRPCResponse(int32(10)),
Type: lib.ResponseFinal,
},
},
@@ -308,7 +318,7 @@
proxyServer *proxy.Proxy
}
-func makeRequest(rpc VeyronRPC, args ...interface{}) (string, error) {
+func makeRequest(rpc VeyronRPCRequest, args ...interface{}) (string, error) {
var buf bytes.Buffer
encoder, err := vom.NewBinaryEncoder(&buf)
if err != nil {
@@ -352,7 +362,7 @@
veyron2.GetNamespace(controller.Context()).SetRoots("/" + endpoint.String())
- req, err := makeRequest(VeyronRPC{
+ req, err := makeRequest(VeyronRPCRequest{
Name: "controller",
Method: "Serve",
NumInArgs: 2,
diff --git a/services/wsprd/browspr/browspr_test.go b/services/wsprd/browspr/browspr_test.go
index c0a1600..a55f521 100644
--- a/services/wsprd/browspr/browspr_test.go
+++ b/services/wsprd/browspr/browspr_test.go
@@ -176,7 +176,7 @@
t.Fatalf("Failed to associate account: %v")
}
- rpc := app.VeyronRPC{
+ rpc := app.VeyronRPCRequest{
Name: mockServerName,
Method: "BasicCall",
NumInArgs: 1,
@@ -244,11 +244,11 @@
if outArg, ok = responseMsg.Message.(string); !ok {
t.Errorf("Got unexpected response message body of type %T, expected type string", responseMsg.Message)
}
- var result []string
+ var result app.VeyronRPCResponse
if err := lib.VomDecode(outArg, &result); err != nil {
t.Errorf("Failed to vom decode args from %v: %v", outArg, err)
}
- if got, want := result, []string{"[InputValue]"}; !reflect.DeepEqual(got, want) {
+ if got, want := result.OutArgs[0], "[InputValue]"; !reflect.DeepEqual(got, want) {
t.Errorf("Result got %v, want %v", got, want)
}
}