veyron2/vtrace: Add an initial vtrace skelleton.
There is still a lot to be done, but this gets things started.
Change-Id: Iaeb12d08d5e910766dda7aa3921e128847c1040e
diff --git a/runtimes/google/ipc/cancel_test.go b/runtimes/google/ipc/cancel_test.go
index b82a748..09b9e7b 100644
--- a/runtimes/google/ipc/cancel_test.go
+++ b/runtimes/google/ipc/cancel_test.go
@@ -4,6 +4,7 @@
"testing"
"veyron/runtimes/google/ipc/stream/manager"
+ tnaming "veyron/runtimes/google/testing/mocks/naming"
"veyron2/ipc"
"veyron2/ipc/stream"
@@ -83,7 +84,7 @@
// RPC call chain without user intervention.
func TestCancellationPropagation(t *testing.T) {
sm := manager.InternalNew(naming.FixedRoutingID(0x555555555))
- ns := newNamespace()
+ ns := tnaming.NewSimpleNamespace()
client, err := InternalNewClient(sm, ns)
if err != nil {
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 3dd736a..2c078c3 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -10,6 +10,7 @@
"veyron/runtimes/google/ipc/version"
inaming "veyron/runtimes/google/naming"
+ "veyron/runtimes/google/vtrace"
"veyron2"
"veyron2/context"
@@ -147,6 +148,8 @@
if ctx == nil {
return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method)
}
+ ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("Client Call: %s.%s", name, method))
+
servers, err := c.ns.Resolve(ctx, name)
if err != nil {
return nil, verror.NotFoundf("ipc: Resolve(%q) failed: %v", name, err)
@@ -181,7 +184,7 @@
discharges := c.prepareDischarges(ctx, flow.LocalID(), flow.RemoteID(), method, args, opts)
lastErr = nil
- fc := newFlowClient(flow, &c.dischargeCache, discharges)
+ fc := newFlowClient(ctx, flow, &c.dischargeCache, discharges)
go func() {
<-ctx.Done()
@@ -262,6 +265,7 @@
// flowClient implements the RPC client-side protocol for a single RPC, over a
// flow that's already connected to the server.
type flowClient struct {
+ ctx context.T // context to annotate with call details
dec *vom.Decoder // to decode responses and results from the server
enc *vom.Encoder // to encode requests and args to the server
flow stream.Flow // the underlying flow
@@ -276,9 +280,10 @@
finished bool // has Finish() already been called?
}
-func newFlowClient(flow stream.Flow, dischargeCache *dischargeCache, discharges []security.Discharge) *flowClient {
+func newFlowClient(ctx context.T, flow stream.Flow, dischargeCache *dischargeCache, discharges []security.Discharge) *flowClient {
return &flowClient{
// TODO(toddw): Support different codecs
+ ctx: ctx,
dec: vom.NewDecoder(flow),
enc: vom.NewEncoder(flow),
flow: flow,
@@ -302,6 +307,7 @@
Timeout: int64(timeout),
HasBlessing: blessing != nil,
NumDischarges: uint64(len(fc.discharges)),
+ TraceRequest: vtrace.Request(fc.ctx),
}
if err := fc.enc.Encode(req); err != nil {
return fc.close(verror.BadProtocolf("ipc: request encoding failed: %v", err))
@@ -386,7 +392,9 @@
}
func (fc *flowClient) Finish(resultptrs ...interface{}) error {
- return fc.finish(resultptrs...)
+ err := fc.finish(resultptrs...)
+ vtrace.FromContext(fc.ctx).Annotate("Finished")
+ return err
}
// finish ensures Finish always returns verror.E.
@@ -421,6 +429,10 @@
return fc.close(errRemainingStreamResults)
}
}
+
+ // Incorporate any VTrace info that was returned.
+ vtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
+
if fc.response.Error != nil {
if verror.Is(fc.response.Error, verror.NotAuthorized) && fc.dischargeCache != nil {
// In case the error was caused by a bad discharge, we do not want to get stuck
@@ -444,5 +456,6 @@
}
func (fc *flowClient) Cancel() {
+ vtrace.FromContext(fc.ctx).Annotate("Cancelled")
fc.flow.Cancel()
}
diff --git a/runtimes/google/ipc/context_test.go b/runtimes/google/ipc/context_test.go
index e5ac557..dd2b051 100644
--- a/runtimes/google/ipc/context_test.go
+++ b/runtimes/google/ipc/context_test.go
@@ -5,14 +5,10 @@
"testing"
"time"
- "veyron2"
- "veyron2/config"
+ "veyron/runtimes/google/testing/mocks/runtime"
+ "veyron/runtimes/google/vtrace"
+
"veyron2/context"
- "veyron2/ipc"
- "veyron2/ipc/stream"
- "veyron2/naming"
- "veyron2/security"
- "veyron2/vlog"
)
// We need a special way to create contexts for tests. We
@@ -20,39 +16,11 @@
// so we use a fake one that panics if used. The runtime
// implementation should not ever use the Runtime from a context.
func testContext() context.T {
- return InternalNewContext(&rt{})
+ ctx := InternalNewContext(&runtime.PanicRuntime{})
+ ctx, _ = vtrace.WithNewSpan(ctx, "Root")
+ return ctx
}
-// rt is a dummy implementation of veyron2.Runtime that panics on every
-// operation. See the comment for testContext.
-type rt struct {
- unique int // Make non-empty to ensure pointer instances are unique.
-}
-
-const badRuntime = "The runtime implmentation should not call methods on runtime intances."
-
-func (*rt) Profile() veyron2.Profile { panic(badRuntime) }
-func (*rt) Publisher() *config.Publisher { panic(badRuntime) }
-func (*rt) NewIdentity(name string) (security.PrivateID, error) { panic(badRuntime) }
-func (*rt) PublicIDStore() security.PublicIDStore { panic(badRuntime) }
-func (*rt) Identity() security.PrivateID { panic(badRuntime) }
-func (*rt) NewClient(opts ...ipc.ClientOpt) (ipc.Client, error) { panic(badRuntime) }
-func (*rt) NewServer(opts ...ipc.ServerOpt) (ipc.Server, error) { panic(badRuntime) }
-func (*rt) Client() ipc.Client { panic(badRuntime) }
-func (*rt) NewContext() context.T { panic(badRuntime) }
-func (*rt) NewStreamManager(opts ...stream.ManagerOpt) (stream.Manager, error) { panic(badRuntime) }
-func (*rt) NewEndpoint(ep string) (naming.Endpoint, error) { panic(badRuntime) }
-func (*rt) Namespace() naming.Namespace { panic(badRuntime) }
-func (*rt) Logger() vlog.Logger { panic(badRuntime) }
-func (*rt) NewLogger(name string, opts ...vlog.LoggingOpts) (vlog.Logger, error) { panic(badRuntime) }
-func (*rt) Stop() { panic(badRuntime) }
-func (*rt) ForceStop() { panic(badRuntime) }
-func (*rt) WaitForStop(chan<- string) { panic(badRuntime) }
-func (*rt) AdvanceGoal(delta int) { panic(badRuntime) }
-func (*rt) AdvanceProgress(delta int) { panic(badRuntime) }
-func (*rt) TrackTask(chan<- veyron2.Task) { panic(badRuntime) }
-func (*rt) Cleanup() { panic(badRuntime) }
-
func testCancel(t *testing.T, ctx context.T, cancel context.CancelFunc) {
select {
case <-ctx.Done():
diff --git a/runtimes/google/ipc/flow_test.go b/runtimes/google/ipc/flow_test.go
index b097171..9271dd4 100644
--- a/runtimes/google/ipc/flow_test.go
+++ b/runtimes/google/ipc/flow_test.go
@@ -125,7 +125,7 @@
}
for _, test := range tests {
clientFlow, serverFlow := newTestFlows()
- client := newFlowClient(clientFlow, nil, nil)
+ client := newFlowClient(testContext(), clientFlow, nil, nil)
server := newFlowServer(serverFlow, ipcServer)
err := client.start(test.suffix, test.method, test.args, 0, nil)
if err != nil {
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 20252a6..05e94ea 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -22,11 +22,11 @@
"veyron/runtimes/google/lib/publisher"
inaming "veyron/runtimes/google/naming"
isecurity "veyron/runtimes/google/security"
+ tnaming "veyron/runtimes/google/testing/mocks/naming"
vsecurity "veyron/security"
"veyron/security/caveat"
"veyron2"
- "veyron2/context"
"veyron2/ipc"
"veyron2/ipc/stream"
"veyron2/naming"
@@ -171,99 +171,6 @@
return ipc.ReflectInvoker(t.server), authorizer, nil
}
-// namespace is a simple partial implementation of naming.Namespace. In
-// particular, it ignores TTLs and not allow fully overlapping mount names.
-type namespace struct {
- sync.Mutex
- mounts map[string][]string
-}
-
-func newNamespace() naming.Namespace {
- return &namespace{mounts: make(map[string][]string)}
-}
-
-func (ns *namespace) Mount(ctx context.T, name, server string, _ time.Duration) error {
- ns.Lock()
- defer ns.Unlock()
- for n, _ := range ns.mounts {
- if n != name && (strings.HasPrefix(name, n) || strings.HasPrefix(n, name)) {
- return fmt.Errorf("simple mount table does not allow names that are a prefix of each other")
- }
- }
- ns.mounts[name] = append(ns.mounts[name], server)
- return nil
-}
-
-func (ns *namespace) Unmount(ctx context.T, name, server string) error {
- var servers []string
- ns.Lock()
- defer ns.Unlock()
- for _, s := range ns.mounts[name] {
- // When server is "", we remove all servers under name.
- if len(server) > 0 && s != server {
- servers = append(servers, s)
- }
- }
- if len(servers) > 0 {
- ns.mounts[name] = servers
- } else {
- delete(ns.mounts, name)
- }
- return nil
-}
-
-func (ns *namespace) Resolve(ctx context.T, name string) ([]string, error) {
- if address, _ := naming.SplitAddressName(name); len(address) > 0 {
- return []string{name}, nil
- }
- ns.Lock()
- defer ns.Unlock()
- for prefix, servers := range ns.mounts {
- if strings.HasPrefix(name, prefix) {
- suffix := strings.TrimLeft(strings.TrimPrefix(name, prefix), "/")
- var ret []string
- for _, s := range servers {
- ret = append(ret, naming.Join(s, suffix))
- }
- return ret, nil
- }
- }
- return nil, verror.NotFoundf("Resolve name %q not found in %v", name, ns.mounts)
-}
-
-func (ns *namespace) ResolveToMountTable(ctx context.T, name string) ([]string, error) {
- panic("ResolveToMountTable not implemented")
- return nil, nil
-}
-
-func (ns *namespace) Unresolve(ctx context.T, name string) ([]string, error) {
- panic("Unresolve not implemented")
- return nil, nil
-}
-
-func (ns *namespace) FlushCacheEntry(name string) bool {
- return false
-}
-
-func (ns *namespace) CacheCtl(ctls ...naming.CacheCtl) []naming.CacheCtl {
- return nil
-}
-
-func (ns *namespace) Glob(ctx context.T, pattern string) (chan naming.MountEntry, error) {
- panic("Glob not implemented")
- return nil, nil
-}
-
-func (ns *namespace) SetRoots(...string) error {
- panic("SetRoots not implemented")
- return nil
-}
-
-func (ns *namespace) Roots() []string {
- panic("Roots not implemented")
- return nil
-}
-
func startServer(t *testing.T, serverID security.PrivateID, sm stream.Manager, ns naming.Namespace, ts interface{}) (naming.Endpoint, ipc.Server) {
vlog.VI(1).Info("InternalNewServer")
server, err := InternalNewServer(testContext(), sm, ns, vc.FixedLocalID(serverID))
@@ -344,7 +251,7 @@
func createBundle(t *testing.T, clientID, serverID security.PrivateID, ts interface{}) (b bundle) {
b.sm = imanager.InternalNew(naming.FixedRoutingID(0x555555555))
- b.ns = newNamespace()
+ b.ns = tnaming.NewSimpleNamespace()
if serverID != nil {
b.ep, b.server = startServer(t, serverID, b.sm, b.ns, ts)
}
@@ -404,7 +311,7 @@
func TestMultipleCallsToServe(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
- ns := newNamespace()
+ ns := tnaming.NewSimpleNamespace()
server, err := InternalNewServer(testContext(), sm, ns, vc.FixedLocalID(serverID))
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
@@ -494,7 +401,7 @@
}
// Servers and clients will be created per-test, use the same stream manager and mounttable.
mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
- ns := newNamespace()
+ ns := tnaming.NewSimpleNamespace()
for _, test := range tests {
name := fmt.Sprintf("(clientID:%q serverID:%q)", test.clientID, test.serverID)
_, server := startServer(t, test.serverID, mgr, ns, &testServer{})
@@ -700,7 +607,7 @@
}
)
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
- ns := newNamespace()
+ ns := tnaming.NewSimpleNamespace()
server, err := InternalNewServer(testContext(), sm, ns, vc.FixedLocalID(serverID))
if err != nil {
t.Fatal(err)
@@ -1120,7 +1027,7 @@
func TestPreferredAddress(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
defer sm.Shutdown()
- ns := newNamespace()
+ ns := tnaming.NewSimpleNamespace()
pa := func(string, []net.Addr) (net.Addr, error) {
a := &net.IPAddr{}
a.IP = net.ParseIP("1.1.1.1")
@@ -1155,7 +1062,7 @@
func TestPreferredAddressErrors(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
defer sm.Shutdown()
- ns := newNamespace()
+ ns := tnaming.NewSimpleNamespace()
paerr := func(string, []net.Addr) (net.Addr, error) {
return nil, fmt.Errorf("oops")
}
@@ -1208,7 +1115,7 @@
func TestProxy(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
- ns := newNamespace()
+ ns := tnaming.NewSimpleNamespace()
client, err := InternalNewClient(sm, ns, vc.FixedLocalID(clientID))
if err != nil {
t.Fatal(err)
@@ -1293,7 +1200,7 @@
func runServer(argv []string) {
mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
- ns := newNamespace()
+ ns := tnaming.NewSimpleNamespace()
id := loadIdentityFromFile(argv[1])
isecurity.TrustIdentityProviders(id)
server, err := InternalNewServer(testContext(), mgr, ns, vc.FixedLocalID(id))
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index a7aa184..951369e 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -13,6 +13,7 @@
"veyron/runtimes/google/lib/publisher"
inaming "veyron/runtimes/google/naming"
isecurity "veyron/runtimes/google/security"
+ ivtrace "veyron/runtimes/google/vtrace"
vsecurity "veyron/security"
"veyron2"
@@ -25,6 +26,7 @@
"veyron2/verror"
"veyron2/vlog"
"veyron2/vom"
+ "veyron2/vtrace"
)
var (
@@ -546,13 +548,17 @@
discharges map[string]security.Discharge
deadline time.Time
endStreamArgs bool // are the stream args at EOF?
+ allowDebug bool // true if the caller is permitted to view debug information.
}
func newFlowServer(flow stream.Flow, server *server) *flowServer {
server.Lock()
disp := server.disp
+ runtime := veyron2.RuntimeFromContext(server.ctx)
server.Unlock()
+
return &flowServer{
+ T: InternalNewContext(runtime),
server: server,
disp: disp,
// TODO(toddw): Support different codecs
@@ -604,11 +610,18 @@
func (fs *flowServer) serve() error {
defer fs.flow.Close()
results, err := fs.processRequest()
+
+ var traceResponse vtrace.Response
+ if fs.allowDebug {
+ traceResponse = ivtrace.Response(fs)
+ }
+
// Respond to the client with the response header and positional results.
response := ipc.Response{
Error: err,
EndStreamResults: true,
NumPosResults: uint64(len(results)),
+ TraceResponse: traceResponse,
}
if err := fs.enc.Encode(response); err != nil {
return verror.BadProtocolf("ipc: response encoding failed: %v", err)
@@ -652,6 +665,11 @@
return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
}
fs.method = req.Method
+ // TODO(mattr): Currently this allows users to trigger trace collection
+ // on the server even if they will not be allowed to collect the
+ // results later. This might be consider a DOS vector.
+ spanName := fmt.Sprintf("Server Call: %s.%s", fs.Name(), fs.Method())
+ fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
// Set the appropriate deadline, if specified.
if req.Timeout == ipc.NoTimeout {
@@ -663,12 +681,11 @@
return nil, verr
}
- runtime := veyron2.RuntimeFromContext(fs.server.ctx)
var cancel context.CancelFunc
if !deadline.IsZero() {
- fs.T, cancel = InternalNewContext(runtime).WithDeadline(deadline)
+ fs.T, cancel = fs.WithDeadline(deadline)
} else {
- fs.T, cancel = InternalNewContext(runtime).WithCancel()
+ fs.T, cancel = fs.WithCancel()
}
// Notify the context when the channel is closed.
@@ -738,6 +755,9 @@
// TODO(ataly, ashankar): For privacy reasons, should we hide the authorizer error (err)?
return nil, errNotAuthorized(fmt.Errorf("%q not authorized for method %q: %v", fs.RemoteID(), fs.Method(), err))
}
+ // Check if the caller is permitted to view debug information.
+ fs.allowDebug = fs.authorizeForDebug(auth) == nil
+
results, err := invoker.Invoke(req.Method, fs, argptrs)
fs.server.stats.record(req.Method, time.Since(start))
return results, verror.Convert(err)
@@ -772,6 +792,27 @@
return vsecurity.NewACLAuthorizer(defaultACL(fs.flow.LocalID())).Authorize(fs)
}
+// debugContext is a context which wraps another context but always returns
+// the debug label.
+type debugContext struct {
+ security.Context
+}
+
+func (debugContext) Label() security.Label { return security.DebugLabel }
+
+// TODO(mattr): Is DebugLabel the right thing to check?
+func (fs *flowServer) authorizeForDebug(auth security.Authorizer) error {
+ dc := debugContext{fs}
+ if auth != nil {
+ return auth.Authorize(dc)
+ }
+ // Since the provided authorizer is nil we create a default IDAuthorizer
+ // for the local identity of the flow. This authorizer only authorizes
+ // remote identities that have either been blessed by the local identity
+ // or have blessed the local identity. (See vsecurity.NewACLAuthorizer)
+ return vsecurity.NewACLAuthorizer(defaultACL(dc.LocalID())).Authorize(dc)
+}
+
// setDeadline sets a deadline on the flow. The flow will be cancelled if it
// is not closed by the specified deadline.
// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index f3884f0..02a382d 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -7,6 +7,7 @@
iipc "veyron/runtimes/google/ipc"
imanager "veyron/runtimes/google/ipc/stream/manager"
"veyron/runtimes/google/ipc/stream/vc"
+ ivtrace "veyron/runtimes/google/vtrace"
"veyron2"
"veyron2/context"
@@ -14,6 +15,7 @@
"veyron2/ipc/stream"
"veyron2/naming"
"veyron2/security"
+ "veyron2/vtrace"
)
// fixedPublicIDStore implements security.PublicIDStore. It embeds a (fixed) PublicID that
@@ -102,7 +104,16 @@
}
func (rt *vrt) NewContext() context.T {
- return iipc.InternalNewContext(rt)
+ ctx, _ := ivtrace.WithNewSpan(iipc.InternalNewContext(rt), "Root")
+ return ctx
+}
+
+func (rt *vrt) WithNewSpan(ctx context.T, name string) (context.T, vtrace.Span) {
+ return ivtrace.WithNewSpan(ctx, name)
+}
+
+func (rt *vrt) SpanFromContext(ctx context.T) vtrace.Span {
+ return ivtrace.FromContext(ctx)
}
func (rt *vrt) NewServer(opts ...ipc.ServerOpt) (ipc.Server, error) {
diff --git a/runtimes/google/testing/mocks/naming/namespace.go b/runtimes/google/testing/mocks/naming/namespace.go
new file mode 100644
index 0000000..951ab52
--- /dev/null
+++ b/runtimes/google/testing/mocks/naming/namespace.go
@@ -0,0 +1,110 @@
+package naming
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "veyron2/context"
+ "veyron2/naming"
+ "veyron2/verror"
+)
+
+// NewSimpleNamespace returns a simple implementation of a Namespace
+// server for use in tests. In particular, it ignores TTLs and not
+// allow fully overlapping mount names.
+func NewSimpleNamespace() naming.Namespace {
+ return &namespace{mounts: make(map[string][]string)}
+}
+
+// namespace is a simple partial implementation of naming.Namespace.
+type namespace struct {
+ sync.Mutex
+ mounts map[string][]string
+}
+
+func (ns *namespace) Mount(ctx context.T, name, server string, _ time.Duration) error {
+ ns.Lock()
+ defer ns.Unlock()
+ for n, _ := range ns.mounts {
+ if n != name && (strings.HasPrefix(name, n) || strings.HasPrefix(n, name)) {
+ return fmt.Errorf("simple mount table does not allow names that are a prefix of each other")
+ }
+ }
+ ns.mounts[name] = append(ns.mounts[name], server)
+ return nil
+}
+
+func (ns *namespace) Unmount(ctx context.T, name, server string) error {
+ var servers []string
+ ns.Lock()
+ defer ns.Unlock()
+ for _, s := range ns.mounts[name] {
+ // When server is "", we remove all servers under name.
+ if len(server) > 0 && s != server {
+ servers = append(servers, s)
+ }
+ }
+ if len(servers) > 0 {
+ ns.mounts[name] = servers
+ } else {
+ delete(ns.mounts, name)
+ }
+ return nil
+}
+
+func (ns *namespace) Resolve(ctx context.T, name string) ([]string, error) {
+ if address, _ := naming.SplitAddressName(name); len(address) > 0 {
+ return []string{name}, nil
+ }
+ ns.Lock()
+ defer ns.Unlock()
+ for prefix, servers := range ns.mounts {
+ if strings.HasPrefix(name, prefix) {
+ suffix := strings.TrimLeft(strings.TrimPrefix(name, prefix), "/")
+ var ret []string
+ for _, s := range servers {
+ ret = append(ret, naming.Join(s, suffix))
+ }
+ return ret, nil
+ }
+ }
+ return nil, verror.NotFoundf("Resolve name %q not found in %v", name, ns.mounts)
+}
+
+func (ns *namespace) ResolveToMountTable(ctx context.T, name string) ([]string, error) {
+ // TODO(mattr): Implement this method for tests that might need it.
+ panic("ResolveToMountTable not implemented")
+ return nil, nil
+}
+
+func (ns *namespace) Unresolve(ctx context.T, name string) ([]string, error) {
+ // TODO(mattr): Implement this method for tests that might need it.
+ panic("Unresolve not implemented")
+ return nil, nil
+}
+
+func (ns *namespace) FlushCacheEntry(name string) bool {
+ return false
+}
+
+func (ns *namespace) CacheCtl(ctls ...naming.CacheCtl) []naming.CacheCtl {
+ return nil
+}
+
+func (ns *namespace) Glob(ctx context.T, pattern string) (chan naming.MountEntry, error) {
+ // TODO(mattr): Implement this method for tests that might need it.
+ panic("Glob not implemented")
+ return nil, nil
+}
+
+func (ns *namespace) SetRoots(...string) error {
+ panic("Calling SetRoots on a mock namespace. This is not supported.")
+ return nil
+}
+
+func (ns *namespace) Roots() []string {
+ panic("Calling Roots on a mock namespace. This is not supported.")
+ return nil
+}
diff --git a/runtimes/google/testing/mocks/runtime/panic_runtime.go b/runtimes/google/testing/mocks/runtime/panic_runtime.go
new file mode 100644
index 0000000..87a9ee3
--- /dev/null
+++ b/runtimes/google/testing/mocks/runtime/panic_runtime.go
@@ -0,0 +1,50 @@
+package runtime
+
+import (
+ "veyron2"
+ "veyron2/config"
+ "veyron2/context"
+ "veyron2/ipc"
+ "veyron2/ipc/stream"
+ "veyron2/naming"
+ "veyron2/security"
+ "veyron2/vlog"
+ "veyron2/vtrace"
+)
+
+// PanicRuntime is a dummy implementation of veyron2.Runtime that panics on every
+// operation. This is useful when you want to pass around a non-nil runtime
+// implementation but you don't want it to be used.
+type PanicRuntime struct {
+ unique int // Make non-empty to ensure pointer instances are unique.
+}
+
+const badRuntime = "The runtime implmentation should not call methods on runtime intances."
+
+func (*PanicRuntime) Profile() veyron2.Profile { panic(badRuntime) }
+func (*PanicRuntime) Publisher() *config.Publisher { panic(badRuntime) }
+func (*PanicRuntime) NewIdentity(name string) (security.PrivateID, error) { panic(badRuntime) }
+func (*PanicRuntime) PublicIDStore() security.PublicIDStore { panic(badRuntime) }
+func (*PanicRuntime) Identity() security.PrivateID { panic(badRuntime) }
+func (*PanicRuntime) NewClient(opts ...ipc.ClientOpt) (ipc.Client, error) { panic(badRuntime) }
+func (*PanicRuntime) NewServer(opts ...ipc.ServerOpt) (ipc.Server, error) { panic(badRuntime) }
+func (*PanicRuntime) Client() ipc.Client { panic(badRuntime) }
+func (*PanicRuntime) NewContext() context.T { panic(badRuntime) }
+func (*PanicRuntime) WithNewSpan(context.T, string) (context.T, vtrace.Span) { panic(badRuntime) }
+func (*PanicRuntime) SpanFromContext(context.T) vtrace.Span { panic(badRuntime) }
+func (*PanicRuntime) NewStreamManager(opts ...stream.ManagerOpt) (stream.Manager, error) {
+ panic(badRuntime)
+}
+func (*PanicRuntime) NewEndpoint(ep string) (naming.Endpoint, error) { panic(badRuntime) }
+func (*PanicRuntime) Namespace() naming.Namespace { panic(badRuntime) }
+func (*PanicRuntime) Logger() vlog.Logger { panic(badRuntime) }
+func (*PanicRuntime) NewLogger(name string, opts ...vlog.LoggingOpts) (vlog.Logger, error) {
+ panic(badRuntime)
+}
+func (*PanicRuntime) Stop() { panic(badRuntime) }
+func (*PanicRuntime) ForceStop() { panic(badRuntime) }
+func (*PanicRuntime) WaitForStop(chan<- string) { panic(badRuntime) }
+func (*PanicRuntime) AdvanceGoal(delta int) { panic(badRuntime) }
+func (*PanicRuntime) AdvanceProgress(delta int) { panic(badRuntime) }
+func (*PanicRuntime) TrackTask(chan<- veyron2.Task) { panic(badRuntime) }
+func (*PanicRuntime) Cleanup() { panic(badRuntime) }
diff --git a/runtimes/google/vtrace/collector.go b/runtimes/google/vtrace/collector.go
new file mode 100644
index 0000000..9352ff2
--- /dev/null
+++ b/runtimes/google/vtrace/collector.go
@@ -0,0 +1,127 @@
+package vtrace
+
+import (
+ "sync"
+ "time"
+
+ "veyron2/context"
+ "veyron2/uniqueid"
+ "veyron2/vtrace"
+)
+
+// collectors collect spans and annotations for output or analysis.
+// collectors are safe to use from multiple goroutines simultaneously.
+// TODO(mattr): collector should support log-based collection
+// as well as in-memory collection.
+type collector struct {
+ traceID uniqueid.ID
+ method vtrace.TraceMethod
+ spans map[uniqueid.ID]*vtrace.SpanRecord
+ mu sync.Mutex
+}
+
+// newCollector returns a new collector for the given traceID.
+func newCollector(traceID uniqueid.ID) *collector {
+ return &collector{
+ traceID: traceID,
+ method: vtrace.None,
+ }
+}
+
+// ID returns the ID of the trace this collector is collecting for.
+func (c *collector) ID() uniqueid.ID {
+ return c.traceID
+}
+
+// ForceCollect turns on collection for this trace. If collection
+// is already active, this does nothing.
+func (c *collector) ForceCollect() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.method != vtrace.InMemory {
+ c.method = vtrace.InMemory
+ c.spans = make(map[uniqueid.ID]*vtrace.SpanRecord)
+ }
+}
+
+// annotate adds a span annotation to the collection.
+func (c *collector) annotate(s vtrace.Span, msg string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.method == vtrace.InMemory {
+ sid := s.ID()
+ record, ok := c.spans[sid]
+ if !ok {
+ record = &vtrace.SpanRecord{ID: sid, Parent: s.Parent(), Name: s.Name()}
+ c.spans[sid] = record
+ }
+ record.Annotations = append(record.Annotations, vtrace.Annotation{
+ When: time.Now().UnixNano(),
+ Message: msg,
+ })
+ }
+}
+
+// response computes a vtrace.Response for the current trace.
+func (c *collector) response() vtrace.Response {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ return vtrace.Response{
+ Method: c.method,
+ Trace: c.recordLocked(),
+ }
+}
+
+// Record computes a vtrace.TraceRecord containing all annotations
+// collected so far.
+func (c *collector) Record() vtrace.TraceRecord {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ return c.recordLocked()
+}
+
+func (c *collector) recordLocked() vtrace.TraceRecord {
+ spans := make([]vtrace.SpanRecord, 0, len(c.spans))
+ for _, span := range c.spans {
+ spans = append(spans, vtrace.SpanRecord{
+ ID: span.ID,
+ Parent: span.Parent,
+ Name: span.Name,
+ Annotations: append([]vtrace.Annotation{}, span.Annotations...),
+ })
+ }
+ return vtrace.TraceRecord{
+ ID: c.traceID,
+ Spans: spans,
+ }
+}
+
+// merge merges a vtrace.Response into the current trace.
+func (c *collector) merge(parent vtrace.Span, t *vtrace.Response) {
+ if t.Method == vtrace.InMemory {
+ c.ForceCollect()
+ }
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ // TODO(mattr): We need to carefully merge here to correct for
+ // clock skew and ordering. We should estimate the clock skew
+ // by assuming that children of parent need to start after parent
+ // and end before now.
+ for _, span := range t.Trace.Spans {
+ c.spans[span.ID] = &vtrace.SpanRecord{
+ ID: span.ID,
+ Parent: span.Parent,
+ Name: span.Name,
+ Annotations: append([]vtrace.Annotation{}, span.Annotations...),
+ }
+ }
+}
+
+// MergeResponse merges a vtrace.Response into the current trace.
+func MergeResponse(ctx context.T, response *vtrace.Response) {
+ if span := getSpan(ctx); span != nil {
+ span.collector.merge(span, response)
+ }
+}
diff --git a/runtimes/google/vtrace/vtrace.go b/runtimes/google/vtrace/vtrace.go
new file mode 100644
index 0000000..d9cb1cf
--- /dev/null
+++ b/runtimes/google/vtrace/vtrace.go
@@ -0,0 +1,100 @@
+// Package vtrace implements the Trace and Span interfaces in veyron2/vtrace.
+// We also provide internal utilities for migrating trace information across
+// IPC calls.
+package vtrace
+
+import (
+ "veyron2/context"
+ "veyron2/uniqueid"
+ "veyron2/vlog"
+ "veyron2/vtrace"
+)
+
+// A span represents an annotated period of time.
+type span struct {
+ id uniqueid.ID
+ parent uniqueid.ID
+ name string
+ collector *collector
+}
+
+func newSpan(parent uniqueid.ID, name string, collector *collector) *span {
+ id, err := uniqueid.Random()
+ if err != nil {
+ vlog.Errorf("vtrace: Couldn't generate Span ID, debug data may be lost: %v", err)
+ }
+ s := &span{
+ id: id,
+ parent: parent,
+ name: name,
+ collector: collector,
+ }
+ s.Annotate("Started")
+ return s
+}
+
+func (c *span) ID() uniqueid.ID { return c.id }
+func (c *span) Parent() uniqueid.ID { return c.parent }
+func (c *span) Name() string { return c.name }
+func (c *span) Trace() vtrace.Trace { return c.collector }
+func (c *span) Annotate(msg string) { c.collector.annotate(c, msg) }
+
+// Request generates a vtrace.Request from the active Span.
+func Request(ctx context.T) vtrace.Request {
+ if span := getSpan(ctx); span != nil {
+ return vtrace.Request{
+ SpanID: span.id,
+ TraceID: span.collector.traceID,
+ Method: span.collector.method,
+ }
+ }
+ return vtrace.Request{}
+}
+
+// Response captures the vtrace.Response for the active Span.
+func Response(ctx context.T) vtrace.Response {
+ if span := getSpan(ctx); span != nil {
+ return span.collector.response()
+ }
+ return vtrace.Response{}
+}
+
+// spanKey is uses to store and retrieve spans inside a context.T objects.
+type spanKey struct{}
+
+// ContinuedSpan creates a span that represents a continuation of a trace from
+// a remote server. name is a user readable string that describes the context
+// and req contains the parameters needed to connect this span with it's trace.
+func WithContinuedSpan(ctx context.T, name string, req vtrace.Request) (context.T, vtrace.Span) {
+ newSpan := newSpan(req.SpanID, name, newCollector(req.TraceID))
+ if req.Method == vtrace.InMemory {
+ newSpan.collector.ForceCollect()
+ }
+ return ctx.WithValue(spanKey{}, newSpan), newSpan
+}
+
+// NewSpan creates a new span.
+func WithNewSpan(parent context.T, name string) (context.T, vtrace.Span) {
+ var s *span
+ if curSpan := getSpan(parent); curSpan != nil {
+ s = newSpan(curSpan.ID(), name, curSpan.collector)
+ } else {
+ id, err := uniqueid.Random()
+ if err != nil {
+ vlog.Errorf("vtrace: Couldn't generate Trace ID, debug data may be lost: %v", err)
+ }
+ s = newSpan(id, name, newCollector(id))
+ }
+ return parent.WithValue(spanKey{}, s), s
+}
+
+func getSpan(ctx context.T) *span {
+ span, _ := ctx.Value(spanKey{}).(*span)
+ return span
+}
+
+// GetSpan returns the active span from the context.
+func FromContext(ctx context.T) vtrace.Span {
+ span, _ := ctx.Value(spanKey{}).(vtrace.Span)
+ return span
+}
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
new file mode 100644
index 0000000..ecbd44d
--- /dev/null
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -0,0 +1,237 @@
+package vtrace_test
+
+import (
+ "strings"
+ "testing"
+
+ iipc "veyron/runtimes/google/ipc"
+ "veyron/runtimes/google/ipc/stream/manager"
+ tnaming "veyron/runtimes/google/testing/mocks/naming"
+ truntime "veyron/runtimes/google/testing/mocks/runtime"
+ ivtrace "veyron/runtimes/google/vtrace"
+
+ "veyron2/context"
+ "veyron2/ipc"
+ "veyron2/ipc/stream"
+ "veyron2/naming"
+ "veyron2/security"
+ "veyron2/vlog"
+ "veyron2/vtrace"
+)
+
+// We need a special way to create contexts for tests. We
+// can't create a real runtime in the runtime implementation
+// so we use a fake one that panics if used. The runtime
+// implementation should not ever use the Runtime from a context.
+func testContext() context.T {
+ return iipc.InternalNewContext(&truntime.PanicRuntime{})
+}
+
+func TestNewFromContext(t *testing.T) {
+ c0 := testContext()
+ c1, s1 := ivtrace.WithNewSpan(c0, "s1")
+ c2, s2 := ivtrace.WithNewSpan(c1, "s2")
+ c3, s3 := ivtrace.WithNewSpan(c2, "s3")
+ expected := map[context.T]vtrace.Span{
+ c0: nil,
+ c1: s1,
+ c2: s2,
+ c3: s3,
+ }
+ for ctx, expectedSpan := range expected {
+ if s := ivtrace.FromContext(ctx); s != expectedSpan {
+ t.Errorf("Wrong span for ctx %v. Got %v, want %v", c0, s, expectedSpan)
+ }
+ }
+}
+
+type fakeAuthorizer int
+
+func (fakeAuthorizer) Authorize(security.Context) error {
+ return nil
+}
+
+type testServer struct {
+ sm stream.Manager
+ ns naming.Namespace
+ name string
+ child string
+ stop func() error
+ forceCollect bool
+}
+
+func (c *testServer) Run(ctx ipc.ServerCall) error {
+ if c.forceCollect {
+ ivtrace.FromContext(ctx).Trace().ForceCollect()
+ }
+
+ client, err := iipc.InternalNewClient(c.sm, c.ns)
+ if err != nil {
+ vlog.Error(err)
+ return err
+ }
+
+ ivtrace.FromContext(ctx).Annotate(c.name + "-begin")
+
+ if c.child != "" {
+ var call ipc.Call
+ if call, err = client.StartCall(ctx, c.child, "Run", []interface{}{}); err != nil {
+ vlog.Error(err)
+ return err
+ }
+ var outerr error
+ if err = call.Finish(&outerr); err != nil {
+ vlog.Error(err)
+ return err
+ }
+ if outerr != nil {
+ vlog.Error(outerr)
+ return outerr
+ }
+ }
+ ivtrace.FromContext(ctx).Annotate(c.name + "-end")
+
+ return nil
+}
+
+func makeTestServer(ns naming.Namespace, name, child string, forceCollect bool) (*testServer, error) {
+ sm := manager.InternalNew(naming.FixedRoutingID(0x111111111))
+ ctx := testContext()
+ s, err := iipc.InternalNewServer(ctx, sm, ns)
+ if err != nil {
+ return nil, err
+ }
+ if _, err := s.Listen("tcp", "127.0.0.1:0"); err != nil {
+ return nil, err
+ }
+
+ c := &testServer{
+ sm: sm,
+ ns: ns,
+ name: name,
+ child: child,
+ stop: s.Stop,
+ forceCollect: forceCollect,
+ }
+
+ if err := s.Serve(name, ipc.LeafDispatcher(c, fakeAuthorizer(0))); err != nil {
+ return nil, err
+ }
+
+ return c, nil
+}
+
+func summary(span *vtrace.SpanRecord) string {
+ msgs := []string{}
+ for _, annotation := range span.Annotations {
+ msgs = append(msgs, annotation.Message)
+ }
+ return span.Name + ": " + strings.Join(msgs, ",")
+}
+
+func expectSequence(t *testing.T, trace vtrace.TraceRecord, expectedSpans []string) {
+ if got, want := len(trace.Spans), len(expectedSpans); got != want {
+ t.Errorf("Found %d spans, want %d", got, want)
+ }
+
+ spans := map[string]*vtrace.SpanRecord{}
+ for i := range trace.Spans {
+ span := &trace.Spans[i]
+ summary := summary(span)
+ spans[summary] = span
+ }
+
+ for i := range expectedSpans {
+ child, ok := spans[expectedSpans[i]]
+ if !ok {
+ t.Errorf("expected span not found: %s", expectedSpans[i])
+ continue
+ }
+ if i == 0 {
+ continue
+ }
+ parent, ok := spans[expectedSpans[i-1]]
+ if !ok {
+ t.Errorf("expected span not found: %s", expectedSpans[i-1])
+ continue
+ }
+ if child.Parent != parent.ID {
+ t.Errorf("%v should be a child of %v, but it's not.", child, parent)
+ }
+ }
+}
+
+func runCallChain(t *testing.T, ctx context.T, force1, force2 bool) {
+ sm := manager.InternalNew(naming.FixedRoutingID(0x555555555))
+ ns := tnaming.NewSimpleNamespace()
+
+ client, err := iipc.InternalNewClient(sm, ns)
+ if err != nil {
+ t.Error(err)
+ }
+
+ c1, err := makeTestServer(ns, "c1", "c2", force1)
+ if err != nil {
+ t.Fatal("Can't start server:", err)
+ }
+ defer c1.stop()
+
+ c2, err := makeTestServer(ns, "c2", "", force2)
+ if err != nil {
+ t.Fatal("Can't start server:", err)
+ }
+ defer c2.stop()
+
+ call, err := client.StartCall(ctx, "c1", "Run", []interface{}{})
+ if err != nil {
+ t.Fatal("can't call: ", err)
+ }
+ var outerr error
+ if err = call.Finish(&outerr); err != nil {
+ t.Error(err)
+ }
+ if outerr != nil {
+ t.Error(outerr)
+ }
+}
+
+// TestCancellationPropagation tests that cancellation propogates along an
+// RPC call chain without user intervention.
+func TestTraceAcrossRPCs(t *testing.T) {
+ ctx, span := ivtrace.WithNewSpan(testContext(), "Root")
+ span.Trace().ForceCollect()
+ span.Annotate("c0-begin")
+
+ runCallChain(t, ctx, false, false)
+
+ span.Annotate("c0-end")
+
+ expectedSpans := []string{
+ "Root: c0-begin,c0-end",
+ "Client Call: c1.Run: Started,Finished",
+ "Server Call: .Run: c1-begin,c1-end",
+ "Client Call: c2.Run: Started,Finished",
+ "Server Call: .Run: c2-begin,c2-end",
+ }
+ expectSequence(t, span.Trace().Record(), expectedSpans)
+}
+
+// TestCancellationPropagationLateForce tests that cancellation propogates along an
+// RPC call chain when tracing is initiated by someone deep in the call chain.
+func TestTraceAcrossRPCsLateForce(t *testing.T) {
+ ctx, span := ivtrace.WithNewSpan(testContext(), "Root")
+ span.Annotate("c0-begin")
+
+ runCallChain(t, ctx, false, true)
+
+ span.Annotate("c0-end")
+
+ expectedSpans := []string{
+ "Root: c0-end",
+ "Client Call: c1.Run: Finished",
+ "Server Call: .Run: c1-end",
+ "Client Call: c2.Run: Finished",
+ "Server Call: .Run: c2-begin,c2-end",
+ }
+ expectSequence(t, span.Trace().Record(), expectedSpans)
+}