veyron/runtimes/google/lib/follow: Fix initialization race in the follow watcher.

Sometimes, a watcher would not detect modifications to the file immediately
after the watcher was created.
This caused a race condition in Read:
1) Routine #1 creates a reader, which in turn creates a watcher.
2) Routine #1 calls reader.Read(). The Read doesn't see enough bytes, so
   Routine #1 awaits an event from watcher.
3) Routine #2 writes to the file.
4) The watcher's watch routine is initialized. However, it missed the write,
   and does not send an event.
As a result, Read hangs.

The watcher now waits until the watch routine is initialized before it returns.

Change-Id: Idd03b7d6a7cf7560315a8a6661c60f55dd7a06a1
diff --git a/runtimes/google/lib/follow/notify_watcher.go b/runtimes/google/lib/follow/notify_watcher.go
index d897f75..7669421 100644
--- a/runtimes/google/lib/follow/notify_watcher.go
+++ b/runtimes/google/lib/follow/notify_watcher.go
@@ -12,8 +12,8 @@
 // events channel. Further fsnotify events are not received until the nil
 // value is received from the events channel.
 // The function sends any errors on the events channel.
-func newFSNotifyWatch(filename string) func(chan<- error, <-chan bool, chan<- bool) {
-	return func(events chan<- error, stop <-chan bool, done chan<- bool) {
+func newFSNotifyWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
+	return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
 		defer close(done)
 		defer close(events)
 		source, err := fsnotify.NewWatcher()
@@ -26,6 +26,9 @@
 			events <- err
 			return
 		}
+
+		close(initialized)
+
 		for {
 			// Receive:
 			//  (A) An fsnotify modification event. Send nil.
diff --git a/runtimes/google/lib/follow/stat_watcher.go b/runtimes/google/lib/follow/stat_watcher.go
index 1f052bc..40e2bbe 100644
--- a/runtimes/google/lib/follow/stat_watcher.go
+++ b/runtimes/google/lib/follow/stat_watcher.go
@@ -21,12 +21,12 @@
 // frequent modification but conserves resources during periods of inactivity.
 // The default values of minSleep and maxSleep can be overriden using the
 // newCustomFSStatWatcher() constructor.
-func newFSStatWatch(filename string) func(chan<- error, <-chan bool, chan<- bool) {
+func newFSStatWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
 	return newCustomFSStatWatch(filename, defaultMinSleep, defaultMaxSleep)
 }
 
-func newCustomFSStatWatch(filename string, minSleep, maxSleep time.Duration) func(chan<- error, <-chan bool, chan<- bool) {
-	return func(events chan<- error, stop <-chan bool, done chan<- bool) {
+func newCustomFSStatWatch(filename string, minSleep, maxSleep time.Duration) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
+	return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
 		defer close(done)
 		defer close(events)
 		file, err := os.Open(filename)
@@ -40,7 +40,11 @@
 			events <- err
 			return
 		}
-		lastFileSize := fileInfo.Size()
+		initialFileSize := fileInfo.Size()
+
+		close(initialized)
+
+		lastFileSize := initialFileSize
 		sleep := minSleep
 		for {
 			// Look for a stop command.
diff --git a/runtimes/google/lib/follow/timed_watcher.go b/runtimes/google/lib/follow/timed_watcher.go
index ae39716..aea4b50 100644
--- a/runtimes/google/lib/follow/timed_watcher.go
+++ b/runtimes/google/lib/follow/timed_watcher.go
@@ -10,14 +10,17 @@
 // channel at an interval specified by sleep.
 // The event receiver must determine whether the event is spurious, or
 // corresponds to a modification of the file.
-func newFSTimedWatch(filename string) func(chan<- error, <-chan bool, chan<- bool) {
+func newFSTimedWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
 	return newCustomFSTimedWatch(filename, defaultSleep)
 }
 
-func newCustomFSTimedWatch(filename string, sleep time.Duration) func(chan<- error, <-chan bool, chan<- bool) {
-	return func(events chan<- error, stop <-chan bool, done chan<- bool) {
+func newCustomFSTimedWatch(filename string, sleep time.Duration) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
+	return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
 		defer close(done)
 		defer close(events)
+
+		close(initialized)
+
 		for {
 			// Look for a stop command.
 			select {
diff --git a/runtimes/google/lib/follow/watcher.go b/runtimes/google/lib/follow/watcher.go
index 678526e..eae1353 100644
--- a/runtimes/google/lib/follow/watcher.go
+++ b/runtimes/google/lib/follow/watcher.go
@@ -15,36 +15,39 @@
 type fsWatcher struct {
 	// watch runs on the event routine, and detects modifications and sends
 	// corresponding events. watch runs until it is asked to stop.
-	watch func(events chan<- error, stop <-chan bool, done chan<- bool)
+	watch func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{})
 	// events is the channel on which events and errors are sent.
 	events <-chan error
 	// stop is the channel on which the event routine is told to stop.
-	stop chan<- bool
+	stop chan<- struct{}
 	// done is the channel on which the event routine announces that it is done.
-	done <-chan bool
+	done <-chan struct{}
 	// mu guards closed
 	mu sync.Mutex
 	// closed is true iff the watcher has been closed.
 	closed bool // GUARDED_BY(mu)
 }
 
-// newFSWatcher spawns an event routine that runs watch, and returns a new
+// newCustomFSWatcher spawns an event routine that runs watch, and returns a new
 // fsWatcher.
 // watch is a function that detects file modifications and sends a corresponding
 // nil value on the returned watcher's events channel. If an error occurs in
 // watch, it is sent on the events channel. However, watch may keep running.
 // To halt watch, the receiver should call Close().
 //
+// Watch guarantees that an event will be received for any modification after
+// newCustomFSWatcher returns.
+//
 // A sequence of modifications may correspond to one sent event. Watch guarantees
 // that at least one event is received after the most recent modification.
 //
 // The frequency at which events are generated is implementation-specific.
 // Implementations may generate events even if the file has not been modified -
 // the receiver should determine whether these events are spurious.
-func newCustomFSWatcher(watch func(chan<- error, <-chan bool, chan<- bool)) (*fsWatcher, error) {
+func newCustomFSWatcher(watch func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{})) (*fsWatcher, error) {
 	events := make(chan error, 1)
-	stop := make(chan bool)
-	done := make(chan bool)
+	stop := make(chan struct{})
+	done := make(chan struct{})
 	watcher := &fsWatcher{
 		watch:  watch,
 		events: events,
@@ -52,7 +55,11 @@
 		done:   done,
 		closed: false,
 	}
-	go watch(events, stop, done)
+	initialized := make(chan struct{})
+	go watch(events, initialized, stop, done)
+	// Wait until the watch routine has been initialized. This ensures that an
+	// event is sent for any modification after this function returns.
+	<-initialized
 	return watcher, nil
 }
 
@@ -87,7 +94,7 @@
 // process them.
 // sendEvent can be preempted by a stop request, and returns true iff the event
 // was sent or coalesced with an existing event.
-func sendEvent(events chan<- error, event error, stop <-chan bool) bool {
+func sendEvent(events chan<- error, event error, stop <-chan struct{}) bool {
 	if event == nil {
 		select {
 		case <-stop:
diff --git a/runtimes/google/lib/follow/watcher_test.go b/runtimes/google/lib/follow/watcher_test.go
index b8c0e2a..2732238 100644
--- a/runtimes/google/lib/follow/watcher_test.go
+++ b/runtimes/google/lib/follow/watcher_test.go
@@ -10,7 +10,7 @@
 
 	// Test that 1) a nil can be sent 2) an error can be sent after a nil.
 	events := make(chan error, 1)
-	stop := make(chan bool)
+	stop := make(chan struct{})
 	if !sendEvent(events, nil, stop) {
 		t.Fatal("Expected that the event will be sent")
 	}
@@ -26,7 +26,7 @@
 
 	// Test that nils are coalesced.
 	events = make(chan error, 1)
-	stop = make(chan bool)
+	stop = make(chan struct{})
 	if !sendEvent(events, nil, stop) {
 		t.Fatal("Expected that the event will be sent")
 	}
@@ -39,7 +39,7 @@
 
 	// Test that an error is not sent if stop is closed.
 	events = make(chan error, 1)
-	stop = make(chan bool)
+	stop = make(chan struct{})
 	close(stop)
 	// The stop signal may not be handled immediately.
 	for sendEvent(events, event, stop) {
@@ -47,7 +47,7 @@
 
 	// Test that a nil is not sent if stop is closed.
 	events = make(chan error, 1)
-	stop = make(chan bool)
+	stop = make(chan struct{})
 	close(stop)
 	// The stop signal may not be handled immediately.
 	for sendEvent(events, nil, stop) {
@@ -55,7 +55,7 @@
 
 	// Test that an error is not sent if stop is closed while send is blocked.
 	events = make(chan error, 1)
-	stop = make(chan bool)
+	stop = make(chan struct{})
 	if !sendEvent(events, nil, stop) {
 		t.Fatal("Expected that the event will be sent")
 	}