veyron/runtimes/google/lib/dependency: Factor the dependency graph logic
out of the runtime implementation into its own package and give it proper
comments and tests.
Change-Id: Ic6d3752481f288c82fbf1aa16bb10b031347e372
diff --git a/runtimes/google/lib/dependency/dependency.go b/runtimes/google/lib/dependency/dependency.go
new file mode 100644
index 0000000..b777bef
--- /dev/null
+++ b/runtimes/google/lib/dependency/dependency.go
@@ -0,0 +1,123 @@
+// Package dependency keeps track of a dependency graph.
+// You add edges to the graph by specifying an object and the objects it depends on.
+// You can then call FinsihAndWait when the object is finished to wait until
+// all the dependents are also finished.
+package dependency
+
+import (
+ "fmt"
+ "sync"
+)
+
+var NotFoundError = fmt.Errorf(
+ "Attempting to create an object whose dependency has already been terminated.")
+
+// Every object in a Graph depends on the all key. We can wait on this key
+// to know when all objects have been closed.
+type all struct{}
+
+type node struct {
+ dependents int
+ cond *sync.Cond
+ dependsOn []*node
+}
+
+// Graph keeps track of a number of objects and their dependents.
+// Typical usage looks like:
+//
+// g := NewGraph()
+//
+// // Instruct the graph that A depends on B and C.
+// if err := g.Depend(A, B, C); err != nil {
+// // Oops, B or C is already terminating, clean up A immediately.
+// }
+// // D depends on A (You should check the error as above).
+// g.Depend(D, A)
+// ...
+// // At some point we want to mark A as closed to new users and
+// // wait for all the objects that depend on it to finish
+// // (in this case D).
+// finish := g.CloseAndWait(A)
+// // Now we know D (and any other depdendents) are finished, so we
+// // can clean up A.
+// A.CleanUp()
+// // Now notify the objects A depended on that they have one less
+// // dependent.
+// finish()
+type Graph struct {
+ mu sync.Mutex
+ nodes map[interface{}]*node
+}
+
+// NewGraph returns a new Graph ready to be used.
+func NewGraph() *Graph {
+ graph := &Graph{nodes: map[interface{}]*node{}}
+ graph.nodes[all{}] = &node{cond: sync.NewCond(&graph.mu)}
+ return graph
+}
+
+// Depend adds obj as a node in the dependency graph and notes it's
+// dependencies on all the objects in 'on'. If any of the
+// dependencies are already closed (or are not in the graph at all)
+// then Depend returns NotFoundError and does not add any edges.
+func (g *Graph) Depend(obj interface{}, on ...interface{}) error {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+
+ nodes := make([]*node, len(on)+1)
+ for i := range on {
+ if nodes[i] = g.nodes[on[i]]; nodes[i] == nil {
+ return NotFoundError
+ }
+ }
+ if alln := g.nodes[all{}]; alln == nil {
+ return NotFoundError
+ } else {
+ nodes[len(on)] = alln
+ }
+ for _, n := range nodes {
+ n.dependents++
+ }
+ if n := g.nodes[obj]; n != nil {
+ n.dependsOn = append(n.dependsOn, nodes...)
+ } else {
+ g.nodes[obj] = &node{
+ cond: sync.NewCond(&g.mu),
+ dependsOn: nodes,
+ }
+ }
+ return nil
+}
+
+// CloseAndWait closes an object to new dependents and waits for all
+// dependants to complete. When this function returns you can safely
+// clean up Obj knowing that no users remain. Once obj is finished
+// with the objects it depends on, you should call the returned function.
+func (g *Graph) CloseAndWait(obj interface{}) func() {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ n := g.nodes[obj]
+ if n == nil {
+ return func() {}
+ }
+ delete(g.nodes, obj)
+ for n.dependents > 0 {
+ n.cond.Wait()
+ }
+ return func() {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ for _, dn := range n.dependsOn {
+ if dn.dependents--; dn.dependents == 0 {
+ dn.cond.Broadcast()
+ }
+ }
+ }
+}
+
+// CloseAndWaitForAll closes the graph. No new objects or dependencies can be added
+// and this function returns only after all existing objects have called
+// Finish on their finishers.
+func (g *Graph) CloseAndWaitForAll() {
+ g.CloseAndWait(all{})
+}
diff --git a/runtimes/google/lib/dependency/dependency_test.go b/runtimes/google/lib/dependency/dependency_test.go
new file mode 100644
index 0000000..6ac6940
--- /dev/null
+++ b/runtimes/google/lib/dependency/dependency_test.go
@@ -0,0 +1,94 @@
+package dependency
+
+import (
+ "testing"
+ "time"
+)
+
+var nextId = 0
+
+type Dep struct {
+ deps []*Dep
+ stopped bool
+ id int
+}
+
+func NewDep(deps ...*Dep) *Dep {
+ d := &Dep{deps: deps, id: nextId}
+ nextId++
+ return d
+}
+
+func (d *Dep) Use(t *testing.T, by *Dep) {
+ if d.stopped {
+ t.Errorf("Object %d using %d after stop.", by.id, d.id)
+ }
+}
+
+func (d *Dep) Stop(t *testing.T) {
+ d.Use(t, d)
+ d.stopped = true
+ for _, dd := range d.deps {
+ dd.Use(t, d)
+ }
+}
+
+func TestGraph(t *testing.T) {
+ a := NewDep()
+ b, c := NewDep(a), NewDep(a)
+ d := NewDep(c)
+
+ g := NewGraph()
+ if err := g.Depend(a); err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+ if err := g.Depend(b, a); err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+ if err := g.Depend(c, a); err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+ if err := g.Depend(d, c); err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+
+ alldone := make(chan struct{})
+ go func() {
+ g.CloseAndWaitForAll()
+ close(alldone)
+ }()
+
+ // Close d, which is a leaf.
+ finish := g.CloseAndWait(d)
+ d.Stop(t)
+ finish()
+
+ // Set a to close and wait which should wait for b and c.
+ done := make(chan struct{})
+ go func() {
+ finish := g.CloseAndWait(a)
+ a.Stop(t)
+ finish()
+ close(done)
+ }()
+
+ // done and alldone shouldn't be finished yet.
+ select {
+ case <-time.After(time.Second):
+ case <-done:
+ t.Errorf("done is finished before it's time")
+ case <-alldone:
+ t.Errorf("alldone is finished before it's time")
+ }
+
+ // Now close b and c.
+ finish = g.CloseAndWait(b)
+ b.Stop(t)
+ finish()
+ finish = g.CloseAndWait(c)
+ c.Stop(t)
+ finish()
+
+ <-done
+ <-alldone
+}
diff --git a/runtimes/google/rt/runtimex.go b/runtimes/google/rt/runtimex.go
index 6a707cc..56a7bd8 100644
--- a/runtimes/google/rt/runtimex.go
+++ b/runtimes/google/rt/runtimex.go
@@ -6,7 +6,6 @@
"os/signal"
"path/filepath"
"strings"
- "sync"
"syscall"
"time"
@@ -28,6 +27,7 @@
iipc "v.io/core/veyron/runtimes/google/ipc"
imanager "v.io/core/veyron/runtimes/google/ipc/stream/manager"
"v.io/core/veyron/runtimes/google/ipc/stream/vc"
+ "v.io/core/veyron/runtimes/google/lib/dependency"
inaming "v.io/core/veyron/runtimes/google/naming"
"v.io/core/veyron/runtimes/google/naming/namespace"
ivtrace "v.io/core/veyron/runtimes/google/vtrace"
@@ -51,17 +51,11 @@
type vtraceDependency struct{}
-type depSet struct {
- count int
- cond *sync.Cond
-}
-
-// RuntimeX implements the veyron2.RuntimeX interface. It is stateless.
+// RuntimeX implements the veyron2.RuntimeX interface.
// Please see the interface definition for documentation of the
// individiual methods.
type RuntimeX struct {
- mu sync.Mutex
- deps map[interface{}]*depSet
+ deps *dependency.Graph
}
type reservedNameDispatcher struct {
@@ -72,8 +66,7 @@
// TODO(mattr,suharshs): Decide if ROpts would be better than this.
func Init(ctx *context.T, appCycle veyron2.AppCycle, protocols []string, listenSpec *ipc.ListenSpec, flags flags.RuntimeFlags,
reservedDispatcher ipc.Dispatcher, dispatcherOpts ...ipc.ServerOpt) (*RuntimeX, *context.T, veyron2.Shutdown, error) {
- r := &RuntimeX{deps: make(map[interface{}]*depSet)}
- r.newDepSetLocked(r)
+ r := &RuntimeX{deps: dependency.NewGraph()}
handle, err := exec.GetChildHandle()
switch err {
@@ -181,84 +174,26 @@
handle.SetReady()
}
- return r, ctx, r.cancel, nil
+ return r, ctx, r.shutdown, nil
}
func (r *RuntimeX) addChild(ctx *context.T, me interface{}, stop func(), dependsOn ...interface{}) error {
- // Note that we keep a depSet for the runtime itself
- // (which we say every child depends on) and we use that to determine
- // when the runtime can be cleaned up.
- dependsOn = append(dependsOn, r)
-
- r.mu.Lock()
- r.newDepSetLocked(me)
- deps, err := r.getDepsLocked(dependsOn)
- if err != nil {
- r.mu.Unlock()
+ if err := r.deps.Depend(me, dependsOn...); err != nil {
stop()
return err
- }
- r.incrLocked(deps)
- r.mu.Unlock()
-
- if done := ctx.Done(); done != nil {
+ } else if done := ctx.Done(); done != nil {
go func() {
<-done
- r.wait(me)
+ finish := r.deps.CloseAndWait(me)
stop()
- r.decr(deps)
+ finish()
}()
}
return nil
}
-func (r *RuntimeX) newDepSetLocked(key interface{}) {
- r.deps[key] = &depSet{cond: sync.NewCond(&r.mu)}
-}
-
-func (r *RuntimeX) getDepsLocked(keys []interface{}) ([]*depSet, error) {
- out := make([]*depSet, len(keys))
- for i := range keys {
- out[i] = r.deps[keys[i]]
- if out[i] == nil {
- return nil, fmt.Errorf("You are creating an object but it depends on something that is already shutdown: %v.", keys[i])
- }
- }
- return out, nil
-}
-
-func (r *RuntimeX) incrLocked(sets []*depSet) {
- for _, ds := range sets {
- ds.count++
- }
-}
-
-func (r *RuntimeX) decr(sets []*depSet) {
- r.mu.Lock()
- for _, ds := range sets {
- ds.count--
- if ds.count == 0 {
- ds.cond.Broadcast()
- }
- }
- r.mu.Unlock()
-}
-
-func (r *RuntimeX) wait(key interface{}) {
- r.mu.Lock()
- ds := r.deps[key]
- if ds == nil {
- panic(fmt.Sprintf("ds is gone for %#v", key))
- }
- delete(r.deps, key)
- for ds.count > 0 {
- ds.cond.Wait()
- }
- r.mu.Unlock()
-}
-
-func (r *RuntimeX) cancel() {
- r.wait(r)
+func (r *RuntimeX) shutdown() {
+ r.deps.CloseAndWaitForAll()
vlog.FlushLog()
}