veyron/runtimes/google/lib/dependency:  Factor the dependency graph logic
out of the runtime implementation into its own package and give it proper
comments and tests.

Change-Id: Ic6d3752481f288c82fbf1aa16bb10b031347e372
diff --git a/runtimes/google/lib/dependency/dependency.go b/runtimes/google/lib/dependency/dependency.go
new file mode 100644
index 0000000..b777bef
--- /dev/null
+++ b/runtimes/google/lib/dependency/dependency.go
@@ -0,0 +1,123 @@
+// Package dependency keeps track of a dependency graph.
+// You add edges to the graph by specifying an object and the objects it depends on.
+// You can then call FinsihAndWait when the object is finished to wait until
+// all the dependents are also finished.
+package dependency
+
+import (
+	"fmt"
+	"sync"
+)
+
+var NotFoundError = fmt.Errorf(
+	"Attempting to create an object whose dependency has already been terminated.")
+
+// Every object in a Graph depends on the all key.  We can wait on this key
+// to know when all objects have been closed.
+type all struct{}
+
+type node struct {
+	dependents int
+	cond       *sync.Cond
+	dependsOn  []*node
+}
+
+// Graph keeps track of a number of objects and their dependents.
+// Typical usage looks like:
+//
+// g := NewGraph()
+//
+// // Instruct the graph that A depends on B and C.
+// if err := g.Depend(A, B, C); err != nil {
+//   // Oops, B or C is already terminating, clean up A immediately.
+// }
+// // D depends on A (You should check the error as above).
+// g.Depend(D, A)
+// ...
+// // At some point we want to mark A as closed to new users and
+// // wait for all the objects that depend on it to finish
+// // (in this case D).
+// finish := g.CloseAndWait(A)
+// // Now we know D (and any other depdendents) are finished, so we
+// // can clean up A.
+// A.CleanUp()
+// // Now notify the objects A depended on that they have one less
+// // dependent.
+// finish()
+type Graph struct {
+	mu    sync.Mutex
+	nodes map[interface{}]*node
+}
+
+// NewGraph returns a new Graph ready to be used.
+func NewGraph() *Graph {
+	graph := &Graph{nodes: map[interface{}]*node{}}
+	graph.nodes[all{}] = &node{cond: sync.NewCond(&graph.mu)}
+	return graph
+}
+
+// Depend adds obj as a node in the dependency graph and notes it's
+// dependencies on all the objects in 'on'.  If any of the
+// dependencies are already closed (or are not in the graph at all)
+// then Depend returns NotFoundError and does not add any edges.
+func (g *Graph) Depend(obj interface{}, on ...interface{}) error {
+	g.mu.Lock()
+	defer g.mu.Unlock()
+
+	nodes := make([]*node, len(on)+1)
+	for i := range on {
+		if nodes[i] = g.nodes[on[i]]; nodes[i] == nil {
+			return NotFoundError
+		}
+	}
+	if alln := g.nodes[all{}]; alln == nil {
+		return NotFoundError
+	} else {
+		nodes[len(on)] = alln
+	}
+	for _, n := range nodes {
+		n.dependents++
+	}
+	if n := g.nodes[obj]; n != nil {
+		n.dependsOn = append(n.dependsOn, nodes...)
+	} else {
+		g.nodes[obj] = &node{
+			cond:      sync.NewCond(&g.mu),
+			dependsOn: nodes,
+		}
+	}
+	return nil
+}
+
+// CloseAndWait closes an object to new dependents and waits for all
+// dependants to complete.  When this function returns you can safely
+// clean up Obj knowing that no users remain.  Once obj is finished
+// with the objects it depends on, you should call the returned function.
+func (g *Graph) CloseAndWait(obj interface{}) func() {
+	g.mu.Lock()
+	defer g.mu.Unlock()
+	n := g.nodes[obj]
+	if n == nil {
+		return func() {}
+	}
+	delete(g.nodes, obj)
+	for n.dependents > 0 {
+		n.cond.Wait()
+	}
+	return func() {
+		g.mu.Lock()
+		defer g.mu.Unlock()
+		for _, dn := range n.dependsOn {
+			if dn.dependents--; dn.dependents == 0 {
+				dn.cond.Broadcast()
+			}
+		}
+	}
+}
+
+// CloseAndWaitForAll closes the graph.  No new objects or dependencies can be added
+// and this function returns only after all existing objects have called
+// Finish on their finishers.
+func (g *Graph) CloseAndWaitForAll() {
+	g.CloseAndWait(all{})
+}
diff --git a/runtimes/google/lib/dependency/dependency_test.go b/runtimes/google/lib/dependency/dependency_test.go
new file mode 100644
index 0000000..6ac6940
--- /dev/null
+++ b/runtimes/google/lib/dependency/dependency_test.go
@@ -0,0 +1,94 @@
+package dependency
+
+import (
+	"testing"
+	"time"
+)
+
+var nextId = 0
+
+type Dep struct {
+	deps    []*Dep
+	stopped bool
+	id      int
+}
+
+func NewDep(deps ...*Dep) *Dep {
+	d := &Dep{deps: deps, id: nextId}
+	nextId++
+	return d
+}
+
+func (d *Dep) Use(t *testing.T, by *Dep) {
+	if d.stopped {
+		t.Errorf("Object %d using %d after stop.", by.id, d.id)
+	}
+}
+
+func (d *Dep) Stop(t *testing.T) {
+	d.Use(t, d)
+	d.stopped = true
+	for _, dd := range d.deps {
+		dd.Use(t, d)
+	}
+}
+
+func TestGraph(t *testing.T) {
+	a := NewDep()
+	b, c := NewDep(a), NewDep(a)
+	d := NewDep(c)
+
+	g := NewGraph()
+	if err := g.Depend(a); err != nil {
+		t.Errorf("Unexpected error: %v", err)
+	}
+	if err := g.Depend(b, a); err != nil {
+		t.Errorf("Unexpected error: %v", err)
+	}
+	if err := g.Depend(c, a); err != nil {
+		t.Errorf("Unexpected error: %v", err)
+	}
+	if err := g.Depend(d, c); err != nil {
+		t.Errorf("Unexpected error: %v", err)
+	}
+
+	alldone := make(chan struct{})
+	go func() {
+		g.CloseAndWaitForAll()
+		close(alldone)
+	}()
+
+	// Close d, which is a leaf.
+	finish := g.CloseAndWait(d)
+	d.Stop(t)
+	finish()
+
+	// Set a to close and wait which should wait for b and c.
+	done := make(chan struct{})
+	go func() {
+		finish := g.CloseAndWait(a)
+		a.Stop(t)
+		finish()
+		close(done)
+	}()
+
+	// done and alldone shouldn't be finished yet.
+	select {
+	case <-time.After(time.Second):
+	case <-done:
+		t.Errorf("done is finished before it's time")
+	case <-alldone:
+		t.Errorf("alldone is finished before it's time")
+	}
+
+	// Now close b and c.
+	finish = g.CloseAndWait(b)
+	b.Stop(t)
+	finish()
+	finish = g.CloseAndWait(c)
+	c.Stop(t)
+	finish()
+
+	<-done
+	<-alldone
+}
diff --git a/runtimes/google/rt/runtimex.go b/runtimes/google/rt/runtimex.go
index 6a707cc..56a7bd8 100644
--- a/runtimes/google/rt/runtimex.go
+++ b/runtimes/google/rt/runtimex.go
@@ -6,7 +6,6 @@
 	"os/signal"
 	"path/filepath"
 	"strings"
