veyron2, veyron/runtimes/google: adding support for supplying and streaming
ticks marking shutdown progress. This is all in-process for now.

Change-Id: Ibfd1149fdf3ee9a089e6cefddba795062920fc46
diff --git a/runtimes/google/rt/mgmt.go b/runtimes/google/rt/mgmt.go
index 9b4757f..b95ac37 100644
--- a/runtimes/google/rt/mgmt.go
+++ b/runtimes/google/rt/mgmt.go
@@ -9,7 +9,9 @@
 
 type mgmtImpl struct {
 	sync.RWMutex
-	waiters []chan<- string
+	waiters      []chan<- string
+	taskTrackers []chan<- veyron2.Task
+	task         veyron2.Task
 }
 
 func (rt *vrt) Stop() {
@@ -35,3 +37,36 @@
 	defer rt.mgmt.Unlock()
 	rt.mgmt.waiters = append(rt.mgmt.waiters, ch)
 }
+
+func (rt *vrt) TrackTask(ch chan<- veyron2.Task) {
+	rt.mgmt.Lock()
+	defer rt.mgmt.Unlock()
+	rt.mgmt.taskTrackers = append(rt.mgmt.taskTrackers, ch)
+}
+
+func (rt *vrt) advanceTask(progress, goal int) {
+	rt.mgmt.Lock()
+	defer rt.mgmt.Unlock()
+	rt.mgmt.task.Goal += goal
+	rt.mgmt.task.Progress += progress
+	for _, pl := range rt.mgmt.taskTrackers {
+		select {
+		case pl <- rt.mgmt.task:
+		default:
+		}
+	}
+}
+
+func (rt *vrt) AdvanceGoal(delta int) {
+	if delta <= 0 {
+		return
+	}
+	rt.advanceTask(0, delta)
+}
+
+func (rt *vrt) AdvanceProgress(delta int) {
+	if delta <= 0 {
+		return
+	}
+	rt.advanceTask(delta, 0)
+}
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index bcad19d..b3a48a9 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -3,6 +3,7 @@
 import (
 	"fmt"
 	"os"
+	"reflect"
 	"testing"
 
 	"veyron2"
@@ -116,3 +117,71 @@
 	c.CloseStdin()
 	c.ExpectEOFAndWaitForExitCode(fmt.Errorf("exit status %d", veyron2.ForceStopExitCode))
 }
+
+func checkProgress(t *testing.T, ch <-chan veyron2.Task, progress, goal int) {
+	if want, got := (veyron2.Task{progress, goal}), <-ch; !reflect.DeepEqual(want, got) {
+		t.Errorf("Unexpected progress: want %+v, got %+v", want, got)
+	}
+}
+
+func checkNoProgress(t *testing.T, ch <-chan veyron2.Task) {
+	select {
+	case p := <-ch:
+		t.Errorf("channel expected to be empty, got %+v instead", p)
+	default:
+	}
+}
+
+// TestProgress verifies that the ticker update/track logic works for a single
+// tracker.
+func TestProgress(t *testing.T) {
+	m, _ := rt.New()
+	m.AdvanceGoal(50)
+	ch := make(chan veyron2.Task, 1)
+	m.TrackTask(ch)
+	checkNoProgress(t, ch)
+	m.AdvanceProgress(10)
+	checkProgress(t, ch, 10, 50)
+	checkNoProgress(t, ch)
+	m.AdvanceProgress(5)
+	checkProgress(t, ch, 15, 50)
+	m.AdvanceGoal(50)
+	checkProgress(t, ch, 15, 100)
+	m.AdvanceProgress(1)
+	checkProgress(t, ch, 16, 100)
+	m.AdvanceGoal(10)
+	checkProgress(t, ch, 16, 110)
+	m.AdvanceProgress(-13)
+	checkNoProgress(t, ch)
+	m.AdvanceGoal(0)
+	checkNoProgress(t, ch)
+}
+
+// TestProgressMultipleTrackers verifies that the ticker update/track logic
+// works for more than one tracker.  It also ensures that the runtime doesn't
+// block when the tracker channels are full.
+func TestProgressMultipleTrackers(t *testing.T) {
+	m, _ := rt.New()
+	// ch1 is 1-buffered, ch2 is 2-buffered.
+	ch1, ch2 := make(chan veyron2.Task, 1), make(chan veyron2.Task, 2)
+	m.TrackTask(ch1)
+	m.TrackTask(ch2)
+	checkNoProgress(t, ch1)
+	checkNoProgress(t, ch2)
+	m.AdvanceProgress(1)
+	checkProgress(t, ch1, 1, 0)
+	checkNoProgress(t, ch1)
+	checkProgress(t, ch2, 1, 0)
+	checkNoProgress(t, ch2)
+	for i := 0; i < 10; i++ {
+		m.AdvanceProgress(1)
+	}
+	checkProgress(t, ch1, 2, 0)
+	checkNoProgress(t, ch1)
+	checkProgress(t, ch2, 2, 0)
+	checkProgress(t, ch2, 3, 0)
+	checkNoProgress(t, ch2)
+	m.AdvanceGoal(4)
+	checkProgress(t, ch1, 11, 4)
+	checkProgress(t, ch2, 11, 4)
+}