syncbase: Implement log start marker and hook for GC.

Watchable store now keeps track of the log start via a persistent
marker. The watch log is now contiguous starting from the marker,
while entries before the marker may be sparse and partially garbage
collected. Clients attempting to start watching from a resume marker
earlier than the log start are rejected.

Added UpdateLogStart() method to the watchable store. It is intended
to be called by GC, passing in the sync resume marker. It persists and
returns the newly computed log start. The log start is computed as the
earliest resume marker out of all active watch clients - ephemeral
clients as well as the persistent sync watcher. This prevents garbage
collection from interrupting an active watch as well as ensuring that
the sync watcher sees all data.

Added test for UpdateLogStart(), also covering NextBatchFromLog()
iteration from previous CL.

Change-Id: I5e38aef4414fce1495c8da80e907de13fce9ae14
diff --git a/services/syncbase/common/constants.go b/services/syncbase/common/constants.go
index 6b42d00..5632349 100644
--- a/services/syncbase/common/constants.go
+++ b/services/syncbase/common/constants.go
@@ -13,6 +13,7 @@
 	DbGCPrefix            = "g"
 	DbInfoPrefix          = "i"
 	LogPrefix             = "l"
+	LogMarkerPrefix       = "m"
 	VClockPrefix          = "q"
 	RowPrefix             = "r"
 	ServicePrefix         = "s"
diff --git a/services/syncbase/server/watchlog_test.go b/services/syncbase/server/watchlog_test.go
index 3f46db0..8e3c96c 100644
--- a/services/syncbase/server/watchlog_test.go
+++ b/services/syncbase/server/watchlog_test.go
@@ -74,7 +74,9 @@
 	blessings, _ := v23.GetPrincipal(ctx).BlessingStore().Default()
 	call := &mockCall{p: v23.GetPrincipal(ctx), b: blessings}
 	var expected []interface{}
-	resumeMarker, _ := watchable.GetResumeMarker(st)
+	sn := st.NewSnapshot()
+	resumeMarker, _ := watchable.GetResumeMarker(sn)
+	sn.Abort()
 	watcher, cancel := st.WatchUpdates(resumeMarker)
 	defer cancel()
 	// Generate Put/Delete events.
diff --git a/services/syncbase/store/watchable/store.go b/services/syncbase/store/watchable/store.go
index 406c7da..af63214 100644
--- a/services/syncbase/store/watchable/store.go
+++ b/services/syncbase/store/watchable/store.go
@@ -28,7 +28,7 @@
 
 // Options configures a Store.
 type Options struct {
-	// Key prefixes to version and log. If nil, all keys are managed.
+	// Key prefixes to version and log.
 	ManagedPrefixes []string
 }
 
