veyron/runtimes/google/vtrace: Add a store for collecting vtrace traces.
Also optionally allow users to dump all stored traces when the runtime is shut down.
Change-Id: I16e1a19d3bbc21653718445ae00c8a3496189a53
diff --git a/lib/flags/flags.go b/lib/flags/flags.go
index bff6c4b..76e1baf 100644
--- a/lib/flags/flags.go
+++ b/lib/flags/flags.go
@@ -93,9 +93,25 @@
// environment variable. The command line will override the environment.
Credentials string // TODO(cnicolaou): provide flag.Value impl
+ // Vtrace flags control various aspects of Vtrace.
+ Vtrace VtraceFlags
+
namespaceRootsFlag namespaceRootFlagVar
}
+type VtraceFlags struct {
+ // VtraceSampleRate is the rate (from 0.0 - 1.0) at which
+ // vtrace traces started by this process are sampled for collection.
+ SampleRate float64
+
+ // VtraceDumpOnShutdown tells the runtime to dump all stored traces
+ // to Stderr at shutdown if true.
+ DumpOnShutdown bool
+
+ // VtraceCacheSize the number of traces to cache in memory.
+ CacheSize int
+}
+
// ACLFlags contains the values of the ACLFlags flag group.
type ACLFlags struct {
flag aclFlagVar
@@ -127,6 +143,11 @@
fs.Var(&f.namespaceRootsFlag, "veyron.namespace.root", "local namespace root; can be repeated to provided multiple roots")
fs.StringVar(&f.Credentials, "veyron.credentials", creds, "directory to use for storing security credentials")
+
+ fs.Float64Var(&f.Vtrace.SampleRate, "veyron.vtrace.sample_rate", 0.0, "Rate (from 0.0 to 1.0) to sample vtrace traces.")
+ fs.BoolVar(&f.Vtrace.DumpOnShutdown, "veyron.vtrace.dump_on_shutdown", false, "If true, dump all stored traces on runtime shutdown.")
+ fs.IntVar(&f.Vtrace.CacheSize, "veyron.vtrace.cache_size", 1024, "The number of vtrace traces to store in memory.")
+
return f
}
diff --git a/runtimes/google/ipc/cancel_test.go b/runtimes/google/ipc/cancel_test.go
index 0ad572c..0ece305 100644
--- a/runtimes/google/ipc/cancel_test.go
+++ b/runtimes/google/ipc/cancel_test.go
@@ -55,7 +55,7 @@
func makeCanceld(ns naming.Namespace, name, child string) (*canceld, error) {
sm := manager.InternalNew(naming.FixedRoutingID(0x111111111))
ctx := testContext()
- s, err := InternalNewServer(ctx, sm, ns)
+ s, err := InternalNewServer(ctx, sm, ns, nil)
if err != nil {
return nil, err
}
diff --git a/runtimes/google/ipc/context_test.go b/runtimes/google/ipc/context_test.go
index a752156..7b9137c 100644
--- a/runtimes/google/ipc/context_test.go
+++ b/runtimes/google/ipc/context_test.go
@@ -17,7 +17,7 @@
// implementation should not ever use the Runtime from a context.
func testContext() context.T {
ctx := InternalNewContext(&runtime.PanicRuntime{})
- ctx, _ = vtrace.WithNewSpan(ctx, "")
+ ctx, _ = vtrace.WithNewRootSpan(ctx, nil, false)
ctx, _ = ctx.WithDeadline(time.Now().Add(20 * time.Second))
return ctx
}
diff --git a/runtimes/google/ipc/debug_test.go b/runtimes/google/ipc/debug_test.go
index 8286463..054696d 100644
--- a/runtimes/google/ipc/debug_test.go
+++ b/runtimes/google/ipc/debug_test.go
@@ -35,8 +35,7 @@
sm := manager.InternalNew(naming.FixedRoutingID(0x555555555))
defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
-
- server, err := InternalNewServer(testContext(), sm, ns, options.ReservedNameDispatcher{debugDisp}, vc.LocalPrincipal{pserver})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, options.ReservedNameDispatcher{debugDisp}, vc.LocalPrincipal{pserver})
if err != nil {
t.Fatalf("InternalNewServer failed: %v", err)
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index bdbdc1f..0cb0709 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -169,7 +169,7 @@
func startServer(t *testing.T, principal security.Principal, sm stream.Manager, ns naming.Namespace, ts interface{}) (naming.Endpoint, ipc.Server) {
vlog.VI(1).Info("InternalNewServer")
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{principal})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{principal})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -270,7 +270,7 @@
func TestMultipleCallsToServeAndName(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
ns := tnaming.NewSimpleNamespace()
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal()})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal()})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -610,7 +610,7 @@
return vc.LocalPrincipal{pclient}
}
)
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{pserver})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{pserver})
if err != nil {
t.Fatal(err)
}
@@ -995,7 +995,7 @@
a.IP = net.ParseIP("1.1.1.1")
return []ipc.Address{&netstate.AddrIfc{Addr: a}}, nil
}
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal("server")})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -1031,7 +1031,7 @@
paerr := func(_ string, a []ipc.Address) ([]ipc.Address, error) {
return nil, fmt.Errorf("oops")
}
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal("server")})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -1058,7 +1058,7 @@
sm := imanager.InternalNew(naming.FixedRoutingID(0x66666666))
defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
- server, err := InternalNewServer(testContext(), sm, ns, options.VCSecurityNone)
+ server, err := InternalNewServer(testContext(), sm, ns, nil, options.VCSecurityNone)
if err != nil {
t.Fatalf("InternalNewServer failed: %v", err)
}
@@ -1171,6 +1171,7 @@
testContext(),
sm,
ns,
+ nil,
opts...)
if err != nil {
t.Fatal(err)
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 42172dd..454ca88 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -50,7 +50,8 @@
names map[string]struct{}
reservedOpt options.ReservedNameDispatcher
// TODO(cnicolaou): add roaming stats to ipcStats
- stats *ipcStats // stats for this server.
+ stats *ipcStats // stats for this server.
+ traceStore *ivtrace.Store // store for vtrace traces.
}
var _ ipc.Server = (*server)(nil)
@@ -64,7 +65,8 @@
ch chan config.Setting // channel to receive settings over
}
-func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
+func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, store *ivtrace.Store, opts ...ipc.ServerOpt) (ipc.Server, error) {
+ ctx, _ = ivtrace.WithNewSpan(ctx, "NewServer")
s := &server{
ctx: ctx,
streamMgr: streamMgr,
@@ -73,6 +75,7 @@
stoppedChan: make(chan struct{}),
ns: ns,
stats: newIPCStats(naming.Join("ipc", "server", streamMgr.RoutingID().String())),
+ traceStore: store,
}
for _, opt := range opts {
switch opt := opt.(type) {
@@ -542,6 +545,8 @@
func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
s.Lock()
defer s.Unlock()
+ ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
+
if s.stopped {
return errServerStopped
}
@@ -563,6 +568,7 @@
func (s *server) AddName(name string) error {
s.Lock()
defer s.Unlock()
+ ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
if s.stopped {
return errServerStopped
}
@@ -579,6 +585,7 @@
func (s *server) RemoveName(name string) error {
s.Lock()
defer s.Unlock()
+ ivtrace.FromContext(s.ctx).Annotate("Removed name: " + name)
if s.stopped {
return errServerStopped
}
@@ -813,7 +820,7 @@
// on the server even if they will not be allowed to collect the
// results later. This might be considered a DOS vector.
spanName := fmt.Sprintf("\"%s\".%s", fs.Name(), fs.Method())
- fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
+ fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest, fs.server.traceStore)
var cancel context.CancelFunc
if req.Timeout != ipc.NoTimeout {
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index 11959f0..f4e5a80 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -121,7 +121,7 @@
t.Fatal(err)
}
defer client.Close()
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal("server")})
if err != nil {
t.Fatal(err)
}
@@ -195,7 +195,7 @@
func runServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
ns := tnaming.NewSimpleNamespace()
- server, err := InternalNewServer(testContext(), mgr, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ server, err := InternalNewServer(testContext(), mgr, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal("server")})
if err != nil {
return fmt.Errorf("InternalNewServer failed: %v", err)
}
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index 5c115f8..6f3de3b 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -2,6 +2,7 @@
import (
"fmt"
+ "math/rand"
iipc "veyron.io/veyron/veyron/runtimes/google/ipc"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
@@ -47,7 +48,11 @@
ctx := iipc.InternalNewContext(rt)
ctx = i18n.ContextWithLangID(ctx, rt.lang)
ctx = verror2.ContextWithComponentName(ctx, rt.program)
- ctx, _ = ivtrace.WithNewSpan(ctx, "") // Initial span has empty name.
+
+ sr := rt.flags.Vtrace.SampleRate
+ forceCollect := sr > 0.0 && (sr >= 1.0 || rand.Float64() < sr)
+ ctx, _ = ivtrace.WithNewRootSpan(ctx, rt.traceStore, forceCollect)
+
return ctx
}
@@ -98,8 +103,7 @@
otherOpts = append(otherOpts, ropts)
otherOpts = append(otherOpts, rt.reservedOpts...)
}
- ctx := rt.NewContext()
- return iipc.InternalNewServer(ctx, sm, ns, otherOpts...)
+ return iipc.InternalNewServer(rt.NewContext(), sm, ns, rt.traceStore, otherOpts...)
}
func (rt *vrt) NewStreamManager(opts ...stream.ManagerOpt) (stream.Manager, error) {
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index 8e199b5..2cff75d 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -19,9 +19,11 @@
"veyron.io/veyron/veyron/lib/flags"
_ "veyron.io/veyron/veyron/lib/stats/sysstats"
"veyron.io/veyron/veyron/runtimes/google/naming/namespace"
+ ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vtrace"
)
// TODO(caprita): Verrorize this, and maybe move it in the API.
@@ -44,8 +46,9 @@
nServers int // GUARDED_BY(mu)
cleaningUp bool // GUARDED_BY(mu)
- lang i18n.LangID // Language, from environment variables.
- program string // Program name, from os.Args[0].
+ lang i18n.LangID // Language, from environment variables.
+ program string // Program name, from os.Args[0].
+ traceStore *ivtrace.Store // Storage of sampled vtrace traces.
}
var _ veyron2.Runtime = (*vrt)(nil)
@@ -60,11 +63,18 @@
// Implements veyron2/rt.New
func New(opts ...veyron2.ROpt) (veyron2.Runtime, error) {
- rt := &vrt{mgmt: new(mgmtImpl), lang: i18n.LangIDFromEnv(), program: filepath.Base(os.Args[0])}
flagsOnce.Do(func() {
runtimeFlags.Parse(os.Args[1:])
})
- rt.flags = runtimeFlags.RuntimeFlags()
+ flags := runtimeFlags.RuntimeFlags()
+ rt := &vrt{
+ mgmt: new(mgmtImpl),
+ lang: i18n.LangIDFromEnv(),
+ program: filepath.Base(os.Args[0]),
+ flags: flags,
+ traceStore: ivtrace.NewStore(flags.Vtrace.CacheSize),
+ }
+
for _, o := range opts {
switch v := o.(type) {
case options.RuntimePrincipal:
@@ -149,6 +159,10 @@
}
func (rt *vrt) Cleanup() {
+ if rt.flags.Vtrace.DumpOnShutdown {
+ vtrace.FormatTraces(os.Stderr, rt.traceStore.TraceRecords(), nil)
+ }
+
rt.mu.Lock()
if rt.cleaningUp {
rt.mu.Unlock()
diff --git a/runtimes/google/vtrace/collector.go b/runtimes/google/vtrace/collector.go
index 3313384..6c883cc 100644
--- a/runtimes/google/vtrace/collector.go
+++ b/runtimes/google/vtrace/collector.go
@@ -26,16 +26,19 @@
// as well as in-memory collection.
type collector struct {
traceID uniqueid.ID
- method vtrace.TraceMethod
- spans map[uniqueid.ID]*vtrace.SpanRecord
- mu sync.Mutex
+ store *Store
+
+ mu sync.Mutex
+ method vtrace.TraceMethod // GUARDED_BY(mu)
+ spans map[uniqueid.ID]*vtrace.SpanRecord // GUARDED_BY(mu)
}
// newCollector returns a new collector for the given traceID.
-func newCollector(traceID uniqueid.ID) *collector {
+func newCollector(traceID uniqueid.ID, store *Store) *collector {
return &collector{
traceID: traceID,
method: vtrace.None,
+ store: store,
}
}
@@ -53,6 +56,9 @@
c.method = vtrace.InMemory
c.spans = make(map[uniqueid.ID]*vtrace.SpanRecord)
}
+ if c.store != nil {
+ c.store.Consider(c)
+ }
}
func (c *collector) spanRecordLocked(s *span) *vtrace.SpanRecord {
@@ -67,6 +73,9 @@
}
c.spans[sid] = record
}
+ if c.store != nil {
+ c.store.Consider(c)
+ }
return record
}
diff --git a/runtimes/google/vtrace/store.go b/runtimes/google/vtrace/store.go
new file mode 100644
index 0000000..182dea5
--- /dev/null
+++ b/runtimes/google/vtrace/store.go
@@ -0,0 +1,160 @@
+package vtrace
+
+import (
+ "sync"
+
+ "veyron.io/veyron/veyron2/uniqueid"
+ "veyron.io/veyron/veyron2/vtrace"
+)
+
+// Store implements a store for traces. The idea is to keep all the
+// information we have about some subset of traces that pass through
+// the server. For now we just implement an LRU cache, so the least
+// recently started/finished/annotated traces expire after some
+// maximum trace count is reached.
+// TODO(mattr): LRU is the wrong policy in the long term, we should
+// try to keep some diverse set of traces and allow users to
+// specifically tell us to capture a specific trace. LRU will work OK
+// for many testing scenarios and low volume applications.
+type Store struct {
+ size int
+
+ // traces and head together implement a linked-hash-map.
+ // head points to the head and tail of the doubly-linked-list
+ // of recently used items (the tail is the LRU traceSet).
+ mu sync.Mutex
+ traces map[uniqueid.ID]*traceSet // GUARDED_BY(mu)
+ head *traceSet // GUARDED_BY(mu)
+}
+
+// NewStore creates a new store that will keep a maximum of size
+// traces in memory.
+// TODO(mattr): Traces can be of widely varying size, we should have
+// some better measurement then just number of traces.
+func NewStore(size int) *Store {
+ head := &traceSet{}
+ head.next, head.prev = head, head
+
+ return &Store{
+ size: size,
+ traces: make(map[uniqueid.ID]*traceSet),
+ head: head,
+ }
+}
+
+// Consider should be called whenever an interesting change happens to
+// a trace the store will decide whether to keep it or not.
+func (s *Store) Consider(trace vtrace.Trace) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ set := s.traces[trace.ID()]
+ if set == nil {
+ set = newTraceSet()
+ s.traces[trace.ID()] = set
+ }
+ set.add(trace)
+ set.moveAfter(s.head)
+ s.trimLocked()
+}
+
+// TraceRecords returns TraceRecords for all traces saved in the store.
+func (s *Store) TraceRecords() []*vtrace.TraceRecord {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ out := make([]*vtrace.TraceRecord, s.size)
+ i := 0
+ for _, ts := range s.traces {
+ out[i] = ts.traceRecord()
+ i++
+ }
+ return out
+}
+
+// TraceRecord returns a TraceRecord for a given uniqueid. Returns
+// nil if the given id is not present.
+func (s *Store) TraceRecord(id uniqueid.ID) *vtrace.TraceRecord {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ ts := s.traces[id]
+ if ts == nil {
+ return nil
+ }
+ return ts.traceRecord()
+}
+
+// trimLocked removes elements from the store LRU first until we are
+// below Store.size.
+func (s *Store) trimLocked() {
+ for len(s.traces) > s.size {
+ el := s.head.prev
+ el.removeFromList()
+ delete(s.traces, el.id())
+ }
+}
+
+// We need to capture traceSets because a single trace can reach this
+// server along multiple paths. Consider a client that calls this
+// server twice in the same operation.
+type traceSet struct {
+ elts map[vtrace.Trace]bool
+ prev, next *traceSet
+}
+
+func newTraceSet() *traceSet {
+ return &traceSet{elts: make(map[vtrace.Trace]bool)}
+}
+
+func (ts *traceSet) add(trace vtrace.Trace) {
+ ts.elts[trace] = true
+}
+
+func (ts *traceSet) removeFromList() {
+ if ts.prev != nil {
+ ts.prev.next = ts.next
+ }
+ if ts.next != nil {
+ ts.next.prev = ts.prev
+ }
+ ts.next = nil
+ ts.prev = nil
+}
+
+func (ts *traceSet) moveAfter(prev *traceSet) {
+ ts.removeFromList()
+ ts.prev = prev
+ ts.next = prev.next
+ prev.next.prev = ts
+ prev.next = ts
+}
+
+func (ts *traceSet) id() uniqueid.ID {
+ for e := range ts.elts {
+ return e.ID()
+ }
+ panic("unreachable")
+}
+
+func (ts *traceSet) traceRecord() *vtrace.TraceRecord {
+ var out vtrace.TraceRecord
+
+ // It is possible to have duplicate copies of spans. Consider the
+ // case where a server calls itself (even indirectly) we'll have one
+ // Trace in the set for the parent call and one Trace in the set for
+ // the decendant. The two records will be exactly the same we
+ // therefore de-dup here.
+ spans := make(map[uniqueid.ID]bool)
+
+ for e, _ := range ts.elts {
+ record := e.Record()
+ out.ID = record.ID
+ for _, span := range record.Spans {
+ if spans[span.ID] {
+ continue
+ }
+ spans[span.ID] = true
+ out.Spans = append(out.Spans, span)
+ }
+ }
+ return &out
+}
diff --git a/runtimes/google/vtrace/store_test.go b/runtimes/google/vtrace/store_test.go
new file mode 100644
index 0000000..9fdc06b
--- /dev/null
+++ b/runtimes/google/vtrace/store_test.go
@@ -0,0 +1,82 @@
+package vtrace
+
+import (
+ "encoding/binary"
+ "reflect"
+ "testing"
+
+ "veyron.io/veyron/veyron2/uniqueid"
+ "veyron.io/veyron/veyron2/vtrace"
+)
+
+var nextid = uint64(1)
+
+func id() uniqueid.ID {
+ var out uniqueid.ID
+ binary.BigEndian.PutUint64(out[8:], nextid)
+ nextid++
+ return out
+}
+
+func makeTraces(n int, st *Store) []vtrace.Trace {
+ traces := make([]vtrace.Trace, n)
+ for i := range traces {
+ traces[i] = newCollector(id(), st)
+ traces[i].ForceCollect()
+ }
+ return traces
+}
+
+func recordids(records ...*vtrace.TraceRecord) map[uniqueid.ID]bool {
+ out := make(map[uniqueid.ID]bool)
+ for _, trace := range records {
+ out[trace.ID] = true
+ }
+ return out
+}
+
+func traceids(traces ...vtrace.Trace) map[uniqueid.ID]bool {
+ out := make(map[uniqueid.ID]bool)
+ for _, trace := range traces {
+ out[trace.ID()] = true
+ }
+ return out
+}
+
+func TestConsiderAndTrim(t *testing.T) {
+ st := NewStore(5)
+ traces := makeTraces(10, st)
+ records := st.TraceRecords()
+
+ if want, got := traceids(traces[5:]...), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+
+ // Starting a new span on one of the traces should bring it back into the stored set.
+ traces[2].(*collector).start(&span{id: id()})
+ records = st.TraceRecords()
+ if want, got := traceids(traces[2], traces[6], traces[7], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+
+ // Starting a new span on one of the traces should bring it back into the stored set.
+ traces[2].(*collector).start(&span{id: id()})
+ records = st.TraceRecords()
+ if want, got := traceids(traces[2], traces[6], traces[7], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+
+ // Finishing a span on one of the traces should bring it back into the stored set.
+ traces[3].(*collector).finish(&span{id: id()})
+ records = st.TraceRecords()
+ if want, got := traceids(traces[3], traces[2], traces[7], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+
+ // Annotating a span on one of the traces should bring it back into the stored set.
+ traces[4].(*collector).annotate(&span{id: id()}, "hello")
+ records = st.TraceRecords()
+ if want, got := traceids(traces[4], traces[3], traces[2], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+}
diff --git a/runtimes/google/vtrace/vtrace.go b/runtimes/google/vtrace/vtrace.go
index 3a41e41..04da3e3 100644
--- a/runtimes/google/vtrace/vtrace.go
+++ b/runtimes/google/vtrace/vtrace.go
@@ -70,27 +70,37 @@
// ContinuedSpan creates a span that represents a continuation of a trace from
// a remote server. name is a user readable string that describes the context
// and req contains the parameters needed to connect this span with it's trace.
-func WithContinuedSpan(ctx context.T, name string, req vtrace.Request) (context.T, vtrace.Span) {
- newSpan := newSpan(req.SpanID, name, newCollector(req.TraceID))
+func WithContinuedSpan(ctx context.T, name string, req vtrace.Request, store *Store) (context.T, vtrace.Span) {
+ newSpan := newSpan(req.SpanID, name, newCollector(req.TraceID, store))
if req.Method == vtrace.InMemory {
newSpan.collector.ForceCollect()
}
return ctx.WithValue(spanKey{}, newSpan), newSpan
}
+func WithNewRootSpan(ctx context.T, store *Store, forceCollect bool) (context.T, vtrace.Span) {
+ id, err := uniqueid.Random()
+ if err != nil {
+ vlog.Errorf("vtrace: Couldn't generate Trace ID, debug data may be lost: %v", err)
+ }
+ col := newCollector(id, store)
+ if forceCollect {
+ col.ForceCollect()
+ }
+ s := newSpan(id, "", col)
+
+ return ctx.WithValue(spanKey{}, s), s
+}
+
// NewSpan creates a new span.
func WithNewSpan(parent context.T, name string) (context.T, vtrace.Span) {
- var s *span
if curSpan := getSpan(parent); curSpan != nil {
- s = newSpan(curSpan.ID(), name, curSpan.collector)
- } else {
- id, err := uniqueid.Random()
- if err != nil {
- vlog.Errorf("vtrace: Couldn't generate Trace ID, debug data may be lost: %v", err)
- }
- s = newSpan(id, name, newCollector(id))
+ s := newSpan(curSpan.ID(), name, curSpan.collector)
+ return parent.WithValue(spanKey{}, s), s
}
- return parent.WithValue(spanKey{}, s), s
+
+ vlog.Error("vtrace: Creating a new child span from context with no existing span.")
+ return WithNewRootSpan(parent, nil, false)
}
func getSpan(ctx context.T) *span {
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
index f8af7f9..81b046b 100644
--- a/runtimes/google/vtrace/vtrace_test.go
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -97,7 +97,7 @@
func makeTestServer(ns naming.Namespace, name, child string, forceCollect bool) (*testServer, error) {
sm := manager.InternalNew(naming.FixedRoutingID(0x111111111))
ctx := testContext()
- s, err := iipc.InternalNewServer(ctx, sm, ns)
+ s, err := iipc.InternalNewServer(ctx, sm, ns, nil)
if err != nil {
return nil, err
}