veyron/runtimes/google/lib/follow: Fix initialization race in the follow watcher.
Sometimes, a watcher would not detect modifications to the file immediately
after the watcher was created.
This caused a race condition in Read:
1) Routine #1 creates a reader, which in turn creates a watcher.
2) Routine #1 calls reader.Read(). The Read doesn't see enough bytes, so
Routine #1 awaits an event from watcher.
3) Routine #2 writes to the file.
4) The watcher's watch routine is initialized. However, it missed the write,
and does not send an event.
As a result, Read hangs.
The watcher now waits until the watch routine is initialized before it returns.
Change-Id: Idd03b7d6a7cf7560315a8a6661c60f55dd7a06a1
diff --git a/runtimes/google/lib/follow/notify_watcher.go b/runtimes/google/lib/follow/notify_watcher.go
index d897f75..7669421 100644
--- a/runtimes/google/lib/follow/notify_watcher.go
+++ b/runtimes/google/lib/follow/notify_watcher.go
@@ -12,8 +12,8 @@
// events channel. Further fsnotify events are not received until the nil
// value is received from the events channel.
// The function sends any errors on the events channel.
-func newFSNotifyWatch(filename string) func(chan<- error, <-chan bool, chan<- bool) {
- return func(events chan<- error, stop <-chan bool, done chan<- bool) {
+func newFSNotifyWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
+ return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
defer close(done)
defer close(events)
source, err := fsnotify.NewWatcher()
@@ -26,6 +26,9 @@
events <- err
return
}
+
+ close(initialized)
+
for {
// Receive:
// (A) An fsnotify modification event. Send nil.
diff --git a/runtimes/google/lib/follow/stat_watcher.go b/runtimes/google/lib/follow/stat_watcher.go
index 1f052bc..40e2bbe 100644
--- a/runtimes/google/lib/follow/stat_watcher.go
+++ b/runtimes/google/lib/follow/stat_watcher.go
@@ -21,12 +21,12 @@
// frequent modification but conserves resources during periods of inactivity.
// The default values of minSleep and maxSleep can be overriden using the
// newCustomFSStatWatcher() constructor.
-func newFSStatWatch(filename string) func(chan<- error, <-chan bool, chan<- bool) {
+func newFSStatWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
return newCustomFSStatWatch(filename, defaultMinSleep, defaultMaxSleep)
}
-func newCustomFSStatWatch(filename string, minSleep, maxSleep time.Duration) func(chan<- error, <-chan bool, chan<- bool) {
- return func(events chan<- error, stop <-chan bool, done chan<- bool) {
+func newCustomFSStatWatch(filename string, minSleep, maxSleep time.Duration) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
+ return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
defer close(done)
defer close(events)
file, err := os.Open(filename)
@@ -40,7 +40,11 @@
events <- err
return
}
- lastFileSize := fileInfo.Size()
+ initialFileSize := fileInfo.Size()
+
+ close(initialized)
+
+ lastFileSize := initialFileSize
sleep := minSleep
for {
// Look for a stop command.
diff --git a/runtimes/google/lib/follow/timed_watcher.go b/runtimes/google/lib/follow/timed_watcher.go
index ae39716..aea4b50 100644
--- a/runtimes/google/lib/follow/timed_watcher.go
+++ b/runtimes/google/lib/follow/timed_watcher.go
@@ -10,14 +10,17 @@
// channel at an interval specified by sleep.
// The event receiver must determine whether the event is spurious, or
// corresponds to a modification of the file.
-func newFSTimedWatch(filename string) func(chan<- error, <-chan bool, chan<- bool) {
+func newFSTimedWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
return newCustomFSTimedWatch(filename, defaultSleep)
}
-func newCustomFSTimedWatch(filename string, sleep time.Duration) func(chan<- error, <-chan bool, chan<- bool) {
- return func(events chan<- error, stop <-chan bool, done chan<- bool) {
+func newCustomFSTimedWatch(filename string, sleep time.Duration) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
+ return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
defer close(done)
defer close(events)
+
+ close(initialized)
+
for {
// Look for a stop command.
select {
diff --git a/runtimes/google/lib/follow/watcher.go b/runtimes/google/lib/follow/watcher.go
index 678526e..eae1353 100644
--- a/runtimes/google/lib/follow/watcher.go
+++ b/runtimes/google/lib/follow/watcher.go
@@ -15,36 +15,39 @@
type fsWatcher struct {
// watch runs on the event routine, and detects modifications and sends
// corresponding events. watch runs until it is asked to stop.
- watch func(events chan<- error, stop <-chan bool, done chan<- bool)
+ watch func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{})
// events is the channel on which events and errors are sent.
events <-chan error
// stop is the channel on which the event routine is told to stop.
- stop chan<- bool
+ stop chan<- struct{}
// done is the channel on which the event routine announces that it is done.
- done <-chan bool
+ done <-chan struct{}
// mu guards closed
mu sync.Mutex
// closed is true iff the watcher has been closed.
closed bool // GUARDED_BY(mu)
}
-// newFSWatcher spawns an event routine that runs watch, and returns a new
+// newCustomFSWatcher spawns an event routine that runs watch, and returns a new
// fsWatcher.
// watch is a function that detects file modifications and sends a corresponding
// nil value on the returned watcher's events channel. If an error occurs in
// watch, it is sent on the events channel. However, watch may keep running.
// To halt watch, the receiver should call Close().
//
+// Watch guarantees that an event will be received for any modification after
+// newCustomFSWatcher returns.
+//
// A sequence of modifications may correspond to one sent event. Watch guarantees
// that at least one event is received after the most recent modification.
//
// The frequency at which events are generated is implementation-specific.
// Implementations may generate events even if the file has not been modified -
// the receiver should determine whether these events are spurious.
-func newCustomFSWatcher(watch func(chan<- error, <-chan bool, chan<- bool)) (*fsWatcher, error) {
+func newCustomFSWatcher(watch func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{})) (*fsWatcher, error) {
events := make(chan error, 1)
- stop := make(chan bool)
- done := make(chan bool)
+ stop := make(chan struct{})
+ done := make(chan struct{})
watcher := &fsWatcher{
watch: watch,
events: events,
@@ -52,7 +55,11 @@
done: done,
closed: false,
}
- go watch(events, stop, done)
+ initialized := make(chan struct{})
+ go watch(events, initialized, stop, done)
+ // Wait until the watch routine has been initialized. This ensures that an
+ // event is sent for any modification after this function returns.
+ <-initialized
return watcher, nil
}
@@ -87,7 +94,7 @@
// process them.
// sendEvent can be preempted by a stop request, and returns true iff the event
// was sent or coalesced with an existing event.
-func sendEvent(events chan<- error, event error, stop <-chan bool) bool {
+func sendEvent(events chan<- error, event error, stop <-chan struct{}) bool {
if event == nil {
select {
case <-stop:
diff --git a/runtimes/google/lib/follow/watcher_test.go b/runtimes/google/lib/follow/watcher_test.go
index b8c0e2a..2732238 100644
--- a/runtimes/google/lib/follow/watcher_test.go
+++ b/runtimes/google/lib/follow/watcher_test.go
@@ -10,7 +10,7 @@
// Test that 1) a nil can be sent 2) an error can be sent after a nil.
events := make(chan error, 1)
- stop := make(chan bool)
+ stop := make(chan struct{})
if !sendEvent(events, nil, stop) {
t.Fatal("Expected that the event will be sent")
}
@@ -26,7 +26,7 @@
// Test that nils are coalesced.
events = make(chan error, 1)
- stop = make(chan bool)
+ stop = make(chan struct{})
if !sendEvent(events, nil, stop) {
t.Fatal("Expected that the event will be sent")
}
@@ -39,7 +39,7 @@
// Test that an error is not sent if stop is closed.
events = make(chan error, 1)
- stop = make(chan bool)
+ stop = make(chan struct{})
close(stop)
// The stop signal may not be handled immediately.
for sendEvent(events, event, stop) {
@@ -47,7 +47,7 @@
// Test that a nil is not sent if stop is closed.
events = make(chan error, 1)
- stop = make(chan bool)
+ stop = make(chan struct{})
close(stop)
// The stop signal may not be handled immediately.
for sendEvent(events, nil, stop) {
@@ -55,7 +55,7 @@
// Test that an error is not sent if stop is closed while send is blocked.
events = make(chan error, 1)
- stop = make(chan bool)
+ stop = make(chan struct{})
if !sendEvent(events, nil, stop) {
t.Fatal("Expected that the event will be sent")
}