@@ -40,13 +40,17 @@
 
 // Wrap returns a *Store that wraps the given store.Store.
 func Wrap(st store.Store, clock Clock, opts *Options) (*Store, error) {
+	logStart, err := getLogStartSeq(st)
+	if err != nil {
+		return nil, err
+	}
 	seq, err := getNextLogSeq(st)
 	if err != nil {
 		return nil, err
 	}
 	return &Store{
 		ist:     st,
-		watcher: newWatcher(),
+		watcher: newWatcher(logStart),
 		opts:    opts,
 		seq:     seq,
 		Clock:   clock,
@@ -134,9 +138,6 @@
 // Internal helpers
 
 func (st *Store) managesKey(key []byte) bool {
-	if st.opts.ManagedPrefixes == nil {
-		return true
-	}
 	ikey := string(key)
 	// TODO(sadovsky): Optimize, e.g. use binary search (here and below).
 	for _, p := range st.opts.ManagedPrefixes {
@@ -148,12 +149,9 @@
 }
 
 func (st *Store) managesRange(start, limit []byte) bool {
-	if st.opts.ManagedPrefixes == nil {
-		return true
-	}
-	istart, ilimit := string(start), string(limit)
+	istart, ilimit := string(start), limitForCompare(string(limit))
 	for _, p := range st.opts.ManagedPrefixes {
-		pstart, plimit := pubutil.PrefixRangeStart(p), pubutil.PrefixRangeLimit(p)
+		pstart, plimit := pubutil.PrefixRangeStart(p), limitForCompare(pubutil.PrefixRangeLimit(p))
 		if pstart <= istart && ilimit <= plimit {
 			return true
 		}
@@ -164,3 +162,17 @@
 	}
 	return false
 }
+
+func limitForCompare(limit string) string {
+	if limit == "" {
+		// Empty limit means no limit. Return a value always greater than a valid start.
+		return "\xff"
+	}
+	return limit
+}
+
+func (st *Store) getSeq() uint64 {
+	st.mu.Lock()
+	defer st.mu.Unlock()
+	return st.seq
+}
diff --git a/services/syncbase/store/watchable/store_test.go b/services/syncbase/store/watchable/store_test.go
index 6af8c06..f528528 100644
--- a/services/syncbase/store/watchable/store_test.go
+++ b/services/syncbase/store/watchable/store_test.go
@@ -19,47 +19,47 @@
 
 func TestStream(t *testing.T) {
 	runTest(t, []string{}, test.RunStreamTest)
-	runTest(t, nil, test.RunStreamTest)
+	runTest(t, []string{""}, test.RunStreamTest)
 }
 
 func TestSnapshot(t *testing.T) {
 	runTest(t, []string{}, test.RunSnapshotTest)
-	runTest(t, nil, test.RunSnapshotTest)
+	runTest(t, []string{""}, test.RunSnapshotTest)
 }
 
 func TestStoreState(t *testing.T) {
 	runTest(t, []string{}, test.RunStoreStateTest)
-	runTest(t, nil, test.RunStoreStateTest)
+	runTest(t, []string{""}, test.RunStoreStateTest)
 }
 
 func TestClose(t *testing.T) {
 	runTest(t, []string{}, test.RunCloseTest)
-	runTest(t, nil, test.RunCloseTest)
+	runTest(t, []string{""}, test.RunCloseTest)
 }
 
 func TestReadWriteBasic(t *testing.T) {
 	runTest(t, []string{}, test.RunReadWriteBasicTest)
-	runTest(t, nil, test.RunReadWriteBasicTest)
+	runTest(t, []string{""}, test.RunReadWriteBasicTest)
 }
 
 func TestReadWriteRandom(t *testing.T) {
 	runTest(t, []string{}, test.RunReadWriteRandomTest)
-	runTest(t, nil, test.RunReadWriteRandomTest)
+	runTest(t, []string{""}, test.RunReadWriteRandomTest)
 }
 
 func TestConcurrentTransactions(t *testing.T) {
 	runTest(t, []string{}, test.RunConcurrentTransactionsTest)
-	runTest(t, nil, test.RunConcurrentTransactionsTest)
+	runTest(t, []string{""}, test.RunConcurrentTransactionsTest)
 }
 
 func TestTransactionState(t *testing.T) {
 	runTest(t, []string{}, test.RunTransactionStateTest)
-	runTest(t, nil, test.RunTransactionStateTest)
+	runTest(t, []string{""}, test.RunTransactionStateTest)
 }
 
 func TestTransactionsWithGet(t *testing.T) {
 	runTest(t, []string{}, test.RunTransactionsWithGetTest)
-	runTest(t, nil, test.RunTransactionsWithGetTest)
+	runTest(t, []string{""}, test.RunTransactionsWithGetTest)
 }
 
 func runTest(t *testing.T, mp []string, f func(t *testing.T, st store.Store)) {
diff --git a/services/syncbase/store/watchable/transaction.go b/services/syncbase/store/watchable/transaction.go
index f1a7a15..c9ee74f 100644
--- a/services/syncbase/store/watchable/transaction.go
+++ b/services/syncbase/store/watchable/transaction.go
@@ -152,7 +152,9 @@
 		return err
 	}
 	tx.St.seq = seq
-	tx.St.watcher.broadcastUpdates()
+	if len(tx.ops) > 0 {
+		tx.St.watcher.broadcastUpdates()
+	}
 	return nil
 }
 
diff --git a/services/syncbase/store/watchable/transaction_test.go b/services/syncbase/store/watchable/transaction_test.go
index f28f996..e56ab0d 100644
--- a/services/syncbase/store/watchable/transaction_test.go
+++ b/services/syncbase/store/watchable/transaction_test.go
@@ -74,7 +74,7 @@
 	// Note: NewVClockForTests calls cl.SysClock.Now() once to write the initial
 	// VClockData to the store.
 	cl := vclock.NewVClockForTests(&mockSystemClock{Time: t1, Inc: inc})
-	wst1, err := Wrap(ist, cl, &Options{ManagedPrefixes: nil})
+	wst1, err := Wrap(ist, cl, &Options{ManagedPrefixes: []string{""}})
 	if err != nil {
 		t.Fatalf("Wrap failed: %v", err)
 	}
@@ -99,7 +99,7 @@
 	verifyCommitLog(t, ist, seqForCreate, 2, t1.Add(inc))
 
 	// Update data already present in store with a new watchable store
-	wst2, err := Wrap(ist, cl, &Options{ManagedPrefixes: nil})
+	wst2, err := Wrap(ist, cl, &Options{ManagedPrefixes: []string{""}})
 	if err != nil {
 		t.Fatalf("Wrap failed: %v", err)
 	}
@@ -142,7 +142,7 @@
 	// Note: NewVClockForTests calls cl.SysClock.Now() once to write the initial
 	// VClockData to the store.
 	cl := vclock.NewVClockForTests(&mockSystemClock{Time: t1, Inc: inc})
-	wst, err := Wrap(ist, cl, &Options{ManagedPrefixes: nil})
+	wst, err := Wrap(ist, cl, &Options{ManagedPrefixes: []string{""}})
 	if err != nil {
 		t.Fatalf("Wrap failed: %v", err)
 	}
diff --git a/services/syncbase/store/watchable/util.go b/services/syncbase/store/watchable/util.go
index a364058..3389421 100644
--- a/services/syncbase/store/watchable/util.go
+++ b/services/syncbase/store/watchable/util.go
@@ -42,11 +42,11 @@
 }
 
 func makeVersionKey(key []byte) []byte {
-	return []byte(join(common.VersionPrefix, string(key)))
+	return []byte(common.JoinKeyParts(common.VersionPrefix, string(key)))
 }
 
 func makeAtVersionKey(key, version []byte) []byte {
-	return []byte(join(string(key), string(version)))
+	return []byte(common.JoinKeyParts(string(key), string(version)))
 }
 
 func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
@@ -80,10 +80,6 @@
 	return tx.Delete(makeVersionKey(key))
 }
 
