Merge "veyron/tools/mgmt/node: node manager cmd-line tool (in progress)"
diff --git a/lib/flags/flags.go b/lib/flags/flags.go
index cb9dde9..76e1baf 100644
--- a/lib/flags/flags.go
+++ b/lib/flags/flags.go
@@ -93,9 +93,25 @@
// environment variable. The command line will override the environment.
Credentials string // TODO(cnicolaou): provide flag.Value impl
+ // Vtrace flags control various aspects of Vtrace.
+ Vtrace VtraceFlags
+
namespaceRootsFlag namespaceRootFlagVar
}
+type VtraceFlags struct {
+ // VtraceSampleRate is the rate (from 0.0 - 1.0) at which
+ // vtrace traces started by this process are sampled for collection.
+ SampleRate float64
+
+ // VtraceDumpOnShutdown tells the runtime to dump all stored traces
+ // to Stderr at shutdown if true.
+ DumpOnShutdown bool
+
+ // VtraceCacheSize the number of traces to cache in memory.
+ CacheSize int
+}
+
// ACLFlags contains the values of the ACLFlags flag group.
type ACLFlags struct {
flag aclFlagVar
@@ -127,6 +143,11 @@
fs.Var(&f.namespaceRootsFlag, "veyron.namespace.root", "local namespace root; can be repeated to provided multiple roots")
fs.StringVar(&f.Credentials, "veyron.credentials", creds, "directory to use for storing security credentials")
+
+ fs.Float64Var(&f.Vtrace.SampleRate, "veyron.vtrace.sample_rate", 0.0, "Rate (from 0.0 to 1.0) to sample vtrace traces.")
+ fs.BoolVar(&f.Vtrace.DumpOnShutdown, "veyron.vtrace.dump_on_shutdown", false, "If true, dump all stored traces on runtime shutdown.")
+ fs.IntVar(&f.Vtrace.CacheSize, "veyron.vtrace.cache_size", 1024, "The number of vtrace traces to store in memory.")
+
return f
}
@@ -232,7 +253,7 @@
return roots, os.Getenv(consts.VeyronCredentials)
}
-// Parse parses the supplied args, as per flag.Parse
+// Parse parses the supplied args, as per flag.Parse.
func (f *Flags) Parse(args []string) error {
// TODO(cnicolaou): implement a single env var 'VANADIUM_OPTS'
// that can be used to specify any command line.
diff --git a/lib/flags/listen.go b/lib/flags/listen.go
index de2855f..929dd56 100644
--- a/lib/flags/listen.go
+++ b/lib/flags/listen.go
@@ -52,6 +52,10 @@
// Implements flag.Value.Set
func (ip *IPHostPortFlag) Set(s string) error {
+ if len(s) == 0 {
+ ip.Address, ip.Port, ip.Host = "", "", ""
+ return nil
+ }
ip.Address = s
host, port, err := net.SplitHostPort(s)
if err != nil {
@@ -87,6 +91,9 @@
// Implements flag.Value.String
func (ip IPHostPortFlag) String() string {
+ if len(ip.Address) == 0 && len(ip.Port) == 0 {
+ return ""
+ }
host := ip.Host
if len(ip.Host) == 0 && ip.IP != nil && len(ip.IP) > 0 {
// We don't have a hostname, so there should be at most one IP address.
diff --git a/lib/flags/listen_test.go b/lib/flags/listen_test.go
index 6c1fc6c..1ec6472 100644
--- a/lib/flags/listen_test.go
+++ b/lib/flags/listen_test.go
@@ -43,7 +43,7 @@
want flags.IPHostPortFlag
str string
}{
- {"", flags.IPHostPortFlag{Port: "0"}, ":0"},
+ {"", flags.IPHostPortFlag{Port: ""}, ""},
{":0", flags.IPHostPortFlag{Port: "0"}, ":0"},
{":22", flags.IPHostPortFlag{Port: "22"}, ":22"},
{"127.0.0.1", flags.IPHostPortFlag{IP: lh, Port: "0"}, "127.0.0.1:0"},
diff --git a/lib/flags/main.go b/lib/flags/main.go
new file mode 100644
index 0000000..cae781a
--- /dev/null
+++ b/lib/flags/main.go
@@ -0,0 +1,29 @@
+// +build ignore
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+
+ "veyron.io/veyron/veyron/lib/flags"
+)
+
+func main() {
+ fl := flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.ACL, flags.Listen)
+ flag.PrintDefaults()
+ fmt.Printf("Args: %v\n", os.Args)
+ if err := fl.Parse(os.Args[1:]); err != nil {
+ fmt.Println("ERROR: %s", err)
+ return
+ }
+ rtf := fl.RuntimeFlags()
+ fmt.Printf("Runtime: Credentials: %s\n", rtf.Credentials)
+ fmt.Printf("Runtime: Namespace Roots: %s\n", rtf.NamespaceRoots)
+ lf := fl.ListenFlags()
+ fmt.Printf("Listen: Protocol %q\n", lf.ListenProtocol)
+ fmt.Printf("Listen: Address %q\n", lf.ListenAddress)
+ fmt.Printf("Listen: Proxy %q\n", lf.ListenProxy)
+ fmt.Printf("ACL: %v\n", fl.ACLFlags())
+}
diff --git a/runtimes/google/ipc/cancel_test.go b/runtimes/google/ipc/cancel_test.go
index 0ad572c..0ece305 100644
--- a/runtimes/google/ipc/cancel_test.go
+++ b/runtimes/google/ipc/cancel_test.go
@@ -55,7 +55,7 @@
func makeCanceld(ns naming.Namespace, name, child string) (*canceld, error) {
sm := manager.InternalNew(naming.FixedRoutingID(0x111111111))
ctx := testContext()
- s, err := InternalNewServer(ctx, sm, ns)
+ s, err := InternalNewServer(ctx, sm, ns, nil)
if err != nil {
return nil, err
}
diff --git a/runtimes/google/ipc/context_test.go b/runtimes/google/ipc/context_test.go
index a752156..7b9137c 100644
--- a/runtimes/google/ipc/context_test.go
+++ b/runtimes/google/ipc/context_test.go
@@ -17,7 +17,7 @@
// implementation should not ever use the Runtime from a context.
func testContext() context.T {
ctx := InternalNewContext(&runtime.PanicRuntime{})
- ctx, _ = vtrace.WithNewSpan(ctx, "")
+ ctx, _ = vtrace.WithNewRootSpan(ctx, nil, false)
ctx, _ = ctx.WithDeadline(time.Now().Add(20 * time.Second))
return ctx
}
diff --git a/runtimes/google/ipc/debug_test.go b/runtimes/google/ipc/debug_test.go
index 8286463..054696d 100644
--- a/runtimes/google/ipc/debug_test.go
+++ b/runtimes/google/ipc/debug_test.go
@@ -35,8 +35,7 @@
sm := manager.InternalNew(naming.FixedRoutingID(0x555555555))
defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
-
- server, err := InternalNewServer(testContext(), sm, ns, options.ReservedNameDispatcher{debugDisp}, vc.LocalPrincipal{pserver})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, options.ReservedNameDispatcher{debugDisp}, vc.LocalPrincipal{pserver})
if err != nil {
t.Fatalf("InternalNewServer failed: %v", err)
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index bdbdc1f..0cb0709 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -169,7 +169,7 @@
func startServer(t *testing.T, principal security.Principal, sm stream.Manager, ns naming.Namespace, ts interface{}) (naming.Endpoint, ipc.Server) {
vlog.VI(1).Info("InternalNewServer")
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{principal})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{principal})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -270,7 +270,7 @@
func TestMultipleCallsToServeAndName(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
ns := tnaming.NewSimpleNamespace()
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal()})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal()})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -610,7 +610,7 @@
return vc.LocalPrincipal{pclient}
}
)
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{pserver})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{pserver})
if err != nil {
t.Fatal(err)
}
@@ -995,7 +995,7 @@
a.IP = net.ParseIP("1.1.1.1")
return []ipc.Address{&netstate.AddrIfc{Addr: a}}, nil
}
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal("server")})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -1031,7 +1031,7 @@
paerr := func(_ string, a []ipc.Address) ([]ipc.Address, error) {
return nil, fmt.Errorf("oops")
}
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal("server")})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -1058,7 +1058,7 @@
sm := imanager.InternalNew(naming.FixedRoutingID(0x66666666))
defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
- server, err := InternalNewServer(testContext(), sm, ns, options.VCSecurityNone)
+ server, err := InternalNewServer(testContext(), sm, ns, nil, options.VCSecurityNone)
if err != nil {
t.Fatalf("InternalNewServer failed: %v", err)
}
@@ -1171,6 +1171,7 @@
testContext(),
sm,
ns,
+ nil,
opts...)
if err != nil {
t.Fatal(err)
diff --git a/runtimes/google/ipc/glob.go b/runtimes/google/ipc/glob.go
index 3dfaa3b..d28fb8f 100644
--- a/runtimes/google/ipc/glob.go
+++ b/runtimes/google/ipc/glob.go
@@ -94,12 +94,18 @@
// If the object implements both VAllGlobber and VChildrenGlobber, we'll
// use VAllGlobber.
gs := invoker.VGlob()
- if gs != nil && gs.VAllGlobber != nil {
+ if gs == nil || (gs.VAllGlobber == nil && gs.VChildrenGlobber == nil) {
+ if g.Len() == 0 {
+ call.Send(types.MountEntry{Name: name})
+ }
+ return nil
+ }
+ if gs.VAllGlobber != nil {
vlog.VI(3).Infof("ipc Glob: %q implements VAllGlobber", suffix)
childCall := &localServerCall{ServerCall: call, basename: name}
return gs.VAllGlobber.Glob(childCall, g.String())
}
- if gs != nil && gs.VChildrenGlobber != nil {
+ if gs.VChildrenGlobber != nil {
vlog.VI(3).Infof("ipc Glob: %q implements VChildrenGlobber", suffix)
children, err := gs.VChildrenGlobber.VGlobChildren()
if err != nil {
@@ -130,8 +136,7 @@
return nil
}
- vlog.VI(3).Infof("ipc Glob: %q implements neither VAllGlobber nor VChildrenGlobber", suffix)
- return verror.NoExistf("ipc: Glob is not implemented by %q", suffix)
+ return nil // Unreachable
}
// An ipc.ServerCall that prepends a name to all the names in the streamed
diff --git a/runtimes/google/ipc/glob_test.go b/runtimes/google/ipc/glob_test.go
index 5af6586..654709d 100644
--- a/runtimes/google/ipc/glob_test.go
+++ b/runtimes/google/ipc/glob_test.go
@@ -45,6 +45,7 @@
"a/b/c2/d1",
"a/b/c2/d2",
"a/x/y/z",
+ "leaf",
}
tree := newNode()
for _, p := range namespace {
@@ -74,6 +75,7 @@
"a/x",
"a/x/y",
"a/x/y/z",
+ "leaf",
}},
{"a", "...", []string{
"",
@@ -118,7 +120,7 @@
"",
}},
{"", "", []string{""}},
- {"", "*", []string{"a"}},
+ {"", "*", []string{"a", "leaf"}},
{"a", "", []string{""}},
{"a", "*", []string{"b", "x"}},
{"a/b", "", []string{""}},
@@ -137,6 +139,7 @@
{"a/b/c1/bad", "", []string{}},
{"a/x/bad", "", []string{}},
{"a/x/y/bad", "", []string{}},
+ {"leaf", "", []string{""}},
// muah is an infinite space to test rescursion limit.
{"muah", "*", []string{"ha"}},
{"muah", "*/*", []string{"ha/ha"}},
@@ -191,6 +194,9 @@
return ipc.VChildrenGlobberInvoker("ha"), nil, nil
}
+ if len(elems) != 0 && elems[0] == "leaf" {
+ return leafObject{}, nil, nil
+ }
if len(elems) <= 2 || (elems[0] == "a" && elems[1] == "x") {
return &vChildrenObject{d.tree, elems}, nil, nil
}
@@ -279,3 +285,9 @@
return nil
}
}
+
+type leafObject struct{}
+
+func (l leafObject) Func(call ipc.ServerCall) error {
+ return nil
+}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 42172dd..454ca88 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -50,7 +50,8 @@
names map[string]struct{}
reservedOpt options.ReservedNameDispatcher
// TODO(cnicolaou): add roaming stats to ipcStats
- stats *ipcStats // stats for this server.
+ stats *ipcStats // stats for this server.
+ traceStore *ivtrace.Store // store for vtrace traces.
}
var _ ipc.Server = (*server)(nil)
@@ -64,7 +65,8 @@
ch chan config.Setting // channel to receive settings over
}
-func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
+func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, store *ivtrace.Store, opts ...ipc.ServerOpt) (ipc.Server, error) {
+ ctx, _ = ivtrace.WithNewSpan(ctx, "NewServer")
s := &server{
ctx: ctx,
streamMgr: streamMgr,
@@ -73,6 +75,7 @@
stoppedChan: make(chan struct{}),
ns: ns,
stats: newIPCStats(naming.Join("ipc", "server", streamMgr.RoutingID().String())),
+ traceStore: store,
}
for _, opt := range opts {
switch opt := opt.(type) {
@@ -542,6 +545,8 @@
func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
s.Lock()
defer s.Unlock()
+ ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
+
if s.stopped {
return errServerStopped
}
@@ -563,6 +568,7 @@
func (s *server) AddName(name string) error {
s.Lock()
defer s.Unlock()
+ ivtrace.FromContext(s.ctx).Annotate("Serving under name: " + name)
if s.stopped {
return errServerStopped
}
@@ -579,6 +585,7 @@
func (s *server) RemoveName(name string) error {
s.Lock()
defer s.Unlock()
+ ivtrace.FromContext(s.ctx).Annotate("Removed name: " + name)
if s.stopped {
return errServerStopped
}
@@ -813,7 +820,7 @@
// on the server even if they will not be allowed to collect the
// results later. This might be considered a DOS vector.
spanName := fmt.Sprintf("\"%s\".%s", fs.Name(), fs.Method())
- fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
+ fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest, fs.server.traceStore)
var cancel context.CancelFunc
if req.Timeout != ipc.NoTimeout {
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index 11959f0..f4e5a80 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -121,7 +121,7 @@
t.Fatal(err)
}
defer client.Close()
- server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ server, err := InternalNewServer(testContext(), sm, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal("server")})
if err != nil {
t.Fatal(err)
}
@@ -195,7 +195,7 @@
func runServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
ns := tnaming.NewSimpleNamespace()
- server, err := InternalNewServer(testContext(), mgr, ns, vc.LocalPrincipal{sectest.NewPrincipal("server")})
+ server, err := InternalNewServer(testContext(), mgr, ns, nil, vc.LocalPrincipal{sectest.NewPrincipal("server")})
if err != nil {
return fmt.Errorf("InternalNewServer failed: %v", err)
}
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index 5c115f8..6f3de3b 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -2,6 +2,7 @@
import (
"fmt"
+ "math/rand"
iipc "veyron.io/veyron/veyron/runtimes/google/ipc"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
@@ -47,7 +48,11 @@
ctx := iipc.InternalNewContext(rt)
ctx = i18n.ContextWithLangID(ctx, rt.lang)
ctx = verror2.ContextWithComponentName(ctx, rt.program)
- ctx, _ = ivtrace.WithNewSpan(ctx, "") // Initial span has empty name.
+
+ sr := rt.flags.Vtrace.SampleRate
+ forceCollect := sr > 0.0 && (sr >= 1.0 || rand.Float64() < sr)
+ ctx, _ = ivtrace.WithNewRootSpan(ctx, rt.traceStore, forceCollect)
+
return ctx
}
@@ -98,8 +103,7 @@
otherOpts = append(otherOpts, ropts)
otherOpts = append(otherOpts, rt.reservedOpts...)
}
- ctx := rt.NewContext()
- return iipc.InternalNewServer(ctx, sm, ns, otherOpts...)
+ return iipc.InternalNewServer(rt.NewContext(), sm, ns, rt.traceStore, otherOpts...)
}
func (rt *vrt) NewStreamManager(opts ...stream.ManagerOpt) (stream.Manager, error) {
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index 8e199b5..2cff75d 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -19,9 +19,11 @@
"veyron.io/veyron/veyron/lib/flags"
_ "veyron.io/veyron/veyron/lib/stats/sysstats"
"veyron.io/veyron/veyron/runtimes/google/naming/namespace"
+ ivtrace "veyron.io/veyron/veyron/runtimes/google/vtrace"
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vtrace"
)
// TODO(caprita): Verrorize this, and maybe move it in the API.
@@ -44,8 +46,9 @@
nServers int // GUARDED_BY(mu)
cleaningUp bool // GUARDED_BY(mu)
- lang i18n.LangID // Language, from environment variables.
- program string // Program name, from os.Args[0].
+ lang i18n.LangID // Language, from environment variables.
+ program string // Program name, from os.Args[0].
+ traceStore *ivtrace.Store // Storage of sampled vtrace traces.
}
var _ veyron2.Runtime = (*vrt)(nil)
@@ -60,11 +63,18 @@
// Implements veyron2/rt.New
func New(opts ...veyron2.ROpt) (veyron2.Runtime, error) {
- rt := &vrt{mgmt: new(mgmtImpl), lang: i18n.LangIDFromEnv(), program: filepath.Base(os.Args[0])}
flagsOnce.Do(func() {
runtimeFlags.Parse(os.Args[1:])
})
- rt.flags = runtimeFlags.RuntimeFlags()
+ flags := runtimeFlags.RuntimeFlags()
+ rt := &vrt{
+ mgmt: new(mgmtImpl),
+ lang: i18n.LangIDFromEnv(),
+ program: filepath.Base(os.Args[0]),
+ flags: flags,
+ traceStore: ivtrace.NewStore(flags.Vtrace.CacheSize),
+ }
+
for _, o := range opts {
switch v := o.(type) {
case options.RuntimePrincipal:
@@ -149,6 +159,10 @@
}
func (rt *vrt) Cleanup() {
+ if rt.flags.Vtrace.DumpOnShutdown {
+ vtrace.FormatTraces(os.Stderr, rt.traceStore.TraceRecords(), nil)
+ }
+
rt.mu.Lock()
if rt.cleaningUp {
rt.mu.Unlock()
diff --git a/runtimes/google/vtrace/collector.go b/runtimes/google/vtrace/collector.go
index 3313384..6c883cc 100644
--- a/runtimes/google/vtrace/collector.go
+++ b/runtimes/google/vtrace/collector.go
@@ -26,16 +26,19 @@
// as well as in-memory collection.
type collector struct {
traceID uniqueid.ID
- method vtrace.TraceMethod
- spans map[uniqueid.ID]*vtrace.SpanRecord
- mu sync.Mutex
+ store *Store
+
+ mu sync.Mutex
+ method vtrace.TraceMethod // GUARDED_BY(mu)
+ spans map[uniqueid.ID]*vtrace.SpanRecord // GUARDED_BY(mu)
}
// newCollector returns a new collector for the given traceID.
-func newCollector(traceID uniqueid.ID) *collector {
+func newCollector(traceID uniqueid.ID, store *Store) *collector {
return &collector{
traceID: traceID,
method: vtrace.None,
+ store: store,
}
}
@@ -53,6 +56,9 @@
c.method = vtrace.InMemory
c.spans = make(map[uniqueid.ID]*vtrace.SpanRecord)
}
+ if c.store != nil {
+ c.store.Consider(c)
+ }
}
func (c *collector) spanRecordLocked(s *span) *vtrace.SpanRecord {
@@ -67,6 +73,9 @@
}
c.spans[sid] = record
}
+ if c.store != nil {
+ c.store.Consider(c)
+ }
return record
}
diff --git a/runtimes/google/vtrace/store.go b/runtimes/google/vtrace/store.go
new file mode 100644
index 0000000..182dea5
--- /dev/null
+++ b/runtimes/google/vtrace/store.go
@@ -0,0 +1,160 @@
+package vtrace
+
+import (
+ "sync"
+
+ "veyron.io/veyron/veyron2/uniqueid"
+ "veyron.io/veyron/veyron2/vtrace"
+)
+
+// Store implements a store for traces. The idea is to keep all the
+// information we have about some subset of traces that pass through
+// the server. For now we just implement an LRU cache, so the least
+// recently started/finished/annotated traces expire after some
+// maximum trace count is reached.
+// TODO(mattr): LRU is the wrong policy in the long term, we should
+// try to keep some diverse set of traces and allow users to
+// specifically tell us to capture a specific trace. LRU will work OK
+// for many testing scenarios and low volume applications.
+type Store struct {
+ size int
+
+ // traces and head together implement a linked-hash-map.
+ // head points to the head and tail of the doubly-linked-list
+ // of recently used items (the tail is the LRU traceSet).
+ mu sync.Mutex
+ traces map[uniqueid.ID]*traceSet // GUARDED_BY(mu)
+ head *traceSet // GUARDED_BY(mu)
+}
+
+// NewStore creates a new store that will keep a maximum of size
+// traces in memory.
+// TODO(mattr): Traces can be of widely varying size, we should have
+// some better measurement then just number of traces.
+func NewStore(size int) *Store {
+ head := &traceSet{}
+ head.next, head.prev = head, head
+
+ return &Store{
+ size: size,
+ traces: make(map[uniqueid.ID]*traceSet),
+ head: head,
+ }
+}
+
+// Consider should be called whenever an interesting change happens to
+// a trace the store will decide whether to keep it or not.
+func (s *Store) Consider(trace vtrace.Trace) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ set := s.traces[trace.ID()]
+ if set == nil {
+ set = newTraceSet()
+ s.traces[trace.ID()] = set
+ }
+ set.add(trace)
+ set.moveAfter(s.head)
+ s.trimLocked()
+}
+
+// TraceRecords returns TraceRecords for all traces saved in the store.
+func (s *Store) TraceRecords() []*vtrace.TraceRecord {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ out := make([]*vtrace.TraceRecord, s.size)
+ i := 0
+ for _, ts := range s.traces {
+ out[i] = ts.traceRecord()
+ i++
+ }
+ return out
+}
+
+// TraceRecord returns a TraceRecord for a given uniqueid. Returns
+// nil if the given id is not present.
+func (s *Store) TraceRecord(id uniqueid.ID) *vtrace.TraceRecord {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ ts := s.traces[id]
+ if ts == nil {
+ return nil
+ }
+ return ts.traceRecord()
+}
+
+// trimLocked removes elements from the store LRU first until we are
+// below Store.size.
+func (s *Store) trimLocked() {
+ for len(s.traces) > s.size {
+ el := s.head.prev
+ el.removeFromList()
+ delete(s.traces, el.id())
+ }
+}
+
+// We need to capture traceSets because a single trace can reach this
+// server along multiple paths. Consider a client that calls this
+// server twice in the same operation.
+type traceSet struct {
+ elts map[vtrace.Trace]bool
+ prev, next *traceSet
+}
+
+func newTraceSet() *traceSet {
+ return &traceSet{elts: make(map[vtrace.Trace]bool)}
+}
+
+func (ts *traceSet) add(trace vtrace.Trace) {
+ ts.elts[trace] = true
+}
+
+func (ts *traceSet) removeFromList() {
+ if ts.prev != nil {
+ ts.prev.next = ts.next
+ }
+ if ts.next != nil {
+ ts.next.prev = ts.prev
+ }
+ ts.next = nil
+ ts.prev = nil
+}
+
+func (ts *traceSet) moveAfter(prev *traceSet) {
+ ts.removeFromList()
+ ts.prev = prev
+ ts.next = prev.next
+ prev.next.prev = ts
+ prev.next = ts
+}
+
+func (ts *traceSet) id() uniqueid.ID {
+ for e := range ts.elts {
+ return e.ID()
+ }
+ panic("unreachable")
+}
+
+func (ts *traceSet) traceRecord() *vtrace.TraceRecord {
+ var out vtrace.TraceRecord
+
+ // It is possible to have duplicate copies of spans. Consider the
+ // case where a server calls itself (even indirectly) we'll have one
+ // Trace in the set for the parent call and one Trace in the set for
+ // the decendant. The two records will be exactly the same we
+ // therefore de-dup here.
+ spans := make(map[uniqueid.ID]bool)
+
+ for e, _ := range ts.elts {
+ record := e.Record()
+ out.ID = record.ID
+ for _, span := range record.Spans {
+ if spans[span.ID] {
+ continue
+ }
+ spans[span.ID] = true
+ out.Spans = append(out.Spans, span)
+ }
+ }
+ return &out
+}
diff --git a/runtimes/google/vtrace/store_test.go b/runtimes/google/vtrace/store_test.go
new file mode 100644
index 0000000..9fdc06b
--- /dev/null
+++ b/runtimes/google/vtrace/store_test.go
@@ -0,0 +1,82 @@
+package vtrace
+
+import (
+ "encoding/binary"
+ "reflect"
+ "testing"
+
+ "veyron.io/veyron/veyron2/uniqueid"
+ "veyron.io/veyron/veyron2/vtrace"
+)
+
+var nextid = uint64(1)
+
+func id() uniqueid.ID {
+ var out uniqueid.ID
+ binary.BigEndian.PutUint64(out[8:], nextid)
+ nextid++
+ return out
+}
+
+func makeTraces(n int, st *Store) []vtrace.Trace {
+ traces := make([]vtrace.Trace, n)
+ for i := range traces {
+ traces[i] = newCollector(id(), st)
+ traces[i].ForceCollect()
+ }
+ return traces
+}
+
+func recordids(records ...*vtrace.TraceRecord) map[uniqueid.ID]bool {
+ out := make(map[uniqueid.ID]bool)
+ for _, trace := range records {
+ out[trace.ID] = true
+ }
+ return out
+}
+
+func traceids(traces ...vtrace.Trace) map[uniqueid.ID]bool {
+ out := make(map[uniqueid.ID]bool)
+ for _, trace := range traces {
+ out[trace.ID()] = true
+ }
+ return out
+}
+
+func TestConsiderAndTrim(t *testing.T) {
+ st := NewStore(5)
+ traces := makeTraces(10, st)
+ records := st.TraceRecords()
+
+ if want, got := traceids(traces[5:]...), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+
+ // Starting a new span on one of the traces should bring it back into the stored set.
+ traces[2].(*collector).start(&span{id: id()})
+ records = st.TraceRecords()
+ if want, got := traceids(traces[2], traces[6], traces[7], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+
+ // Starting a new span on one of the traces should bring it back into the stored set.
+ traces[2].(*collector).start(&span{id: id()})
+ records = st.TraceRecords()
+ if want, got := traceids(traces[2], traces[6], traces[7], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+
+ // Finishing a span on one of the traces should bring it back into the stored set.
+ traces[3].(*collector).finish(&span{id: id()})
+ records = st.TraceRecords()
+ if want, got := traceids(traces[3], traces[2], traces[7], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+
+ // Annotating a span on one of the traces should bring it back into the stored set.
+ traces[4].(*collector).annotate(&span{id: id()}, "hello")
+ records = st.TraceRecords()
+ if want, got := traceids(traces[4], traces[3], traces[2], traces[8], traces[9]), recordids(records...); !reflect.DeepEqual(want, got) {
+ t.Errorf("Got wrong traces. Want %#v, got %#v", want, got)
+ }
+}
diff --git a/runtimes/google/vtrace/vtrace.go b/runtimes/google/vtrace/vtrace.go
index 3a41e41..04da3e3 100644
--- a/runtimes/google/vtrace/vtrace.go
+++ b/runtimes/google/vtrace/vtrace.go
@@ -70,27 +70,37 @@
// ContinuedSpan creates a span that represents a continuation of a trace from
// a remote server. name is a user readable string that describes the context
// and req contains the parameters needed to connect this span with it's trace.
-func WithContinuedSpan(ctx context.T, name string, req vtrace.Request) (context.T, vtrace.Span) {
- newSpan := newSpan(req.SpanID, name, newCollector(req.TraceID))
+func WithContinuedSpan(ctx context.T, name string, req vtrace.Request, store *Store) (context.T, vtrace.Span) {
+ newSpan := newSpan(req.SpanID, name, newCollector(req.TraceID, store))
if req.Method == vtrace.InMemory {
newSpan.collector.ForceCollect()
}
return ctx.WithValue(spanKey{}, newSpan), newSpan
}
+func WithNewRootSpan(ctx context.T, store *Store, forceCollect bool) (context.T, vtrace.Span) {
+ id, err := uniqueid.Random()
+ if err != nil {
+ vlog.Errorf("vtrace: Couldn't generate Trace ID, debug data may be lost: %v", err)
+ }
+ col := newCollector(id, store)
+ if forceCollect {
+ col.ForceCollect()
+ }
+ s := newSpan(id, "", col)
+
+ return ctx.WithValue(spanKey{}, s), s
+}
+
// NewSpan creates a new span.
func WithNewSpan(parent context.T, name string) (context.T, vtrace.Span) {
- var s *span
if curSpan := getSpan(parent); curSpan != nil {
- s = newSpan(curSpan.ID(), name, curSpan.collector)
- } else {
- id, err := uniqueid.Random()
- if err != nil {
- vlog.Errorf("vtrace: Couldn't generate Trace ID, debug data may be lost: %v", err)
- }
- s = newSpan(id, name, newCollector(id))
+ s := newSpan(curSpan.ID(), name, curSpan.collector)
+ return parent.WithValue(spanKey{}, s), s
}
- return parent.WithValue(spanKey{}, s), s
+
+ vlog.Error("vtrace: Creating a new child span from context with no existing span.")
+ return WithNewRootSpan(parent, nil, false)
}
func getSpan(ctx context.T) *span {
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
index f8af7f9..81b046b 100644
--- a/runtimes/google/vtrace/vtrace_test.go
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -97,7 +97,7 @@
func makeTestServer(ns naming.Namespace, name, child string, forceCollect bool) (*testServer, error) {
sm := manager.InternalNew(naming.FixedRoutingID(0x111111111))
ctx := testContext()
- s, err := iipc.InternalNewServer(ctx, sm, ns)
+ s, err := iipc.InternalNewServer(ctx, sm, ns, nil)
if err != nil {
return nil, err
}
diff --git a/security/audit/principal.go b/security/audit/principal.go
index c7c2e50..efd1323 100644
--- a/security/audit/principal.go
+++ b/security/audit/principal.go
@@ -79,7 +79,10 @@
func addCaveats(args args, caveats ...security.Caveat) args {
for _, c := range caveats {
- args = append(args, c)
+ // TODO(ashankar,suharshs): Should isUnconstrainedCaveat in veyron2/security be exported and used here?
+ if len(c.ValidatorVOM) > 0 {
+ args = append(args, c)
+ }
}
return args
}
diff --git a/security/util.go b/security/util.go
index 6e1f848..e7ed33f 100644
--- a/security/util.go
+++ b/security/util.go
@@ -123,6 +123,8 @@
//
// It is an error if any of the provided caveat bytes cannot
// be decoded into a security.CaveatValidator.
+// TODO(suharshs,ashankar,ataly): Rather than quitting on non-decodable caveats, just skip
+// them and return on caveats that we can decode.
func CaveatValidators(caveats ...security.Caveat) ([]security.CaveatValidator, error) {
if len(caveats) == 0 {
return nil, nil
diff --git a/services/identity/auditor/blessing_auditor.go b/services/identity/auditor/blessing_auditor.go
new file mode 100644
index 0000000..a76bc89
--- /dev/null
+++ b/services/identity/auditor/blessing_auditor.go
@@ -0,0 +1,147 @@
+package auditor
+
+import (
+ "bytes"
+ "fmt"
+ _ "github.com/go-sql-driver/mysql"
+ "strings"
+ "time"
+
+ vsecurity "veyron.io/veyron/veyron/security"
+ "veyron.io/veyron/veyron/security/audit"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vom"
+)
+
+// BlessingLogReader provides the Read method to read audit logs.
+// Read returns a channel of BlessingEntrys whose extension matches the provided email.
+type BlessingLogReader interface {
+ Read(email string) <-chan BlessingEntry
+}
+
+// BlessingEntry contains important logged information about a blessed principal.
+type BlessingEntry struct {
+ Email string
+ Caveats []security.Caveat
+ Timestamp time.Time // Time when the blesings were created.
+ RevocationCaveatID string
+ Blessings security.Blessings
+}
+
+// NewSQLBlessingAuditor returns an auditor for wrapping a principal with, and a BlessingLogReader
+// for reading the audits made by that auditor. The config is used to construct the connection
+// to the SQL database that the auditor and BlessingLogReader use.
+func NewSQLBlessingAuditor(config SQLConfig) (audit.Auditor, BlessingLogReader, error) {
+ db, err := newSQLDatabase(config)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create sql db: %v", err)
+ }
+ auditor, reader := &blessingAuditor{db}, &blessingLogReader{db}
+ return auditor, reader, nil
+}
+
+type blessingAuditor struct {
+ db database
+}
+
+func (a *blessingAuditor) Audit(entry audit.Entry) error {
+ if entry.Method != "Bless" {
+ return nil
+ }
+ dbentry, err := newDatabaseEntry(entry)
+ if err != nil {
+ return err
+ }
+ return a.db.Insert(dbentry)
+}
+
+type blessingLogReader struct {
+ db database
+}
+
+func (r *blessingLogReader) Read(email string) <-chan BlessingEntry {
+ c := make(chan BlessingEntry)
+ go r.sendAuditEvents(c, email)
+ return c
+}
+
+func (r *blessingLogReader) sendAuditEvents(dst chan<- BlessingEntry, email string) {
+ defer close(dst)
+ dbch := r.db.Query(email)
+ for dbentry := range dbch {
+ var entry BlessingEntry
+ if err := entry.fromDatabaseEntry(dbentry); err != nil {
+ vlog.Errorf("Corrupt database data? %#v, %v", dbentry, err)
+ continue
+ }
+ dst <- entry
+ }
+}
+
+func newDatabaseEntry(entry audit.Entry) (databaseEntry, error) {
+ d := databaseEntry{timestamp: entry.Timestamp}
+ extension, ok := entry.Arguments[2].(string)
+ if !ok {
+ return d, fmt.Errorf("failed to extract extension")
+ }
+ d.email = strings.Split(extension, "/")[0]
+ var caveats []security.Caveat
+ for _, arg := range entry.Arguments[3:] {
+ if cav, ok := arg.(security.Caveat); !ok {
+ return d, fmt.Errorf("failed to extract Caveat")
+ } else {
+ caveats = append(caveats, cav)
+ }
+ }
+ var blessings security.Blessings
+ if blessings, ok = entry.Results[0].(security.Blessings); !ok {
+ return d, fmt.Errorf("failed to extract result blessing")
+ }
+ {
+ var buf bytes.Buffer
+ if err := vom.NewEncoder(&buf).Encode(security.MarshalBlessings(blessings)); err != nil {
+ return d, err
+ }
+ d.blessings = buf.Bytes()
+ }
+ {
+ var buf bytes.Buffer
+ if err := vom.NewEncoder(&buf).Encode(caveats); err != nil {
+ return d, err
+ }
+ d.caveats = buf.Bytes()
+ }
+ return d, nil
+}
+
+func (b *BlessingEntry) fromDatabaseEntry(dbentry databaseEntry) error {
+ b.Email = dbentry.email
+ b.Timestamp = dbentry.timestamp
+ var wireBlessings security.WireBlessings
+ var err error
+ if err := vom.NewDecoder(bytes.NewBuffer(dbentry.blessings)).Decode(&wireBlessings); err != nil {
+ return err
+ }
+ if b.Blessings, err = security.NewBlessings(wireBlessings); err != nil {
+ return err
+ }
+ if err := vom.NewDecoder(bytes.NewBuffer(dbentry.caveats)).Decode(&b.Caveats); err != nil {
+ return err
+ }
+ b.RevocationCaveatID, err = revocationCaveatID(b.Caveats)
+ return err
+}
+
+func revocationCaveatID(caveats []security.Caveat) (string, error) {
+ validators, err := vsecurity.CaveatValidators(caveats...)
+ if err != nil {
+ return "", err
+ }
+ for _, cav := range validators {
+ if tpcav, ok := cav.(security.ThirdPartyCaveat); ok {
+ return tpcav.ID(), nil
+ }
+ }
+ return "", nil
+}
diff --git a/services/identity/auditor/blessing_auditor_test.go b/services/identity/auditor/blessing_auditor_test.go
new file mode 100644
index 0000000..946d976
--- /dev/null
+++ b/services/identity/auditor/blessing_auditor_test.go
@@ -0,0 +1,128 @@
+package auditor
+
+import (
+ "reflect"
+ "testing"
+ "time"
+
+ vsecurity "veyron.io/veyron/veyron/security"
+ "veyron.io/veyron/veyron/security/audit"
+ "veyron.io/veyron/veyron2/security"
+)
+
+func TestBlessingAuditor(t *testing.T) {
+ db := &mockDatabase{}
+ auditor, reader := &blessingAuditor{db}, &blessingLogReader{db}
+
+ p, err := vsecurity.NewPrincipal()
+ if err != nil {
+ t.Fatalf("failed to create principal: %v", err)
+ }
+ expiryCaveat := newCaveat(security.ExpiryCaveat(time.Now().Add(time.Hour)))
+ revocationCaveat := newThirdPartyCaveat(t, p)
+
+ tests := []struct {
+ Extension string
+ Email string
+ Caveats []security.Caveat
+ RevocationCaveatID string
+ Blessings security.Blessings
+ }{
+ {
+ Extension: "email/nocaveats",
+ Email: "email",
+ RevocationCaveatID: "",
+ Blessings: newBlessing(t, p, "test/email/nocaveats"),
+ },
+ {
+ Extension: "email/caveat",
+ Email: "email",
+ Caveats: []security.Caveat{expiryCaveat},
+ RevocationCaveatID: "",
+ Blessings: newBlessing(t, p, "test/email/caveat"),
+ },
+ {
+ Extension: "email/caveatAndRevocation",
+ Email: "email",
+ Caveats: []security.Caveat{expiryCaveat, newCaveat(security.NewCaveat(revocationCaveat))},
+ RevocationCaveatID: revocationCaveat.ID(),
+ Blessings: newBlessing(t, p, "test/email/caveatAndRevocation"),
+ },
+ }
+
+ for _, test := range tests {
+ args := []interface{}{nil, nil, test.Extension}
+ for _, cav := range test.Caveats {
+ args = append(args, cav)
+ }
+ if err := auditor.Audit(audit.Entry{
+ Method: "Bless",
+ Arguments: args,
+ Results: []interface{}{test.Blessings},
+ }); err != nil {
+ t.Errorf("Failed to audit Blessing %v: %v", test.Blessings, err)
+ }
+ ch := reader.Read("query")
+ got := <-ch
+ if got.Email != test.Email {
+ t.Errorf("got %v, want %v", got.Email, test.Email)
+ }
+ if !reflect.DeepEqual(got.Caveats, test.Caveats) {
+ t.Errorf("got %#v, want %#v", got.Caveats, test.Caveats)
+ }
+ if got.RevocationCaveatID != test.RevocationCaveatID {
+ t.Errorf("got %v, want %v", got.RevocationCaveatID, test.RevocationCaveatID)
+ }
+ if !reflect.DeepEqual(got.Blessings, test.Blessings) {
+ t.Errorf("got %v, want %v", got.Blessings, test.Blessings)
+ }
+ var extraRoutines bool
+ for _ = range ch {
+ // Drain the channel to prevent the producer goroutines from being leaked.
+ extraRoutines = true
+ }
+ if extraRoutines {
+ t.Errorf("Got more entries that expected for test %+v", test)
+ }
+ }
+}
+
+type mockDatabase struct {
+ NextEntry databaseEntry
+}
+
+func (db *mockDatabase) Insert(entry databaseEntry) error {
+ db.NextEntry = entry
+ return nil
+}
+func (db *mockDatabase) Query(email string) <-chan databaseEntry {
+ c := make(chan databaseEntry)
+ go func() {
+ c <- db.NextEntry
+ close(c)
+ }()
+ return c
+}
+
+func newThirdPartyCaveat(t *testing.T, p security.Principal) security.ThirdPartyCaveat {
+ tp, err := security.NewPublicKeyCaveat(p.PublicKey(), "location", security.ThirdPartyRequirements{}, newCaveat(security.MethodCaveat("method")))
+ if err != nil {
+ t.Fatal(err)
+ }
+ return tp
+}
+
+func newBlessing(t *testing.T, p security.Principal, name string) security.Blessings {
+ b, err := p.BlessSelf(name)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return b
+}
+
+func newCaveat(caveat security.Caveat, err error) security.Caveat {
+ if err != nil {
+ panic(err)
+ }
+ return caveat
+}
diff --git a/services/identity/auditor/sql_database.go b/services/identity/auditor/sql_database.go
new file mode 100644
index 0000000..7b601b6
--- /dev/null
+++ b/services/identity/auditor/sql_database.go
@@ -0,0 +1,85 @@
+package auditor
+
+import (
+ "database/sql"
+ "fmt"
+ _ "github.com/go-sql-driver/mysql"
+
+ "time"
+ "veyron.io/veyron/veyron2/vlog"
+)
+
+// SQLConfig contains the information to create a connection to a sql database.
+type SQLConfig struct {
+ // Database is a driver specific string specifying how to connect to the database.
+ Database string `json:"database"`
+ Table string `json:"table"`
+}
+
+type database interface {
+ Insert(entry databaseEntry) error
+ Query(email string) <-chan databaseEntry
+}
+
+type databaseEntry struct {
+ email, revocationCaveatID string
+ caveats, blessings []byte
+ timestamp time.Time
+}
+
+// newSQLDatabase returns a SQL implementation of the database interface.
+// If the table does not exist it creates it.
+func newSQLDatabase(config SQLConfig) (database, error) {
+ db, err := sql.Open("mysql", config.Database)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create database with config(%v): %v", config, err)
+ }
+ if err := db.Ping(); err != nil {
+ return nil, err
+ }
+ createStmt, err := db.Prepare(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s ( Email NVARCHAR(256), Caveats BLOB, Timestamp DATETIME, RevocationCaveatID NVARCHAR(1000), Blessings BLOB );", config.Table))
+ if err != nil {
+ return nil, err
+ }
+ if _, err = createStmt.Exec(); err != nil {
+ return nil, err
+ }
+ insertStmt, err := db.Prepare(fmt.Sprintf("INSERT INTO %s (Email, Caveats, RevocationCaveatID, Timestamp, Blessings) VALUES (?, ?, ?, ?, ?)", config.Table))
+ if err != nil {
+ return nil, err
+ }
+ queryStmt, err := db.Prepare(fmt.Sprintf("SELECT Email, Caveats, RevocationCaveatID, Timestamp, Blessings from %s WHERE Email=?", config.Table))
+ return sqlDatabase{insertStmt, queryStmt}, err
+}
+
+type sqlDatabase struct {
+ insertStmt, queryStmt *sql.Stmt
+}
+
+func (s sqlDatabase) Insert(entry databaseEntry) error {
+ _, err := s.insertStmt.Exec(entry.email, entry.caveats, entry.revocationCaveatID, entry.timestamp, entry.blessings)
+ return err
+}
+
+func (s sqlDatabase) Query(email string) <-chan databaseEntry {
+ c := make(chan databaseEntry)
+ go s.sendDatabaseEntries(email, c)
+ return c
+}
+
+func (s sqlDatabase) sendDatabaseEntries(email string, dst chan<- databaseEntry) {
+ defer close(dst)
+ rows, err := s.queryStmt.Query(email)
+ if err != nil {
+ vlog.Errorf("query failed %v", err)
+ return
+ }
+ for rows.Next() {
+ var dbentry databaseEntry
+ if err = rows.Scan(&dbentry.email, &dbentry.caveats, &dbentry.revocationCaveatID, &dbentry.timestamp, &dbentry.blessings); err != nil {
+ vlog.Errorf("scan of row failed %v", err)
+ return
+ }
+ dst <- dbentry
+ }
+}
diff --git a/services/identity/googleoauth/handler.go b/services/identity/googleoauth/handler.go
index 11dea0b..255c9b1 100644
--- a/services/identity/googleoauth/handler.go
+++ b/services/identity/googleoauth/handler.go
@@ -39,6 +39,7 @@
"code.google.com/p/goauth2/oauth"
+ "veyron.io/veyron/veyron/services/identity/auditor"
"veyron.io/veyron/veyron/services/identity/blesser"
"veyron.io/veyron/veyron/services/identity/revocation"
"veyron.io/veyron/veyron/services/identity/util"
@@ -73,9 +74,8 @@
// client_id and client_secret registered with the Google Developer
// Console for API access.
ClientID, ClientSecret string
- // Prefix for the audit log from which data will be sourced.
- // (auditor.ReadAuditLog).
- Auditor string
+ // BlessingLogReder is needed for reading audit logs.
+ BlessingLogReader auditor.BlessingLogReader
// The RevocationManager is used to revoke blessings granted with a revocation caveat.
// If nil, then revocation caveats cannot be added to blessings and an expiration caveat
// will be used instead.
@@ -171,6 +171,24 @@
util.HTTPBadRequest(w, r, err)
return
}
+
+ type tmplentry struct {
+ Timestamp time.Time
+ Caveats []security.Caveat
+ RevocationTime time.Time
+ Blessed security.Blessings
+ Token string
+ }
+ tmplargs := struct {
+ Log chan tmplentry
+ Email, RevokeRoute string
+ }{
+ Log: make(chan tmplentry),
+ Email: email,
+ RevokeRoute: revokeRoute,
+ }
+ entrych := h.args.BlessingLogReader.Read(email)
+
w.Header().Set("Context-Type", "text/html")
// This MaybeSetCookie call is needed to ensure that a cookie is created. Since the
// header cannot be changed once the body is written to, this needs to be called first.
@@ -179,16 +197,31 @@
util.HTTPServerError(w, err)
return
}
- w.Write([]byte(fmt.Sprintf(`
-<html>
-<head>
- <title>DISABLED FUNCTIONALITY</title>
- </head>
- <body>
- <h1>Attention %s</h1>
- <h2>This functionality has been temporarily disabled. ashankar@ and suharshs@ will know more</h2>
- </body>
- </html>`, email)))
+ go func(ch chan tmplentry) {
+ defer close(ch)
+ for entry := range entrych {
+ tmplEntry := tmplentry{
+ Timestamp: entry.Timestamp,
+ Caveats: entry.Caveats,
+ Blessed: entry.Blessings,
+ }
+ if len(entry.RevocationCaveatID) > 0 && h.args.RevocationManager != nil {
+ if revocationTime := h.args.RevocationManager.GetRevocationTime(entry.RevocationCaveatID); revocationTime != nil {
+ tmplEntry.RevocationTime = *revocationTime
+ } else {
+ caveatID := base64.URLEncoding.EncodeToString([]byte(entry.RevocationCaveatID))
+ if tmplEntry.Token, err = h.csrfCop.NewToken(w, r, clientIDCookie, caveatID); err != nil {
+ vlog.Errorf("Failed to create CSRF token[%v] for request %#v", err, r)
+ }
+ }
+ }
+ ch <- tmplEntry
+ }
+ }(tmplargs.Log)
+ if err := tmplViewBlessings.Execute(w, tmplargs); err != nil {
+ vlog.Errorf("Unable to execute audit page template: %v", err)
+ util.HTTPServerError(w, err)
+ }
}
func (h *handler) revoke(w http.ResponseWriter, r *http.Request) {
diff --git a/services/identity/googleoauth/template.go b/services/identity/googleoauth/template.go
index a01f1e1..6bac1f7 100644
--- a/services/identity/googleoauth/template.go
+++ b/services/identity/googleoauth/template.go
@@ -73,22 +73,24 @@
<table class="table table-bordered table-hover table-responsive">
<thead>
<tr>
- <th>Blessing sought as</th>
<th>Blessed as</th>
+ <th>Public Key</th>
<th>Issued</th>
- <th>Expires</th>
- <th>PublicKey</th>
+ <th>Caveats</th>
<th>Revoked</th>
</tr>
</thead>
<tbody>
{{range .Log}}
<tr>
-<td>{{.Blessee}}</td>
<td>{{.Blessed}}</td>
-<td><div class="unixtime" data-unixtime={{.Start.Unix}}>{{.Start.String}}</div></td>
-<td><div class="unixtime" data-unixtime={{.End.Unix}}>{{.End.String}}</div></td>
-<td>{{.Blessee.PublicKey}}</td>
+<td>{{.Blessed.PublicKey}}</td>
+<td><div class="unixtime" data-unixtime={{.Timestamp.Unix}}>{{.Timestamp.String}}</div></td>
+<td>
+{{range .Caveats}}
+ {{.}}</br>
+{{end}}
+</td>
<td>
{{ if .Token }}
<button class="revoke" value="{{.Token}}">Revoke</button>
diff --git a/services/identity/identityd/main.go b/services/identity/identityd/main.go
index 6665804..84a2bac 100644
--- a/services/identity/identityd/main.go
+++ b/services/identity/identityd/main.go
@@ -3,9 +3,11 @@
import (
"crypto/rand"
+ "encoding/json"
"flag"
"fmt"
"html/template"
+ "io/ioutil"
"net"
"net/http"
"os"
@@ -22,6 +24,8 @@
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron/lib/signals"
+ "veyron.io/veyron/veyron/security/audit"
+ "veyron.io/veyron/veyron/services/identity/auditor"
"veyron.io/veyron/veyron/services/identity/blesser"
"veyron.io/veyron/veyron/services/identity/googleoauth"
"veyron.io/veyron/veyron/services/identity/handlers"
@@ -38,9 +42,8 @@
tlsconfig = flag.String("tlsconfig", "", "Comma-separated list of TLS certificate and private key files. This must be provided.")
host = flag.String("host", defaultHost(), "Hostname the HTTP server listens on. This can be the name of the host running the webserver, but if running behind a NAT or load balancer, this should be the host name that clients will connect to. For example, if set to 'x.com', Veyron identities will have the IssuerName set to 'x.com' and clients can expect to find the public key of the signer at 'x.com/pubkey/'.")
- // Flags controlling auditing of Blessing operations.
- auditprefix = flag.String("audit", "", "File prefix to files where auditing information will be written.")
- auditfilter = flag.String("audit_filter", "", "If non-empty, instead of starting the server the audit log will be dumped to STDOUT (with the filter set to the value of this flag. '/' can be used to dump all events).")
+ // Flag controlling auditing of Blessing operations.
+ auditConfig = flag.String("audit_config", "", "A JSON-encoded file with sql server configuration information for auditing. The file must have an entry for user, host, password, database, and table.")
// Configuration for various Google OAuth-based clients.
googleConfigWeb = flag.String("google_config_web", "", "Path to JSON-encoded OAuth client configuration for the web application that renders the audit log for blessings provided by this provider.")
@@ -61,14 +64,10 @@
func main() {
flag.Usage = usage
- r := rt.Init(providerPrincipal())
+ p, blessingLogReader := providerPrincipal()
+ r := rt.Init(options.RuntimePrincipal{p})
defer r.Cleanup()
- if len(*auditfilter) > 0 {
- dumpAuditLog()
- return
- }
-
// Calling with empty string returns a empty RevocationManager
revocationManager, err := revocation.NewRevocationManager(*revocationDir)
if err != nil {
@@ -95,7 +94,7 @@
Addr: fmt.Sprintf("%s%s", httpaddress(), n),
ClientID: clientID,
ClientSecret: clientSecret,
- Auditor: *auditprefix,
+ BlessingLogReader: blessingLogReader,
RevocationManager: revocationManager,
MacaroonBlessingService: naming.JoinAddressName(published[0], macaroonService),
})
@@ -120,7 +119,7 @@
if len(*googleConfigChrome) > 0 || len(*googleConfigAndroid) > 0 {
args.GoogleServers = appendSuffixTo(published, googleService)
}
- if len(*auditprefix) > 0 && len(*googleConfigWeb) > 0 {
+ if len(*auditConfig) > 0 && len(*googleConfigWeb) > 0 {
args.ListBlessingsRoute = googleoauth.ListBlessingsRoute
}
if err := tmpl.Execute(w, args); err != nil {
@@ -280,8 +279,9 @@
return host
}
-// providerPrincipal returns the Principal to use for the identity provider (i.e., this program).
-func providerPrincipal() veyron2.ROpt {
+// providerPrincipal returns the Principal to use for the identity provider (i.e., this program) and
+// the database where audits will be store. If no database exists nil will be returned.
+func providerPrincipal() (security.Principal, auditor.BlessingLogReader) {
// TODO(ashankar): Somewhat silly to have to create a runtime, but oh-well.
r, err := rt.New()
if err != nil {
@@ -289,12 +289,33 @@
}
defer r.Cleanup()
p := r.Principal()
- // TODO(ashankar): Hook this up with Suharsh's new auditor implementation.
- if len(*auditprefix) == 0 {
- return options.RuntimePrincipal{p}
+ if len(*auditConfig) == 0 {
+ return p, nil
}
- vlog.Fatalf("--auditprefix is not supported just yet!")
- return nil
+ config, err := readSQLConfigFromFile(*auditConfig)
+ if err != nil {
+ vlog.Fatalf("Failed to read sql config: %v", err)
+ }
+ auditor, reader, err := auditor.NewSQLBlessingAuditor(config)
+ if err != nil {
+ vlog.Fatalf("Failed to create sql auditor from config: %v", err)
+ }
+ return audit.NewPrincipal(p, auditor), reader
+}
+
+func readSQLConfigFromFile(file string) (auditor.SQLConfig, error) {
+ var config auditor.SQLConfig
+ content, err := ioutil.ReadFile(file)
+ if err != nil {
+ return config, err
+ }
+ if err := json.Unmarshal(content, &config); err != nil {
+ return config, err
+ }
+ if len(strings.Split(config.Table, " ")) != 1 || strings.Contains(config.Table, ";") {
+ return config, fmt.Errorf("sql config table value must be 1 word long")
+ }
+ return config, nil
}
func httpaddress() string {
@@ -305,13 +326,6 @@
return fmt.Sprintf("https://%s:%v", *host, port)
}
-func dumpAuditLog() {
- if len(*auditprefix) == 0 {
- vlog.Fatalf("Must set --audit")
- }
- vlog.Fatalf("Auditing support disabled. Please contact ashankar@ or suharshs@ for restoration timeline")
-}
-
var tmpl = template.Must(template.New("main").Parse(`<!doctype html>
<html>
<head>
diff --git a/services/mgmt/node/impl/node_installer.go b/services/mgmt/node/impl/node_installer.go
index b7696b3..e0deac2 100644
--- a/services/mgmt/node/impl/node_installer.go
+++ b/services/mgmt/node/impl/node_installer.go
@@ -84,7 +84,7 @@
if err := linkSelf(nmDir, "noded"); err != nil {
return err
}
- // We don't pass in the config state setting, since they're already
+ // We don't pass in the config state settings, since they're already
// contained in the environment.
if err := generateScript(nmDir, nil, envelope); err != nil {
return err
diff --git a/services/mgmt/node/noded/main.go b/services/mgmt/node/noded/main.go
index f1b9a2d..eea0702 100644
--- a/services/mgmt/node/noded/main.go
+++ b/services/mgmt/node/noded/main.go
@@ -34,7 +34,11 @@
}
if *installSelf {
- if err := impl.SelfInstall(flag.Args(), os.Environ()); err != nil {
+ // If the user specified a name to publish as, pass that through
+ // to the installed node manager script.
+ // TODO(caprita): Make the flag survive updates.
+ args := append([]string{"--name=" + *publishAs}, flag.Args()...)
+ if err := impl.SelfInstall(args, os.Environ()); err != nil {
vlog.Errorf("SelfInstall failed: %v", err)
os.Exit(1)
}
@@ -68,6 +72,7 @@
if err := server.ServeDispatcher(*publishAs, dispatcher); err != nil {
vlog.Fatalf("Serve(%v) failed: %v", *publishAs, err)
}
+ vlog.VI(0).Infof("Node manager published as: %v", *publishAs)
impl.InvokeCallback(name)
// Wait until shutdown.
diff --git a/tools/mgmt/nminstall b/tools/mgmt/nminstall
index 449e38d..858ade7 100755
--- a/tools/mgmt/nminstall
+++ b/tools/mgmt/nminstall
@@ -31,8 +31,7 @@
echo "./nminstall <install parent dir> [<binary source>]"
}
-# TODO(caprita): Also agent.
-readonly BIN_NAMES=(noded suidhelper)
+readonly BIN_NAMES=(noded suidhelper agentd)
###############################################################################
# Copies one binary from source to destination.
@@ -163,7 +162,8 @@
# Tell the node manager to install itself.
local -r NM_ROOT="${INSTALL_DIR}/nmroot"
echo "Installing node manager under ${NM_ROOT} ..."
- VEYRON_NM_CURRENT="${INSTALL_DIR}/curr" VEYRON_NM_ROOT="${NM_ROOT}" VEYRON_NM_HELPER="${SETUID_SCRIPT}" "${BIN_INSTALL}/noded" --install_self
+ local -r PUBLISH=$(hostname)
+ VEYRON_NM_CURRENT="${INSTALL_DIR}/curr" VEYRON_NM_ROOT="${NM_ROOT}" VEYRON_NM_HELPER="${SETUID_SCRIPT}" "${BIN_INSTALL}/noded" --install_self --name="${PUBLISH}"
echo "Node manager installed."
}
diff --git a/tools/mounttable/impl.go b/tools/mounttable/impl.go
index 1cb8aac..8778b54 100644
--- a/tools/mounttable/impl.go
+++ b/tools/mounttable/impl.go
@@ -35,15 +35,18 @@
Name: "glob",
Short: "returns all matching entries in the mount table",
Long: "returns all matching entries in the mount table",
- ArgsName: "<mount name> <pattern>",
+ ArgsName: "[<mount name>] <pattern>",
ArgsLong: `
-<mount name> is a mount name on a mount table.
+<mount name> is a mount name on a mount table. Defaults to namespace root.
<pattern> is a glob pattern that is matched against all the entries below the
specified mount name.
`,
}
func runGlob(cmd *cmdline.Command, args []string) error {
+ if len(args) == 1 {
+ args = append([]string{""}, args...)
+ }
if expected, got := 2, len(args); expected != got {
return cmd.UsageErrorf("glob: incorrect number of arguments, expected %d, got %d", expected, got)
}