-	"sync"
 	"syscall"
 	"time"
 
@@ -28,6 +27,7 @@
 	iipc "v.io/core/veyron/runtimes/google/ipc"
 	imanager "v.io/core/veyron/runtimes/google/ipc/stream/manager"
 	"v.io/core/veyron/runtimes/google/ipc/stream/vc"
+	"v.io/core/veyron/runtimes/google/lib/dependency"
 	inaming "v.io/core/veyron/runtimes/google/naming"
 	"v.io/core/veyron/runtimes/google/naming/namespace"
 	ivtrace "v.io/core/veyron/runtimes/google/vtrace"
@@ -51,17 +51,11 @@
 
 type vtraceDependency struct{}
 
-type depSet struct {
-	count int
-	cond  *sync.Cond
-}
-
-// RuntimeX implements the veyron2.RuntimeX interface.  It is stateless.
+// RuntimeX implements the veyron2.RuntimeX interface.
 // Please see the interface definition for documentation of the
 // individiual methods.
 type RuntimeX struct {
-	mu   sync.Mutex
-	deps map[interface{}]*depSet
+	deps *dependency.Graph
 }
 
 type reservedNameDispatcher struct {
@@ -72,8 +66,7 @@
 // TODO(mattr,suharshs): Decide if ROpts would be better than this.
 func Init(ctx *context.T, appCycle veyron2.AppCycle, protocols []string, listenSpec *ipc.ListenSpec, flags flags.RuntimeFlags,
 	reservedDispatcher ipc.Dispatcher, dispatcherOpts ...ipc.ServerOpt) (*RuntimeX, *context.T, veyron2.Shutdown, error) {
-	r := &RuntimeX{deps: make(map[interface{}]*depSet)}
-	r.newDepSetLocked(r)
+	r := &RuntimeX{deps: dependency.NewGraph()}
 
 	handle, err := exec.GetChildHandle()
 	switch err {
@@ -181,84 +174,26 @@
 		handle.SetReady()
 	}
 
-	return r, ctx, r.cancel, nil
+	return r, ctx, r.shutdown, nil
 }
 
 func (r *RuntimeX) addChild(ctx *context.T, me interface{}, stop func(), dependsOn ...interface{}) error {
-	// Note that we keep a depSet for the runtime itself
-	// (which we say every child depends on) and we use that to determine
-	// when the runtime can be cleaned up.
-	dependsOn = append(dependsOn, r)
-
-	r.mu.Lock()
-	r.newDepSetLocked(me)
-	deps, err := r.getDepsLocked(dependsOn)
-	if err != nil {
-		r.mu.Unlock()
+	if err := r.deps.Depend(me, dependsOn...); err != nil {
 		stop()
 		return err
-	}
-	r.incrLocked(deps)
-	r.mu.Unlock()
-
-	if done := ctx.Done(); done != nil {
+	} else if done := ctx.Done(); done != nil {
 		go func() {
 			<-done
-			r.wait(me)
+			finish := r.deps.CloseAndWait(me)
 			stop()
-			r.decr(deps)
+			finish()
 		}()
 	}
 	return nil
 }
 
-func (r *RuntimeX) newDepSetLocked(key interface{}) {
-	r.deps[key] = &depSet{cond: sync.NewCond(&r.mu)}
-}
-
-func (r *RuntimeX) getDepsLocked(keys []interface{}) ([]*depSet, error) {
-	out := make([]*depSet, len(keys))
-	for i := range keys {
-		out[i] = r.deps[keys[i]]
-		if out[i] == nil {
-			return nil, fmt.Errorf("You are creating an object but it depends on something that is already shutdown: %v.", keys[i])
-		}
-	}
-	return out, nil
-}
-
-func (r *RuntimeX) incrLocked(sets []*depSet) {
-	for _, ds := range sets {
-		ds.count++
-	}
-}
-
-func (r *RuntimeX) decr(sets []*depSet) {
-	r.mu.Lock()
-	for _, ds := range sets {
-		ds.count--
-		if ds.count == 0 {
-			ds.cond.Broadcast()
-		}
-	}
-	r.mu.Unlock()
-}
-
-func (r *RuntimeX) wait(key interface{}) {
-	r.mu.Lock()
-	ds := r.deps[key]
-	if ds == nil {
-		panic(fmt.Sprintf("ds is gone for %#v", key))
-	}
-	delete(r.deps, key)
-	for ds.count > 0 {
-		ds.cond.Wait()
-	}
-	r.mu.Unlock()
-}
-
-func (r *RuntimeX) cancel() {
-	r.wait(r)
+func (r *RuntimeX) shutdown() {
+	r.deps.CloseAndWaitForAll()
 	vlog.FlushLog()
 }