watch: refactoring required for the client watch

This change precedes the client watch implementation and contains
refactoring with some minor fixes. The intention is to reuse more
code.

The main parts are:
- v23/syncbase/nosql: rename Stream to ScanStream and extract
  common Stream interface from ScanStream and ResultStream
  (also WatchStream is coming)
- move Watch methods from server/nosql/database.go to
  server/nosql/database_watch.go
- move watch log helpers from watchable/util.go to
  watchable/watcher.go
- move the WatchLogBatch() func from vsync/ to watchable/
  together with the test

Change-Id: Iaab18a1ec6dd218e85284bd994ed71e766feab26
diff --git a/v23/syncbase/nosql/invalid_types.go b/v23/syncbase/nosql/invalid_types.go
index 022ad65..0398a3f 100644
--- a/v23/syncbase/nosql/invalid_types.go
+++ b/v23/syncbase/nosql/invalid_types.go
@@ -4,38 +4,38 @@
 
 package nosql
 
-// InvalidStream is a nosql.Stream for which all methods return errors.
-type InvalidStream struct {
+// InvalidScanStream is a nosql.ScanStream for which all methods return errors.
+type InvalidScanStream struct {
 	Error error // returned by all methods
 }
 
 var (
-	_ Stream = (*InvalidStream)(nil)
+	_ ScanStream = (*InvalidScanStream)(nil)
 )
 
 ////////////////////////////////////////////////////////////
-// InvalidStream
+// InvalidScanStream
 
-// Advance implements Stream.Advance.
-func (s *InvalidStream) Advance() bool {
+// Advance implements the Stream interface.
+func (s *InvalidScanStream) Advance() bool {
 	return false
 }
 
-// Key implements Stream.Key.
-func (s *InvalidStream) Key() string {
-	panic(s.Error)
-}
-
-// Value implements Stream.Value.
-func (s *InvalidStream) Value(value interface{}) error {
-	panic(s.Error)
-}
-
-// Err implements Stream.Err.
-func (s *InvalidStream) Err() error {
+// Err implements the Stream interface.
+func (s *InvalidScanStream) Err() error {
 	return s.Error
 }
 
-// Cancel implements Stream.Cancel.
-func (s *InvalidStream) Cancel() {
+// Cancel implements the Stream interface.
+func (s *InvalidScanStream) Cancel() {
+}
+
+// Key implements the ScanStream interface.
+func (s *InvalidScanStream) Key() string {
+	panic(s.Error)
+}
+
+// Value implements the ScanStream interface.
+func (s *InvalidScanStream) Value(value interface{}) error {
+	panic(s.Error)
 }
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index 560704a..99f11eb 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -188,7 +188,7 @@
 	// time of the RPC (or at the time of BeginBatch, if in a batch), and will not
 	// reflect subsequent writes to keys not yet reached by the stream.
 	// See helpers nosql.Prefix(), nosql.Range(), nosql.SingleRow().
-	Scan(ctx *context.T, r RowRange) Stream
+	Scan(ctx *context.T, r RowRange) ScanStream
 
 	// GetPermissions returns an array of (prefix, perms) pairs. The array is
 	// sorted from longest prefix to shortest, so element zero is the one that
@@ -236,15 +236,31 @@
 	Delete(ctx *context.T) error
 }
 
-// Stream is an interface for iterating through a collection of key-value pairs.
+// Stream is an interface for iterating through a collection of elements.
 type Stream interface {
-	// Advance stages an element so the client can retrieve it with Key or Value.
-	// Advance returns true iff there is an element to retrieve. The client must
-	// call Advance before calling Key or Value. The client must call Cancel if it
-	// does not iterate through all elements (i.e. until Advance returns false).
+	// Advance stages an element so the client can retrieve it. Advance returns
+	// true iff there is an element to retrieve. The client must call Advance
+	// before retrieving the element. The client must call Cancel if it does not
+	// iterate through all elements (i.e. until Advance returns false).
 	// Advance may block if an element is not immediately available.
 	Advance() bool
 
+	// Err returns a non-nil error iff the stream encountered any errors. Err does
+	// not block.
+	Err() error
+
+	// Cancel notifies the stream provider that it can stop producing elements.
+	// The client must call Cancel if it does not iterate through all elements
+	// (i.e. until Advance returns false). Cancel is idempotent and can be called
+	// concurrently with a goroutine that is iterating via Advance.
+	// Cancel causes Advance to subsequently return false. Cancel does not block.
+	Cancel()
+}
+
+// ScanStream is an interface for iterating through a collection of key-value pairs.
+type ScanStream interface {
+	Stream
+
 	// Key returns the key of the element that was staged by Advance.
 	// Key may panic if Advance returned false or was not called at all.
 	// Key does not block.
@@ -255,44 +271,17 @@
 	// Value may panic if Advance returned false or was not called at all.
 	// Value does not block.
 	Value(value interface{}) error
-
-	// Err returns a non-nil error iff the stream encountered any errors. Err does
-	// not block.
-	Err() error
-
-	// Cancel notifies the stream provider that it can stop producing elements.
-	// The client must call Cancel if it does not iterate through all elements
-	// (i.e. until Advance returns false). Cancel is idempotent and can be called
-	// concurrently with a goroutine that is iterating via Advance/Key/Value.
-	// Cancel causes Advance to subsequently return false. Cancel does not block.
-	Cancel()
 }
 
 // ResultStream is an interface for iterating through rows resulting from an
 // Exec.
 type ResultStream interface {
-	// Advance stages an result so the client can retrieve it with Result.
-	// Advance returns true iff there is a result to retrieve. The client must
-	// call Advance before calling Result. The client must call Cancel if it
-	// does not iterate through all results (i.e. until Advance returns false).
-	// Advance may block if a result is not immediately available.
-	Advance() bool
+	Stream
 
 	// Result returns the result that was staged by Advance.
 	// Result may panic if Advance returned false or was not called at all.
 	// Result does not block.
 	Result() []*vdl.Value
-
-	// Err returns a non-nil error iff the stream encountered any errors. Err does
-	// not block.
-	Err() error
-
-	// Cancel notifies the stream provider that it can stop producing results.
-	// The client must call Cancel if it does not iterate through all results
-	// (i.e. until Advance returns false). Cancel is idempotent and can be called
-	// concurrently with a goroutine that is iterating via Advance/Result.
-	// Cancel causes Advance to subsequently return false. Cancel does not block.
-	Cancel()
 }
 
 // SyncGroup is the interface for a SyncGroup in the store.
diff --git a/v23/syncbase/nosql/stream.go b/v23/syncbase/nosql/scan_stream.go
similarity index 73%
rename from v23/syncbase/nosql/stream.go
rename to v23/syncbase/nosql/scan_stream.go
index 9cd2870..08a6831 100644
--- a/v23/syncbase/nosql/stream.go
+++ b/v23/syncbase/nosql/scan_stream.go
@@ -13,7 +13,7 @@
 	"v.io/v23/vom"
 )
 
-type stream struct {
+type scanStream struct {
 	mu sync.Mutex
 	// cancel cancels the RPC stream.
 	cancel context.CancelFunc
@@ -28,17 +28,17 @@
 	finished bool
 }
 
-var _ Stream = (*stream)(nil)
+var _ ScanStream = (*scanStream)(nil)
 
-func newStream(cancel context.CancelFunc, call wire.TableScanClientCall) *stream {
-	return &stream{
+func newScanStream(cancel context.CancelFunc, call wire.TableScanClientCall) *scanStream {
+	return &scanStream{
 		cancel: cancel,
 		call:   call,
 	}
 }
 
-// Advance implements Stream.Advance.
-func (s *stream) Advance() bool {
+// Advance implements the Stream interface.
+func (s *scanStream) Advance() bool {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.err != nil || s.finished {
@@ -59,8 +59,8 @@
 	return true
 }
 
-// Key implements Stream.Key.
-func (s *stream) Key() string {
+// Key implements the ScanStream interface.
+func (s *scanStream) Key() string {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.curr == nil {
@@ -69,8 +69,8 @@
 	return s.curr.Key
 }
 
-// Value implements Stream.Value.
-func (s *stream) Value(value interface{}) error {
+// Value implements the ScanStream interface.
+func (s *scanStream) Value(value interface{}) error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.curr == nil {
@@ -79,8 +79,8 @@
 	return vom.Decode(s.curr.Value, value)
 }
 
-// Err implements Stream.Err.
-func (s *stream) Err() error {
+// Err implements the Stream interface.
+func (s *scanStream) Err() error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.err == nil {
@@ -89,9 +89,9 @@
 	return verror.Convert(verror.IDAction{}, nil, s.err)
 }
 
-// Cancel implements Stream.Cancel.
+// Cancel implements the Stream interface.
 // TODO(sadovsky): Make Cancel non-blocking.
-func (s *stream) Cancel() {
+func (s *scanStream) Cancel() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	s.cancel()
diff --git a/v23/syncbase/nosql/table.go b/v23/syncbase/nosql/table.go
index 05de44f..543c2f2 100644
--- a/v23/syncbase/nosql/table.go
+++ b/v23/syncbase/nosql/table.go
@@ -68,13 +68,13 @@
 }
 
 // Scan implements Table.Scan.
-func (t *table) Scan(ctx *context.T, r RowRange) Stream {
+func (t *table) Scan(ctx *context.T, r RowRange) ScanStream {
 	ctx, cancel := context.WithCancel(ctx)
 	call, err := t.c.Scan(ctx, t.dbSchemaVersion, []byte(r.Start()), []byte(r.Limit()))
 	if err != nil {
-		return &InvalidStream{Error: err}
+		return &InvalidScanStream{Error: err}
 	}
-	return newStream(cancel, call)
+	return newScanStream(cancel, call)
 }
 
 // GetPermissions implements Table.GetPermissions.
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 7b9a470..1fe3066 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -25,7 +25,6 @@
 	"v.io/v23/glob"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
-	"v.io/v23/services/watch"
 	"v.io/v23/vdl"
 	"v.io/v23/verror"
 	"v.io/v23/vom"
@@ -273,7 +272,10 @@
 		sender.Send(resultHeaders)
 		for rs.Advance() {
 			result := rs.Result()
-			sender.Send(result)
+			if err := sender.Send(result); err != nil {
+				rs.Cancel()
+				return err
+			}
 		}
 		return rs.Err()
 	}
@@ -321,25 +323,6 @@
 	return data.Perms, util.FormatVersion(data.Version), nil
 }
 
-func (d *databaseReq) WatchGlob(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, req watch.GlobRequest) error {
-	// TODO(rogulenko): Implement.
-	if !d.exists {
-		return verror.New(verror.ErrNoExist, ctx, d.name)
-	}
-	if d.batchId != nil {
-		return wire.NewErrBoundToBatch(ctx)
-	}
-	return verror.NewErrNotImplemented(ctx)
-}
-
-func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
-	// TODO(rogulenko): Implement.
-	if !d.exists {
-		return nil, verror.New(verror.ErrNoExist, ctx, d.name)
-	}
-	return nil, verror.NewErrNotImplemented(ctx)
-}
-
 func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element) error {
 	if !d.exists {
 		return verror.New(verror.ErrNoExist, ctx, d.name)
diff --git a/x/ref/services/syncbase/server/nosql/database_watch.go b/x/ref/services/syncbase/server/nosql/database_watch.go
new file mode 100644
index 0000000..951444d
--- /dev/null
+++ b/x/ref/services/syncbase/server/nosql/database_watch.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	nosqlwire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/services/watch"
+	"v.io/v23/verror"
+)
+
+// WatchGlob implements the nosqlwire.DatabaseWatcher interface.
+func (d *databaseReq) WatchGlob(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, req watch.GlobRequest) error {
+	// TODO(rogulenko): Implement.
+	if !d.exists {
+		return verror.New(verror.ErrNoExist, ctx, d.name)
+	}
+	if d.batchId != nil {
+		return nosqlwire.NewErrBoundToBatch(ctx)
+	}
+	return verror.NewErrNotImplemented(ctx)
+}
+
+// GetResumeMarker implements the nosqlwire.DatabaseWatcher interface.
+func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
+	// TODO(rogulenko): Implement.
+	if !d.exists {
+		return nil, verror.New(verror.ErrNoExist, ctx, d.name)
+	}
+	return nil, verror.NewErrNotImplemented(ctx)
+}
diff --git a/x/ref/services/syncbase/server/nosql/table.go b/x/ref/services/syncbase/server/nosql/table.go
index a3ed4b7..8cf744e 100644
--- a/x/ref/services/syncbase/server/nosql/table.go
+++ b/x/ref/services/syncbase/server/nosql/table.go
@@ -157,7 +157,10 @@
 				it.Cancel()
 				return err
 			}
-			sender.Send(wire.KeyValue{Key: externalKey, Value: value})
+			if err := sender.Send(wire.KeyValue{Key: externalKey, Value: value}); err != nil {
+				it.Cancel()
+				return err
+			}
 		}
 		if err := it.Err(); err != nil {
 			return verror.New(verror.ErrInternal, ctx, err)
diff --git a/x/ref/services/syncbase/server/watchable/test_util.go b/x/ref/services/syncbase/server/watchable/test_util.go
index 7db1755..77aab33 100644
--- a/x/ref/services/syncbase/server/watchable/test_util.go
+++ b/x/ref/services/syncbase/server/watchable/test_util.go
@@ -91,7 +91,7 @@
 }
 
 func newLogEntryReader(st store.Store, seq uint64) *logEntryReader {
-	stream := st.Scan([]byte(getLogEntryKey(seq)), []byte(getLogEntryKey(math.MaxUint64)))
+	stream := st.Scan([]byte(logEntryKey(seq)), []byte(logEntryKey(math.MaxUint64)))
 	return &logEntryReader{stream: stream}
 }
 
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index fb82f72..f557ca7 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -142,7 +142,7 @@
 	timestamp := tx.st.clock.Now().UnixNano()
 	seq := tx.st.seq
 	for i, op := range tx.ops {
-		key := getLogEntryKey(seq)
+		key := logEntryKey(seq)
 		value := &LogEntry{
 			Op:              op,
 			CommitTimestamp: timestamp,
@@ -248,6 +248,8 @@
 // StoreReader interface is required since this is a Get operation.
 func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
 	switch w := st.(type) {
+	case *snapshot:
+		return getAtVersion(w.isn, key, valbuf, version)
 	case *transaction:
 		w.mu.Lock()
 		defer w.mu.Unlock()
diff --git a/x/ref/services/syncbase/server/watchable/util.go b/x/ref/services/syncbase/server/watchable/util.go
index 64f8eeb..8eb606e 100644
--- a/x/ref/services/syncbase/server/watchable/util.go
+++ b/x/ref/services/syncbase/server/watchable/util.go
@@ -15,7 +15,6 @@
 import (
 	"fmt"
 	"math/rand"
-	"strconv"
 	"sync"
 	"time"
 
@@ -81,71 +80,6 @@
 	return tx.Delete(makeVersionKey(key))
 }
 
-func getLogEntryKey(seq uint64) string {
-	// Note: MaxUint64 is 0xffffffffffffffff.
-	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
-	return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
-}
-
-// logEntryExists returns true iff the log contains an entry with the given
-// sequence number.
-func logEntryExists(st store.StoreReader, seq uint64) (bool, error) {
-	_, err := st.Get([]byte(getLogEntryKey(seq)), nil)
-	if err != nil && verror.ErrorID(err) != store.ErrUnknownKey.ID {
-		return false, err
-	}
-	return err == nil, nil
-}
-
-// getNextLogSeq returns the next sequence number to be used for a new commit.
-// NOTE: this function assumes that all sequence numbers in the log represent
-// some range [start, limit] without gaps.
-func getNextLogSeq(st store.StoreReader) (uint64, error) {
-	// Determine initial value for seq.
-	// TODO(sadovsky): Consider using a bigger seq.
-
-	// Find the beginning of the log.
-	it := st.Scan(util.ScanPrefixArgs(util.LogPrefix, ""))
-	if !it.Advance() {
-		return 0, nil
-	}
-	if it.Err() != nil {
-		return 0, it.Err()
-	}
-	key := string(it.Key(nil))
-	parts := split(key)
-	if len(parts) != 2 {
-		panic("wrong number of parts: " + key)
-	}
-	seq, err := strconv.ParseUint(parts[1], 10, 64)
-	if err != nil {
-		panic("failed to parse seq: " + key)
-	}
-	var step uint64 = 1
-	// Suppose the actual value we are looking for is S. First, we estimate the
-	// range for S. We find seq, step: seq < S <= seq + step.
-	for {
-		if ok, err := logEntryExists(st, seq+step); err != nil {
-			return 0, err
-		} else if !ok {
-			break
-		}
-		seq += step
-		step *= 2
-	}
-	// Next we keep the seq < S <= seq + step invariant, reducing step to 1.
-	for step > 1 {
-		step /= 2
-		if ok, err := logEntryExists(st, seq+step); err != nil {
-			return 0, err
-		} else if ok {
-			seq += step
-		}
-	}
-	// Now seq < S <= seq + 1, thus S = seq + 1.
-	return seq + 1, nil
-}
-
 func join(parts ...string) string {
 	return util.JoinKeyParts(parts...)
 }
diff --git a/x/ref/services/syncbase/server/watchable/util_test.go b/x/ref/services/syncbase/server/watchable/util_test.go
index f6875f6..99c440a 100644
--- a/x/ref/services/syncbase/server/watchable/util_test.go
+++ b/x/ref/services/syncbase/server/watchable/util_test.go
@@ -24,6 +24,6 @@
 		if got, want := seq, i; got != want {
 			t.Fatalf("unexpected log seq: got %v, want %v", got, want)
 		}
-		st.Put([]byte(getLogEntryKey(i)), nil)
+		st.Put([]byte(logEntryKey(i)), nil)
 	}
 }
diff --git a/x/ref/services/syncbase/server/watchable/watcher.go b/x/ref/services/syncbase/server/watchable/watcher.go
new file mode 100644
index 0000000..1209a1e
--- /dev/null
+++ b/x/ref/services/syncbase/server/watchable/watcher.go
@@ -0,0 +1,138 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"fmt"
+	"strconv"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/services/watch"
+	"v.io/v23/verror"
+	"v.io/v23/vom"
+	"v.io/x/lib/vlog"
+)
+
+// MakeResumeMarker converts a sequence number to the resume marker.
+func MakeResumeMarker(seq uint64) watch.ResumeMarker {
+	return watch.ResumeMarker(logEntryKey(seq))
+}
+
+func logEntryKey(seq uint64) string {
+	// Note: MaxUint64 is 0xffffffffffffffff.
+	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+	return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
+}
+
+// WatchLogBatch returns a batch of watch log records (a transaction) from
+// the given database and the new resume marker at the end of the batch.
+func WatchLogBatch(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error) {
+	seq, err := parseResumeMarker(string(resumeMarker))
+	if err != nil {
+		return nil, resumeMarker, err
+	}
+	_, scanLimit := util.ScanPrefixArgs(util.LogPrefix, "")
+	scanStart := resumeMarker
+	endOfBatch := false
+
+	// Use the store directly to scan these read-only log entries, no need
+	// to create a snapshot since they are never overwritten.  Read and
+	// buffer a batch before processing it.
+	var logs []*LogEntry
+	stream := st.Scan(scanStart, scanLimit)
+	for stream.Advance() {
+		seq++
+		var logEnt LogEntry
+		if err := vom.Decode(stream.Value(nil), &logEnt); err != nil {
+			return nil, resumeMarker, err
+		}
+
+		logs = append(logs, &logEnt)
+
+		// Stop if this is the end of the batch.
+		if logEnt.Continued == false {
+			endOfBatch = true
+			break
+		}
+	}
+
+	if err = stream.Err(); err != nil {
+		return nil, resumeMarker, err
+	}
+	if !endOfBatch {
+		if len(logs) > 0 {
+			vlog.Fatalf("end of batch not found after %d entries", len(logs))
+		}
+		return nil, resumeMarker, nil
+	}
+	return logs, watch.ResumeMarker(logEntryKey(seq)), nil
+}
+
+func parseResumeMarker(resumeMarker string) (uint64, error) {
+	parts := split(resumeMarker)
+	if len(parts) != 2 {
+		return 0, verror.New(watch.ErrUnknownResumeMarker, nil, resumeMarker)
+	}
+	seq, err := strconv.ParseUint(parts[1], 16, 64)
+	if err != nil {
+		return 0, verror.New(watch.ErrUnknownResumeMarker, nil, resumeMarker)
+	}
+	return seq, nil
+}
+
+// logEntryExists returns true iff the log contains an entry with the given
+// sequence number.
+func logEntryExists(st store.StoreReader, seq uint64) (bool, error) {
+	_, err := st.Get([]byte(logEntryKey(seq)), nil)
+	if err != nil && verror.ErrorID(err) != store.ErrUnknownKey.ID {
+		return false, err
+	}
+	return err == nil, nil
+}
+
+// getNextLogSeq returns the next sequence number to be used for a new commit.
+// NOTE: this function assumes that all sequence numbers in the log represent
+// some range [start, limit] without gaps.
+func getNextLogSeq(st store.StoreReader) (uint64, error) {
+	// Determine initial value for seq.
+	// TODO(sadovsky): Consider using a bigger seq.
+
+	// Find the beginning of the log.
+	it := st.Scan(util.ScanPrefixArgs(util.LogPrefix, ""))
+	if !it.Advance() {
+		return 0, nil
+	}
+	if it.Err() != nil {
+		return 0, it.Err()
+	}
+	seq, err := parseResumeMarker(string(it.Key(nil)))
+	if err != nil {
+		return 0, err
+	}
+	var step uint64 = 1
+	// Suppose the actual value we are looking for is S. First, we estimate the
+	// range for S. We find seq, step: seq < S <= seq + step.
+	for {
+		if ok, err := logEntryExists(st, seq+step); err != nil {
+			return 0, err
+		} else if !ok {
+			break
+		}
+		seq += step
+		step *= 2
+	}
+	// Next we keep the seq < S <= seq + step invariant, reducing step to 1.
+	for step > 1 {
+		step /= 2
+		if ok, err := logEntryExists(st, seq+step); err != nil {
+			return 0, err
+		} else if ok {
+			seq += step
+		}
+	}
+	// Now seq < S <= seq + 1, thus S = seq + 1.
+	return seq + 1, nil
+}
diff --git a/x/ref/services/syncbase/server/watchable/watcher_test.go b/x/ref/services/syncbase/server/watchable/watcher_test.go
new file mode 100644
index 0000000..246bc65
--- /dev/null
+++ b/x/ref/services/syncbase/server/watchable/watcher_test.go
@@ -0,0 +1,93 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"bytes"
+	"fmt"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// TestWatchLogBatch tests fetching a batch of log records.
+func TestWatchLogBatch(t *testing.T) {
+	runTest(t, []string{util.RowPrefix, util.PermsPrefix}, runWatchLogBatchTest)
+}
+
+// runWatchLogBatchTest tests fetching a batch of log records.
+func runWatchLogBatchTest(t *testing.T, st store.Store) {
+	// Create a set of batches to fill the log queue.
+	numTx, numPut := 3, 4
+
+	makeKeyVal := func(batchNum, recNum int) ([]byte, []byte) {
+		key := util.JoinKeyParts(util.RowPrefix, fmt.Sprintf("foo-%d-%d", batchNum, recNum))
+		val := fmt.Sprintf("val-%d-%d", batchNum, recNum)
+		return []byte(key), []byte(val)
+	}
+
+	for i := 0; i < numTx; i++ {
+		tx := st.NewTransaction()
+		for j := 0; j < numPut; j++ {
+			key, val := makeKeyVal(i, j)
+			if err := tx.Put(key, val); err != nil {
+				t.Errorf("cannot put %s (%s): %v", key, val, err)
+			}
+		}
+		tx.Commit()
+	}
+
+	// Fetch the batches and a few more empty fetches and verify them.
+	resmark := MakeResumeMarker(0)
+	var seq uint64
+
+	for i := 0; i < (numTx + 3); i++ {
+		logs, newResmark, err := WatchLogBatch(st, resmark)
+		if err != nil {
+			t.Fatalf("can't get watch log batch: %v", err)
+		}
+		if i < numTx {
+			if len(logs) != numPut {
+				t.Errorf("log fetch (i=%d) wrong log seq: %d instead of %d",
+					i, len(logs), numPut)
+			}
+
+			seq += uint64(len(logs))
+			expResmark := MakeResumeMarker(seq)
+			if !bytes.Equal(newResmark, expResmark) {
+				t.Errorf("log fetch (i=%d) wrong resmark: %s instead of %s",
+					i, newResmark, expResmark)
+			}
+
+			for j, log := range logs {
+				op := log.Op.(OpPut)
+				expKey, expVal := makeKeyVal(i, j)
+				key := op.Value.Key
+				if !bytes.Equal(key, expKey) {
+					t.Errorf("log fetch (i=%d, j=%d) bad key: %s instead of %s",
+						i, j, key, expKey)
+				}
+				tx := st.NewTransaction()
+				var val []byte
+				val, err := GetAtVersion(nil, tx, key, val, op.Value.Version)
+				if err != nil {
+					t.Errorf("log fetch (i=%d, j=%d) cannot GetAtVersion(): %v", i, j, err)
+				}
+				if !bytes.Equal(val, expVal) {
+					t.Errorf("log fetch (i=%d, j=%d) bad value: %s instead of %s",
+						i, j, val, expVal)
+				}
+				tx.Abort()
+			}
+		} else {
+			if logs != nil || !bytes.Equal(newResmark, resmark) {
+				t.Errorf("NOP log fetch (i=%d) had changes: %d logs, resmask %s",
+					i, len(logs), newResmark)
+			}
+		}
+		resmark = newResmark
+	}
+}
diff --git a/x/ref/services/syncbase/vsync/test_util.go b/x/ref/services/syncbase/vsync/test_util.go
index e454574..b06b785 100644
--- a/x/ref/services/syncbase/vsync/test_util.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -162,11 +162,6 @@
 	}
 }
 
-// makeResMark returns the resume marker for a given log entry position.
-func makeResMark(pos int) string {
-	return util.JoinKeyParts(util.LogPrefix, fmt.Sprintf("%016x", pos))
-}
-
 // makeRowKey returns the database row key for a given application key.
 func makeRowKey(key string) string {
 	return util.JoinKeyParts(util.RowPrefix, key)
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 6d80317..04c3bed 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -22,8 +22,8 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
+	"v.io/v23/services/watch"
 	"v.io/v23/verror"
-	"v.io/v23/vom"
 	"v.io/x/lib/vlog"
 )
 
@@ -116,14 +116,18 @@
 			vlog.Errorf("sync: processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
 			return false
 		}
-		resMark = ""
+		resMark = watchable.MakeResumeMarker(0)
 	}
 
 	// Initialize Database sync state if needed.
 	s.initDbSyncStateInMem(ctx, appName, dbName)
 
 	// Get a batch of watch log entries, if any, after this resume marker.
-	if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
+	logs, nextResmark, err := watchable.WatchLogBatch(st, resMark)
+	if err != nil {
+		vlog.Fatalf("sync: processDatabase: %s, %s: cannot get watch log batch: %v", appName, dbName, verror.DebugString(err))
+	}
+	if logs != nil {
 		s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
 		return true
 	}
@@ -134,7 +138,7 @@
 // watchable SyncGroup prefixes, uses the prefixes to filter the batch to the
 // subset of syncable records, and transactionally applies these updates to the
 // sync metadata (DAG & log records) and updates the watch resume marker.
-func (s *syncService) processWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, logs []*watchable.LogEntry, resMark string) {
+func (s *syncService) processWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, logs []*watchable.LogEntry, resMark watch.ResumeMarker) {
 	if len(logs) == 0 {
 		return
 	}
@@ -289,68 +293,6 @@
 	}
 }
 
-// dbLogScanArgs determines the arguments needed to start a new scan from a
-// given resume marker (last log entry read).  An empty resume marker is used
-// to begin the scan from the start of the log.
-func dbLogScanArgs(resMark string) ([]byte, []byte) {
-	start, limit := util.ScanPrefixArgs(util.LogPrefix, "")
-	if resMark != "" {
-		// To start just after the current resume marker, augment it by
-		// appending an extra byte at the end.  Use byte value zero to
-		// use the lowest value possible.  This works because resume
-		// markers have a fixed length and are sorted lexicographically.
-		// By creationg a fake longer resume marker that falls between
-		// real resume markers, the next scan will start right after
-		// where the previous one stopped without missing data.
-		start = append([]byte(resMark), 0)
-	}
-	return start, limit
-}
-
-// getWatchLogBatch returns a batch of watch log records (a transaction) from
-// the given database and the new resume marker at the end of the batch.
-func getWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, resMark string) ([]*watchable.LogEntry, string) {
-	scanStart, scanLimit := dbLogScanArgs(resMark)
-	endOfBatch := false
-	var newResmark string
-
-	// Use the store directly to scan these read-only log entries, no need
-	// to create a snapshot since they are never overwritten.  Read and
-	// buffer a batch before processing it.
-	var logs []*watchable.LogEntry
-	stream := st.Scan(scanStart, scanLimit)
-	for stream.Advance() {
-		logKey := string(stream.Key(nil))
-		var logEnt watchable.LogEntry
-		if vom.Decode(stream.Value(nil), &logEnt) != nil {
-			vlog.Fatalf("sync: getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v",
-				appName, dbName, logKey, stream.Value(nil))
-		}
-
-		logs = append(logs, &logEnt)
-
-		// Stop if this is the end of the batch.
-		if logEnt.Continued == false {
-			newResmark = logKey
-			endOfBatch = true
-			break
-		}
-	}
-
-	if err := stream.Err(); err != nil {
-		vlog.Errorf("sync: getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err)
-		return nil, resMark
-	}
-	if !endOfBatch {
-		if len(logs) > 0 {
-			vlog.Fatalf("sync: getWatchLogBatch: %s, %s: end of batch not found after %d entries",
-				appName, dbName, len(logs))
-		}
-		return nil, resMark
-	}
-	return logs, newResmark
-}
-
 // convertLogRecord converts a store log entry to a sync log record.  It fills
 // the previous object version (parent) by fetching its current DAG head if it
 // has one.  For a delete, it generates a new object version because the store
@@ -479,16 +421,16 @@
 }
 
 // setResMark stores the watcher resume marker for a database.