-func join(parts ...string) string {
-	return common.JoinKeyParts(parts...)
-}
-
 func convertError(err error) error {
 	return verror.Convert(verror.IDAction{}, nil, err)
 }
diff --git a/services/syncbase/store/watchable/util_test.go b/services/syncbase/store/watchable/util_test.go
index adf00dd..761ec10 100644
--- a/services/syncbase/store/watchable/util_test.go
+++ b/services/syncbase/store/watchable/util_test.go
@@ -7,18 +7,39 @@
 import (
 	"testing"
 
+	"v.io/x/ref/services/syncbase/store"
 	"v.io/x/ref/services/syncbase/vclock"
 )
 
-// TestGetNextLogSeq tests that the getNextLogSeq helper works on range 0..10.
+// TestGetNextLogSeq tests that the getNextLogSeq helper works on range 0..10
+// and continues to work after moving the log start.
 func TestGetNextLogSeq(t *testing.T) {
-	st, destroy := createStore()
+	ist, destroy := createStore()
 	defer destroy()
-	st, err := Wrap(st, vclock.NewVClockForTests(nil), &Options{})
+	st, err := Wrap(ist, vclock.NewVClockForTests(nil), &Options{ManagedPrefixes: []string{}})
 	if err != nil {
 		t.Fatal(err)
 	}
-	for i := uint64(0); i <= uint64(10); i++ {
+	// Check 0..10.
+	testGetNextLogSeqInterval(t, st, 0, 10)
+	// Update log start.
+	if _, err := st.watcher.updateLogStartSeq(st, 7); err != nil {
+		t.Fatalf("failed to update log start seq: %v", err)
+	}
+	// Delete some entries before the new start to simulate garbage collection.
+	for _, i := range []uint64{1, 3, 4, 5} {
+		if err := st.Delete([]byte(logEntryKey(i))); err != nil {
+			t.Fatalf("failed to delete log entry %d: %v", i, err)
+		}
+	}
+	// Check 11..20.
+	testGetNextLogSeqInterval(t, st, 11, 20)
+}
+
+// testGenNextLogSeqInterval tests that the getNextLogSeq helper works on range
+// begin..end, assuming that begin is the next seq to be written.
+func testGetNextLogSeqInterval(t *testing.T, st store.Store, begin, end uint64) {
+	for i := begin; i <= end; i++ {
 		seq, err := getNextLogSeq(st)
 		if err != nil {
 			t.Fatalf("failed to get log seq: %v", err)
@@ -29,3 +50,37 @@
 		st.Put([]byte(logEntryKey(i)), nil)
 	}
 }
+
+// TestPutLogStartSeq tests putting and getting the log start seq.
+func TestPutLogStartSeq(t *testing.T) {
+	ist, destroy := createStore()
+	defer destroy()
+	st, err := Wrap(ist, vclock.NewVClockForTests(nil), &Options{ManagedPrefixes: []string{}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	// Log start should default to 0.
+	if gotSeq, err := getLogStartSeq(st); err != nil {
+		t.Fatalf("getLogStartSeq() failed: %v", err)
+	} else if gotSeq != 0 {
+		t.Errorf("expected initial seq 0, got %d", gotSeq)
+	}
+	// Test log start persistence and encoding.
+	for _, seq := range []uint64{1, 42, 1337, 0x1f427, 0x1badb002, 0xdeadbeef} {
+		if err := putLogStartSeq(st, seq); err != nil {
+			t.Fatalf("putLogStartSeq(%d) failed: %v", seq, err)
+		}
+		// The log start should be the same when read from the wrapped store or from
+		// the watchable store.
+		if gotSeq, err := getLogStartSeq(st); err != nil {
+			t.Fatalf("getLogStartSeq(st) failed: %v", err)
+		} else if gotSeq != seq {
+			t.Errorf("expected seq %d, got %d", seq, gotSeq)
+		}
+		if gotSeq, err := getLogStartSeq(ist); err != nil {
+			t.Fatalf("getLogStartSeq(ist) failed: %v", err)
+		} else if gotSeq != seq {
+			t.Errorf("expected seq %d, got %d", seq, gotSeq)
+		}
+	}
+}
diff --git a/services/syncbase/store/watchable/watcher.go b/services/syncbase/store/watchable/watcher.go
index c4ca18a..e1e068a 100644
--- a/services/syncbase/store/watchable/watcher.go
+++ b/services/syncbase/store/watchable/watcher.go
@@ -26,18 +26,25 @@
 	// closed, updater is closed.
 	updater chan struct{}
 
-	// Protects the clients map.
+	// Protects the fields below.
 	mu sync.RWMutex
 	// Currently registered clients, notified by watcherLoop via their channels.
 	// When watcher is closed, all clients are stopped (and their channels closed)
 	// with ErrAborted and clients is set to nil.
 	clients map[*Client]struct{}
+	// Sequence number pointing to the log start. Kept in sync with the value
+	// persisted in the store under logStartSeqKey(). The log is a contiguous,
+	// possibly empty, sequence of log entries beginning from logStart; log
+	// entries before logStart may be partially garbage collected. logStart
+	// will never move past an active watcher's seq.
+	logStart uint64
 }
 
-func newWatcher() *watcher {
+func newWatcher(logStart uint64) *watcher {
 	ret := &watcher{
-		updater: make(chan struct{}, 1),
-		clients: make(map[*Client]struct{}),
+		updater:  make(chan struct{}, 1),
+		clients:  make(map[*Client]struct{}),
+		logStart: logStart,
 	}
 	go ret.watcherLoop()
 	return ret
@@ -105,6 +112,34 @@
 	}
 }
 
+// updateLogStartSeq - see UpdateLogStart.
+func (w *watcher) updateLogStartSeq(st *Store, syncSeq uint64) (uint64, error) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	if w.clients == nil {
+		return 0, verror.New(verror.ErrAborted, nil, "watcher closed")
+	}
+	// New log start is the minimum of all watch client seqs - the persistent sync
+	// watcher and all ephemeral client watchers.
+	lsSeq := syncSeq
+	for c := range w.clients {
+		if seq := c.getPrevSeq(); lsSeq > seq {
+			lsSeq = seq
+		}
+	}
+	if lsSeq < w.logStart {
+		// New log start is earlier than the previous log start. This should never
+		// happen since it means at least one watch client is incorrectly reading
+		// log entries released to garbage collection.
+		return 0, verror.New(verror.ErrInternal, nil, "watcher or sync seq less than log start")
+	}
+	if err := putLogStartSeq(st, lsSeq); err != nil {
+		return 0, err
+	}
+	w.logStart = lsSeq
+	return lsSeq, nil
+}
+
 // watchUpdates - see WatchUpdates.
 func (w *watcher) watchUpdates(seq uint64) (_ *Client, cancel func()) {
 	w.mu.Lock()
@@ -113,6 +148,11 @@
 		// watcher is closed. Return stopped Client.
 		return newStoppedClient(verror.NewErrAborted(nil)), func() {}
 	}
+	if seq < w.logStart {
+		// Log start has moved past seq, so entries between seq and log start have
+		// potentially been garbage collected. Return stopped Client.
+		return newStoppedClient(verror.New(watch.ErrUnknownResumeMarker, nil, MakeResumeMarker(seq))), func() {}
+	}
 	// Register and return client.
 	c := newClient(seq)
 	w.clients[c] = struct{}{}
@@ -127,15 +167,40 @@
 	return c, cancel
 }
 
+// UpdateLogStart takes as input the resume marker of the sync watcher and
+// returns the new log start, computed as the earliest resume marker of all
+// active watchers including the sync watcher. The new log start is persisted
+// before being returned, making it safe to garbage collect earlier log entries.
+// syncMarker is assumed to monotonically increase, always remaining between the
+// log start and end (inclusive).
+func (st *Store) UpdateLogStart(syncMarker watch.ResumeMarker) (watch.ResumeMarker, error) {
+	syncSeq, err := parseResumeMarker(string(syncMarker))
+	if err != nil {
+		return nil, err
+	}
+	if logEnd := st.getSeq(); syncSeq > logEnd {
+		// Sync has moved past log end. This should never happen.
+		return nil, verror.New(verror.ErrInternal, nil, "sync seq greater than log end")
+	}
+	lsSeq, err := st.watcher.updateLogStartSeq(st, syncSeq)
+	return MakeResumeMarker(lsSeq), err
+}
+
 // WatchUpdates returns a Client which supports waiting for changes and
 // iterating over the watch log starting from resumeMarker, as well as a
-// cancel function which MUST be called to release watch resources.
+// cancel function which MUST be called to release watch resources. Returns
+// a stopped Client if the resume marker is invalid or pointing to an
+// already garbage collected segment of the log.
 func (st *Store) WatchUpdates(resumeMarker watch.ResumeMarker) (_ *Client, cancel func()) {
 	seq, err := parseResumeMarker(string(resumeMarker))
 	if err != nil {
 		// resumeMarker is invalid. Return stopped Client.
 		return newStoppedClient(err), func() {}
 	}
+	if logEnd := st.getSeq(); seq > logEnd {
+		// resumeMarker points past log end. Return stopped Client.
+		return newStoppedClient(verror.New(watch.ErrUnknownResumeMarker, nil, resumeMarker)), func() {}
+	}
 	return st.watcher.watchUpdates(seq)
 }
 
@@ -209,7 +274,7 @@
 // is nil, the client is active. Otherwise:
 // * ErrCanceled - watch was canceled by the client.
 // * ErrAborted - watcher was closed (store was closed, possibly destroyed).
-// * ErrUnknownResumeMarker - watch was started with an invalid resume marker.
+// * ErrUnknownResumeMarker - watch was started with an invalid or too old resume marker.
 // * other errors - NextBatchFromLog encountered an error.
 func (c *Client) Err() error {
 	c.mu.Lock()
@@ -228,10 +293,16 @@
 	c.mu.Unlock()
 }
 
+func (c *Client) getPrevSeq() uint64 {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	return c.prevSeq
+}
+
 // GetResumeMarker returns the ResumeMarker that points to the current end
 // of the event log.
-func GetResumeMarker(st store.StoreReader) (watch.ResumeMarker, error) {
-	seq, err := getNextLogSeq(st)
+func GetResumeMarker(sntx store.SnapshotOrTransaction) (watch.ResumeMarker, error) {
+	seq, err := getNextLogSeq(sntx)
 	return watch.ResumeMarker(logEntryKey(seq)), err
 }
 
@@ -243,26 +314,28 @@
 func logEntryKey(seq uint64) string {
 	// Note: MaxUint64 is 0xffffffffffffffff.
 	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
-	return join(common.LogPrefix, fmt.Sprintf("%016x", seq))
+	return common.JoinKeyParts(common.LogPrefix, fmt.Sprintf("%016x", seq))
 }
 
 // readBatchFromLog returns a batch of watch log records (a transaction) from
 // the given database and the next sequence number at the end of the batch.
+// Assumes that the log start is less than seq during its execution.
 func readBatchFromLog(st store.Store, seq uint64) ([]*LogEntry, uint64, error) {
 	_, scanLimit := common.ScanPrefixArgs(common.LogPrefix, "")
 	scanStart := MakeResumeMarker(seq)
 	endOfBatch := false
 
-	// Use the store directly to scan these read-only log entries, no need
-	// to create a snapshot since they are never overwritten.  Read and
-	// buffer a batch before processing it.
+	// Use the store directly to scan these read-only log entries, no need to
+	// create a snapshot since log entries are never overwritten and are not
+	// deleted before the log start moves past them. Read and buffer a batch
+	// before processing it.
 	var logs []*LogEntry
 	stream := st.Scan(scanStart, scanLimit)
+	defer stream.Cancel()
 	for stream.Advance() {
 		seq++
 		var logEnt LogEntry
 		if err := vom.Decode(stream.Value(nil), &logEnt); err != nil {
-			stream.Cancel()
 			return nil, seq, err
 		}
 
@@ -271,7 +344,6 @@
 		// Stop if this is the end of the batch.
 		if logEnt.Continued == false {
 			endOfBatch = true
-			stream.Cancel()
 			break
 		}
 	}
@@ -281,7 +353,7 @@
 			return nil, seq, err
 		}
 		if len(logs) > 0 {
-			vlog.Fatalf("end of batch not found after %d entries", len(logs))
+			return nil, seq, verror.New(verror.ErrInternal, nil, fmt.Sprintf("end of batch not found after %d entries", len(logs)))
 		}
 		return nil, seq, nil
 	}
@@ -301,6 +373,32 @@
 	return seq, nil
 }
 
