core: Refactor vtrace to allow starting new traces from user code.
Also update to follow the new Set/Get naming convention.
Change-Id: Idd906af0af50168c7184ad2373725f12497e7f92
MultiPart: 2/2
diff --git a/lib/flags/flags.go b/lib/flags/flags.go
index 4e2631a..cc199c7 100644
--- a/lib/flags/flags.go
+++ b/lib/flags/flags.go
@@ -119,7 +119,9 @@
// to Stderr at shutdown if true.
DumpOnShutdown bool
- // VtraceCacheSize the number of traces to cache in memory.
+ // VtraceCacheSize is the number of traces to cache in memory.
+ // TODO(mattr): Traces can be of widely varying size, we should have
+ // some better measurement then just number of traces.
CacheSize int
}
diff --git a/profiles/roaming/roaming.go b/profiles/roaming/roaming.go
index 9ad9ab1..c3352eb 100644
--- a/profiles/roaming/roaming.go
+++ b/profiles/roaming/roaming.go
@@ -80,7 +80,7 @@
func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) (veyron2.AppCycle, error) {
log := rt.Logger()
- rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie(), rt.VtraceStore()))
+ rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie()))
lf := commonFlags.ListenFlags()
ListenSpec = ipc.ListenSpec{
diff --git a/profiles/static/static.go b/profiles/static/static.go
index ef1837c..38dc172 100644
--- a/profiles/static/static.go
+++ b/profiles/static/static.go
@@ -60,7 +60,7 @@
func (p *static) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
log := rt.Logger()
- rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie(), rt.VtraceStore()))
+ rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie()))
lf := commonFlags.ListenFlags()
ListenSpec = ipc.ListenSpec{
diff --git a/profiles/static/staticinit.go b/profiles/static/staticinit.go
index 4a48b6f..02bf67b 100644
--- a/profiles/static/staticinit.go
+++ b/profiles/static/staticinit.go
@@ -35,7 +35,7 @@
ctx = runtime.Init(ctx)
log := runtime.GetLogger(ctx)
- ctx = runtime.SetReservedNameDispatcher(ctx, debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie(), runtime.GetVtraceStore(ctx)))
+ ctx = runtime.SetReservedNameDispatcher(ctx, debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie()))
lf := commonFlags.ListenFlags()
listenSpec := ipc.ListenSpec{
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 1d807fe..1569270 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -289,7 +289,7 @@
return nil, verror.ExplicitMake(verror.BadArg, i18n.NoLangID, "ipc.Client", "StartCall")
}
- ctx, span := ivtrace.WithNewSpan(ctx, fmt.Sprintf("<client>%q.%s", name, method))
+ ctx, span := vtrace.SetNewSpan(ctx, fmt.Sprintf("<client>%q.%s", name, method))
ctx = verror.ContextWithComponentName(ctx, "ipc.Client")
// Context specified deadline.
@@ -350,7 +350,7 @@
status := &serverStatus{index: index}
var err verror.E
var span vtrace.Span
- ctx, span = ivtrace.WithNewSpan(ctx, "<client>connectFlow")
+ ctx, span = vtrace.SetNewSpan(ctx, "<client>connectFlow")
span.Annotatef("address:%v", server)
defer span.Finish()
if status.flow, status.suffix, err = c.connectFlow(ctx, server, noDischarges); err != nil {
@@ -502,7 +502,7 @@
go func() {
select {
case <-ctx.Done():
- ivtrace.FromContext(fc.ctx).Annotate("Cancelled")
+ vtrace.GetSpan(fc.ctx).Annotate("Cancelled")
fc.flow.Cancel()
case <-fc.flow.Closed():
}
@@ -845,7 +845,7 @@
func (fc *flowClient) Finish(resultptrs ...interface{}) error {
defer vlog.LogCall()()
err := fc.finish(resultptrs...)
- ivtrace.FromContext(fc.ctx).Finish()
+ vtrace.GetSpan(fc.ctx).Finish()
return err
}
@@ -888,7 +888,7 @@
clientAckBlessings(fc.flow.VCDataCache(), fc.blessings)
}
// Incorporate any VTrace info that was returned.
- ivtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
+ ivtrace.Merge(fc.ctx, fc.response.TraceResponse)
if fc.response.Error != nil {
// TODO(cnicolaou): remove verror.NoAccess with verror version
// when ipc.Server is converted.
diff --git a/runtimes/google/ipc/debug_test.go b/runtimes/google/ipc/debug_test.go
index 449b0fd..e8c3b2f 100644
--- a/runtimes/google/ipc/debug_test.go
+++ b/runtimes/google/ipc/debug_test.go
@@ -16,7 +16,6 @@
"v.io/core/veyron/runtimes/google/ipc/stream/manager"
"v.io/core/veyron/runtimes/google/ipc/stream/vc"
tnaming "v.io/core/veyron/runtimes/google/testing/mocks/naming"
- "v.io/core/veyron/runtimes/google/vtrace"
"v.io/core/veyron/services/mgmt/debug"
)
@@ -31,8 +30,7 @@
pclient.AddToRoots(bclient) // Client recognizes "server" as a root of blessings.
pclient.BlessingStore().Set(bclient, "server") // Client presents bclient to server
- store := vtrace.NewStore(10)
- debugDisp := debug.NewDispatcher(vlog.Log.LogDir(), nil, store)
+ debugDisp := debug.NewDispatcher(vlog.Log.LogDir(), nil)
sm := manager.InternalNew(naming.FixedRoutingID(0x555555555))
defer sm.Shutdown()
diff --git a/runtimes/google/ipc/discharges.go b/runtimes/google/ipc/discharges.go
index 09a1abc..07b9e48 100644
--- a/runtimes/google/ipc/discharges.go
+++ b/runtimes/google/ipc/discharges.go
@@ -4,7 +4,6 @@
"sync"
"v.io/core/veyron/runtimes/google/ipc/stream/vc"
- ivtrace "v.io/core/veyron/runtimes/google/vtrace"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
@@ -65,7 +64,7 @@
}
if ctx != nil {
var span vtrace.Span
- ctx, span = ivtrace.WithNewSpan(ctx, "Fetching Discharges")
+ ctx, span = vtrace.SetNewSpan(ctx, "Fetching Discharges")
defer span.Finish()
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 572f007..ff3f950 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -26,7 +26,9 @@
"v.io/core/veyron2/vdl/vdlutil"
verror "v.io/core/veyron2/verror2"
"v.io/core/veyron2/vlog"
+ "v.io/core/veyron2/vtrace"
+ "v.io/core/veyron/lib/flags"
"v.io/core/veyron/lib/netstate"
"v.io/core/veyron/lib/stats"
"v.io/core/veyron/lib/testutil"
@@ -88,7 +90,8 @@
func testContextWithoutDeadline() *context.T {
ctx := context.NewUninitializedContext(&truntime.PanicRuntime{})
- ctx, _ = ivtrace.WithNewRootSpan(ctx, nil, false)
+ ctx = ivtrace.Init(ctx, flags.VtraceFlags{})
+ ctx, _ = vtrace.SetNewTrace(ctx)
return ctx
}
@@ -199,7 +202,7 @@
func startServerWS(t *testing.T, principal security.Principal, sm stream.Manager, ns naming.Namespace, name string, disp ipc.Dispatcher, shouldUseWebsocket websocketMode, opts ...ipc.ServerOpt) (naming.Endpoint, ipc.Server) {
vlog.VI(1).Info("InternalNewServer")
opts = append(opts, vc.LocalPrincipal{principal})
- server, err := InternalNewServer(testContext(), sm, ns, nil, opts...)
+ server, err := InternalNewServer(testContext(), sm, ns, opts...)
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -705,7 +708,7 @@
func (s *dischargeTestServer) Discharge(ctx ipc.ServerContext, cav vdlutil.Any, impetus security.DischargeImpetus) (vdlutil.Any, error) {
s.impetus = append(s.impetus, impetus)
- s.traceid = append(s.traceid, ivtrace.FromContext(ctx.Context()).Trace().ID())
+ s.traceid = append(s.traceid, vtrace.GetSpan(ctx.Context()).Trace())
return nil, fmt.Errorf("discharges not issued")
}
@@ -824,7 +827,7 @@
}
defer client.Close()
ctx := testContext()
- tid := ivtrace.FromContext(ctx).Trace().ID()
+ tid := vtrace.GetSpan(ctx).Trace()
// StartCall should fetch the discharge, do not worry about finishing the RPC - do not care about that for this test.
if _, err := client.StartCall(ctx, object, "Method", []interface{}{"argument"}); err != nil {
t.Errorf("StartCall(%+v) failed: %v", test.Requirements, err)
@@ -1368,7 +1371,6 @@
testContext(),
sm,
ns,
- nil,
opts...)
if err != nil {
t.Fatal(err)
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 25cbdec..804225c 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -64,8 +64,7 @@
// wasn't 'Added' for this server.
names map[string]struct{}
// TODO(cnicolaou): add roaming stats to ipcStats
- stats *ipcStats // stats for this server.
- traceStore *ivtrace.Store // store for vtrace traces.
+ stats *ipcStats // stats for this server.
}
var _ ipc.Server = (*server)(nil)
@@ -86,8 +85,8 @@
func (PreferredServerResolveProtocols) IPCServerOpt() {}
-func InternalNewServer(ctx *context.T, streamMgr stream.Manager, ns naming.Namespace, store *ivtrace.Store, opts ...ipc.ServerOpt) (ipc.Server, error) {
- ctx, _ = ivtrace.WithNewSpan(ctx, "NewServer")
+func InternalNewServer(ctx *context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
+ ctx, _ = vtrace.SetNewSpan(ctx, "NewServer")
statsPrefix := naming.Join("ipc", "server", "routing-id", streamMgr.RoutingID().String())
s := &server{
ctx: ctx,
@@ -98,7 +97,6 @@
stoppedChan: make(chan struct{}),
ns: ns,
stats: newIPCStats(statsPrefix),
- traceStore: store,
}
var (
principal security.Principal
@@ -655,7 +653,7 @@
func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
s.Lock()
defer s.Unlock()
- ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
+ vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
if s.stopped {
return s.newBadState("ipc.Server.Stop already called")
@@ -678,7 +676,7 @@
func (s *server) AddName(name string) error {
s.Lock()
defer s.Unlock()
- ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
+ vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
if len(name) == 0 {
return s.newBadArg("name is empty")
}
@@ -698,7 +696,7 @@
func (s *server) RemoveName(name string) error {
s.Lock()
defer s.Unlock()
- ivtrace.FromContext(s.ctx).Annotate("Removed name: " + name)
+ vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
if s.stopped {
return s.newBadState("ipc.Server.Stop already called")
}
@@ -892,7 +890,7 @@
results, err := fs.processRequest()
- ivtrace.FromContext(fs.T).Finish()
+ vtrace.GetSpan(fs.T).Finish()
var traceResponse vtrace.Response
if fs.allowDebug {
@@ -962,7 +960,7 @@
if verr != nil {
// We don't know what the ipc call was supposed to be, but we'll create
// a placeholder span so we can capture annotations.
- fs.T, _ = ivtrace.WithNewSpan(fs.T, fmt.Sprintf("\"%s\".UNKNOWN", fs.Name()))
+ fs.T, _ = vtrace.SetNewSpan(fs.T, fmt.Sprintf("\"%s\".UNKNOWN", fs.Name()))
return nil, verr
}
fs.method = req.Method
@@ -972,7 +970,7 @@
// on the server even if they will not be allowed to collect the
// results later. This might be considered a DOS vector.
spanName := fmt.Sprintf("\"%s\".%s", fs.Name(), fs.Method())
- fs.T, _ = ivtrace.WithContinuedSpan(fs.T, spanName, req.TraceRequest, fs.server.traceStore)
+ fs.T, _ = ivtrace.SetContinuedSpan(fs.T, spanName, req.TraceRequest)
var cancel context.CancelFunc
if req.Timeout != ipc.NoTimeout {
diff --git a/runtimes/google/ipc/stream/vc/vc.go b/runtimes/google/ipc/stream/vc/vc.go
index e9521b5..d273830 100644
--- a/runtimes/google/ipc/stream/vc/vc.go
+++ b/runtimes/google/ipc/stream/vc/vc.go
@@ -16,7 +16,6 @@
"v.io/core/veyron/runtimes/google/lib/bqueue"
"v.io/core/veyron/runtimes/google/lib/iobuf"
vsync "v.io/core/veyron/runtimes/google/lib/sync"
- ivtrace "v.io/core/veyron/runtimes/google/vtrace"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc/stream"
@@ -399,7 +398,7 @@
}
if ctx != nil {
var span vtrace.Span
- ctx, span = ivtrace.WithNewSpan(ctx, "vc.HandshakeDialedVC")
+ ctx, span = vtrace.SetNewSpan(ctx, "vc.HandshakeDialedVC")
defer span.Finish()
}
// If noDischarge is provided, disable the dischargeClient.
diff --git a/runtimes/google/ipc/stream/vif/vif.go b/runtimes/google/ipc/stream/vif/vif.go
index 0d68a45..3b51d61 100644
--- a/runtimes/google/ipc/stream/vif/vif.go
+++ b/runtimes/google/ipc/stream/vif/vif.go
@@ -25,7 +25,6 @@
"v.io/core/veyron/runtimes/google/lib/pcqueue"
vsync "v.io/core/veyron/runtimes/google/lib/sync"
"v.io/core/veyron/runtimes/google/lib/upcqueue"
- ivtrace "v.io/core/veyron/runtimes/google/vtrace"
"v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/verror"
@@ -115,7 +114,7 @@
}
if ctx != nil {
var span vtrace.Span
- ctx, span = ivtrace.WithNewSpan(ctx, "InternalNewDialedVIF")
+ ctx, span = vtrace.SetNewSpan(ctx, "InternalNewDialedVIF")
span.Annotatef("(%v, %v)", conn.RemoteAddr().Network(), conn.RemoteAddr())
defer span.Finish()
}
diff --git a/runtimes/google/lib/publisher/publisher_test.go b/runtimes/google/lib/publisher/publisher_test.go
index a82e4ca..15f4655 100644
--- a/runtimes/google/lib/publisher/publisher_test.go
+++ b/runtimes/google/lib/publisher/publisher_test.go
@@ -8,16 +8,19 @@
"v.io/core/veyron2/context"
"v.io/core/veyron2/naming"
+ "v.io/core/veyron2/vtrace"
+ "v.io/core/veyron/lib/flags"
"v.io/core/veyron/runtimes/google/lib/publisher"
tnaming "v.io/core/veyron/runtimes/google/testing/mocks/naming"
"v.io/core/veyron/runtimes/google/testing/mocks/runtime"
- "v.io/core/veyron/runtimes/google/vtrace"
+ ivtrace "v.io/core/veyron/runtimes/google/vtrace"
)
func testContext() *context.T {
ctx := context.NewUninitializedContext(&runtime.PanicRuntime{})
- ctx, _ = vtrace.WithNewSpan(ctx, "")
+ ctx = ivtrace.Init(ctx, flags.VtraceFlags{})
+ ctx, _ = vtrace.SetNewSpan(ctx, "")
ctx, _ = context.WithDeadline(ctx, time.Now().Add(20*time.Second))
return ctx
}
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index 5c2d452..c7ac058 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "math/rand"
"time"
iipc "v.io/core/veyron/runtimes/google/ipc"
@@ -48,22 +47,11 @@
ctx := context.NewUninitializedContext(rt)
ctx = i18n.ContextWithLangID(ctx, rt.lang)
ctx = verror2.ContextWithComponentName(ctx, rt.program)
-
- sr := rt.flags.Vtrace.SampleRate
- forceCollect := sr > 0.0 && (sr >= 1.0 || rand.Float64() < sr)
- ctx, _ = ivtrace.WithNewRootSpan(ctx, rt.traceStore, forceCollect)
-
+ ctx = ivtrace.DeprecatedInit(ctx, rt.traceStore)
+ ctx, _ = vtrace.SetNewTrace(ctx)
return rt.initRuntimeXContext(ctx)
}
-func (rt *vrt) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) {
- return ivtrace.WithNewSpan(ctx, name)
-}
-
-func (rt *vrt) SpanFromContext(ctx *context.T) vtrace.Span {
- return ivtrace.FromContext(ctx)
-}
-
func (rt *vrt) NewServer(opts ...ipc.ServerOpt) (ipc.Server, error) {
rt.mu.Lock()
// Create a new RoutingID (and StreamManager) for each server, except
@@ -109,7 +97,7 @@
otherOpts = append(otherOpts, iipc.PreferredServerResolveProtocols(rt.preferredProtocols))
}
ctx := rt.NewContext()
- return iipc.InternalNewServer(ctx, sm, ns, rt.traceStore, otherOpts...)
+ return iipc.InternalNewServer(ctx, sm, ns, otherOpts...)
}
func (rt *vrt) NewStreamManager(opts ...stream.ManagerOpt) (stream.Manager, error) {
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index cea3656..b616b79 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -73,7 +73,7 @@
lang: i18n.LangIDFromEnv(),
program: filepath.Base(os.Args[0]),
flags: flags,
- traceStore: ivtrace.NewStore(flags.Vtrace.CacheSize),
+ traceStore: ivtrace.NewStore(flags.Vtrace),
}
for _, o := range opts {
diff --git a/runtimes/google/rt/runtimex.go b/runtimes/google/rt/runtimex.go
index 6af8e88..350a93b 100644
--- a/runtimes/google/rt/runtimex.go
+++ b/runtimes/google/rt/runtimex.go
@@ -14,7 +14,6 @@
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
"v.io/core/veyron2/vlog"
- "v.io/core/veyron2/vtrace"
//iipc "v.io/core/veyron/runtimes/google/ipc"
iipc "v.io/core/veyron/runtimes/google/ipc"
@@ -22,7 +21,6 @@
"v.io/core/veyron/runtimes/google/ipc/stream/vc"
inaming "v.io/core/veyron/runtimes/google/naming"
"v.io/core/veyron/runtimes/google/naming/namespace"
- ivtrace "v.io/core/veyron/runtimes/google/vtrace"
)
type contextKey int
@@ -33,7 +31,6 @@
namespaceKey
loggerKey
principalKey
- vtraceKey
reservedNameKey
profileKey
appCycleKey
@@ -58,7 +55,6 @@
ctx = context.WithValue(ctx, namespaceKey, rt.ns)
ctx = context.WithValue(ctx, loggerKey, vlog.Log)
ctx = context.WithValue(ctx, principalKey, rt.principal)
- ctx = context.WithValue(ctx, vtraceKey, rt.traceStore)
ctx = context.WithValue(ctx, publisherKey, rt.publisher)
ctx = context.WithValue(ctx, profileKey, rt.profile)
ctx = context.WithValue(ctx, appCycleKey, rt.ac)
@@ -104,8 +100,7 @@
// TODO(mattr): We used to get rt.preferredprotocols here, should we
// attach these to the context directly?
- traceStore, _ := ctx.Value(vtraceKey).(*ivtrace.Store)
- server, err := iipc.InternalNewServer(ctx, sm, ns, traceStore, otherOpts...)
+ server, err := iipc.InternalNewServer(ctx, sm, ns, otherOpts...)
if done := ctx.Done(); err == nil && done != nil {
// Arrange to clean up the server when the parent context is canceled.
// TODO(mattr): Should we actually do this? Or just have users clean
@@ -218,14 +213,6 @@
return cl
}
-func (*RuntimeX) SetNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) {
- return ivtrace.WithNewSpan(ctx, name)
-}
-
-func (*RuntimeX) GetSpan(ctx *context.T) vtrace.Span {
- return ivtrace.FromContext(ctx)
-}
-
func (*RuntimeX) setNewNamespace(ctx *context.T, roots ...string) (*context.T, naming.Namespace, error) {
ns, err := namespace.New(roots...)
if err == nil {
@@ -267,11 +254,6 @@
return logger
}
-func (*RuntimeX) GetVtraceStore(ctx *context.T) vtrace.Store {
- traceStore, _ := ctx.Value(vtraceKey).(vtrace.Store)
- return traceStore
-}
-
type reservedNameDispatcher struct {
dispatcher ipc.Dispatcher
opts []ipc.ServerOpt
diff --git a/runtimes/google/testing/mocks/runtime/panic_runtime.go b/runtimes/google/testing/mocks/runtime/panic_runtime.go
index 84b8d8b..03a6ec9 100644
--- a/runtimes/google/testing/mocks/runtime/panic_runtime.go
+++ b/runtimes/google/testing/mocks/runtime/panic_runtime.go
@@ -32,7 +32,7 @@
func (*PanicRuntime) Client() ipc.Client { panic(badRuntime) }
func (*PanicRuntime) NewContext() *context.T { panic(badRuntime) }
-func (PanicRuntime) WithNewSpan(c *context.T, m string) (*context.T, vtrace.Span) { return c, &span{m} }
+func (PanicRuntime) SetNewSpan(c *context.T, m string) (*context.T, vtrace.Span) { return c, &span{m} }
func (*PanicRuntime) SpanFromContext(*context.T) vtrace.Span { return &span{} }
func (*PanicRuntime) NewStreamManager(opts ...stream.ManagerOpt) (stream.Manager, error) {
@@ -58,4 +58,5 @@
func (*span) Annotate(string) {}
func (*span) Annotatef(string, ...interface{}) {}
func (*span) Finish() {}
-func (*span) Trace() vtrace.Trace { return nil }
+func (*span) Trace() uniqueid.ID { return uniqueid.ID{} }
+func (*span) ForceCollect() {}
diff --git a/runtimes/google/vtrace/collector.go b/runtimes/google/vtrace/collector.go
deleted file mode 100644
index ab7000c..0000000
--- a/runtimes/google/vtrace/collector.go
+++ /dev/null
@@ -1,171 +0,0 @@
-package vtrace
-
-import (
- "sync"
- "time"
-
- "v.io/core/veyron2/context"
- "v.io/core/veyron2/uniqueid"
- "v.io/core/veyron2/vtrace"
-)
-
-func copySpanRecord(in *vtrace.SpanRecord) *vtrace.SpanRecord {
- return &vtrace.SpanRecord{
- ID: in.ID,
- Parent: in.Parent,
- Name: in.Name,
- Start: in.Start,
- End: in.End,
- Annotations: append([]vtrace.Annotation{}, in.Annotations...),
- }
-}
-
-// collectors collect spans and annotations for output or analysis.
-// collectors are safe to use from multiple goroutines simultaneously.
-// TODO(mattr): collector should support log-based collection
-// as well as in-memory collection.
-type collector struct {
- traceID uniqueid.ID
- store *Store
- mu sync.Mutex
- method vtrace.TraceMethod // GUARDED_BY(mu)
- spans map[uniqueid.ID]*vtrace.SpanRecord // GUARDED_BY(mu)
-}
-
-// newCollector returns a new collector for the given traceID.
-func newCollector(traceID uniqueid.ID, store *Store) *collector {
- return &collector{
- traceID: traceID,
- method: vtrace.None,
- store: store,
- }
-}
-
-// ID returns the ID of the trace this collector is collecting for.
-func (c *collector) ID() uniqueid.ID {
- return c.traceID
-}
-
-// ForceCollect turns on collection for this trace. If collection
-// is already active, this does nothing.
-func (c *collector) ForceCollect() {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.method != vtrace.InMemory {
- c.method = vtrace.InMemory
- c.spans = make(map[uniqueid.ID]*vtrace.SpanRecord)
- }
- if c.store != nil {
- c.store.Consider(c)
- }
-}
-
-func (c *collector) spanRecordLocked(s *span) *vtrace.SpanRecord {
- sid := s.ID()
- record, ok := c.spans[sid]
- if !ok {
- record = &vtrace.SpanRecord{
- ID: sid,
- Parent: s.parent,
- Name: s.name,
- Start: s.start.UnixNano(),
- }
- c.spans[sid] = record
- }
- if c.store != nil {
- c.store.Consider(c)
- }
- return record
-}
-
-// start records the fact that a given span has begun.
-func (c *collector) start(s *span) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if c.method == vtrace.InMemory {
- // Note that simply fetching the record is enough since
- // if the record does not exist we will created it according
- // to the start time in s.
- c.spanRecordLocked(s)
- }
-}
-
-// finish records the time that a span finished.
-func (c *collector) finish(s *span) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if c.method == vtrace.InMemory {
- record := c.spanRecordLocked(s)
- // TODO(mattr): Perhaps we should log an error if we have already been finished?
- record.End = time.Now().UnixNano()
- }
-}
-
-// annotate adds a span annotation to the collection.
-func (c *collector) annotate(s *span, msg string) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if c.method == vtrace.InMemory {
- record := c.spanRecordLocked(s)
- record.Annotations = append(record.Annotations, vtrace.Annotation{
- When: time.Now().UnixNano(),
- Message: msg,
- })
- }
-}
-
-// response computes a vtrace.Response for the current trace.
-func (c *collector) response() vtrace.Response {
- c.mu.Lock()
- defer c.mu.Unlock()
- return vtrace.Response{
- Method: c.method,
- Trace: c.traceRecordLocked(),
- }
-}
-
-// Record computes a vtrace.TraceRecord containing all annotations
-// collected so far.
-func (c *collector) Record() vtrace.TraceRecord {
- c.mu.Lock()
- defer c.mu.Unlock()
- return c.traceRecordLocked()
-}
-
-func (c *collector) traceRecordLocked() vtrace.TraceRecord {
- spans := make([]vtrace.SpanRecord, 0, len(c.spans))
- for _, span := range c.spans {
- spans = append(spans, *copySpanRecord(span))
- }
- return vtrace.TraceRecord{
- ID: c.traceID,
- Spans: spans,
- }
-}
-
-// merge merges a vtrace.Response into the current trace.
-func (c *collector) merge(parent vtrace.Span, t *vtrace.Response) {
- if t.Method == vtrace.InMemory {
- c.ForceCollect()
- }
- c.mu.Lock()
- defer c.mu.Unlock()
-
- // TODO(mattr): We need to carefully merge here to correct for
- // clock skew and ordering. We should estimate the clock skew
- // by assuming that children of parent need to start after parent
- // and end before now.
- for _, span := range t.Trace.Spans {
- c.spans[span.ID] = copySpanRecord(&span)
- }
-}
-
-// MergeResponse merges a vtrace.Response into the current trace.
-func MergeResponse(ctx *context.T, response *vtrace.Response) {
- if span := getSpan(ctx); span != nil {
- span.collector.merge(span, response)
- }
-}
diff --git a/runtimes/google/vtrace/store.go b/runtimes/google/vtrace/store.go
index 5d06804..682b532 100644
--- a/runtimes/google/vtrace/store.go
+++ b/runtimes/google/vtrace/store.go
@@ -1,10 +1,15 @@
package vtrace
import (
+ "math/rand"
"sync"
+ "time"
+ "v.io/core/veyron2/context"
"v.io/core/veyron2/uniqueid"
"v.io/core/veyron2/vtrace"
+
+ "v.io/core/veyron/lib/flags"
)
// Store implements a store for traces. The idea is to keep all the
@@ -17,44 +22,114 @@
// specifically tell us to capture a specific trace. LRU will work OK
// for many testing scenarios and low volume applications.
type Store struct {
- size int
+ opts flags.VtraceFlags
// traces and head together implement a linked-hash-map.
// head points to the head and tail of the doubly-linked-list
- // of recently used items (the tail is the LRU traceSet).
+ // of recently used items (the tail is the LRU traceStore).
+ // TODO(mattr): Use rwmutex.
mu sync.Mutex
- traces map[uniqueid.ID]*traceSet // GUARDED_BY(mu)
- head *traceSet // GUARDED_BY(mu)
+ traces map[uniqueid.ID]*traceStore // GUARDED_BY(mu)
+ head *traceStore // GUARDED_BY(mu)
}
-// NewStore creates a new store that will keep a maximum of size
-// traces in memory.
-// TODO(mattr): Traces can be of widely varying size, we should have
-// some better measurement then just number of traces.
-func NewStore(size int) *Store {
- head := &traceSet{}
+// NewStore creates a new store according to the passed in opts.
+func NewStore(opts flags.VtraceFlags) *Store {
+ head := &traceStore{}
head.next, head.prev = head, head
return &Store{
- size: size,
- traces: make(map[uniqueid.ID]*traceSet),
+ opts: opts,
+ traces: make(map[uniqueid.ID]*traceStore),
head: head,
}
}
-// Consider should be called whenever an interesting change happens to
-// a trace the store will decide whether to keep it or not.
-func (s *Store) Consider(trace vtrace.Trace) {
+func (s *Store) ForceCollect(id uniqueid.ID) {
+ s.mu.Lock()
+ s.forceCollectLocked(id)
+ s.mu.Unlock()
+}
+
+func (s *Store) forceCollectLocked(id uniqueid.ID) *traceStore {
+ ts := s.traces[id]
+ if ts == nil {
+ ts = newTraceStore(id)
+ s.traces[id] = ts
+ ts.moveAfter(s.head)
+ // Trim elements beyond our size limit.
+ for len(s.traces) > s.opts.CacheSize {
+ el := s.head.prev
+ el.removeFromList()
+ delete(s.traces, el.id)
+ }
+ }
+ return ts
+}
+
+// Merge merges a vtrace.Response into the current store.
+func (s *Store) merge(t vtrace.Response) {
s.mu.Lock()
defer s.mu.Unlock()
- set := s.traces[trace.ID()]
- if set == nil {
- set = newTraceSet()
- s.traces[trace.ID()] = set
+
+ var ts *traceStore
+ if t.Method == vtrace.InMemory {
+ ts = s.forceCollectLocked(t.Trace.ID)
+ } else {
+ ts = s.traces[t.Trace.ID]
}
- set.add(trace)
- set.moveAfter(s.head)
- s.trimLocked()
+ if ts != nil {
+ ts.merge(t.Trace.Spans)
+ }
+}
+
+// annotate stores an annotation for the trace if it is being collected.
+func (s *Store) annotate(span *span, msg string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if ts := s.traces[span.Trace()]; ts != nil {
+ ts.annotate(span, msg)
+ ts.moveAfter(s.head)
+ }
+}
+
+// start stores data about a starting span if the trace is being collected.
+func (s *Store) start(span *span) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ var ts *traceStore
+ sr := s.opts.SampleRate
+ // If this is a root span, we may automatically sample it for collection.
+ if span.trace == span.parent && sr > 0.0 && (sr >= 1.0 || rand.Float64() < sr) {
+ ts = s.forceCollectLocked(span.Trace())
+ } else {
+ ts = s.traces[span.Trace()]
+ }
+ if ts != nil {
+ ts.start(span)
+ ts.moveAfter(s.head)
+ }
+}
+
+// finish stores data about a finished span if the trace is being collected.
+func (s *Store) finish(span *span) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if ts := s.traces[span.Trace()]; ts != nil {
+ ts.finish(span)
+ ts.moveAfter(s.head)
+ }
+}
+
+// method returns the collection method for the given trace.
+func (s *Store) method(id uniqueid.ID) vtrace.TraceMethod {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if ts := s.traces[id]; ts != nil {
+ return vtrace.InMemory
+ }
+ return vtrace.None
}
// TraceRecords returns TraceRecords for all traces saved in the store.
@@ -76,42 +151,70 @@
func (s *Store) TraceRecord(id uniqueid.ID) *vtrace.TraceRecord {
s.mu.Lock()
defer s.mu.Unlock()
+ out := &vtrace.TraceRecord{}
ts := s.traces[id]
- if ts == nil {
- return nil
+ if ts != nil {
+ ts.traceRecord(out)
}
- out := vtrace.TraceRecord{}
- ts.traceRecord(&out)
- return &out
+ return out
}
-// trimLocked removes elements from the store LRU first until we are
-// below Store.size.
-func (s *Store) trimLocked() {
- for len(s.traces) > s.size {
- el := s.head.prev
- el.removeFromList()
- delete(s.traces, el.id())
+type traceStore struct {
+ id uniqueid.ID
+ spans map[uniqueid.ID]*vtrace.SpanRecord
+ prev, next *traceStore
+}
+
+func newTraceStore(id uniqueid.ID) *traceStore {
+ return &traceStore{
+ id: id,
+ spans: make(map[uniqueid.ID]*vtrace.SpanRecord),
}
}
-// We need to capture traceSets because a single trace can reach this
-// server along multiple paths. Consider a client that calls this
-// server twice in the same operation.
-type traceSet struct {
- elts map[vtrace.Trace]bool
- prev, next *traceSet
+func (ts *traceStore) record(s *span) *vtrace.SpanRecord {
+ record, ok := ts.spans[s.id]
+ if !ok {
+ record = &vtrace.SpanRecord{
+ ID: s.id,
+ Parent: s.parent,
+ Name: s.name,
+ Start: s.start.UnixNano(),
+ }
+ ts.spans[s.id] = record
+ }
+ return record
}
-func newTraceSet() *traceSet {
- return &traceSet{elts: make(map[vtrace.Trace]bool)}
+func (ts *traceStore) annotate(s *span, msg string) {
+ record := ts.record(s)
+ record.Annotations = append(record.Annotations, vtrace.Annotation{
+ When: time.Now().UnixNano(),
+ Message: msg,
+ })
}
-func (ts *traceSet) add(trace vtrace.Trace) {
- ts.elts[trace] = true
+func (ts *traceStore) start(s *span) {
+ ts.record(s)
}
-func (ts *traceSet) removeFromList() {
+func (ts *traceStore) finish(s *span) {
+ ts.record(s).End = time.Now().UnixNano()
+}
+
+func (ts *traceStore) merge(spans []vtrace.SpanRecord) {
+ // TODO(mattr): We need to carefully merge here to correct for
+ // clock skew and ordering. We should estimate the clock skew
+ // by assuming that children of parent need to start after parent
+ // and end before now.
+ for _, span := range spans {
+ if ts.spans[span.ID] == nil {
+ ts.spans[span.ID] = copySpanRecord(&span)
+ }
+ }
+}
+
+func (ts *traceStore) removeFromList() {
if ts.prev != nil {
ts.prev.next = ts.next
}
@@ -122,7 +225,7 @@
ts.prev = nil
}
-func (ts *traceSet) moveAfter(prev *traceSet) {
+func (ts *traceStore) moveAfter(prev *traceStore) {
ts.removeFromList()
ts.prev = prev
ts.next = prev.next
@@ -130,30 +233,27 @@
prev.next = ts
}
-func (ts *traceSet) id() uniqueid.ID {
- for e := range ts.elts {
- return e.ID()
+func copySpanRecord(in *vtrace.SpanRecord) *vtrace.SpanRecord {
+ return &vtrace.SpanRecord{
+ ID: in.ID,
+ Parent: in.Parent,
+ Name: in.Name,
+ Start: in.Start,
+ End: in.End,
+ Annotations: append([]vtrace.Annotation{}, in.Annotations...),
}
- panic("unreachable")
}
-func (ts *traceSet) traceRecord(out *vtrace.TraceRecord) {
- // It is possible to have duplicate copies of spans. Consider the
- // case where a server calls itself (even indirectly) we'll have one
- // Trace in the set for the parent call and one Trace in the set for
- // the decendant. The two records will be exactly the same we
- // therefore de-dup here.
- spans := make(map[uniqueid.ID]bool)
-
- for e, _ := range ts.elts {
- record := e.Record()
- out.ID = record.ID
- for _, span := range record.Spans {
- if spans[span.ID] {
- continue
- }
- spans[span.ID] = true
- out.Spans = append(out.Spans, span)
- }
+func (ts *traceStore) traceRecord(out *vtrace.TraceRecord) {
+ spans := make([]vtrace.SpanRecord, 0, len(ts.spans))
+ for _, span := range ts.spans {
+ spans = append(spans, *copySpanRecord(span))
}
+ out.ID = ts.id
+ out.Spans = spans
+}
+
+// Merge merges a vtrace.Response into the current store.
+func Merge(ctx *context.T, t vtrace.Response) {
+ getStore(ctx).merge(t)
}
diff --git a/runtimes/google/vtrace/store_test.go b/runtimes/google/vtrace/store_test.go
index 6d76557..3aac93a 100644
--- a/runtimes/google/vtrace/store_test.go
+++ b/runtimes/google/vtrace/store_test.go
@@ -3,10 +3,13 @@
import (
"encoding/binary"
"reflect"
+ "sort"
"testing"
"v.io/core/veyron2/uniqueid"
"v.io/core/veyron2/vtrace"
+
+ "v.io/core/veyron/lib/flags"
)
var nextid = uint64(1)
@@ -18,11 +21,12 @@
return out
}
-func makeTraces(n int, st *Store) []vtrace.Trace {
- traces := make([]vtrace.Trace, n)
+func makeTraces(n int, st *Store) []uniqueid.ID {
+ traces := make([]uniqueid.ID, n)
for i := range traces {
- traces[i] = newCollector(id(), st)
- traces[i].ForceCollect()
+ curid := id()
+ traces[i] = curid
+ st.ForceCollect(curid)
}
return traces
}
@@ -35,48 +39,51 @@
return out
}
-func traceids(traces ...vtrace.Trace) map[uniqueid.ID]bool {
+func traceids(traces ...uniqueid.ID) map[uniqueid.ID]bool {
out := make(map[uniqueid.ID]bool)
for _, trace := range traces {
- out[trace.ID()] = true
+ out[trace] = true
}
return out
}
-func TestConsiderAndTrim(t *testing.T) {
- st := NewStore(5)
+func pretty(in map[uniqueid.ID]bool) []int {
+ out := make([]int, 0, len(in))
+ for k, _ := range in {
+ out = append(out, int(k[15]))
+ }
+ sort.Ints(out)
+ return out
+}
+
+func compare(t *testing.T, want map[uniqueid.ID]bool, records []vtrace.TraceRecord) {
+ got := recordids(records...)
+ if !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Got %v, want %v.", pretty(got), pretty(want))
+ }
+}
+
+func TestTrimming(t *testing.T) {
+ st := NewStore(flags.VtraceFlags{CacheSize: 5})
traces := makeTraces(10, st)
- records := st.TraceRecords()
- if want, got := traceids(traces[5:]...), recordids(records...); !reflect.DeepEqual(want, got) {
- t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
- }
+ compare(t, traceids(traces[5:]...), st.TraceRecords())
- // Starting a new span on one of the traces should bring it back into the stored set.
- traces[2].(*collector).start(&span{id: id()})
- records = st.TraceRecords()
- if want, got := traceids(traces[2], traces[6], traces[7], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
- t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
- }
+ traces = append(traces, id(), id(), id())
- // Starting a new span on one of the traces should bring it back into the stored set.
- traces[2].(*collector).start(&span{id: id()})
- records = st.TraceRecords()
- if want, got := traceids(traces[2], traces[6], traces[7], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
- t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
- }
+ // Starting a span on an existing trace brings it to the front of the queue
+ // and prevent it from being removed when a new trace begins.
+ st.start(&span{trace: traces[5], id: id()})
+ st.ForceCollect(traces[10])
+ compare(t, traceids(traces[10], traces[5], traces[7], traces[8], traces[9]), st.TraceRecords())
// Finishing a span on one of the traces should bring it back into the stored set.
- traces[3].(*collector).finish(&span{id: id()})
- records = st.TraceRecords()
- if want, got := traceids(traces[3], traces[2], traces[7], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
- t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
- }
+ st.finish(&span{trace: traces[7], id: id()})
+ st.ForceCollect(traces[11])
+ compare(t, traceids(traces[10], traces[11], traces[5], traces[7], traces[9]), st.TraceRecords())
// Annotating a span on one of the traces should bring it back into the stored set.
- traces[4].(*collector).annotate(&span{id: id()}, "hello")
- records = st.TraceRecords()
- if want, got := traceids(traces[4], traces[3], traces[2], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
- t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
- }
+ st.annotate(&span{trace: traces[9], id: id()}, "hello")
+ st.ForceCollect(traces[12])
+ compare(t, traceids(traces[10], traces[11], traces[12], traces[7], traces[9]), st.TraceRecords())
}
diff --git a/runtimes/google/vtrace/vtrace.go b/runtimes/google/vtrace/vtrace.go
index 7493f56..5eef4c7 100644
--- a/runtimes/google/vtrace/vtrace.go
+++ b/runtimes/google/vtrace/vtrace.go
@@ -11,52 +11,61 @@
"v.io/core/veyron2/uniqueid"
"v.io/core/veyron2/vlog"
"v.io/core/veyron2/vtrace"
+
+ "v.io/core/veyron/lib/flags"
)
// A span represents an annotated period of time.
type span struct {
- id uniqueid.ID
- parent uniqueid.ID
- name string
- collector *collector
- start time.Time
+ id uniqueid.ID
+ parent uniqueid.ID
+ name string
+ trace uniqueid.ID
+ start time.Time
+ store *Store
}
-func newSpan(parent uniqueid.ID, name string, collector *collector) *span {
+func newSpan(parent uniqueid.ID, name string, trace uniqueid.ID, store *Store) *span {
id, err := uniqueid.Random()
if err != nil {
vlog.Errorf("vtrace: Couldn't generate Span ID, debug data may be lost: %v", err)
}
s := &span{
- id: id,
- parent: parent,
- name: name,
- collector: collector,
- start: time.Now(),
+ id: id,
+ parent: parent,
+ name: name,
+ trace: trace,
+ start: time.Now(),
+ store: store,
}
- collector.start(s)
+ store.start(s)
return s
}
-func (c *span) ID() uniqueid.ID { return c.id }
-func (c *span) Parent() uniqueid.ID { return c.parent }
-func (c *span) Name() string { return c.name }
-func (c *span) Trace() vtrace.Trace { return c.collector }
-func (c *span) Annotate(s string) {
- c.collector.annotate(c, s)
+func (s *span) ID() uniqueid.ID { return s.id }
+func (s *span) Parent() uniqueid.ID { return s.parent }
+func (s *span) Name() string { return s.name }
+func (s *span) Trace() uniqueid.ID { return s.trace }
+func (s *span) Annotate(msg string) {
+ s.store.annotate(s, msg)
}
-func (c *span) Annotatef(format string, a ...interface{}) {
- c.collector.annotate(c, fmt.Sprintf(format, a...))
+func (s *span) Annotatef(format string, a ...interface{}) {
+ s.store.annotate(s, fmt.Sprintf(format, a...))
}
-func (c *span) Finish() { c.collector.finish(c) }
+func (s *span) Finish() {
+ s.store.finish(s)
+}
+func (s *span) method() vtrace.TraceMethod {
+ return s.store.method(s.trace)
+}
// Request generates a vtrace.Request from the active Span.
func Request(ctx *context.T) vtrace.Request {
if span := getSpan(ctx); span != nil {
return vtrace.Request{
SpanID: span.id,
- TraceID: span.collector.traceID,
- Method: span.collector.method,
+ TraceID: span.trace,
+ Method: span.method(),
}
}
return vtrace.Request{}
@@ -65,57 +74,105 @@
// Response captures the vtrace.Response for the active Span.
func Response(ctx *context.T) vtrace.Response {
if span := getSpan(ctx); span != nil {
- return span.collector.response()
+ return vtrace.Response{
+ Method: span.method(),
+ Trace: *span.store.TraceRecord(span.trace),
+ }
}
return vtrace.Response{}
}
-// spanKey is uses to store and retrieve spans inside a context.T objects.
-type spanKey struct{}
-
// ContinuedSpan creates a span that represents a continuation of a trace from
// a remote server. name is a user readable string that describes the context
// and req contains the parameters needed to connect this span with it's trace.
-func WithContinuedSpan(ctx *context.T, name string, req vtrace.Request, store *Store) (*context.T, vtrace.Span) {
- newSpan := newSpan(req.SpanID, name, newCollector(req.TraceID, store))
+func SetContinuedSpan(ctx *context.T, name string, req vtrace.Request) (*context.T, vtrace.Span) {
+ st := getStore(ctx)
if req.Method == vtrace.InMemory {
- newSpan.collector.ForceCollect()
+ st.ForceCollect(req.TraceID)
}
- return context.WithValue(ctx, spanKey{}, newSpan), newSpan
+ newSpan := newSpan(req.SpanID, name, req.TraceID, st)
+ return context.WithValue(ctx, spanKey, newSpan), newSpan
}
-func WithNewRootSpan(ctx *context.T, store *Store, forceCollect bool) (*context.T, vtrace.Span) {
+type contextKey int
+
+const (
+ storeKey = contextKey(iota)
+ spanKey
+)
+
+// Manager allows you to create new traces and spans and access the
+// vtrace store.
+type manager struct{}
+
+// SetNewTrace creates a new vtrace context that is not the child of any
+// other span. This is useful when starting operations that are
+// disconnected from the activity ctx is performing. For example
+// this might be used to start background tasks.
+func (m manager) SetNewTrace(ctx *context.T) (*context.T, vtrace.Span) {
id, err := uniqueid.Random()
if err != nil {
vlog.Errorf("vtrace: Couldn't generate Trace ID, debug data may be lost: %v", err)
}
- col := newCollector(id, store)
- if forceCollect {
- col.ForceCollect()
- }
- s := newSpan(id, "", col)
+ s := newSpan(id, "", id, getStore(ctx))
- return context.WithValue(ctx, spanKey{}, s), s
+ return context.WithValue(ctx, spanKey, s), s
}
-// NewSpan creates a new span.
-func WithNewSpan(parent *context.T, name string) (*context.T, vtrace.Span) {
- if curSpan := getSpan(parent); curSpan != nil {
- s := newSpan(curSpan.ID(), name, curSpan.collector)
- return context.WithValue(parent, spanKey{}, s), s
+// SetNewSpan derives a context with a new Span that can be used to
+// trace and annotate operations across process boundaries.
+func (m manager) SetNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) {
+ if curSpan := getSpan(ctx); curSpan != nil {
+ if curSpan.store == nil {
+ panic("nil store")
+ }
+ s := newSpan(curSpan.ID(), name, curSpan.trace, curSpan.store)
+ return context.WithValue(ctx, spanKey, s), s
}
vlog.Error("vtrace: Creating a new child span from context with no existing span.")
- return WithNewRootSpan(parent, nil, false)
+ return m.SetNewTrace(ctx)
}
+// Span finds the currently active span.
+func (m manager) GetSpan(ctx *context.T) vtrace.Span {
+ if span := getSpan(ctx); span != nil {
+ return span
+ }
+ return nil
+}
+
+// Store returns the current vtrace.Store.
+func (m manager) GetStore(ctx *context.T) vtrace.Store {
+ if store := getStore(ctx); store != nil {
+ return store
+ }
+ return nil
+}
+
+// getSpan returns the internal span type.
func getSpan(ctx *context.T) *span {
- span, _ := ctx.Value(spanKey{}).(*span)
+ span, _ := ctx.Value(spanKey).(*span)
return span
}
-// GetSpan returns the active span from the context.
-func FromContext(ctx *context.T) vtrace.Span {
- span, _ := ctx.Value(spanKey{}).(vtrace.Span)
- return span
+// GetStore returns the *Store attached to the context.
+func getStore(ctx *context.T) *Store {
+ store, _ := ctx.Value(storeKey).(*Store)
+ return store
+}
+
+// Init initializes vtrace and attaches some state to the context.
+// This should be called by
+func Init(ctx *context.T, opts flags.VtraceFlags) *context.T {
+ ctx = vtrace.WithManager(ctx, manager{})
+ ctx = context.WithValue(ctx, storeKey, NewStore(opts))
+ return ctx
+}
+
+// TODO(mattr): Remove this function once the old Runtime type is deprecated.
+func DeprecatedInit(ctx *context.T, store *Store) *context.T {
+ ctx = vtrace.WithManager(ctx, manager{})
+ ctx = context.WithValue(ctx, storeKey, store)
+ return ctx
}
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
index 398a3ed..f8da2f5 100644
--- a/runtimes/google/vtrace/vtrace_test.go
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -13,6 +13,7 @@
"v.io/core/veyron2/vlog"
"v.io/core/veyron2/vtrace"
+ "v.io/core/veyron/lib/flags"
"v.io/core/veyron/profiles"
iipc "v.io/core/veyron/runtimes/google/ipc"
"v.io/core/veyron/runtimes/google/ipc/stream/manager"
@@ -26,14 +27,16 @@
// so we use a fake one that panics if used. The runtime
// implementation should not ever use the Runtime from a context.
func testContext() *context.T {
- return context.NewUninitializedContext(&truntime.PanicRuntime{})
+ ctx := context.NewUninitializedContext(&truntime.PanicRuntime{})
+ ctx = ivtrace.Init(ctx, flags.VtraceFlags{CacheSize: 100})
+ return ctx
}
func TestNewFromContext(t *testing.T) {
c0 := testContext()
- c1, s1 := ivtrace.WithNewSpan(c0, "s1")
- c2, s2 := ivtrace.WithNewSpan(c1, "s2")
- c3, s3 := ivtrace.WithNewSpan(c2, "s3")
+ c1, s1 := vtrace.SetNewSpan(c0, "s1")
+ c2, s2 := vtrace.SetNewSpan(c1, "s2")
+ c3, s3 := vtrace.SetNewSpan(c2, "s3")
expected := map[*context.T]vtrace.Span{
c0: nil,
c1: s1,
@@ -41,7 +44,7 @@
c3: s3,
}
for ctx, expectedSpan := range expected {
- if s := ivtrace.FromContext(ctx); s != expectedSpan {
+ if s := vtrace.GetSpan(ctx); s != expectedSpan {
t.Errorf("Wrong span for ctx %v. Got %v, want %v", c0, s, expectedSpan)
}
}
@@ -64,7 +67,7 @@
func (c *testServer) Run(ctx ipc.ServerContext) error {
if c.forceCollect {
- ivtrace.FromContext(ctx.Context()).Trace().ForceCollect()
+ vtrace.ForceCollect(ctx.Context())
}
client, err := iipc.InternalNewClient(c.sm, c.ns)
@@ -73,7 +76,7 @@
return err
}
- ivtrace.FromContext(ctx.Context()).Annotate(c.name + "-begin")
+ vtrace.GetSpan(ctx.Context()).Annotate(c.name + "-begin")
if c.child != "" {
var call ipc.Call
@@ -91,7 +94,7 @@
return outerr
}
}
- ivtrace.FromContext(ctx.Context()).Annotate(c.name + "-end")
+ vtrace.GetSpan(ctx.Context()).Annotate(c.name + "-end")
return nil
}
@@ -236,8 +239,8 @@
// TestCancellationPropagation tests that cancellation propogates along an
// RPC call chain without user intervention.
func TestTraceAcrossRPCs(t *testing.T) {
- ctx, span := ivtrace.WithNewSpan(testContext(), "")
- span.Trace().ForceCollect()
+ ctx, span := vtrace.SetNewSpan(testContext(), "")
+ vtrace.ForceCollect(ctx)
span.Annotate("c0-begin")
runCallChain(t, ctx, false, false)
@@ -251,13 +254,14 @@
"<client>\"c2\".Run",
"\"\".Run: c2-begin, c2-end",
}
- expectSequence(t, span.Trace().Record(), expectedSpans)
+ record := vtrace.GetStore(ctx).TraceRecord(span.Trace())
+ expectSequence(t, *record, expectedSpans)
}
// TestCancellationPropagationLateForce tests that cancellation propogates along an
// RPC call chain when tracing is initiated by someone deep in the call chain.
func TestTraceAcrossRPCsLateForce(t *testing.T) {
- ctx, span := ivtrace.WithNewSpan(testContext(), "")
+ ctx, span := vtrace.SetNewSpan(testContext(), "")
span.Annotate("c0-begin")
runCallChain(t, ctx, false, true)
@@ -271,5 +275,6 @@
"<client>\"c2\".Run",
"\"\".Run: c2-begin, c2-end",
}
- expectSequence(t, span.Trace().Record(), expectedSpans)
+ record := vtrace.GetStore(ctx).TraceRecord(span.Trace())
+ expectSequence(t, *record, expectedSpans)
}
diff --git a/services/mgmt/debug/dispatcher.go b/services/mgmt/debug/dispatcher.go
index fd85e5c..02d10bb 100644
--- a/services/mgmt/debug/dispatcher.go
+++ b/services/mgmt/debug/dispatcher.go
@@ -6,7 +6,6 @@
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/security"
- "v.io/core/veyron2/vtrace"
logreaderimpl "v.io/core/veyron/services/mgmt/logreader/impl"
pprofimpl "v.io/core/veyron/services/mgmt/pprof/impl"
@@ -18,13 +17,12 @@
type dispatcher struct {
logsDir string // The root of the logs directory.
auth security.Authorizer
- store vtrace.Store
}
var _ ipc.Dispatcher = (*dispatcher)(nil)
-func NewDispatcher(logsDir string, authorizer security.Authorizer, store vtrace.Store) *dispatcher {
- return &dispatcher{logsDir, authorizer, store}
+func NewDispatcher(logsDir string, authorizer security.Authorizer) *dispatcher {
+ return &dispatcher{logsDir, authorizer}
}
// The first part of the names of the objects served by this dispatcher.
@@ -57,7 +55,7 @@
case "stats":
return statsimpl.NewStatsService(suffix, 10*time.Second), d.auth, nil
case "vtrace":
- return vtraceimpl.NewVtraceService(d.store), d.auth, nil
+ return vtraceimpl.NewVtraceService(), d.auth, nil
}
return nil, d.auth, nil
}
diff --git a/services/mgmt/debug/dispatcher_test.go b/services/mgmt/debug/dispatcher_test.go
index dac56ab..3c2c3a7 100644
--- a/services/mgmt/debug/dispatcher_test.go
+++ b/services/mgmt/debug/dispatcher_test.go
@@ -33,7 +33,7 @@
if len(logsDir) == 0 {
return "", nil, fmt.Errorf("logs directory missing")
}
- disp := NewDispatcher(logsDir, nil, rt.VtraceStore())
+ disp := NewDispatcher(logsDir, nil)
server, err := rt.NewServer()
if err != nil {
return "", nil, fmt.Errorf("failed to start debug server: %v", err)
@@ -58,7 +58,7 @@
tracedContext := func() *context.T {
ctx := runtime.NewContext()
- vtrace.FromContext(ctx).Trace().ForceCollect()
+ vtrace.ForceCollect(ctx)
return ctx
}
rootName = "debug"
diff --git a/services/mgmt/vtrace/impl/vtrace.go b/services/mgmt/vtrace/impl/vtrace.go
index 9c56e7e..3aa22dd 100644
--- a/services/mgmt/vtrace/impl/vtrace.go
+++ b/services/mgmt/vtrace/impl/vtrace.go
@@ -8,12 +8,11 @@
"v.io/core/veyron2/vtrace"
)
-type vtraceService struct {
- store vtrace.Store
-}
+type vtraceService struct{}
func (v *vtraceService) Trace(ctx ipc.ServerContext, id uniqueid.ID) (vtrace.TraceRecord, error) {
- tr := v.store.TraceRecord(id)
+ store := vtrace.GetStore(ctx.Context())
+ tr := store.TraceRecord(id)
if tr == nil {
return vtrace.TraceRecord{}, verror2.Make(verror2.NoExist, ctx.Context(), "No trace with id %x", id)
}
@@ -23,7 +22,8 @@
func (v *vtraceService) AllTraces(ctx svtrace.StoreAllTracesContext) error {
// TODO(mattr): Consider changing the store to allow us to iterate through traces
// when there are many.
- traces := v.store.TraceRecords()
+ store := vtrace.GetStore(ctx.Context())
+ traces := store.TraceRecords()
for i := range traces {
if err := ctx.SendStream().Send(traces[i]); err != nil {
return err
@@ -32,6 +32,6 @@
return nil
}
-func NewVtraceService(store vtrace.Store) interface{} {
- return svtrace.StoreServer(&vtraceService{store})
+func NewVtraceService() interface{} {
+ return svtrace.StoreServer(&vtraceService{})
}
diff --git a/services/mgmt/vtrace/impl/vtrace_test.go b/services/mgmt/vtrace/impl/vtrace_test.go
index a7e21c8..bf77a95 100644
--- a/services/mgmt/vtrace/impl/vtrace_test.go
+++ b/services/mgmt/vtrace/impl/vtrace_test.go
@@ -29,7 +29,7 @@
if err != nil {
t.Fatalf("Listen failed: %s", err)
}
- if err := server.Serve("", impl.NewVtraceService(runtime.VtraceStore()), nil); err != nil {
+ if err := server.Serve("", impl.NewVtraceService(), nil); err != nil {
t.Fatalf("Serve failed: %s", err)
}
return endpoints[0].String(), server, runtime
@@ -40,10 +40,10 @@
defer server.Stop()
sctx := runtime.NewContext()
- sctx, span := runtime.WithNewSpan(sctx, "The Span")
- span.Trace().ForceCollect()
+ sctx, span := vtrace.SetNewSpan(sctx, "The Span")
+ vtrace.ForceCollect(sctx)
span.Finish()
- id := span.Trace().ID()
+ id := span.Trace()
client := service.StoreClient(naming.JoinAddressName(endpoint, ""))