-func setResMark(ctx *context.T, tx store.Transaction, resMark string) error {
+func setResMark(ctx *context.T, tx store.Transaction, resMark watch.ResumeMarker) error {
 	return util.Put(ctx, tx, resMarkKey(), resMark)
 }
 
 // getResMark retrieves the watcher resume marker for a database.
-func getResMark(ctx *context.T, st store.StoreReader) (string, error) {
-	var resMark string
+func getResMark(ctx *context.T, st store.StoreReader) (watch.ResumeMarker, error) {
+	var resMark watch.ResumeMarker
 	key := resMarkKey()
 	if err := util.Get(ctx, st, key, &resMark); err != nil {
-		return NoVersion, err
+		return nil, err
 	}
 	return resMark, nil
 }
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
index 66aae8f..c730e77 100644
--- a/x/ref/services/syncbase/vsync/watcher_test.go
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -8,7 +8,6 @@
 
 import (
 	"bytes"
-	"fmt"
 	"reflect"
 	"testing"
 	"time"
@@ -26,11 +25,11 @@
 	st := svc.St()
 
 	resmark, err := getResMark(nil, st)
-	if err == nil || resmark != "" {
+	if err == nil || resmark != nil {
 		t.Errorf("found non-existent resume marker: %s, %v", resmark, err)
 	}
 
-	wantResmark := "1234567890"
+	wantResmark := watchable.MakeResumeMarker(1234567890)
 	tx := st.NewTransaction()
 	if err := setResMark(nil, tx, wantResmark); err != nil {
 		t.Errorf("cannot set resume marker: %v", err)
@@ -41,7 +40,7 @@
 	if err != nil {
 		t.Errorf("cannot get new resume marker: %v", err)
 	}
-	if resmark != wantResmark {
+	if !bytes.Equal(resmark, wantResmark) {
 		t.Errorf("invalid new resume: got %s instead of %s", resmark, wantResmark)
 	}
 }
@@ -163,7 +162,7 @@
 	fooxyzKey := makeRowKey("fooxyz")
 
 	// Empty logs does not fail.
-	s.processWatchLogBatch(nil, app, db, st, nil, "")
+	s.processWatchLogBatch(nil, app, db, st, nil, nil)
 
 	// Non-syncable logs.
 	batch := []*watchable.LogEntry{
@@ -171,10 +170,10 @@
 		newLog(barKey, "555", false),
 	}
 
-	resmark := "abcd"
+	resmark := watchable.MakeResumeMarker(1234)
 	s.processWatchLogBatch(nil, app, db, st, batch, resmark)
 
-	if res, err := getResMark(nil, st); err != nil && res != resmark {
+	if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
 	}
 	if ok, err := hasNode(nil, st, fooKey, "123"); err != nil || ok {
@@ -192,10 +191,10 @@
 		newLog(barKey, "222", false),
 	}
 
-	resmark = "cdef"
+	resmark = watchable.MakeResumeMarker(3456)
 	s.processWatchLogBatch(nil, app, db, st, batch, resmark)
 
-	if res, err := getResMark(nil, st); err != nil && res != resmark {
+	if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
 	}
 	if head, err := getHead(nil, st, fooKey); err != nil && head != "333" {
@@ -226,10 +225,10 @@
 		newLog(barKey, "7", false),
 	}
 
-	resmark = "ghij"
+	resmark = watchable.MakeResumeMarker(7890)
 	s.processWatchLogBatch(nil, app, db, st, batch, resmark)
 
-	if res, err := getResMark(nil, st); err != nil && res != resmark {
+	if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
 	}
 	if head, err := getHead(nil, st, fooKey); err != nil && head != "1" {
@@ -269,10 +268,10 @@
 		newLog(barKey, "007", false),
 	}
 
-	resmark = "tuvw"
+	resmark = watchable.MakeResumeMarker(20212223)
 	s.processWatchLogBatch(nil, app, db, st, batch, resmark)
 
-	if res, err := getResMark(nil, st); err != nil && res != resmark {
+	if res, err := getResMark(nil, st); err != nil && !bytes.Equal(res, resmark) {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
 	}
 	// No changes to "foo".
@@ -316,79 +315,3 @@
 		t.Errorf("wrong count of batches: got %d instead of 2", count)
 	}
 }
-
-// TestGetWatchLogBatch tests fetching a batch of log records.
-func TestGetWatchLogBatch(t *testing.T) {
-	svc := createService(t)
-	defer destroyService(t, svc)
-	st := svc.St()
-
-	// Create a set of batches to fill the log queue.
-	numTx, numPut := 3, 4
-
-	makeKeyVal := func(batchNum, recNum int) ([]byte, []byte) {
-		key := util.JoinKeyParts(util.RowPrefix, fmt.Sprintf("foo-%d-%d", batchNum, recNum))
-		val := fmt.Sprintf("val-%d-%d", batchNum, recNum)
-		return []byte(key), []byte(val)
-	}
-
-	for i := 0; i < numTx; i++ {
-		tx := st.NewTransaction()
-		for j := 0; j < numPut; j++ {
-			key, val := makeKeyVal(i, j)
-			if err := tx.Put(key, val); err != nil {
-				t.Errorf("cannot put %s (%s): %v", key, val, err)
-			}
-		}
-		tx.Commit()
-	}
-
-	// Fetch the batches and a few more empty fetches and verify them.
-	app, db := "mockapp", "mockdb"
-	resmark := ""
-	count := 0
-
-	for i := 0; i < (numTx + 3); i++ {
-		logs, newResmark := getWatchLogBatch(nil, app, db, st, resmark)
-		if i < numTx {
-			if len(logs) != numPut {
-				t.Errorf("log fetch (i=%d) wrong log count: %d instead of %d",
-					i, len(logs), numPut)
-			}
-
-			count += len(logs)
-			expResmark := makeResMark(count - 1)
-			if newResmark != expResmark {
-				t.Errorf("log fetch (i=%d) wrong resmark: %s instead of %s",
-					i, newResmark, expResmark)
-			}
-
-			for j, log := range logs {
-				op := log.Op.(watchable.OpPut)
-				expKey, expVal := makeKeyVal(i, j)
-				key := op.Value.Key
-				if !bytes.Equal(key, expKey) {
-					t.Errorf("log fetch (i=%d, j=%d) bad key: %s instead of %s",
-						i, j, key, expKey)
-				}
-				tx := st.NewTransaction()
-				var val []byte
-				val, err := watchable.GetAtVersion(nil, tx, key, val, op.Value.Version)
-				if err != nil {
-					t.Errorf("log fetch (i=%d, j=%d) cannot GetAtVersion(): %v", i, j, err)
-				}
-				if !bytes.Equal(val, expVal) {
-					t.Errorf("log fetch (i=%d, j=%d) bad value: %s instead of %s",
-						i, j, val, expVal)
-				}
-				tx.Abort()
-			}
-		} else {
-			if logs != nil || newResmark != resmark {
-				t.Errorf("NOP log fetch (i=%d) had changes: %d logs, resmask %s",
-					i, len(logs), newResmark)
-			}
-		}
-		resmark = newResmark
-	}
-}