+func logStartSeqKey() string {
+	return common.JoinKeyParts(common.LogMarkerPrefix, "st")
+}
+
+func getLogStartSeq(st store.StoreReader) (uint64, error) {
+	var seq uint64
+	if err := store.Get(nil, st, logStartSeqKey(), &seq); err != nil {
+		if verror.ErrorID(err) != verror.ErrNoExist.ID {
+			return 0, err
+		}
+		return 0, nil
+	}
+	return seq, nil
+}
+
+func putLogStartSeq(st *Store, seq uint64) error {
+	// The log start key must not be managed because getLogStartSeq is called both
+	// on the wrapped store and on the watchable store.
+	if st.managesKey([]byte(logStartSeqKey())) {
+		panic("log start key must not be managed")
+	}
+	// We put directly into the wrapped store to avoid using a watchable store
+	// transaction, which may cause a deadlock by calling broadcastUpdates().
+	return store.Put(nil, st.ist, logStartSeqKey(), seq)
+}
+
 // logEntryExists returns true iff the log contains an entry with the given
 // sequence number.
 func logEntryExists(st store.StoreReader, seq uint64) (bool, error) {
@@ -312,25 +410,27 @@
 }
 
 // getNextLogSeq returns the next sequence number to be used for a new commit.
-// NOTE: this function assumes that all sequence numbers in the log represent
-// some range [start, limit] without gaps.
+// NOTE: This function assumes that all sequence numbers in the log represent
+// some range [start, limit] without gaps. It also assumes that the log is not
+// changing (by appending entries or moving the log start) during its execution.
+// Therefore, it should only be called on a snapshot or a store with no active
+// potential writers (e.g. when opening the store). Furthermore, it assumes that
+// common.LogMarkerPrefix and common.LogPrefix are not managed prefixes.
+// TODO(ivanpi): Consider replacing this function with persisted log end,
+// similar to how log start is handled.
 func getNextLogSeq(st store.StoreReader) (uint64, error) {
-	// Determine initial value for seq.
+	// Read initial value for seq.
 	// TODO(sadovsky): Consider using a bigger seq.
-
-	// Find the beginning of the log.
-	it := st.Scan(common.ScanPrefixArgs(common.LogPrefix, ""))
-	if !it.Advance() {
-		return 0, nil
-	}
-	defer it.Cancel()
-	if it.Err() != nil {
-		return 0, it.Err()
-	}
-	seq, err := parseResumeMarker(string(it.Key(nil)))
+	seq, err := getLogStartSeq(st)
 	if err != nil {
 		return 0, err
 	}
+	// Handle empty log case.
+	if ok, err := logEntryExists(st, seq); err != nil {
+		return 0, err
+	} else if !ok {
+		return 0, nil
+	}
 	var step uint64 = 1
 	// Suppose the actual value we are looking for is S. First, we estimate the
 	// range for S. We find seq, step: seq < S <= seq + step.
diff --git a/services/syncbase/store/watchable/watcher_test.go b/services/syncbase/store/watchable/watcher_test.go
index f7005fb..38d2999 100644
--- a/services/syncbase/store/watchable/watcher_test.go
+++ b/services/syncbase/store/watchable/watcher_test.go
@@ -10,9 +10,11 @@
 	"testing"
 	"time"
 
+	"v.io/v23/services/watch"
 	"v.io/v23/verror"
 	"v.io/x/ref/services/syncbase/common"
 	"v.io/x/ref/services/syncbase/store"
+	"v.io/x/ref/services/syncbase/vclock"
 )
 
 // TestWatchLogBatch tests fetching a batch of log records.
