blob: ab7000cd39ed84f952670b69e5ce22ea0148fd75 [file] [log] [blame]
package vtrace
import (
"sync"
"time"
"v.io/core/veyron2/context"
"v.io/core/veyron2/uniqueid"
"v.io/core/veyron2/vtrace"
)
func copySpanRecord(in *vtrace.SpanRecord) *vtrace.SpanRecord {
return &vtrace.SpanRecord{
ID: in.ID,
Parent: in.Parent,
Name: in.Name,
Start: in.Start,
End: in.End,
Annotations: append([]vtrace.Annotation{}, in.Annotations...),
}
}
// collectors collect spans and annotations for output or analysis.
// collectors are safe to use from multiple goroutines simultaneously.
// TODO(mattr): collector should support log-based collection
// as well as in-memory collection.
type collector struct {
traceID uniqueid.ID
store *Store
mu sync.Mutex
method vtrace.TraceMethod // GUARDED_BY(mu)
spans map[uniqueid.ID]*vtrace.SpanRecord // GUARDED_BY(mu)
}
// newCollector returns a new collector for the given traceID.
func newCollector(traceID uniqueid.ID, store *Store) *collector {
return &collector{
traceID: traceID,
method: vtrace.None,
store: store,
}
}
// ID returns the ID of the trace this collector is collecting for.
func (c *collector) ID() uniqueid.ID {
return c.traceID
}
// ForceCollect turns on collection for this trace. If collection
// is already active, this does nothing.
func (c *collector) ForceCollect() {
c.mu.Lock()
defer c.mu.Unlock()
if c.method != vtrace.InMemory {
c.method = vtrace.InMemory
c.spans = make(map[uniqueid.ID]*vtrace.SpanRecord)
}
if c.store != nil {
c.store.Consider(c)
}
}
func (c *collector) spanRecordLocked(s *span) *vtrace.SpanRecord {
sid := s.ID()
record, ok := c.spans[sid]
if !ok {
record = &vtrace.SpanRecord{
ID: sid,
Parent: s.parent,
Name: s.name,
Start: s.start.UnixNano(),
}
c.spans[sid] = record
}
if c.store != nil {
c.store.Consider(c)
}
return record
}
// start records the fact that a given span has begun.
func (c *collector) start(s *span) {
c.mu.Lock()
defer c.mu.Unlock()
if c.method == vtrace.InMemory {
// Note that simply fetching the record is enough since
// if the record does not exist we will created it according
// to the start time in s.
c.spanRecordLocked(s)
}
}
// finish records the time that a span finished.
func (c *collector) finish(s *span) {
c.mu.Lock()
defer c.mu.Unlock()
if c.method == vtrace.InMemory {
record := c.spanRecordLocked(s)
// TODO(mattr): Perhaps we should log an error if we have already been finished?
record.End = time.Now().UnixNano()
}
}
// annotate adds a span annotation to the collection.
func (c *collector) annotate(s *span, msg string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.method == vtrace.InMemory {
record := c.spanRecordLocked(s)
record.Annotations = append(record.Annotations, vtrace.Annotation{
When: time.Now().UnixNano(),
Message: msg,
})
}
}
// response computes a vtrace.Response for the current trace.
func (c *collector) response() vtrace.Response {
c.mu.Lock()
defer c.mu.Unlock()
return vtrace.Response{
Method: c.method,
Trace: c.traceRecordLocked(),
}
}
// Record computes a vtrace.TraceRecord containing all annotations
// collected so far.
func (c *collector) Record() vtrace.TraceRecord {
c.mu.Lock()
defer c.mu.Unlock()
return c.traceRecordLocked()
}
func (c *collector) traceRecordLocked() vtrace.TraceRecord {
spans := make([]vtrace.SpanRecord, 0, len(c.spans))
for _, span := range c.spans {
spans = append(spans, *copySpanRecord(span))
}
return vtrace.TraceRecord{
ID: c.traceID,
Spans: spans,
}
}
// merge merges a vtrace.Response into the current trace.
func (c *collector) merge(parent vtrace.Span, t *vtrace.Response) {
if t.Method == vtrace.InMemory {
c.ForceCollect()
}
c.mu.Lock()
defer c.mu.Unlock()
// TODO(mattr): We need to carefully merge here to correct for
// clock skew and ordering. We should estimate the clock skew
// by assuming that children of parent need to start after parent
// and end before now.
for _, span := range t.Trace.Spans {
c.spans[span.ID] = copySpanRecord(&span)
}
}
// MergeResponse merges a vtrace.Response into the current trace.
func MergeResponse(ctx *context.T, response *vtrace.Response) {
if span := getSpan(ctx); span != nil {
span.collector.merge(span, response)
}
}