veyron2, veyron/runtimes/google: adding support for supplying and streaming
ticks marking shutdown progress. This is all in-process for now.
Change-Id: Ibfd1149fdf3ee9a089e6cefddba795062920fc46
diff --git a/runtimes/google/rt/mgmt.go b/runtimes/google/rt/mgmt.go
index 9b4757f..b95ac37 100644
--- a/runtimes/google/rt/mgmt.go
+++ b/runtimes/google/rt/mgmt.go
@@ -9,7 +9,9 @@
type mgmtImpl struct {
sync.RWMutex
- waiters []chan<- string
+ waiters []chan<- string
+ taskTrackers []chan<- veyron2.Task
+ task veyron2.Task
}
func (rt *vrt) Stop() {
@@ -35,3 +37,36 @@
defer rt.mgmt.Unlock()
rt.mgmt.waiters = append(rt.mgmt.waiters, ch)
}
+
+func (rt *vrt) TrackTask(ch chan<- veyron2.Task) {
+ rt.mgmt.Lock()
+ defer rt.mgmt.Unlock()
+ rt.mgmt.taskTrackers = append(rt.mgmt.taskTrackers, ch)
+}
+
+func (rt *vrt) advanceTask(progress, goal int) {
+ rt.mgmt.Lock()
+ defer rt.mgmt.Unlock()
+ rt.mgmt.task.Goal += goal
+ rt.mgmt.task.Progress += progress
+ for _, pl := range rt.mgmt.taskTrackers {
+ select {
+ case pl <- rt.mgmt.task:
+ default:
+ }
+ }
+}
+
+func (rt *vrt) AdvanceGoal(delta int) {
+ if delta <= 0 {
+ return
+ }
+ rt.advanceTask(0, delta)
+}
+
+func (rt *vrt) AdvanceProgress(delta int) {
+ if delta <= 0 {
+ return
+ }
+ rt.advanceTask(delta, 0)
+}
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index bcad19d..b3a48a9 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -3,6 +3,7 @@
import (
"fmt"
"os"
+ "reflect"
"testing"
"veyron2"
@@ -116,3 +117,71 @@
c.CloseStdin()
c.ExpectEOFAndWaitForExitCode(fmt.Errorf("exit status %d", veyron2.ForceStopExitCode))
}
+
+func checkProgress(t *testing.T, ch <-chan veyron2.Task, progress, goal int) {
+ if want, got := (veyron2.Task{progress, goal}), <-ch; !reflect.DeepEqual(want, got) {
+ t.Errorf("Unexpected progress: want %+v, got %+v", want, got)
+ }
+}
+
+func checkNoProgress(t *testing.T, ch <-chan veyron2.Task) {
+ select {
+ case p := <-ch:
+ t.Errorf("channel expected to be empty, got %+v instead", p)
+ default:
+ }
+}
+
+// TestProgress verifies that the ticker update/track logic works for a single
+// tracker.
+func TestProgress(t *testing.T) {
+ m, _ := rt.New()
+ m.AdvanceGoal(50)
+ ch := make(chan veyron2.Task, 1)
+ m.TrackTask(ch)
+ checkNoProgress(t, ch)
+ m.AdvanceProgress(10)
+ checkProgress(t, ch, 10, 50)
+ checkNoProgress(t, ch)
+ m.AdvanceProgress(5)
+ checkProgress(t, ch, 15, 50)
+ m.AdvanceGoal(50)
+ checkProgress(t, ch, 15, 100)
+ m.AdvanceProgress(1)
+ checkProgress(t, ch, 16, 100)
+ m.AdvanceGoal(10)
+ checkProgress(t, ch, 16, 110)
+ m.AdvanceProgress(-13)
+ checkNoProgress(t, ch)
+ m.AdvanceGoal(0)
+ checkNoProgress(t, ch)
+}
+
+// TestProgressMultipleTrackers verifies that the ticker update/track logic
+// works for more than one tracker. It also ensures that the runtime doesn't
+// block when the tracker channels are full.
+func TestProgressMultipleTrackers(t *testing.T) {
+ m, _ := rt.New()
+ // ch1 is 1-buffered, ch2 is 2-buffered.
+ ch1, ch2 := make(chan veyron2.Task, 1), make(chan veyron2.Task, 2)
+ m.TrackTask(ch1)
+ m.TrackTask(ch2)
+ checkNoProgress(t, ch1)
+ checkNoProgress(t, ch2)
+ m.AdvanceProgress(1)
+ checkProgress(t, ch1, 1, 0)
+ checkNoProgress(t, ch1)
+ checkProgress(t, ch2, 1, 0)
+ checkNoProgress(t, ch2)
+ for i := 0; i < 10; i++ {
+ m.AdvanceProgress(1)
+ }
+ checkProgress(t, ch1, 2, 0)
+ checkNoProgress(t, ch1)
+ checkProgress(t, ch2, 2, 0)
+ checkProgress(t, ch2, 3, 0)
+ checkNoProgress(t, ch2)
+ m.AdvanceGoal(4)
+ checkProgress(t, ch1, 11, 4)
+ checkProgress(t, ch2, 11, 4)
+}