@@ -96,9 +98,9 @@
 	}
 }
 
-// TestWatcher tests broadcasting updates to watch clients.
-func TestWatcher(t *testing.T) {
-	w := newWatcher()
+// TestBroadcastUpdates tests broadcasting updates to watch clients.
+func TestBroadcastUpdates(t *testing.T) {
+	w := newWatcher(0)
 
 	// Update broadcast should never block. It should be safe to call with no
 	// clients registered.
@@ -239,3 +241,191 @@
 	// logs an error message.
 	w.broadcastUpdates()
 }
+
+// TestUpdateLogStart tests that UpdateLogStart moves the log start up to the
+// earliest active watcher's seq, including syncSeq, and that NextBatchFromLog
+// correctly iterates over the log.
+func TestUpdateLogStart(t *testing.T) {
+	ist, destroy := createStore()
+	defer destroy()
+	st, err := Wrap(ist, vclock.NewVClockForTests(nil), &Options{ManagedPrefixes: []string{common.RowPrefix, common.CollectionPermsPrefix}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	// UpdateLogStart(0) should work on an empty log.
+	updateLogStart(t, st, 0, 0)
+	putBatch(t, st, 0, 3)
+	putBatch(t, st, 3, 1)
+	// UpdateLogStart(3) with no active watchers should move the log start to 3.
+	updateLogStart(t, st, 3, 3)
+	putBatch(t, st, 4, 5)
+	// UpdateLogStart should be idempotent if called with the same value.
+	updateLogStart(t, st, 3, 3)
+	// Start watchers @3 and @9.
+	w1, w1cancel := st.WatchUpdates(MakeResumeMarker(3))
+	w2, _ := st.WatchUpdates(MakeResumeMarker(9))
+	// Watching from 0 should fail because log start has moved to 3.
+	if wFail, _ := st.WatchUpdates(MakeResumeMarker(0)); verror.ErrorID(wFail.Err()) != watch.ErrUnknownResumeMarker.ID {
+		t.Fatalf("WatchUpdates with old resume marker should have failed with ErrUnknownResumeMarker, got: %v", err)
+	}
+	putBatch(t, st, 9, 1)
+	// UpdateLogStart(9): sync @9, w1 @3, w2 @9 => log start stays at 3.
+	updateLogStart(t, st, 9, 3)
+	// w1 still @3, next @4.
+	expectBatch(t, st, w1, 3, 1)
+	// UpdateLogStart(9): sync @9, w1 @3, w2 @9 => log start stays at 3.
+	updateLogStart(t, st, 9, 3)
+	// w1 @4, next @9.
+	expectBatch(t, st, w1, 4, 5)
+	// UpdateLogStart(9): sync @9, w1 @4, w2 @9 => log start moves to 4.
+	updateLogStart(t, st, 9, 4)
+	// UpdateLogStart requires the sync resmark to monotonically increase.
+	if _, err := st.UpdateLogStart(MakeResumeMarker(3)); err == nil {
+		t.Fatalf("UpdateLogStart should fail when sync resume marker is smaller than log start")
+	}
+	// Above UpdateLogStart with an invalid sync resmark should have had no effect.
+	updateLogStart(t, st, 9, 4)
+	putBatch(t, st, 10, 2)
+	// w1 @9, next @10.
+	expectBatch(t, st, w1, 9, 1)
+	// w1 @10, next @12.
+	expectBatch(t, st, w1, 10, 2)
+	// UpdateLogStart(9): sync @9, w1 @10, w2 @9 => log start stays at 9.
+	updateLogStart(t, st, 9, 9)
+	// UpdateLogStart(10): sync @10, w1 @10, w2 @9 => log start stays at 9.
+	updateLogStart(t, st, 10, 9)
+	// w2 still @9, next @10.
+	expectBatch(t, st, w2, 9, 1)
+	// w2 @10, next @12.
+	expectBatch(t, st, w2, 10, 2)
+	// w2 @12, nothing to read, next @12.
+	expectBatch(t, st, w2, 12, 0)
+	// UpdateLogStart(12): sync @12, w1 @10, w2 @12 => log start moves to 10.
+	updateLogStart(t, st, 12, 10)
+	putBatch(t, st, 12, 5)
+	// w2 still @12, next @17.
+	expectBatch(t, st, w2, 12, 5)
+	// w2 @17, nothing to read, next @17.
+	expectBatch(t, st, w2, 17, 0)
+	putBatch(t, st, 17, 1)
+	// UpdateLogStart(12): sync @12, w1 @10, w2 @17 => log start stays at 10.
+	updateLogStart(t, st, 12, 10)
+	// Stop w1.
+	w1cancel()
+	if _, _, err := w1.NextBatchFromLog(st); verror.ErrorID(err) != verror.ErrCanceled.ID {
+		t.Fatalf("NextBatchFromLog after watcher cancel should have failed with ErrCanceled, got: %v", err)
+	}
+	// UpdateLogStart(12): sync @12, w2 @17 => log start moves to 12.
+	updateLogStart(t, st, 12, 12)
+	// UpdateLogStart(18): sync @18, w2 @17 => log start moves to 17.
+	updateLogStart(t, st, 18, 17)
+	// Simulate store close and reopen by closing the watcher (which stops w2) and
+	// rewrapping ist.
+	st.watcher.close()
+	if _, _, err := w2.NextBatchFromLog(st); verror.ErrorID(err) != verror.ErrAborted.ID {
+		t.Fatalf("NextBatchFromLog after watcher closure should have failed with ErrAborted, got: %v", err)
+	}
+	if _, err := st.UpdateLogStart(MakeResumeMarker(18)); verror.ErrorID(err) != verror.ErrAborted.ID {
+		t.Fatalf("UpdateLogStart on closed watcher should have failed with ErrAborted, got: %v", err)
+	}
+	st, err = Wrap(ist, vclock.NewVClockForTests(nil), &Options{ManagedPrefixes: []string{common.RowPrefix, common.CollectionPermsPrefix}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	// Before starting watchers, test sync resume marker sanity check.
+	if _, err := st.UpdateLogStart(MakeResumeMarker(42)); err == nil {
+		t.Fatalf("UpdateLogStart should fail when sync resume marker is greater than seq")
+	}
+	putBatch(t, st, 18, 2)
+	putBatch(t, st, 20, 5)
+	putBatch(t, st, 25, 2)
+	// UpdateLogStart(20) with no active watchers should move the log start to 20.
+	updateLogStart(t, st, 20, 20)
+	// Watching from 18 should fail because log start has moved to 20.
+	if wFail, _ := st.WatchUpdates(MakeResumeMarker(18)); verror.ErrorID(wFail.Err()) != watch.ErrUnknownResumeMarker.ID {
+		t.Fatalf("WatchUpdates with old resume marker should have failed with ErrUnknownResumeMarker, got: %v", err)
+	}
+	// Start watcher @20.
+	w3, w3cancel := st.WatchUpdates(MakeResumeMarker(20))
+	// Watching from 42 should fail because log end hasn't reached 42 yet.
+	if wFail, _ := st.WatchUpdates(MakeResumeMarker(42)); verror.ErrorID(wFail.Err()) != watch.ErrUnknownResumeMarker.ID {
+		t.Fatalf("WatchUpdates with resume marker greater than seq should have failed with ErrUnknownResumeMarker, got: %v", err)
+	}
+	// w3 still @20, next @25.
+	expectBatch(t, st, w3, 20, 5)
+	// UpdateLogStart(27): sync @27, w3 @20 => log start stays at 20.
+	updateLogStart(t, st, 27, 20)
+	// w3 @25, next @27.
+	expectBatch(t, st, w3, 25, 2)
+	// UpdateLogStart(27): sync @27, w3 @25 => log start moves to 25.
+	updateLogStart(t, st, 27, 25)
+	// Stop w3.
+	w3cancel()
+	// UpdateLogStart(27) with no active watchers should move the log start to 27.
+	updateLogStart(t, st, 27, 27)
+}
+
+// putBatch puts a batch with size changes, asserting that it starts at wantSeq.
+func putBatch(t *testing.T, st *Store, wantSeq, size uint64) {
+	seq := st.getSeq()
+	if seq != wantSeq {
+		t.Errorf("putBatch %d: unexpected seq before: %d, want %d", wantSeq, seq, wantSeq)
+	}
+	if err := RunInTransaction(st, func(tx *Transaction) error {
+		for i := uint64(0); i < size; i++ {
+			if err := tx.Put([]byte(dummyRowKey(seq+i)), []byte("value")); err != nil {
+				return err
+			}
+		}
+		return nil
+	}); err != nil {
+		t.Fatalf("putBatch %d: RunInTransaction failed: %v", wantSeq, err)
+	}
+	if got, want := st.getSeq(), seq+size; got != want {
+		t.Fatalf("putBatch %d: unexpected seq after: %d, want %d", wantSeq, got, want)
+	}
+}
+
+// expectBatch reads the next batch for the watcher using NextBatchFromLog,
+// asserting that it starts at wantSeq and has wantSize changes.
+func expectBatch(t *testing.T, st *Store, watcher *Client, wantSeq, wantSize uint64) {
+	gotLogs, gotRm, err := watcher.NextBatchFromLog(st)
+	if err != nil {
+		t.Fatalf("expectBatch %d: NextBatchFromLog failed: %v", wantSeq, err)
+	}
+	if got, want := uint64(len(gotLogs)), wantSize; got != want {
+		t.Errorf("expectBatch %d: expected %d logs, got %d", wantSeq, want, got)
+	}
+	if len(gotLogs) > 0 {
+		if endSeq, err := parseResumeMarker(string(gotRm)); err != nil {
+			t.Fatalf("expectBatch %d: got invalid resume marker %q: %v", wantSeq, gotRm, err)
+		} else if got, want := endSeq-uint64(len(gotLogs)), wantSeq; got != want {
+			t.Errorf("expectBatch %d: expected logs starting from seq %d, got %d", wantSeq, want, got)
+		}
+	}
+}
+
+// updateLogStart calls UpdateLogStart with syncSeq, asserting that the updated
+// log start, both returned and persisted, matches wantSeq.
+func updateLogStart(t *testing.T, st *Store, syncSeq, wantSeq uint64) {
+	gotResMark, err := st.UpdateLogStart(MakeResumeMarker(syncSeq))
+	if err != nil {
+		t.Fatalf("UpdateLogStart %d: failed: %v", syncSeq, err)
+	}
+	gotSeq, err := parseResumeMarker(string(gotResMark))
+	if err != nil {
+		t.Fatalf("UpdateLogStart %d: returned invalid resume marker %q: %v", syncSeq, gotResMark, err)
+	}
+	if gotSeq != wantSeq {
+		t.Errorf("UpdateLogStart %d: expected updated seq %d, got %d", syncSeq, wantSeq, gotSeq)
+	}
+	if readSeq, err := getLogStartSeq(st); err != nil {
+		t.Fatalf("UpdateLogStart %d: getLogStartSeq failed: %v", syncSeq, err)
+	} else if readSeq != wantSeq {
+		t.Errorf("UpdateLogStart %d: expected persisted seq %d, got %d", syncSeq, wantSeq, readSeq)
+	}
+}
+
+func dummyRowKey(seq uint64) string {
+	return common.JoinKeyParts(common.RowPrefix, "u,c", fmt.Sprintf("r%08d", seq))
+}