veyron/runtimes/google/lib/follow: Refactor watcher api and implementation.
Previously, the watcher would send file modifications events on a channel. Now,
the watcher provides a Recv method that blocks until the file is modified.
This simplifies the watcher implementation:
1) we no longer spawn go routines that fill the events channel.
2) delegate watchers (i.e. fsnotify-based, fs.Stat-based, timed, etc.) implement
an interface, not a function type.
Change-Id: Ia6b27c53f4e1a7e123cd2e738ac621331d177b95
diff --git a/runtimes/google/lib/follow/bsd_darwin_linux_windows_config.go b/runtimes/google/lib/follow/bsd_darwin_linux_windows_config.go
index 5dacaa8..09a9cf9 100644
--- a/runtimes/google/lib/follow/bsd_darwin_linux_windows_config.go
+++ b/runtimes/google/lib/follow/bsd_darwin_linux_windows_config.go
@@ -4,7 +4,6 @@
// newFSWatcher starts and returns a new fsnotify-based fsWatcher.
// filename specifies the file to watch.
-func newFSWatcher(filename string) (*fsWatcher, error) {
- watch := newFSNotifyWatch(filename)
- return newCustomFSWatcher(watch)
+func newFSWatcher(filename string) (fsWatcher, error) {
+ return newFSNotifyWatcher(filename)
}
diff --git a/runtimes/google/lib/follow/notify_reader_test.go b/runtimes/google/lib/follow/notify_reader_test.go
index 862e04c..04f8827 100644
--- a/runtimes/google/lib/follow/notify_reader_test.go
+++ b/runtimes/google/lib/follow/notify_reader_test.go
@@ -21,8 +21,7 @@
defer os.Remove(testFileName)
// Create the fsnotify-based fsWatcher.
- watch := newFSNotifyWatch(testFileName)
- watcher, err := newCustomFSWatcher(watch)
+ watcher, err := newFSNotifyWatcher(testFileName)
if err != nil {
t.Fatalf("newCustomFSWatcher() failed: %v", err)
}
@@ -46,8 +45,7 @@
defer os.Remove(testFileName)
// Create the fsnotify-based fsWatcher.
- watch := newFSNotifyWatch(testFileName)
- watcher, err := newCustomFSWatcher(watch)
+ watcher, err := newFSNotifyWatcher(testFileName)
if err != nil {
t.Fatalf("newCustomFSWatcher() failed: %v", err)
}
@@ -71,8 +69,7 @@
defer os.Remove(testFileName)
// Create the fsnotify-based fsWatcher.
- watch := newFSNotifyWatch(testFileName)
- watcher, err := newCustomFSWatcher(watch)
+ watcher, err := newFSNotifyWatcher(testFileName)
if err != nil {
t.Fatalf("newCustomFSWatcher() failed: %v", err)
}
diff --git a/runtimes/google/lib/follow/notify_watcher.go b/runtimes/google/lib/follow/notify_watcher.go
index 7669421..c5279d8 100644
--- a/runtimes/google/lib/follow/notify_watcher.go
+++ b/runtimes/google/lib/follow/notify_watcher.go
@@ -3,51 +3,83 @@
package follow
import (
+ "fmt"
"github.com/howeyc/fsnotify"
+ "io"
+ "sync"
+
+ vsync "veyron/runtimes/google/lib/sync"
)
-// newFSNotifyWatch returns a function that listens on fsnotify and sends
-// corresponding modification events.
-// For each fsnotify modification event, a single nil value is sent on the
-// events channel. Further fsnotify events are not received until the nil
-// value is received from the events channel.
-// The function sends any errors on the events channel.
-func newFSNotifyWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
- return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
- defer close(done)
- defer close(events)
- source, err := fsnotify.NewWatcher()
- if err != nil {
- events <- err
- return
- }
- defer source.Close()
- if err := source.Watch(filename); err != nil {
- events <- err
- return
- }
+type fsNotifyWatcher struct {
+ filename string
+ source *fsnotify.Watcher
+ // cancel signals Wait to terminate.
+ cancel chan struct{}
+ // pending allows Close to block till ongoing calls to Wait terminate.
+ pending vsync.WaitGroup
+ // mu and closed ensure that Close is idempotent.
+ mu sync.Mutex
+ closed bool // GUARDED_BY(mu)
+}
- close(initialized)
+// newFSNotifyWatcher returns an fsnotify-based fsWatcher.
+// Wait() blocks until it receives a file modification event from fsnotify.
+func newFSNotifyWatcher(filename string) (fsWatcher, error) {
+ source, err := fsnotify.NewWatcher()
+ if err != nil {
+ return nil, err
+ }
+ if err := source.Watch(filename); err != nil {
+ source.Close()
+ return nil, err
+ }
+ return &fsNotifyWatcher{
+ source: source,
+ cancel: make(chan struct{}),
+ }, nil
+}
- for {
- // Receive:
- // (A) An fsnotify modification event. Send nil.
- // (B) An fsnotify error. Send the error.
- // (C) A stop command. Stop listening and clean up.
- select {
- case event := <-source.Event:
- if event.IsModify() {
- if !sendEvent(events, nil, stop) {
- return
+func (w *fsNotifyWatcher) Wait() error {
+ // After Close returns, any call to Wait must return io.EOF.
+ if !w.pending.TryAdd() {
+ return io.EOF
+ }
+ defer w.pending.Done()
+
+ for {
+ select {
+ case event := <-w.source.Event:
+ if event.IsModify() {
+ // Drain the event queue.
+ drained := false
+ for !drained {
+ select {
+ case <-w.source.Event:
+ default:
+ drained = true
}
}
- case err := <-source.Error:
- if !sendEvent(events, err, stop) {
- return
- }
- case <-stop:
- return
+ return nil
}
+ return fmt.Errorf("Unexpected event %v", event)
+ case err := <-w.source.Error:
+ return err
+ case <-w.cancel:
+ // After Close returns, any call to Wait must return io.EOF.
+ return io.EOF
}
}
}
+
+func (w *fsNotifyWatcher) Close() error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if w.closed {
+ return nil
+ }
+ w.closed = true
+ close(w.cancel)
+ w.pending.Wait()
+ return w.source.Close()
+}
diff --git a/runtimes/google/lib/follow/notify_watcher_test.go b/runtimes/google/lib/follow/notify_watcher_test.go
index 143de22..058f5a1 100644
--- a/runtimes/google/lib/follow/notify_watcher_test.go
+++ b/runtimes/google/lib/follow/notify_watcher_test.go
@@ -17,8 +17,7 @@
defer testfile.Close()
defer os.Remove(testFileName)
- watch := newFSNotifyWatch(testFileName)
- watcher, err := newCustomFSWatcher(watch)
+ watcher, err := newFSNotifyWatcher(testFileName)
if err != nil {
t.Fatalf("newCustomFSWatcer() failed: %v", err)
}
diff --git a/runtimes/google/lib/follow/other_config.go b/runtimes/google/lib/follow/other_config.go
index 09debb5..8feb648 100644
--- a/runtimes/google/lib/follow/other_config.go
+++ b/runtimes/google/lib/follow/other_config.go
@@ -2,9 +2,8 @@
package follow
-// newFSWatcher starts and returns a new timer-based fsWatcher.
+// newFSWatcher starts and returns a new os.Stat()-based fsWatcher.
// filename specifies the file to watch.
-func newFSWatcher(filename string) (*fsWatcher, error) {
- watch := newFSTimedWatch(filename)
- return newCustomFSWatcher(watch)
+func newFSWatcher(filename string) (fsWatcher, error) {
+ return newFSStatWatcher(filename)
}
diff --git a/runtimes/google/lib/follow/reader.go b/runtimes/google/lib/follow/reader.go
index e3b6474..ab488cd 100644
--- a/runtimes/google/lib/follow/reader.go
+++ b/runtimes/google/lib/follow/reader.go
@@ -9,13 +9,12 @@
// fsReader is an implementation of io.ReadCloser that reads synchronously
// from a file, blocking until at least one byte is written to the file and is
// available for reading.
-// fsReader should not be accessed concurrently.
type fsReader struct {
mu sync.Mutex
// The file to read.
file *os.File // GUARDED_BY(mu)
// The watcher of modifications to the file.
- watcher *fsWatcher
+ watcher fsWatcher
// True if the reader is open for reading, false otherwise.
closed bool // GUARDED_BY(mu)
}
@@ -23,10 +22,12 @@
// NewReader creates a new reader that reads synchronously from a file,
// blocking until at least one byte is written to the file and is available
// for reading.
-// The returned ReadCloser should not be accessed concurrently.
+// The returned io.ReadCloser supports limited concurrency:
+// 1) Reads may not be called concurrently.
+// 2) Close may be called concurrently with Read, and will terminate Read.
func NewReader(filename string) (reader io.ReadCloser, err error) {
var file *os.File
- var watcher *fsWatcher
+ var watcher fsWatcher
defer func() {
if err == nil {
return
@@ -51,7 +52,7 @@
return newCustomReader(file, watcher)
}
-func newCustomReader(file *os.File, watcher *fsWatcher) (io.ReadCloser, error) {
+func newCustomReader(file *os.File, watcher fsWatcher) (io.ReadCloser, error) {
reader := &fsReader{
file: file,
watcher: watcher,
@@ -77,7 +78,7 @@
// Wait until the file is modified one or more times. The new
// bytes from each corresponding modification have been
// written to the file already, and therefore won't be skipped.
- if err := receiveEvents(r.watcher.events); err != nil {
+ if err := r.watcher.Wait(); err != nil {
return 0, err
}
@@ -85,17 +86,6 @@
}
}
-// receiveEvents receives events from an event channel, blocking until at
-// least one event is available..
-// io.EOF is returned if the event channel is closed (as a result of Close()).
-func receiveEvents(events <-chan error) error {
- err, ok := <-events
- if !ok {
- return io.EOF
- }
- return err
-}
-
// Close closes the reader synchronously.
// 1) Terminates ongoing reads. (reads return io.EOF)
// 2) Prevents future reads. (reads return io.EOF)
diff --git a/runtimes/google/lib/follow/stat_reader_test.go b/runtimes/google/lib/follow/stat_reader_test.go
index 2c34b0e..c67e8b7 100644
--- a/runtimes/google/lib/follow/stat_reader_test.go
+++ b/runtimes/google/lib/follow/stat_reader_test.go
@@ -21,8 +21,7 @@
// Create the os.Stat()-based fsWatcher.
minSleep := 10 * time.Millisecond
maxSleep := 100 * time.Millisecond
- watch := newCustomFSStatWatch(testFileName, minSleep, maxSleep)
- watcher, err := newCustomFSWatcher(watch)
+ watcher, err := newCustomFSStatWatcher(testFileName, minSleep, maxSleep)
if err != nil {
t.Fatalf("newCustomFSWatcher() failed: %v", err)
}
@@ -48,8 +47,7 @@
// Create the os.Stat()-based fsWatcher.
minSleep := 10 * time.Millisecond
maxSleep := 100 * time.Millisecond
- watch := newCustomFSStatWatch(testFileName, minSleep, maxSleep)
- watcher, err := newCustomFSWatcher(watch)
+ watcher, err := newCustomFSStatWatcher(testFileName, minSleep, maxSleep)
if err != nil {
t.Fatalf("newCustomFSWatcher() failed: %v", err)
}
@@ -75,8 +73,7 @@
// Create the os.Stat()-based fsWatcher.
minSleep := 10 * time.Millisecond
maxSleep := 100 * time.Millisecond
- watch := newCustomFSStatWatch(testFileName, minSleep, maxSleep)
- watcher, err := newCustomFSWatcher(watch)
+ watcher, err := newCustomFSStatWatcher(testFileName, minSleep, maxSleep)
if err != nil {
t.Fatalf("newCustomFSWatcher() failed: %v", err)
}
diff --git a/runtimes/google/lib/follow/stat_watcher.go b/runtimes/google/lib/follow/stat_watcher.go
index 40e2bbe..29c8208 100644
--- a/runtimes/google/lib/follow/stat_watcher.go
+++ b/runtimes/google/lib/follow/stat_watcher.go
@@ -1,72 +1,100 @@
package follow
import (
+ "io"
"os"
+ "sync"
"time"
+
+ vsync "veyron/runtimes/google/lib/sync"
)
const (
- defaultMinSleep = time.Second
- defaultMaxSleep = time.Minute
+ defaultMinSleep = 10 * time.Millisecond
+ defaultMaxSleep = 5 * time.Second
)
-// newFSStatWatch returns a function that polls os.Stat(), observing file size.
-// If the file size is larger than the previously-recorded file size,
-// fsStatWatcher assumes the file has been modified and sends a nil value on
-// the events channel.
-// The function starts polling os.Stat() at an interval specified by
-// minSleep, doubling that interval as long the file is not modified, upto
-// a maximum interval specified by maxSleep. The interval is reset to minSleep
-// once the file is modified. This allows faster detection during periods of
-// frequent modification but conserves resources during periods of inactivity.
-// The default values of minSleep and maxSleep can be overriden using the
-// newCustomFSStatWatcher() constructor.
-func newFSStatWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
- return newCustomFSStatWatch(filename, defaultMinSleep, defaultMaxSleep)
+type fsStatWatcher struct {
+ minSleep time.Duration
+ maxSleep time.Duration
+ file *os.File
+ lastFileSize int64
+ // cancel signals Wait to terminate.
+ cancel chan struct{}
+ // pending allows Close to block till ongoing calls to Wait terminate.
+ pending vsync.WaitGroup
+ // mu and closed ensure that Close is idempotent.
+ mu sync.Mutex
+ closed bool // GUARDED_BY(mu)
}
-func newCustomFSStatWatch(filename string, minSleep, maxSleep time.Duration) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
- return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
- defer close(done)
- defer close(events)
- file, err := os.Open(filename)
- if err != nil {
- events <- err
- return
- }
- defer file.Close()
- fileInfo, err := file.Stat()
- if err != nil {
- events <- err
- return
- }
- initialFileSize := fileInfo.Size()
+// newFSStatWatcher returns an fsWatcher that polls os.Stat(), observing file
+// size. If the file size is larger than the previously-recorded file size,
+// the watcher assumes the file has been modified.
+// Wait() polls os.Stat() at an interval specified by minSleep, doubling that
+// interval as long the file is not modified, upto a maximum interval specified
+// by maxSleep. This allows faster detection during periods of frequent
+// modification but conserves resources during periods of inactivity.
+// The default values of minSleep and maxSleep can be overriden using the
+// newCustomFSStatWatcher() constructor.
+func newFSStatWatcher(filename string) (fsWatcher, error) {
+ return newCustomFSStatWatcher(filename, defaultMinSleep, defaultMaxSleep)
+}
- close(initialized)
-
- lastFileSize := initialFileSize
- sleep := minSleep
- for {
- // Look for a stop command.
- select {
- case <-stop:
- return
- default:
- }
- fileInfo, err := file.Stat()
- if err != nil {
- if !sendEvent(events, err, stop) {
- return
- }
- } else if fileSize := fileInfo.Size(); lastFileSize < fileSize {
- lastFileSize = fileSize
- if !sendEvent(events, nil, stop) {
- return
- }
- sleep = minSleep
- }
- time.Sleep(sleep)
- sleep = minDuration(sleep*2, maxSleep)
- }
+func newCustomFSStatWatcher(filename string, minSleep, maxSleep time.Duration) (fsWatcher, error) {
+ file, err := os.Open(filename)
+ if err != nil {
+ return nil, err
}
+ fileInfo, err := file.Stat()
+ if err != nil {
+ file.Close()
+ return nil, err
+ }
+ return &fsStatWatcher{
+ minSleep: minSleep,
+ maxSleep: maxSleep,
+ file: file,
+ lastFileSize: fileInfo.Size(),
+ cancel: make(chan struct{}),
+ }, nil
+}
+
+func (w *fsStatWatcher) Wait() error {
+ // After Close returns, any call to Wait must return io.EOF.
+ if !w.pending.TryAdd() {
+ return io.EOF
+ }
+ defer w.pending.Done()
+
+ sleep := w.minSleep
+ for {
+ select {
+ case <-w.cancel:
+ // After Close returns, any call to Wait must return io.EOF.
+ return io.EOF
+ default:
+ }
+ fileInfo, err := w.file.Stat()
+ if err != nil {
+ return err
+ } else if fileSize := fileInfo.Size(); w.lastFileSize < fileSize {
+ w.lastFileSize = fileSize
+ return nil
+ }
+ time.Sleep(sleep)
+ sleep = minDuration(sleep*2, w.maxSleep)
+ }
+}
+
+func (w *fsStatWatcher) Close() error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if w.closed {
+ return nil
+ }
+ w.closed = true
+ close(w.cancel)
+ w.pending.Wait()
+ return w.file.Close()
}
diff --git a/runtimes/google/lib/follow/stat_watcher_test.go b/runtimes/google/lib/follow/stat_watcher_test.go
index 04843d3..5c12f00 100644
--- a/runtimes/google/lib/follow/stat_watcher_test.go
+++ b/runtimes/google/lib/follow/stat_watcher_test.go
@@ -17,8 +17,7 @@
minSleep := 10 * time.Millisecond
maxSleep := 100 * time.Millisecond
- watch := newCustomFSStatWatch(testFileName, minSleep, maxSleep)
- watcher, err := newCustomFSWatcher(watch)
+ watcher, err := newCustomFSStatWatcher(testFileName, minSleep, maxSleep)
if err != nil {
t.Fatalf("newCustomFSWatcher() failed : %v", err)
}
diff --git a/runtimes/google/lib/follow/test_util.go b/runtimes/google/lib/follow/test_util.go
index 48f0d56..0da3fac 100644
--- a/runtimes/google/lib/follow/test_util.go
+++ b/runtimes/google/lib/follow/test_util.go
@@ -14,6 +14,20 @@
errUnexpectedModification = errors.New("unexpected modification event")
)
+func events(watcher fsWatcher) <-chan error {
+ events := make(chan error)
+ go func() {
+ for {
+ event := watcher.Wait()
+ events <- event
+ if event == io.EOF {
+ break
+ }
+ }
+ }()
+ return events
+}
+
// readString reads a string of the specified length from the reader.
// Returns an error if:
// (A) reader.Read returned an error
@@ -91,31 +105,32 @@
// testModification tests that the watcher sends events when the file is
// modified.
-func testModification(file *os.File, watcher *fsWatcher, timeout time.Duration) error {
+func testModification(file *os.File, watcher fsWatcher, timeout time.Duration) error {
+ events := events(watcher)
// no modifications, expect no events.
- if err := expectSilence(watcher.events, timeout); err != nil {
+ if err := expectSilence(events, timeout); err != nil {
return errors.New(fmt.Sprintf("expectSilence() failed with no modifications: %v ", err))
}
// modify once, expect event.
if err := writeString(file, "modification one"); err != nil {
return errors.New(fmt.Sprintf("writeString() failed on modification one: %v ", err))
}
- if err := expectModification(watcher.events, timeout); err != nil {
+ if err := expectModification(events, timeout); err != nil {
return errors.New(fmt.Sprintf("expectModication() failed on modification one: %v ", err))
}
// no further modifications, expect no events.
- if err := expectSilence(watcher.events, timeout); err != nil {
+ if err := expectSilence(events, timeout); err != nil {
return errors.New(fmt.Sprintf("expectSilence() failed after modification one: %v ", err))
}
// modify again, expect event.
if err := writeString(file, "modification two"); err != nil {
return errors.New(fmt.Sprintf("writeString() failed on modification two: %v ", err))
}
- if err := expectModification(watcher.events, timeout); err != nil {
+ if err := expectModification(events, time.Hour); err != nil {
return errors.New(fmt.Sprintf("expectModification() failed on modification two: %v ", err))
}
// no further modifications, expect no events.
- if err := expectSilence(watcher.events, timeout); err != nil {
+ if err := expectSilence(events, timeout); err != nil {
return errors.New(fmt.Sprintf("expectSilence() failed after modification two: %v ", err))
}
return nil
@@ -124,7 +139,7 @@
// testClose tests the implementation of fsReader.Read(). Specifically,
// tests that Read() blocks if the requested bytes are not available for
// reading in the underlying file.
-func testReadPartial(testFileName string, watcher *fsWatcher, timeout time.Duration) error {
+func testReadPartial(testFileName string, watcher fsWatcher, timeout time.Duration) error {
s0 := "part"
// Open the file for writing.
@@ -172,7 +187,7 @@
// testClose tests the implementation of fsReader.Read(). Specifically,
// tests that Read() returns the requested bytes when they are available
// for reading in the underlying file.
-func testReadFull(testFileName string, watcher *fsWatcher, timeout time.Duration) error {
+func testReadFull(testFileName string, watcher fsWatcher, timeout time.Duration) error {
s0, s1, s2 := "partitioned", "parti", "tioned"
// Open the file for writing.
@@ -217,7 +232,7 @@
}
// testClose tests the implementation of fsReader.Close()
-func testClose(testFileName string, watcher *fsWatcher, timeout time.Duration) error {
+func testClose(testFileName string, watcher fsWatcher, timeout time.Duration) error {
s := "word"
// Open the file for writing.
diff --git a/runtimes/google/lib/follow/timed_reader_test.go b/runtimes/google/lib/follow/timed_reader_test.go
deleted file mode 100644
index a9b055d..0000000
--- a/runtimes/google/lib/follow/timed_reader_test.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package follow
-
-import (
- "os"
- "testing"
- "time"
-)
-
-// TestTimedReadPartial tests partial reads with the timer-based fsReader
-func TestTimedReadPartial(t *testing.T) {
- testFileName := os.TempDir() + "/follow.reader.timed.partial"
-
- // Create the test file.
- testfile, err := os.Create(testFileName)
- if err != nil {
- t.Fatalf("os.Create() failed: %v", err)
- }
- testfile.Close()
- defer os.Remove(testFileName)
-
- // Create the timer-based fsWatcher.
- sleep := 10 * time.Millisecond
- watch := newCustomFSTimedWatch(testFileName, sleep)
- watcher, err := newCustomFSWatcher(watch)
- if err != nil {
- t.Fatalf("newCustomFSWatcher() failed: %v", err)
- }
-
- timeout := 100 * time.Millisecond
- if err := testReadPartial(testFileName, watcher, timeout); err != nil {
- t.Fatal("testReadPartial() failed: %v", err)
- }
-}
-
-// TestTimedReadFull tests full reads with the timer-based fsReader
-func TestTimedReadFull(t *testing.T) {
- testFileName := os.TempDir() + "/follow.reader.timed.full"
-
- // Create the test file.
- testfile, err := os.Create(testFileName)
- if err != nil {
- t.Fatalf("os.Create() failed: %v", err)
- }
- testfile.Close()
- defer os.Remove(testFileName)
-
- // Create the timer-based fsWatcher.
- sleep := 10 * time.Millisecond
- watch := newCustomFSTimedWatch(testFileName, sleep)
- watcher, err := newCustomFSWatcher(watch)
- if err != nil {
- t.Fatalf("newCustomFSWatcher() failed: %v", err)
- }
-
- timeout := 100 * time.Millisecond
- if err := testReadFull(testFileName, watcher, timeout); err != nil {
- t.Fatal("testReadFull() failed: %v", err)
- }
-}
-
-// TestTimedClose tests close with the timer-based fsReader
-func TestTimedClose(t *testing.T) {
- testFileName := os.TempDir() + "/follow.reader.timed.close"
-
- // Create the test file.
- testfile, err := os.Create(testFileName)
- if err != nil {
- t.Fatalf("os.Create() failed: %v", err)
- }
- testfile.Close()
- defer os.Remove(testFileName)
-
- // Create the timer-based fsWatcher.
- sleep := 10 * time.Millisecond
- watch := newCustomFSTimedWatch(testFileName, sleep)
- watcher, err := newCustomFSWatcher(watch)
- if err != nil {
- t.Fatalf("newCustomFSWatcher() failed: %v", err)
- }
-
- timeout := 100 * time.Millisecond
- if err := testClose(testFileName, watcher, timeout); err != nil {
- t.Fatal("testClose() failed: %v", err)
- }
-}
diff --git a/runtimes/google/lib/follow/timed_watcher.go b/runtimes/google/lib/follow/timed_watcher.go
deleted file mode 100644
index aea4b50..0000000
--- a/runtimes/google/lib/follow/timed_watcher.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package follow
-
-import (
- "time"
-)
-
-const defaultSleep = time.Second
-
-// newFSStatWatch returns a function that sends a nil value on the events
-// channel at an interval specified by sleep.
-// The event receiver must determine whether the event is spurious, or
-// corresponds to a modification of the file.
-func newFSTimedWatch(filename string) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
- return newCustomFSTimedWatch(filename, defaultSleep)
-}
-
-func newCustomFSTimedWatch(filename string, sleep time.Duration) func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{}) {
- return func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{}) {
- defer close(done)
- defer close(events)
-
- close(initialized)
-
- for {
- // Look for a stop command.
- select {
- case <-stop:
- return
- default:
- }
- if !sendEvent(events, nil, stop) {
- return
- }
- time.Sleep(sleep)
- }
- }
-}
diff --git a/runtimes/google/lib/follow/timed_watcher_test.go b/runtimes/google/lib/follow/timed_watcher_test.go
deleted file mode 100644
index 233b746..0000000
--- a/runtimes/google/lib/follow/timed_watcher_test.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package follow
-
-import (
- "os"
- "testing"
- "time"
-)
-
-func TestModificationTimed(t *testing.T) {
- testFileName := os.TempDir() + "/follow.modification.timed"
- testfile, err := os.Create(testFileName)
- if err != nil {
- t.Fatalf("os.Create() failed: %v", err)
- }
- defer testfile.Close()
- defer os.Remove(testFileName)
-
- sleep := 10 * time.Millisecond
- watch := newCustomFSTimedWatch(testFileName, sleep)
- watcher, err := newCustomFSWatcher(watch)
- if err != nil {
- t.Fatalf("newCustomFSWatcher() failed : %v", err)
- }
- timeout := 100 * time.Millisecond
- // don't modify file, expect event.
- if err := expectModification(watcher.events, timeout); err != nil {
- t.Fatalf("expectModification() failed without a file modification: %v", err)
- }
- // modify file, expect event.
- if err := writeString(testfile, "modification"); err != nil {
- t.Fatalf("writeString() failed: %v", err)
- }
- if err := expectModification(watcher.events, timeout); err != nil {
- t.Fatalf("expectModification() failed after a file modification: %v", err)
- }
-}
diff --git a/runtimes/google/lib/follow/watcher.go b/runtimes/google/lib/follow/watcher.go
index eae1353..d5ad863 100644
--- a/runtimes/google/lib/follow/watcher.go
+++ b/runtimes/google/lib/follow/watcher.go
@@ -1,113 +1,14 @@
package follow
-import (
- "errors"
- "sync"
-)
-
-var errWatcherClosed = errors.New("watcher has already been closed")
-
// fsWatcher is a tool for watching append-only modifications to a file.
-// Start() spawns an event routine that asynchronously detects modifications to
-// the specified file and sends corresponding events on the events channel.
-// Close() terminates the event routine and stops detecting any modifications.
-// However, the watcher may continue to send previously detected events.
-type fsWatcher struct {
- // watch runs on the event routine, and detects modifications and sends
- // corresponding events. watch runs until it is asked to stop.
- watch func(events chan<- error, initialized chan<- struct{}, stop <-chan struct{}, done chan<- struct{})
- // events is the channel on which events and errors are sent.
- events <-chan error
- // stop is the channel on which the event routine is told to stop.
- stop chan<- struct{}
- // done is the channel on which the event routine announces that it is done.
- done <-chan struct{}
- // mu guards closed
- mu sync.Mutex
- // closed is true iff the watcher has been closed.
- closed bool // GUARDED_BY(mu)
-}
+type fsWatcher interface {
+ // Wait blocks until the file is modified.
+ // Wait returns an io.EOF if the watcher is closed, and immediately returns
+ // any error it encounters while blocking.
+ Wait() error
-// newCustomFSWatcher spawns an event routine that runs watch, and returns a new
-// fsWatcher.
-// watch is a function that detects file modifications and sends a corresponding
-// nil value on the returned watcher's events channel. If an error occurs in
-// watch, it is sent on the events channel. However, watch may keep running.
-// To halt watch, the receiver should call Close().
-//
-// Watch guarantees that an event will be received for any modification after
-// newCustomFSWatcher returns.
-//
-// A sequence of modifications may correspond to one sent event. Watch guarantees
-// that at least one event is received after the most recent modification.
-//
-// The frequency at which events are generated is implementation-specific.
-// Implementations may generate events even if the file has not been modified -
-// the receiver should determine whether these events are spurious.
-func newCustomFSWatcher(watch func(chan<- error, chan<- struct{}, <-chan struct{}, chan<- struct{})) (*fsWatcher, error) {
- events := make(chan error, 1)
- stop := make(chan struct{})
- done := make(chan struct{})
- watcher := &fsWatcher{
- watch: watch,
- events: events,
- stop: stop,
- done: done,
- closed: false,
- }
- initialized := make(chan struct{})
- go watch(events, initialized, stop, done)
- // Wait until the watch routine has been initialized. This ensures that an
- // event is sent for any modification after this function returns.
- <-initialized
- return watcher, nil
-}
-
-// Close closes the watcher synchronously.
-func (w *fsWatcher) Close() error {
- // Mark the watcher closed.
- if err := w.setClosed(); err != nil {
- return err
- }
-
- close(w.stop)
- <-w.done
- return nil
-}
-
-func (w *fsWatcher) setClosed() error {
- w.mu.Lock()
- defer w.mu.Unlock()
- if w.closed {
- return errWatcherClosed
- }
- w.closed = true
- return nil
-}
-
-// sendEvent sends the event on the events channel. sendEvent expects the buffer
-// size of the events channel to be exactly one.
-// If the event is not nil, it will always be sent, and sendEvent blocks on the
-// events channel.
-// Otherwise, if there is already an event in the channel, sendEvent won't send
-// the event. This coalesces events that happen faster than the receiver can
-// process them.
-// sendEvent can be preempted by a stop request, and returns true iff the event
-// was sent or coalesced with an existing event.
-func sendEvent(events chan<- error, event error, stop <-chan struct{}) bool {
- if event == nil {
- select {
- case <-stop:
- return false
- case events <- nil:
- default:
- }
- return true
- }
- select {
- case <-stop:
- return false
- case events <- event:
- return true
- }
+ // Close closes the watcher synchronously. Any ongoing or following calls
+ // to Wait return io.EOF.
+ // Close is idempotent.
+ Close() error
}
diff --git a/runtimes/google/lib/follow/watcher_test.go b/runtimes/google/lib/follow/watcher_test.go
deleted file mode 100644
index 2732238..0000000
--- a/runtimes/google/lib/follow/watcher_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package follow
-
-import (
- "errors"
- "testing"
-)
-
-func TestSendEvent(t *testing.T) {
- event := errors.New("event")
-
- // Test that 1) a nil can be sent 2) an error can be sent after a nil.
- events := make(chan error, 1)
- stop := make(chan struct{})
- if !sendEvent(events, nil, stop) {
- t.Fatal("Expected that the event will be sent")
- }
- if recv := <-events; recv != nil {
- t.Fatalf("Expected to receive nil")
- }
- if !sendEvent(events, event, stop) {
- t.Fatal("Expected that the event will be sent")
- }
- if recv := <-events; recv != event {
- t.Fatalf("Expected to receive %v", event)
- }
-
- // Test that nils are coalesced.
- events = make(chan error, 1)
- stop = make(chan struct{})
- if !sendEvent(events, nil, stop) {
- t.Fatal("Expected that the event will be sent")
- }
- if !sendEvent(events, nil, stop) {
- t.Fatal("Expected that the event will be sent")
- }
- if recv := <-events; recv != nil {
- t.Fatalf("Expected to receive nil")
- }
-
- // Test that an error is not sent if stop is closed.
- events = make(chan error, 1)
- stop = make(chan struct{})
- close(stop)
- // The stop signal may not be handled immediately.
- for sendEvent(events, event, stop) {
- }
-
- // Test that a nil is not sent if stop is closed.
- events = make(chan error, 1)
- stop = make(chan struct{})
- close(stop)
- // The stop signal may not be handled immediately.
- for sendEvent(events, nil, stop) {
- }
-
- // Test that an error is not sent if stop is closed while send is blocked.
- events = make(chan error, 1)
- stop = make(chan struct{})
- if !sendEvent(events, nil, stop) {
- t.Fatal("Expected that the event will be sent")
- }
- go close(stop)
- // The stop signal may not be handled immediately.
- for sendEvent(events, event, stop) {
- }
-}