veyron/runtimes/google/lib/follow: Refactor watcher api and implementation.

Previously, the watcher would send file modifications events on a channel. Now,
the watcher provides a Recv method that blocks until the file is modified.
This simplifies the watcher implementation:
1) we no longer spawn go routines that fill the events channel.
2) delegate watchers (i.e. fsnotify-based, fs.Stat-based, timed, etc.) implement
   an interface, not a function type.

Change-Id: Ia6b27c53f4e1a7e123cd2e738ac621331d177b95
diff --git a/runtimes/google/lib/follow/bsd_darwin_linux_windows_config.go b/runtimes/google/lib/follow/bsd_darwin_linux_windows_config.go
index 5dacaa8..09a9cf9 100644
--- a/runtimes/google/lib/follow/bsd_darwin_linux_windows_config.go
+++ b/runtimes/google/lib/follow/bsd_darwin_linux_windows_config.go
@@ -4,7 +4,6 @@
 
 // newFSWatcher starts and returns a new fsnotify-based fsWatcher.
 // filename specifies the file to watch.
-func newFSWatcher(filename string) (*fsWatcher, error) {
-	watch := newFSNotifyWatch(filename)
-	return newCustomFSWatcher(watch)
+func newFSWatcher(filename string) (fsWatcher, error) {
+	return newFSNotifyWatcher(filename)
 }
diff --git a/runtimes/google/lib/follow/notify_reader_test.go b/runtimes/google/lib/follow/notify_reader_test.go
index 862e04c..04f8827 100644
--- a/runtimes/google/lib/follow/notify_reader_test.go
+++ b/runtimes/google/lib/follow/notify_reader_test.go
@@ -21,8 +21,7 @@
 	defer os.Remove(testFileName)
 
 	// Create the fsnotify-based fsWatcher.
-	watch := newFSNotifyWatch(testFileName)
-	watcher, err := newCustomFSWatcher(watch)
+	watcher, err := newFSNotifyWatcher(testFileName)
 	if err != nil {
 		t.Fatalf("newCustomFSWatcher() failed: %v", err)
 	}
@@ -46,8 +45,7 @@
 	defer os.Remove(testFileName)
 
 	// Create the fsnotify-based fsWatcher.
-	watch := newFSNotifyWatch(testFileName)
-	watcher, err := newCustomFSWatcher(watch)
+	watcher, err := newFSNotifyWatcher(testFileName)
 	if err != nil {
 		t.Fatalf("newCustomFSWatcher() failed: %v", err)
 	}
@@ -71,8 +69,7 @@
 	defer os.Remove(testFileName)
 
 	// Create the fsnotify-based fsWatcher.
-	watch := newFSNotifyWatch(testFileName)
-	watcher, err := newCustomFSWatcher(watch)
+	watcher, err := newFSNotifyWatcher(testFileName)
 	if err != nil {
 		t.Fatalf("newCustomFSWatcher() failed: %v", err)
 	}
diff --git a/runtimes/google/lib/follow/notify_watcher.go b/runtimes/google/lib/follow/notify_watcher.go
index 7669421..c5279d8 100644
--- a/runtimes/google/lib/follow/notify_watcher.go
+++ b/runtimes/google/lib/follow/notify_watcher.go
@@ -3,51 +3,83 @@
 package follow
 
 import (
+	"fmt"
 	"github.com/howeyc/fsnotify"
+	"io"
+	"sync"
+
+	vsync "veyron/runtimes/google/lib/sync"
 )
 
-// newFSNotifyWatch returns a function that listens on fsnotify and sends
-// corresponding modification events.
-// For each fsnotify modification event, a single nil value is sent on the
-// events channel. Further fsnotify events are not received until the nil
-// value is received from the events channel.
-// The function sends any errors on the events channel.
-func newFSNotifyWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
-	return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
-		defer close(done)
-		defer close(events)
-		source, err := fsnotify.NewWatcher()
-		if err != nil {
-			events <- err
-			return
-		}
-		defer source.Close()
-		if err := source.Watch(filename); err != nil {
-			events <- err
-			return
-		}
+type fsNotifyWatcher struct {
+	filename string
+	source   *fsnotify.Watcher
+	// cancel signals Wait to terminate.
+	cancel chan struct{}
+	// pending allows Close to block till ongoing calls to Wait terminate.
+	pending vsync.WaitGroup
+	// mu and closed ensure that Close is idempotent.
+	mu     sync.Mutex
+	closed bool // GUARDED_BY(mu)
+}
 
-		close(initialized)
+// newFSNotifyWatcher returns an fsnotify-based fsWatcher.
+// Wait() blocks until it receives a file modification event from fsnotify.
+func newFSNotifyWatcher(filename string) (fsWatcher, error) {
+	source, err := fsnotify.NewWatcher()
+	if err != nil {
+		return nil, err
+	}
+	if err := source.Watch(filename); err != nil {
+		source.Close()
+		return nil, err
+	}
+	return &fsNotifyWatcher{
+		source: source,
+		cancel: make(chan struct{}),
+	}, nil
+}
 
-		for {
-			// Receive:
-			//  (A) An fsnotify modification event. Send nil.
-			//  (B) An fsnotify error. Send the error.
-			//  (C) A stop command. Stop listening and clean up.
-			select {
-			case event := <-source.Event:
-				if event.IsModify() {
-					if !sendEvent(events, nil, stop) {
-						return
+func (w *fsNotifyWatcher) Wait() error {
+	// After Close returns, any call to Wait must return io.EOF.
+	if !w.pending.TryAdd() {
+		return io.EOF
+	}
+	defer w.pending.Done()
+
+	for {
+		select {
+		case event := <-w.source.Event:
+			if event.IsModify() {
+				// Drain the event queue.
+				drained := false
+				for !drained {
+					select {
+					case <-w.source.Event:
+					default:
+						drained = true
 					}
 				}
-			case err := <-source.Error:
-				if !sendEvent(events, err, stop) {
-					return
-				}
-			case <-stop:
-				return
+				return nil
 			}
+			return fmt.Errorf("Unexpected event %v", event)
+		case err := <-w.source.Error:
+			return err
+		case <-w.cancel:
+			// After Close returns, any call to Wait must return io.EOF.
+			return io.EOF
 		}
 	}
 }
+
+func (w *fsNotifyWatcher) Close() error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	if w.closed {
+		return nil
+	}
+	w.closed = true
+	close(w.cancel)
+	w.pending.Wait()
+	return w.source.Close()
+}
diff --git a/runtimes/google/lib/follow/notify_watcher_test.go b/runtimes/google/lib/follow/notify_watcher_test.go
index 143de22..058f5a1 100644
--- a/runtimes/google/lib/follow/notify_watcher_test.go
+++ b/runtimes/google/lib/follow/notify_watcher_test.go
@@ -17,8 +17,7 @@
 	defer testfile.Close()
 	defer os.Remove(testFileName)
 
-	watch := newFSNotifyWatch(testFileName)
-	watcher, err := newCustomFSWatcher(watch)
+	watcher, err := newFSNotifyWatcher(testFileName)
 	if err != nil {
 		t.Fatalf("newCustomFSWatcer() failed: %v", err)
 	}
diff --git a/runtimes/google/lib/follow/other_config.go b/runtimes/google/lib/follow/other_config.go
index 09debb5..8feb648 100644
--- a/runtimes/google/lib/follow/other_config.go
+++ b/runtimes/google/lib/follow/other_config.go
@@ -2,9 +2,8 @@
 
 package follow
 
-// newFSWatcher starts and returns a new timer-based fsWatcher.
+// newFSWatcher starts and returns a new os.Stat()-based fsWatcher.
 // filename specifies the file to watch.
-func newFSWatcher(filename string) (*fsWatcher, error) {
-	watch := newFSTimedWatch(filename)
-	return newCustomFSWatcher(watch)
+func newFSWatcher(filename string) (fsWatcher, error) {
+	return newFSStatWatcher(filename)
 }
diff --git a/runtimes/google/lib/follow/reader.go b/runtimes/google/lib/follow/reader.go
index e3b6474..ab488cd 100644
--- a/runtimes/google/lib/follow/reader.go
+++ b/runtimes/google/lib/follow/reader.go
@@ -9,13 +9,12 @@
 // fsReader is an implementation of io.ReadCloser that reads synchronously
 // from a file, blocking until at least one byte is written to the file and is
 // available for reading.
-// fsReader should not be accessed concurrently.
 type fsReader struct {
 	mu sync.Mutex
 	// The file to read.
 	file *os.File // GUARDED_BY(mu)
 	// The watcher of modifications to the file.
-	watcher *fsWatcher
+	watcher fsWatcher
 	// True if the reader is open for reading, false otherwise.
 	closed bool // GUARDED_BY(mu)
 }
@@ -23,10 +22,12 @@
 // NewReader creates a new reader that reads synchronously from a file,
 // blocking until at least one byte is written to the file and is available
 // for reading.
-// The returned ReadCloser should not be accessed concurrently.
+// The returned io.ReadCloser supports limited concurrency:
+// 1) Reads may not be called concurrently.
+// 2) Close may be called concurrently with Read, and will terminate Read.
 func NewReader(filename string) (reader io.ReadCloser, err error) {
 	var file *os.File
-	var watcher *fsWatcher
+	var watcher fsWatcher
 	defer func() {
 		if err == nil {
 			return
@@ -51,7 +52,7 @@
 	return newCustomReader(file, watcher)
 }
 
-func newCustomReader(file *os.File, watcher *fsWatcher) (io.ReadCloser, error) {
+func newCustomReader(file *os.File, watcher fsWatcher) (io.ReadCloser, error) {
 	reader := &fsReader{
 		file:    file,
 		watcher: watcher,
@@ -77,7 +78,7 @@
 		// Wait until the file is modified one or more times. The new
 		// bytes from each corresponding modification have been
 		// written to the file already, and therefore won't be skipped.
-		if err := receiveEvents(r.watcher.events); err != nil {
+		if err := r.watcher.Wait(); err != nil {
 			return 0, err
 		}
 
@@ -85,17 +86,6 @@
 	}
 }
 
-// receiveEvents receives events from an event channel, blocking until at
-// least one event is available..
-// io.EOF is returned if the event channel is closed (as a result of Close()).
-func receiveEvents(events <-chan error) error {
-	err, ok := <-events
-	if !ok {
-		return io.EOF
-	}
-	return err
-}
-
 // Close closes the reader synchronously.
 // 1) Terminates ongoing reads. (reads return io.EOF)
 // 2) Prevents future reads. (reads return io.EOF)
diff --git a/runtimes/google/lib/follow/stat_reader_test.go b/runtimes/google/lib/follow/stat_reader_test.go
index 2c34b0e..c67e8b7 100644
--- a/runtimes/google/lib/follow/stat_reader_test.go
+++ b/runtimes/google/lib/follow/stat_reader_test.go
@@ -21,8 +21,7 @@
 	// Create the os.Stat()-based fsWatcher.
 	minSleep := 10 * time.Millisecond
 	maxSleep := 100 * time.Millisecond
-	watch := newCustomFSStatWatch(testFileName, minSleep, maxSleep)
-	watcher, err := newCustomFSWatcher(watch)
+	watcher, err := newCustomFSStatWatcher(testFileName, minSleep, maxSleep)
 	if err != nil {
 		t.Fatalf("newCustomFSWatcher() failed: %v", err)
 	}
@@ -48,8 +47,7 @@
 	// Create the os.Stat()-based fsWatcher.
 	minSleep := 10 * time.Millisecond
 	maxSleep := 100 * time.Millisecond
-	watch := newCustomFSStatWatch(testFileName, minSleep, maxSleep)
-	watcher, err := newCustomFSWatcher(watch)
+	watcher, err := newCustomFSStatWatcher(testFileName, minSleep, maxSleep)
 	if err != nil {
 		t.Fatalf("newCustomFSWatcher() failed: %v", err)
 	}
@@ -75,8 +73,7 @@
 	// Create the os.Stat()-based fsWatcher.
 	minSleep := 10 * time.Millisecond
 	maxSleep := 100 * time.Millisecond
-	watch := newCustomFSStatWatch(testFileName, minSleep, maxSleep)
-	watcher, err := newCustomFSWatcher(watch)
+	watcher, err := newCustomFSStatWatcher(testFileName, minSleep, maxSleep)
 	if err != nil {
 		t.Fatalf("newCustomFSWatcher() failed: %v", err)
 	}
diff --git a/runtimes/google/lib/follow/stat_watcher.go b/runtimes/google/lib/follow/stat_watcher.go
index 40e2bbe..29c8208 100644
--- a/runtimes/google/lib/follow/stat_watcher.go
+++ b/runtimes/google/lib/follow/stat_watcher.go
@@ -1,72 +1,100 @@
 package follow
 
 import (
+	"io"
 	"os"
+	"sync"
 	"time"
+
+	vsync "veyron/runtimes/google/lib/sync"
 )
 
 const (
-	defaultMinSleep = time.Second
-	defaultMaxSleep = time.Minute
+	defaultMinSleep = 10 * time.Millisecond
+	defaultMaxSleep = 5 * time.Second
 )
 
-// newFSStatWatch returns a function that polls os.Stat(), observing file size.
-// If the file size is larger than the previously-recorded file size,
-// fsStatWatcher assumes the file has been modified and sends a nil value on
-// the events channel.
-// The function starts polling os.Stat() at an interval specified by
-// minSleep, doubling that interval as long the file is not modified, upto
-// a maximum interval specified by maxSleep. The interval is reset to minSleep
-// once the file is modified. This allows faster detection during periods of
-// frequent modification but conserves resources during periods of inactivity.
-// The default values of minSleep and maxSleep can be overriden using the
-// newCustomFSStatWatcher() constructor.
-func newFSStatWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
-	return newCustomFSStatWatch(filename, defaultMinSleep, defaultMaxSleep)
+type fsStatWatcher struct {
+	minSleep     time.Duration
+	maxSleep     time.Duration
+	file         *os.File
+	lastFileSize int64
+	// cancel signals Wait to terminate.
+	cancel chan struct{}
+	// pending allows Close to block till ongoing calls to Wait terminate.
+	pending vsync.WaitGroup
+	// mu and closed ensure that Close is idempotent.
+	mu     sync.Mutex
+	closed bool // GUARDED_BY(mu)
 }
 
-func newCustomFSStatWatch(filename string, minSleep, maxSleep time.Duration) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
-	return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
-		defer close(done)
-		defer close(events)
-		file, err := os.Open(filename)
-		if err != nil {
-			events <- err
-			return
-		}
-		defer file.Close()
-		fileInfo, err := file.Stat()
-		if err != nil {
-			events <- err
-			return
-		}
-		initialFileSize := fileInfo.Size()
+// newFSStatWatcher returns an fsWatcher that polls os.Stat(), observing file
+// size. If the file size is larger than the previously-recorded file size,
+// the watcher assumes the file has been modified.
+// Wait() polls os.Stat() at an interval specified by minSleep, doubling that
+// interval as long the file is not modified, upto a maximum interval specified
+// by maxSleep. This allows faster detection during periods of frequent
+// modification but conserves resources during periods of inactivity.
+// The default values of minSleep and maxSleep can be overriden using the
+// newCustomFSStatWatcher() constructor.
+func newFSStatWatcher(filename string) (fsWatcher, error) {
+	return newCustomFSStatWatcher(filename, defaultMinSleep, defaultMaxSleep)
+}
 
-		close(initialized)
-
-		lastFileSize := initialFileSize
-		sleep := minSleep
-		for {
-			// Look for a stop command.
-			select {
-			case <-stop:
-				return
-			default:
-			}
-			fileInfo, err := file.Stat()
-			if err != nil {
-				if !sendEvent(events, err, stop) {
-					return
-				}
-			} else if fileSize := fileInfo.Size(); lastFileSize < fileSize {
-				lastFileSize = fileSize
-				if !sendEvent(events, nil, stop) {
-					return
-				}
-				sleep = minSleep
-			}
-			time.Sleep(sleep)
-			sleep = minDuration(sleep*2, maxSleep)
-		}
+func newCustomFSStatWatcher(filename string, minSleep, maxSleep time.Duration) (fsWatcher, error) {
+	file, err := os.Open(filename)
+	if err != nil {
+		return nil, err
 	}
+	fileInfo, err := file.Stat()
+	if err != nil {
+		file.Close()
+		return nil, err
+	}
+	return &fsStatWatcher{
+		minSleep:     minSleep,
+		maxSleep:     maxSleep,
+		file:         file,
+		lastFileSize: fileInfo.Size(),
+		cancel:       make(chan struct{}),
+	}, nil
+}
+
+func (w *fsStatWatcher) Wait() error {
+	// After Close returns, any call to Wait must return io.EOF.
+	if !w.pending.TryAdd() {
+		return io.EOF
+	}
+	defer w.pending.Done()
+
+	sleep := w.minSleep
+	for {
+		select {
+		case <-w.cancel:
+			// After Close returns, any call to Wait must return io.EOF.
+			return io.EOF
+		default:
+		}
+		fileInfo, err := w.file.Stat()
+		if err != nil {
+			return err
+		} else if fileSize := fileInfo.Size(); w.lastFileSize < fileSize {
+			w.lastFileSize = fileSize
+			return nil
+		}
+		time.Sleep(sleep)
+		sleep = minDuration(sleep*2, w.maxSleep)
+	}
+}
+
+func (w *fsStatWatcher) Close() error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	if w.closed {
+		return nil
+	}
+	w.closed = true
+	close(w.cancel)
+	w.pending.Wait()
+	return w.file.Close()
 }
diff --git a/runtimes/google/lib/follow/stat_watcher_test.go b/runtimes/google/lib/follow/stat_watcher_test.go
index 04843d3..5c12f00 100644
--- a/runtimes/google/lib/follow/stat_watcher_test.go
+++ b/runtimes/google/lib/follow/stat_watcher_test.go
@@ -17,8 +17,7 @@
 
 	minSleep := 10 * time.Millisecond
 	maxSleep := 100 * time.Millisecond
-	watch := newCustomFSStatWatch(testFileName, minSleep, maxSleep)
-	watcher, err := newCustomFSWatcher(watch)
+	watcher, err := newCustomFSStatWatcher(testFileName, minSleep, maxSleep)
 	if err != nil {
 		t.Fatalf("newCustomFSWatcher() failed : %v", err)
 	}
diff --git a/runtimes/google/lib/follow/test_util.go b/runtimes/google/lib/follow/test_util.go
index 48f0d56..0da3fac 100644
--- a/runtimes/google/lib/follow/test_util.go
+++ b/runtimes/google/lib/follow/test_util.go
@@ -14,6 +14,20 @@
 	errUnexpectedModification = errors.New("unexpected modification event")
 )
 
+func events(watcher fsWatcher) <-chan error {
+	events := make(chan error)
+	go func() {
+		for {
+			event := watcher.Wait()
+			events <- event
+			if event == io.EOF {
+				break
+			}
+		}
+	}()
+	return events
+}
+
 // readString reads a string of the specified length from the reader.
 // Returns an error if:
 //  (A) reader.Read returned an error
@@ -91,31 +105,32 @@
 
 // testModification tests that the watcher sends events when the file is
 // modified.
-func testModification(file *os.File, watcher *fsWatcher, timeout time.Duration) error {
+func testModification(file *os.File, watcher fsWatcher, timeout time.Duration) error {
+	events := events(watcher)
 	// no modifications, expect no events.
-	if err := expectSilence(watcher.events, timeout); err != nil {
+	if err := expectSilence(events, timeout); err != nil {
 		return errors.New(fmt.Sprintf("expectSilence() failed with no modifications: %v ", err))
 	}
 	// modify once, expect event.
 	if err := writeString(file, "modification one"); err != nil {
 		return errors.New(fmt.Sprintf("writeString() failed on modification one: %v ", err))
 	}
-	if err := expectModification(watcher.events, timeout); err != nil {
+	if err := expectModification(events, timeout); err != nil {
 		return errors.New(fmt.Sprintf("expectModication() failed on modification one: %v ", err))
 	}
 	// no further modifications, expect no events.
-	if err := expectSilence(watcher.events, timeout); err != nil {
+	if err := expectSilence(events, timeout); err != nil {
 		return errors.New(fmt.Sprintf("expectSilence() failed after modification one: %v ", err))
 	}
 	// modify again, expect event.
 	if err := writeString(file, "modification two"); err != nil {
 		return errors.New(fmt.Sprintf("writeString() failed on modification two: %v ", err))
 	}
-	if err := expectModification(watcher.events, timeout); err != nil {
+	if err := expectModification(events, time.Hour); err != nil {
 		return errors.New(fmt.Sprintf("expectModification() failed on modification two: %v ", err))
 	}
 	// no further modifications, expect no events.
-	if err := expectSilence(watcher.events, timeout); err != nil {
+	if err := expectSilence(events, timeout); err != nil {
 		return errors.New(fmt.Sprintf("expectSilence() failed after modification two: %v ", err))
 	}
 	return nil
@@ -124,7 +139,7 @@
 // testClose tests the implementation of fsReader.Read(). Specifically,
 // tests that Read() blocks if the requested bytes are not available for
 // reading in the underlying file.
-func testReadPartial(testFileName string, watcher *fsWatcher, timeout time.Duration) error {
+func testReadPartial(testFileName string, watcher fsWatcher, timeout time.Duration) error {
 	s0 := "part"
 
 	// Open the file for writing.
@@ -172,7 +187,7 @@
 // testClose tests the implementation of fsReader.Read(). Specifically,
 // tests that Read() returns the requested bytes when they are available
 // for reading in the underlying file.
-func testReadFull(testFileName string, watcher *fsWatcher, timeout time.Duration) error {
+func testReadFull(testFileName string, watcher fsWatcher, timeout time.Duration) error {
 	s0, s1, s2 := "partitioned", "parti", "tioned"
 
 	// Open the file for writing.
@@ -217,7 +232,7 @@
 }
 
 // testClose tests the implementation of fsReader.Close()
-func testClose(testFileName string, watcher *fsWatcher, timeout time.Duration) error {
+func testClose(testFileName string, watcher fsWatcher, timeout time.Duration) error {
 	s := "word"
 
 	// Open the file for writing.
diff --git a/runtimes/google/lib/follow/timed_reader_test.go b/runtimes/google/lib/follow/timed_reader_test.go
deleted file mode 100644
index a9b055d..0000000
--- a/runtimes/google/lib/follow/timed_reader_test.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package follow
-
-import (
-	"os"
-	"testing"
-	"time"
-)
-
-// TestTimedReadPartial tests partial reads with the timer-based fsReader
-func TestTimedReadPartial(t *testing.T) {
-	testFileName := os.TempDir() + "/follow.reader.timed.partial"
-
-	// Create the test file.
-	testfile, err := os.Create(testFileName)
-	if err != nil {
-		t.Fatalf("os.Create() failed: %v", err)
-	}
-	testfile.Close()
-	defer os.Remove(testFileName)
-
-	// Create the timer-based fsWatcher.
-	sleep := 10 * time.Millisecond
-	watch := newCustomFSTimedWatch(testFileName, sleep)
-	watcher, err := newCustomFSWatcher(watch)
-	if err != nil {
-		t.Fatalf("newCustomFSWatcher() failed: %v", err)
-	}
-
-	timeout := 100 * time.Millisecond
-	if err := testReadPartial(testFileName, watcher, timeout); err != nil {
-		t.Fatal("testReadPartial() failed: %v", err)
-	}
-}
-
-// TestTimedReadFull tests full reads with the timer-based fsReader
-func TestTimedReadFull(t *testing.T) {
-	testFileName := os.TempDir() + "/follow.reader.timed.full"
-
-	// Create the test file.
-	testfile, err := os.Create(testFileName)
-	if err != nil {
-		t.Fatalf("os.Create() failed: %v", err)
-	}
-	testfile.Close()
-	defer os.Remove(testFileName)
-
-	// Create the timer-based fsWatcher.
-	sleep := 10 * time.Millisecond
-	watch := newCustomFSTimedWatch(testFileName, sleep)
-	watcher, err := newCustomFSWatcher(watch)
-	if err != nil {
-		t.Fatalf("newCustomFSWatcher() failed: %v", err)
-	}
-
-	timeout := 100 * time.Millisecond
-	if err := testReadFull(testFileName, watcher, timeout); err != nil {
-		t.Fatal("testReadFull() failed: %v", err)
-	}
-}
-
-// TestTimedClose tests close with the timer-based fsReader
-func TestTimedClose(t *testing.T) {
-	testFileName := os.TempDir() + "/follow.reader.timed.close"
-
-	// Create the test file.
-	testfile, err := os.Create(testFileName)
-	if err != nil {
-		t.Fatalf("os.Create() failed: %v", err)
-	}
-	testfile.Close()
-	defer os.Remove(testFileName)
-
-	// Create the timer-based fsWatcher.
-	sleep := 10 * time.Millisecond
-	watch := newCustomFSTimedWatch(testFileName, sleep)
-	watcher, err := newCustomFSWatcher(watch)
-	if err != nil {
-		t.Fatalf("newCustomFSWatcher() failed: %v", err)
-	}
-
-	timeout := 100 * time.Millisecond
-	if err := testClose(testFileName, watcher, timeout); err != nil {
-		t.Fatal("testClose() failed: %v", err)
-	}
-}
diff --git a/runtimes/google/lib/follow/timed_watcher.go b/runtimes/google/lib/follow/timed_watcher.go
deleted file mode 100644
index aea4b50..0000000
--- a/runtimes/google/lib/follow/timed_watcher.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package follow
-
-import (
-	"time"
-)
-
-const defaultSleep = time.Second
-
-// newFSStatWatch returns a function that sends a nil value on the events
-// channel at an interval specified by sleep.
-// The event receiver must determine whether the event is spurious, or
-// corresponds to a modification of the file.
-func newFSTimedWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
-	return newCustomFSTimedWatch(filename, defaultSleep)
-}
-
-func newCustomFSTimedWatch(filename string, sleep time.Duration) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
-	return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
-		defer close(done)
-		defer close(events)
-
-		close(initialized)
-
-		for {
-			// Look for a stop command.
-			select {
-			case <-stop:
-				return
-			default:
-			}
-			if !sendEvent(events, nil, stop) {
-				return
-			}
-			time.Sleep(sleep)
-		}
-	}
-}
diff --git a/runtimes/google/lib/follow/timed_watcher_test.go b/runtimes/google/lib/follow/timed_watcher_test.go
deleted file mode 100644
index 233b746..0000000
--- a/runtimes/google/lib/follow/timed_watcher_test.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package follow
-
-import (
-	"os"
-	"testing"
-	"time"
-)
-
-func TestModificationTimed(t *testing.T) {
-	testFileName := os.TempDir() + "/follow.modification.timed"
-	testfile, err := os.Create(testFileName)
-	if err != nil {
-		t.Fatalf("os.Create() failed: %v", err)
-	}
-	defer testfile.Close()
-	defer os.Remove(testFileName)
-
-	sleep := 10 * time.Millisecond
-	watch := newCustomFSTimedWatch(testFileName, sleep)
-	watcher, err := newCustomFSWatcher(watch)
-	if err != nil {
-		t.Fatalf("newCustomFSWatcher() failed : %v", err)
-	}
-	timeout := 100 * time.Millisecond
-	// don't modify file, expect event.
-	if err := expectModification(watcher.events, timeout); err != nil {
-		t.Fatalf("expectModification() failed without a file modification: %v", err)
-	}
-	// modify file, expect event.
-	if err := writeString(testfile, "modification"); err != nil {
-		t.Fatalf("writeString() failed: %v", err)
-	}
-	if err := expectModification(watcher.events, timeout); err != nil {
-		t.Fatalf("expectModification() failed after a file modification: %v", err)
-	}
-}
diff --git a/runtimes/google/lib/follow/watcher.go b/runtimes/google/lib/follow/watcher.go
index eae1353..d5ad863 100644
--- a/runtimes/google/lib/follow/watcher.go
+++ b/runtimes/google/lib/follow/watcher.go
@@ -1,113 +1,14 @@
 package follow
 
-import (
-	"errors"
-	"sync"
-)
-
-var errWatcherClosed = errors.New("watcher has already been closed")
-
 // fsWatcher is a tool for watching append-only modifications to a file.
-// Start() spawns an event routine that asynchronously detects modifications to
-// the specified file and sends corresponding events on the events channel.
-// Close() terminates the event routine and stops detecting any modifications.
-// However, the watcher may continue to send previously detected events.
-type fsWatcher struct {
-	// watch runs on the event routine, and detects modifications and sends
-	// corresponding events. watch runs until it is asked to stop.
-	watch func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{})
-	// events is the channel on which events and errors are sent.
-	events <-chan error
-	// stop is the channel on which the event routine is told to stop.
-	stop chan<- struct{}
-	// done is the channel on which the event routine announces that it is done.
-	done <-chan struct{}
-	// mu guards closed
-	mu sync.Mutex
-	// closed is true iff the watcher has been closed.
-	closed bool // GUARDED_BY(mu)
-}
+type fsWatcher interface {
+	// Wait blocks until the file is modified.
+	// Wait returns an io.EOF if the watcher is closed, and immediately returns
+	// any error it encounters while blocking.
+	Wait() error
 
-// newCustomFSWatcher spawns an event routine that runs watch, and returns a new
-// fsWatcher.
-// watch is a function that detects file modifications and sends a corresponding
-// nil value on the returned watcher's events channel. If an error occurs in
-// watch, it is sent on the events channel. However, watch may keep running.
-// To halt watch, the receiver should call Close().
-//
-// Watch guarantees that an event will be received for any modification after
-// newCustomFSWatcher returns.
-//
-// A sequence of modifications may correspond to one sent event. Watch guarantees
-// that at least one event is received after the most recent modification.
-//
-// The frequency at which events are generated is implementation-specific.
-// Implementations may generate events even if the file has not been modified -
-// the receiver should determine whether these events are spurious.
-func newCustomFSWatcher(watch func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{})) (*fsWatcher, error) {
-	events := make(chan error, 1)
-	stop := make(chan struct{})
-	done := make(chan struct{})
-	watcher := &fsWatcher{
-		watch:  watch,
-		events: events,
-		stop:   stop,
-		done:   done,
-		closed: false,
-	}
-	initialized := make(chan struct{})
-	go watch(events, initialized, stop, done)
-	// Wait until the watch routine has been initialized. This ensures that an
-	// event is sent for any modification after this function returns.
-	<-initialized
-	return watcher, nil
-}
-
-// Close closes the watcher synchronously.
-func (w *fsWatcher) Close() error {
-	// Mark the watcher closed.
-	if err := w.setClosed(); err != nil {
-		return err
-	}
-
-	close(w.stop)
-	<-w.done
-	return nil
-}
-
-func (w *fsWatcher) setClosed() error {
-	w.mu.Lock()
-	defer w.mu.Unlock()
-	if w.closed {
-		return errWatcherClosed
-	}
-	w.closed = true
-	return nil
-}
-
-// sendEvent sends the event on the events channel. sendEvent expects the buffer
-// size of the events channel to be exactly one.
-// If the event is not nil, it will always be sent, and sendEvent blocks on the
-// events channel.
-// Otherwise, if there is already an event in the channel, sendEvent won't send
-// the event. This coalesces events that happen faster than the receiver can
-// process them.
-// sendEvent can be preempted by a stop request, and returns true iff the event
-// was sent or coalesced with an existing event.
-func sendEvent(events chan<- error, event error, stop <-chan struct{}) bool {
-	if event == nil {
-		select {
-		case <-stop:
-			return false
-		case events <- nil:
-		default:
-		}
-		return true
-	}
-	select {
-	case <-stop:
-		return false
-	case events <- event:
-		return true
-	}
+	// Close closes the watcher synchronously. Any ongoing or following calls
+	// to Wait return io.EOF.
+	// Close is idempotent.
+	Close() error
 }
diff --git a/runtimes/google/lib/follow/watcher_test.go b/runtimes/google/lib/follow/watcher_test.go
deleted file mode 100644
index 2732238..0000000
--- a/runtimes/google/lib/follow/watcher_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package follow
-
-import (
-	"errors"
-	"testing"
-)
-
-func TestSendEvent(t *testing.T) {
-	event := errors.New("event")
-
-	// Test that 1) a nil can be sent 2) an error can be sent after a nil.
-	events := make(chan error, 1)
-	stop := make(chan struct{})
-	if !sendEvent(events, nil, stop) {
-		t.Fatal("Expected that the event will be sent")
-	}
-	if recv := <-events; recv != nil {
-		t.Fatalf("Expected to receive nil")
-	}
-	if !sendEvent(events, event, stop) {
-		t.Fatal("Expected that the event will be sent")
-	}
-	if recv := <-events; recv != event {
-		t.Fatalf("Expected to receive %v", event)
-	}
-
-	// Test that nils are coalesced.
-	events = make(chan error, 1)
-	stop = make(chan struct{})
-	if !sendEvent(events, nil, stop) {
-		t.Fatal("Expected that the event will be sent")
-	}
-	if !sendEvent(events, nil, stop) {
-		t.Fatal("Expected that the event will be sent")
-	}
-	if recv := <-events; recv != nil {
-		t.Fatalf("Expected to receive nil")
-	}
-
-	// Test that an error is not sent if stop is closed.
-	events = make(chan error, 1)
-	stop = make(chan struct{})
-	close(stop)
-	// The stop signal may not be handled immediately.
-	for sendEvent(events, event, stop) {
-	}
-
-	// Test that a nil is not sent if stop is closed.
-	events = make(chan error, 1)
-	stop = make(chan struct{})
-	close(stop)
-	// The stop signal may not be handled immediately.
-	for sendEvent(events, nil, stop) {
-	}
-
-	// Test that an error is not sent if stop is closed while send is blocked.
-	events = make(chan error, 1)
-	stop = make(chan struct{})
-	if !sendEvent(events, nil, stop) {
-		t.Fatal("Expected that the event will be sent")
-	}
-	go close(stop)
-	// The stop signal may not be handled immediately.
-	for sendEvent(events, event, stop) {
-	}
-}