syncbase: Refactor watchable store clients to track resume markers.

Expanded watchable clients from update channels to structs encapsulating
update channels and sequence numbers (parsed resume markers). Clients
now act as iterators over the watch log.

This is in preparation for log garbage collection (watcher needs to know
when all clients are past a certain log record).

Also changed the sync watcher to stop watching when an error is
encountered instead of keep retrying since the watch client is
now an iterator. Watch should be restarted or the database
quarantined instead. Errors due to the store being closed (e.g.
database destruction) are still suppressed.

Change-Id: I78792fffeb3b85701860763c75b73326c86a51eb
diff --git a/services/syncbase/server/database_watch.go b/services/syncbase/server/database_watch.go
index 0d8a6cd..8df5782 100644
--- a/services/syncbase/server/database_watch.go
+++ b/services/syncbase/server/database_watch.go
@@ -228,7 +228,7 @@
 // - wait for one of two signals: new updates available or the call is canceled
 // The 'new updates' signal is sent by the watcher via a Go channel.
 func (d *database) watchUpdates(ctx *context.T, call rpc.ServerCall, sender *watchBatchSender, resumeMarker watch.ResumeMarker, watchFilter filter.CollectionRowFilter) error {
-	hasUpdates, cancelWatch := watchable.WatchUpdates(d.st)
+	watcher, cancelWatch := d.st.WatchUpdates(resumeMarker)
 	defer cancelWatch()
 	for {
 		// Drain the log queue.
@@ -237,7 +237,7 @@
 			// conflict resolution merge batches, very large batches may not be
 			// unrealistic. However, sync currently also processes an entire batch at
 			// a time, and would need to be updated as well.
-			logs, nextResumeMarker, err := watchable.ReadBatchFromLog(d.st, resumeMarker)
+			logs, nextResumeMarker, err := watcher.NextBatchFromLog(d.st)
 			if err != nil {
 				// TODO(ivanpi): Log all internal errors, especially ones not returned.
 				return verror.NewErrInternal(ctx) // no detailed error before access check
@@ -256,9 +256,9 @@
 		}
 		// Wait for new updates or cancel.
 		select {
-		case _, ok := <-hasUpdates:
+		case _, ok := <-watcher.Wait():
 			if !ok {
-				return verror.NewErrAborted(ctx)
+				return watcher.Err()
 			}
 		case <-ctx.Done():
 			return ctx.Err()
diff --git a/services/syncbase/server/watchlog_test.go b/services/syncbase/server/watchlog_test.go
index ed5f76c..3f46db0 100644
--- a/services/syncbase/server/watchlog_test.go
+++ b/services/syncbase/server/watchlog_test.go
@@ -75,6 +75,8 @@
 	call := &mockCall{p: v23.GetPrincipal(ctx), b: blessings}
 	var expected []interface{}
 	resumeMarker, _ := watchable.GetResumeMarker(st)
+	watcher, cancel := st.WatchUpdates(resumeMarker)
+	defer cancel()
 	// Generate Put/Delete events.
 	for i := 0; i < 5; i++ {
 		// Set initial collection permissions.
@@ -119,7 +121,7 @@
 	expectedIndex := 0
 	for {
 		var logs []*watchable.LogEntry
-		if logs, resumeMarker, _ = watchable.ReadBatchFromLog(st, resumeMarker); logs == nil {
+		if logs, resumeMarker, _ = watcher.NextBatchFromLog(st); logs == nil {
 			break
 		}
 		for _, logRecord := range logs {
diff --git a/services/syncbase/store/watchable/watcher.go b/services/syncbase/store/watchable/watcher.go
index e00cc24..c4ca18a 100644
--- a/services/syncbase/store/watchable/watcher.go
+++ b/services/syncbase/store/watchable/watcher.go
@@ -25,17 +25,19 @@
 	// Channel used by broadcastUpdates() to notify watcherLoop. When watcher is
 	// closed, updater is closed.
 	updater chan struct{}
+
 	// Protects the clients map.
 	mu sync.RWMutex
-	// Channels used by watcherLoop to notify currently registered clients. When
-	// watcher is closed, all client channels are closed and clients is set to nil.
-	clients map[chan struct{}]struct{}
+	// Currently registered clients, notified by watcherLoop via their channels.
+	// When watcher is closed, all clients are stopped (and their channels closed)
+	// with ErrAborted and clients is set to nil.
+	clients map[*Client]struct{}
 }
 
 func newWatcher() *watcher {
 	ret := &watcher{
 		updater: make(chan struct{}, 1),
-		clients: make(map[chan struct{}]struct{}),
+		clients: make(map[*Client]struct{}),
 	}
 	go ret.watcherLoop()
 	return ret
@@ -45,9 +47,9 @@
 func (w *watcher) close() {
 	w.mu.Lock()
 	if w.clients != nil {
-		// Close all client channels.
+		// Stop all clients and close their channels.
 		for c := range w.clients {
-			closeAndDrain(c)
+			c.stop(verror.NewErrAborted(nil))
 		}
 		// Set clients to nil to mark watcher as closed.
 		w.clients = nil
@@ -79,7 +81,7 @@
 		}
 		w.mu.RLock()
 		for c := range w.clients { // safe for w.clients == nil
-			ping(c)
+			ping(c.update)
 		}
 		w.mu.RUnlock()
 	}
@@ -104,40 +106,126 @@
 }
 
 // watchUpdates - see WatchUpdates.
-func (w *watcher) watchUpdates() (update <-chan struct{}, cancel func()) {
-	updateRW := make(chan struct{}, 1)
+func (w *watcher) watchUpdates(seq uint64) (_ *Client, cancel func()) {
 	w.mu.Lock()
 	defer w.mu.Unlock()
 	if w.clients == nil {
-		// watcher is closed, return a closed update channel and no-op cancel.
-		close(updateRW)
-		cancel = func() {}
-		return updateRW, cancel
+		// watcher is closed. Return stopped Client.
+		return newStoppedClient(verror.NewErrAborted(nil)), func() {}
 	}
-	// Register update channel.
-	w.clients[updateRW] = struct{}{}
-	// Cancel is idempotent. It unregisters and closes the update channel.
+	// Register and return client.
+	c := newClient(seq)
+	w.clients[c] = struct{}{}
 	cancel = func() {
 		w.mu.Lock()
-		if _, ok := w.clients[updateRW]; ok { // safe for w.clients == nil
-			delete(w.clients, updateRW)
-			closeAndDrain(updateRW)
+		if _, ok := w.clients[c]; ok { // safe for w.clients == nil
+			c.stop(verror.NewErrCanceled(nil))
+			delete(w.clients, c)
 		}
 		w.mu.Unlock()
 	}
-	return updateRW, cancel
+	return c, cancel
 }
 
-// WatchUpdates returns a channel that can be used to wait for changes of the
-// database, as well as a cancel function which MUST be called to release the
-// watch resources. If the update channel is closed, the store is closed and
-// no more updates will happen. Otherwise, the channel will have a value
-// available whenever the store has changed since the last receive on the
-// channel.
-func WatchUpdates(st store.Store) (update <-chan struct{}, cancel func()) {
-	// TODO(rogulenko): Remove dynamic type assertion here and in other places.
-	watcher := st.(*Store).watcher
-	return watcher.watchUpdates()
+// WatchUpdates returns a Client which supports waiting for changes and
+// iterating over the watch log starting from resumeMarker, as well as a
+// cancel function which MUST be called to release watch resources.
+func (st *Store) WatchUpdates(resumeMarker watch.ResumeMarker) (_ *Client, cancel func()) {
+	seq, err := parseResumeMarker(string(resumeMarker))
+	if err != nil {
+		// resumeMarker is invalid. Return stopped Client.
+		return newStoppedClient(err), func() {}
+	}
+	return st.watcher.watchUpdates(seq)
+}
+
+// Client encapsulates a channel used to notify watch clients of store updates
+// and an iterator over the watch log.
+type Client struct {
+	// Channel used by watcherLoop to notify the client. When the client is
+	// stopped, update is closed.
+	update chan struct{}
+
+	// Protects the fields below.
+	mu sync.Mutex
+	// Sequence number pointing to the start of the previously retrieved log
+	// batch. Equal to nextSeq if the retrieved batch was empty.
+	prevSeq uint64
+	// Sequence number pointing to the start of the next log batch to retrieve.
+	nextSeq uint64
+	// When the client is stopped, err is set to the reason for stopping.
+	err error
+}
+
+func newClient(seq uint64) *Client {
+	return &Client{
+		update:  make(chan struct{}, 1),
+		prevSeq: seq,
+		nextSeq: seq,
+	}
+}
+
+func newStoppedClient(err error) *Client {
+	c := newClient(0)
+	c.stop(err)
+	return c
+}
+
+// Wait returns the update channel that can be used to wait for new changes in
+// the store. If the update channel is closed, the client is stopped and no more
+// updates will happen. Otherwise, the channel will have a value available
+// whenever the store has changed since the last receive on the channel.
+func (c *Client) Wait() <-chan struct{} {
+	return c.update
+}
+
+// NextBatchFromLog returns the next batch of watch log records (transaction)
+// from the given database and the resume marker at the end of the batch. If
+// there is no batch available, it returns a nil slice and the same resume
+// marker as the previous NextBatchFromLog call. The returned log entries are
+// guaranteed to point to existing data versions until either the client is
+// stopped or NextBatchFromLog is called again. If the client is stopped,
+// NextBatchFromLog returns the same error as Err.
+func (c *Client) NextBatchFromLog(st store.Store) ([]*LogEntry, watch.ResumeMarker, error) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.err != nil {
+		return nil, nil, c.err
+	}
+	batch, batchEndSeq, err := readBatchFromLog(st, c.nextSeq)
+	if err != nil {
+		// We cannot call stop() here since c.mu is locked. However, we checked
+		// above that c.err is nil, so it is safe to set c.err anc close c.update.
+		c.err = err
+		closeAndDrain(c.update)
+		return nil, nil, err
+	}
+	c.prevSeq = c.nextSeq
+	c.nextSeq = batchEndSeq
+	return batch, MakeResumeMarker(batchEndSeq), nil
+}
+
+// Err returns the error that caused the client to stop watching. If the error
+// is nil, the client is active. Otherwise:
+// * ErrCanceled - watch was canceled by the client.
+// * ErrAborted - watcher was closed (store was closed, possibly destroyed).
+// * ErrUnknownResumeMarker - watch was started with an invalid resume marker.
+// * other errors - NextBatchFromLog encountered an error.
+func (c *Client) Err() error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	return c.err
+}
+
+// stop closes the client update channel and sets the error returned by Err.
+// Idempotent (only the error from the first call to stop is kept).
+func (c *Client) stop(err error) {
+	c.mu.Lock()
+	if c.err == nil {
+		c.err = err
+		closeAndDrain(c.update)
+	}
+	c.mu.Unlock()
 }
 
 // GetResumeMarker returns the ResumeMarker that points to the current end
@@ -158,15 +246,11 @@
 	return join(common.LogPrefix, fmt.Sprintf("%016x", seq))
 }
 
-// ReadBatchFromLog returns a batch of watch log records (a transaction) from
-// the given database and the new resume marker at the end of the batch.
-func ReadBatchFromLog(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error) {
-	seq, err := parseResumeMarker(string(resumeMarker))
-	if err != nil {
-		return nil, resumeMarker, err
-	}
+// readBatchFromLog returns a batch of watch log records (a transaction) from
+// the given database and the next sequence number at the end of the batch.
+func readBatchFromLog(st store.Store, seq uint64) ([]*LogEntry, uint64, error) {
 	_, scanLimit := common.ScanPrefixArgs(common.LogPrefix, "")
-	scanStart := resumeMarker
+	scanStart := MakeResumeMarker(seq)
 	endOfBatch := false
 
 	// Use the store directly to scan these read-only log entries, no need
@@ -179,7 +263,7 @@
 		var logEnt LogEntry
 		if err := vom.Decode(stream.Value(nil), &logEnt); err != nil {
 			stream.Cancel()
-			return nil, resumeMarker, err
+			return nil, seq, err
 		}
 
 		logs = append(logs, &logEnt)
@@ -193,15 +277,15 @@
 	}
 
 	if !endOfBatch {
-		if err = stream.Err(); err != nil {
-			return nil, resumeMarker, err
+		if err := stream.Err(); err != nil {
+			return nil, seq, err
 		}
 		if len(logs) > 0 {
 			vlog.Fatalf("end of batch not found after %d entries", len(logs))
 		}
-		return nil, resumeMarker, nil
+		return nil, seq, nil
 	}
-	return logs, watch.ResumeMarker(logEntryKey(seq)), nil
+	return logs, seq, nil
 }
 
 func parseResumeMarker(resumeMarker string) (uint64, error) {
diff --git a/services/syncbase/store/watchable/watcher_test.go b/services/syncbase/store/watchable/watcher_test.go
index 86985ae..f7005fb 100644
--- a/services/syncbase/store/watchable/watcher_test.go
+++ b/services/syncbase/store/watchable/watcher_test.go
@@ -10,6 +10,7 @@
 	"testing"
 	"time"
 
+	"v.io/v23/verror"
 	"v.io/x/ref/services/syncbase/common"
 	"v.io/x/ref/services/syncbase/store"
 )
@@ -42,11 +43,11 @@
 	}
 
 	// Fetch the batches and a few more empty fetches and verify them.
-	resmark := MakeResumeMarker(0)
-	var seq uint64
+	var seq uint64 = 0
+	var wantSeq uint64 = 0
 
 	for i := 0; i < (numTx + 3); i++ {
-		logs, newResmark, err := ReadBatchFromLog(st, resmark)
+		logs, newSeq, err := readBatchFromLog(st, seq)
 		if err != nil {
 			t.Fatalf("can't get watch log batch: %v", err)
 		}
@@ -56,11 +57,10 @@
 					i, len(logs), numPut)
 			}
 
-			seq += uint64(len(logs))
-			expResmark := MakeResumeMarker(seq)
-			if !bytes.Equal(newResmark, expResmark) {
-				t.Errorf("log fetch (i=%d) wrong resmark: %s instead of %s",
-					i, newResmark, expResmark)
+			wantSeq += uint64(len(logs))
+			if newSeq != wantSeq {
+				t.Errorf("log fetch (i=%d) wrong seq: %d instead of %d",
+					i, newSeq, wantSeq)
 			}
 
 			for j, log := range logs {
@@ -87,12 +87,12 @@
 				tx.Abort()
 			}
 		} else {
-			if logs != nil || !bytes.Equal(newResmark, resmark) {
-				t.Errorf("NOP log fetch (i=%d) had changes: %d logs, resmask %s",
-					i, len(logs), newResmark)
+			if logs != nil || newSeq != seq {
+				t.Errorf("NOP log fetch (i=%d) had changes: %d logs, seq %d",
+					i, len(logs), newSeq)
 			}
 		}
-		resmark = newResmark
+		seq = newSeq
 	}
 }
 
@@ -106,23 +106,26 @@
 	w.broadcastUpdates()
 
 	// Never-receiving client should not block watcher.
-	_, cancel1 := w.watchUpdates()
+	_, cancel1 := w.watchUpdates(0)
 	defer cancel1()
 
 	// Cancelled client should not affect watcher.
-	chan2, cancel2 := w.watchUpdates()
+	c2, cancel2 := w.watchUpdates(0)
 	cancel2()
 	// Cancel should be idempotent.
 	cancel2()
 
 	// Channel should be closed when client is cancelled.
 	select {
-	case _, ok := <-chan2:
+	case _, ok := <-c2.Wait():
 		if ok {
-			t.Fatalf("cancel2 was called, chan2 should be drained and closed")
+			t.Fatalf("cancel2 was called, c2 channel should be drained and closed")
 		}
 	default:
-		t.Fatalf("cancel2 was called, chan2 should be closed")
+		t.Fatalf("cancel2 was called, c2 channel should be closed")
+	}
+	if verror.ErrorID(c2.Err()) != verror.ErrCanceled.ID {
+		t.Fatalf("expected c2.Err() to return ErrCanceled, got: %v", c2.Err())
 	}
 
 	// Update broadcast should not block client registration or vice versa.
@@ -130,13 +133,13 @@
 	registerLoop1 := make(chan bool)
 	go func() {
 		for i := 0; i < 5000; i++ {
-			_, canceli := w.watchUpdates()
+			_, canceli := w.watchUpdates(0)
 			defer canceli()
 		}
 		registerLoop1 <- true
 	}()
 
-	chan3, cancel3 := w.watchUpdates()
+	c3, cancel3 := w.watchUpdates(0)
 
 	for i := 0; i < 5000; i++ {
 		w.broadcastUpdates()
@@ -154,19 +157,22 @@
 
 	// chan3 should have a single pending notification.
 	select {
-	case _, ok := <-chan3:
+	case _, ok := <-c3.Wait():
 		if !ok {
-			t.Fatalf("chan3 should not be closed")
+			t.Fatalf("c3 channel should not be closed")
 		}
 	default:
-		t.Fatalf("chan3 should have a notification")
+		t.Fatalf("c3 channel should have a notification")
 	}
 	select {
-	case <-chan3:
-		t.Fatalf("chan3 should not have another notification")
+	case <-c3.Wait():
+		t.Fatalf("c3 channel should not have another notification")
 	default:
 		// ok
 	}
+	if c3.Err() != nil {
+		t.Fatalf("expected c3.Err() to return nil, got: %v", c3.Err())
+	}
 
 	// After notification was read, chan3 still receives updates.
 	go func() {
@@ -175,12 +181,15 @@
 	}()
 
 	select {
-	case _, ok := <-chan3:
+	case _, ok := <-c3.Wait():
 		if !ok {
-			t.Fatalf("chan3 should not be closed")
+			t.Fatalf("c3 channel should not be closed")
 		}
 	case <-time.After(5 * time.Second):
-		t.Fatalf("chan3 didn't receive after 5s")
+		t.Fatalf("c3 channel didn't receive after 5s")
+	}
+	if c3.Err() != nil {
+		t.Fatalf("expected c3.Err() to return nil, got: %v", c3.Err())
 	}
 
 	// Closing the watcher.
@@ -190,28 +199,38 @@
 
 	// Client channels should be closed when watcher is closed.
 	select {
-	case _, ok := <-chan3:
+	case _, ok := <-c3.Wait():
 		if ok {
-			t.Fatalf("watcher was closed, chan3 should be drained and closed")
+			t.Fatalf("watcher was closed, c3 channel should be drained and closed")
 		}
 	default:
-		t.Fatalf("watcher was closed, chan3 should be closed")
+		t.Fatalf("watcher was closed, c3 channel should be closed")
+	}
+	if verror.ErrorID(c3.Err()) != verror.ErrAborted.ID {
+		t.Fatalf("expected c3.Err() to return ErrAborted, got: %v", c3.Err())
 	}
 
 	// Cancel is safe to call after the store is closed.
 	cancel3()
+	// ErrAborted should be preserved instead of being overridden by ErrCanceled.
+	if verror.ErrorID(c3.Err()) != verror.ErrAborted.ID {
+		t.Fatalf("expected c3.Err() to return ErrAborted, got: %v", c3.Err())
+	}
 
 	// watchUpdates is safe to call after the store is closed, returning closed
 	// channel.
-	chan4, cancel4 := w.watchUpdates()
+	c4, cancel4 := w.watchUpdates(0)
 
 	select {
-	case _, ok := <-chan4:
+	case _, ok := <-c4.Wait():
 		if ok {
-			t.Fatalf("watcher was closed, chan4 should be drained and closed")
+			t.Fatalf("watcher was closed, c4 channel should be drained and closed")
 		}
 	default:
-		t.Fatalf("watcher was closed, chan4 should be closed")
+		t.Fatalf("watcher was closed, c4 channel should be closed")
+	}
+	if verror.ErrorID(c4.Err()) != verror.ErrAborted.ID {
+		t.Fatalf("expected c4.Err() to return ErrAborted, got: %v", c4.Err())
 	}
 
 	cancel4()
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index b14fcbc..e512f94 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -71,62 +71,70 @@
 func (s *syncService) watchStore(ctx *context.T, dbId wire.Id, st *watchable.Store) {
 	vlog.VI(1).Infof("sync: watchStore: DB %v: start watching updates", dbId)
 
-	updatesChan, cancel := watchable.WatchUpdates(st)
+	resMark, err := getResMark(ctx, st)
+	if err != nil {
+		if verror.ErrorID(err) != verror.ErrNoExist.ID {
+			// TODO(rdaoud): Quarantine this database.
+			vlog.Errorf("sync: watchStore: %v: cannot get resMark, stop watching and exit: %v", dbId, err)
+			return
+		}
+		resMark = watchable.MakeResumeMarker(0)
+	}
+
+	watcher, cancel := st.WatchUpdates(resMark)
 	defer cancel()
 
 	moreWork := true
 	for moreWork && !s.isClosed() {
-		if s.processDatabase(ctx, dbId, st) {
+		if hadUpdate, err := s.processDatabase(ctx, dbId, st, watcher); err != nil {
+			// TODO(rdaoud): Quarantine this database.
+			vlog.Errorf("sync: watchStore: DB %v: processDatabase failed, stop watching and exit: %v", dbId, err)
+			return
+		} else if hadUpdate {
 			vlog.VI(2).Infof("sync: watchStore: DB %v: had updates", dbId)
 		} else {
 			vlog.VI(2).Infof("sync: watchStore: DB %v: idle, wait for updates", dbId)
 			select {
-			case _, moreWork = <-updatesChan:
+			case _, moreWork = <-watcher.Wait():
 
-			case <-s.closed:
-				moreWork = false
+			case _, moreWork = <-s.closed:
+
 			}
 		}
 	}
 
-	vlog.VI(1).Infof("sync: watchStore: DB %v: channel closed, stop watching and exit", dbId)
+	vlog.VI(1).Infof("sync: watchStore: DB %v: channel closed, stop watching and exit, watch status %v", dbId, watcher.Err())
 }
 
 // processDatabase fetches from the given database at most one new batch update
 // (transaction) and processes it.  A batch is stored as a contiguous set of log
 // records ending with one record having the "continued" flag set to false.  The
 // call returns true if a new batch update was processed.
-func (s *syncService) processDatabase(ctx *context.T, dbId wire.Id, st store.Store) bool {
+func (s *syncService) processDatabase(ctx *context.T, dbId wire.Id, st store.Store, watcher *watchable.Client) (bool, error) {
 	vlog.VI(2).Infof("sync: processDatabase: begin: %v", dbId)
 	defer vlog.VI(2).Infof("sync: processDatabase: end: %v", dbId)
 
-	resMark, err := getResMark(ctx, st)
-	if err != nil {
-		if verror.ErrorID(err) != verror.ErrNoExist.ID {
-			vlog.Errorf("sync: processDatabase: %v: cannot get resMark: %v", dbId, err)
-			return false
-		}
-		resMark = watchable.MakeResumeMarker(0)
-	}
-
 	// Initialize Database sync state if needed.
 	s.initSyncStateInMem(ctx, dbId, "")
 
-	// Get a batch of watch log entries, if any, after this resume marker.
-	logs, nextResmark, err := watchable.ReadBatchFromLog(st, resMark)
+	// Get the next batch of watch log entries, if any.
+	logs, nextResmark, err := watcher.NextBatchFromLog(st)
 	if err != nil {
-		// An error here (scan stream cancelled) is possible when the watcher is in
-		// the middle of processing a database when it is destroyed. Hence, we just
-		// ignore this database and proceed.
 		vlog.Errorf("sync: processDatabase: %v: cannot get watch log batch: %v", dbId, verror.DebugString(err))
-		return false
+		if verror.ErrorID(err) == verror.ErrAborted.ID {
+			// Watch was aborted because the database was closed, probably destroyed.
+			// We suppress this error and let the watcher exit gracefully.
+			vlog.VI(1).Infof("sync: processDatabase: %v: watch aborted because database was closed", dbId)
+			return false, nil
+		}
+		return false, err
 	} else if logs == nil {
-		return false
+		return false, nil
 	}
 
-	if err = s.processWatchLogBatch(ctx, dbId, st, logs, nextResmark); err != nil {
-		// TODO(rdaoud): quarantine this database.
-		return false
+	if err = s.processWatchLogBatch(ctx, dbId, st, watcher.Err, logs, nextResmark); err != nil {
+		vlog.Errorf("sync: processDatabase: %v: cannot process watch log batch: %v", dbId, verror.DebugString(err))
+		return false, err
 	}
 
 	// The requirement for pause is that any write after the pause must not
@@ -143,23 +151,25 @@
 		// The database is online. Cut a gen.
 		if err := s.checkptLocalGen(ctx, dbId, nil); err != nil {
 			vlog.Errorf("sync: processDatabase: %v: cannot cut a generation: %v", dbId, verror.DebugString(err))
-			return false
+			// TODO(ivanpi): This may not be a fatal error, continue watching?
+			return false, err
 		}
 	} else {
 		vlog.VI(1).Infof("sync: processDatabase: %v database not allowed to sync, skipping cutting a gen", dbId)
 	}
-	return true
+	return true, nil
 }
 
 // processWatchLogBatch parses the given batch of watch log records, updates the
 // watchable syncgroup prefixes, uses the prefixes to filter the batch to the
 // subset of syncable records, and transactionally applies these updates to the
 // sync metadata (DAG & log records) and updates the watch resume marker.
-func (s *syncService) processWatchLogBatch(ctx *context.T, dbId wire.Id, st store.Store, logs []*watchable.LogEntry, resMark watch.ResumeMarker) error {
+func (s *syncService) processWatchLogBatch(ctx *context.T, dbId wire.Id, st store.Store, watchStatus func() error, logs []*watchable.LogEntry, resMark watch.ResumeMarker) error {
 	if len(logs) == 0 {
 		return nil
 	}
 
+	// TODO(ivanpi): Pipe through errors from here instead of calling Fatalf.
 	if processDbStateChangeLogRecord(ctx, s, st, dbId, logs[0], resMark) {
 		// A batch containing DbStateChange will not have any more records.
 		// This batch is done processing.
@@ -213,10 +223,17 @@
 	vlog.VI(3).Infof("sync: processWatchLogBatch: %v: sg snap %t, syncable %d, total %d", dbId, !appBatch, len(batch), totalCount)
 
 	if err := s.processWatchBlobRefs(ctx, dbId, st, batch); err != nil {
-		// There may be an error here if the database is recently
-		// destroyed.  Ignore the error and continue to another database.
 		vlog.Errorf("sync: processWatchLogBatch: %v: watcher cannot process blob refs: %v", dbId, err)
-		return nil
+		if verror.ErrorID(watchStatus()) == verror.ErrAborted.ID {
+			// Watch was aborted because the database was closed, probably destroyed.
+			// We suppress this error and let the watcher exit gracefully.
+			// Note, we use the watcher error for this because it is guaranteed to be
+			// set to ErrAborted when the database is closed gracefully, unlike the
+			// error returned from processWatchBlobRefs.
+			vlog.VI(1).Infof("sync: processWatchLogBatch: %v: watch aborted because database was closed", dbId)
+			return nil
+		}
+		return err
 	}
 
 	// Transactional processing of the batch: Fixup syncable log records to
@@ -241,10 +258,17 @@
 	})
 
 	if err != nil {
-		// There may be an error here if the database is recently
-		// destroyed. Ignore the error and continue to another database.
-		// TODO(rdaoud): quarantine this database for other errors.
 		vlog.Errorf("sync: processWatchLogBatch: %v: watcher cannot process batch: %v", dbId, err)
+		if verror.ErrorID(watchStatus()) == verror.ErrAborted.ID {
+			// Watch was aborted because the database was closed, probably destroyed.
+			// We suppress this error and let the watcher exit gracefully.
+			// Note, we use the watcher error for this because it is guaranteed to be
+			// set to ErrAborted when the database is closed gracefully, unlike the
+			// error returned from RunInTransaction.
+			vlog.VI(1).Infof("sync: processWatchLogBatch: %v: watch aborted because database was closed", dbId)
+			return nil
+		}
+		return err
 	}
 	return nil
 }
diff --git a/services/syncbase/vsync/watcher_test.go b/services/syncbase/vsync/watcher_test.go
index 82b396e..ac6c04b 100644
--- a/services/syncbase/vsync/watcher_test.go
+++ b/services/syncbase/vsync/watcher_test.go
@@ -207,8 +207,10 @@
 	barKey := makeRowKey("bar")
 	fooxyzKey := makeRowKey("fooxyz")
 
+	noError := func() error { return nil }
+
 	// Empty logs does not fail.
-	s.processWatchLogBatch(nil, mockDbId, st, nil, nil)
+	s.processWatchLogBatch(nil, mockDbId, st, noError, nil, nil)
 
 	// Non-syncable logs.
 	batch := []*watchable.LogEntry{
@@ -217,7 +219,7 @@
 	}
 
 	resmark := watchable.MakeResumeMarker(1234)
-	s.processWatchLogBatch(nil, mockDbId, st, batch, resmark)
+	s.processWatchLogBatch(nil, mockDbId, st, noError, batch, resmark)
 
 	if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
@@ -250,7 +252,7 @@
 	}
 
 	resmark = watchable.MakeResumeMarker(3456)
-	s.processWatchLogBatch(nil, mockDbId, st, batch, resmark)
+	s.processWatchLogBatch(nil, mockDbId, st, noError, batch, resmark)
 
 	if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
@@ -290,7 +292,7 @@
 	}
 
 	resmark = watchable.MakeResumeMarker(7890)
-	s.processWatchLogBatch(nil, mockDbId, st, batch, resmark)
+	s.processWatchLogBatch(nil, mockDbId, st, noError, batch, resmark)
 
 	if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
@@ -333,7 +335,7 @@
 	}
 
 	resmark = watchable.MakeResumeMarker(20212223)
-	s.processWatchLogBatch(nil, mockDbId, st, batch, resmark)
+	s.processWatchLogBatch(nil, mockDbId, st, noError, batch, resmark)
 
 	if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)