the client watch impl

The client subscribes to store update notifications via a go
channel. The go channel is only used to send a bit 'there is
a watch log update'. Whenever a client sees the notification,
it scans through the watch log for new updates (like the sync
watcher does), filters out unnecessary or inaccessible log
records and sends the result to the client.

Change-Id: I143c1228d3b0808f1910c9b6dc17b19758dcf1de
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index d23c9c4..5444f1e 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -288,10 +288,12 @@
 	// blob upload.
 }
 
-// Watch allows a client to watch for updates in the database. For each watched
-// request, the client will receive a reliable stream of watch events without
-// re-ordering. See watch.GlobWatcher for a detailed explanation of the
-// behavior.
+// DatabaseWatcher allows a client to watch for updates in the database.
+// For each watched request, the client will receive a reliable stream of watch
+// events without re-ordering. See watch.GlobWatcher for a detailed explanation
+// of the behavior.
+// TODO(rogulenko): Currently the only supported watch patterns are
+// 'table/row*'. Consider changing that.
 //
 // The watching is done by starting a streaming RPC. The argument to the RPC
 // contains the ResumeMarker that points to a particular place in the database
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 7318ff2..639877f 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -75,10 +75,12 @@
 // DatabaseWatcherClientMethods is the client interface
 // containing DatabaseWatcher methods.
 //
-// Watch allows a client to watch for updates in the database. For each watched
-// request, the client will receive a reliable stream of watch events without
-// re-ordering. See watch.GlobWatcher for a detailed explanation of the
-// behavior.
+// DatabaseWatcher allows a client to watch for updates in the database.
+// For each watched request, the client will receive a reliable stream of watch
+// events without re-ordering. See watch.GlobWatcher for a detailed explanation
+// of the behavior.
+// TODO(rogulenko): Currently the only supported watch patterns are
+// 'table/row*'. Consider changing that.
 //
 // The watching is done by starting a streaming RPC. The argument to the RPC
 // contains the ResumeMarker that points to a particular place in the database
@@ -129,10 +131,12 @@
 // DatabaseWatcherServerMethods is the interface a server writer
 // implements for DatabaseWatcher.
 //
-// Watch allows a client to watch for updates in the database. For each watched
-// request, the client will receive a reliable stream of watch events without
-// re-ordering. See watch.GlobWatcher for a detailed explanation of the
-// behavior.
+// DatabaseWatcher allows a client to watch for updates in the database.
+// For each watched request, the client will receive a reliable stream of watch
+// events without re-ordering. See watch.GlobWatcher for a detailed explanation
+// of the behavior.
+// TODO(rogulenko): Currently the only supported watch patterns are
+// 'table/row*'. Consider changing that.
 //
 // The watching is done by starting a streaming RPC. The argument to the RPC
 // contains the ResumeMarker that points to a particular place in the database
@@ -221,7 +225,7 @@
 var descDatabaseWatcher = rpc.InterfaceDesc{
 	Name:    "DatabaseWatcher",
 	PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
-	Doc:     "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
+	Doc:     "// DatabaseWatcher allows a client to watch for updates in the database.\n// For each watched request, the client will receive a reliable stream of watch\n// events without re-ordering. See watch.GlobWatcher for a detailed explanation\n// of the behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// 'table/row*'. Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
 	Embeds: []rpc.EmbedDesc{
 		{"GlobWatcher", "v.io/v23/services/watch", "// GlobWatcher allows a client to receive updates for changes to objects\n// that match a pattern.  See the package comments for details."},
 	},
@@ -1495,10 +1499,12 @@
 	//    GetPermissions() (perms access.Permissions, version string, err error) {Blue}
 	//  }
 	permissions.ObjectClientMethods
-	// Watch allows a client to watch for updates in the database. For each watched
-	// request, the client will receive a reliable stream of watch events without
-	// re-ordering. See watch.GlobWatcher for a detailed explanation of the
-	// behavior.
+	// DatabaseWatcher allows a client to watch for updates in the database.
+	// For each watched request, the client will receive a reliable stream of watch
+	// events without re-ordering. See watch.GlobWatcher for a detailed explanation
+	// of the behavior.
+	// TODO(rogulenko): Currently the only supported watch patterns are
+	// 'table/row*'. Consider changing that.
 	//
 	// The watching is done by starting a streaming RPC. The argument to the RPC
 	// contains the ResumeMarker that points to a particular place in the database
@@ -1740,10 +1746,12 @@
 	//    GetPermissions() (perms access.Permissions, version string, err error) {Blue}
 	//  }
 	permissions.ObjectServerMethods
-	// Watch allows a client to watch for updates in the database. For each watched
-	// request, the client will receive a reliable stream of watch events without
-	// re-ordering. See watch.GlobWatcher for a detailed explanation of the
-	// behavior.
+	// DatabaseWatcher allows a client to watch for updates in the database.
+	// For each watched request, the client will receive a reliable stream of watch
+	// events without re-ordering. See watch.GlobWatcher for a detailed explanation
+	// of the behavior.
+	// TODO(rogulenko): Currently the only supported watch patterns are
+	// 'table/row*'. Consider changing that.
 	//
 	// The watching is done by starting a streaming RPC. The argument to the RPC
 	// contains the ResumeMarker that points to a particular place in the database
@@ -1851,10 +1859,12 @@
 	//    GetPermissions() (perms access.Permissions, version string, err error) {Blue}
 	//  }
 	permissions.ObjectServerStubMethods
-	// Watch allows a client to watch for updates in the database. For each watched
-	// request, the client will receive a reliable stream of watch events without
-	// re-ordering. See watch.GlobWatcher for a detailed explanation of the
-	// behavior.
+	// DatabaseWatcher allows a client to watch for updates in the database.
+	// For each watched request, the client will receive a reliable stream of watch
+	// events without re-ordering. See watch.GlobWatcher for a detailed explanation
+	// of the behavior.
+	// TODO(rogulenko): Currently the only supported watch patterns are
+	// 'table/row*'. Consider changing that.
 	//
 	// The watching is done by starting a streaming RPC. The argument to the RPC
 	// contains the ResumeMarker that points to a particular place in the database
@@ -1997,7 +2007,7 @@
 	Doc:     "// Database represents a collection of Tables. Batches, queries, sync, watch,\n// etc. all operate at the Database level.\n// Database.Glob operates over Table names.\n// Param schemaVersion is the version number that the client expects the database\n// to be at. To disable schema version checking, pass -1.\n//\n// TODO(sadovsky): Add Watch method.",
 	Embeds: []rpc.EmbedDesc{
 		{"Object", "v.io/v23/services/permissions", "// Object provides access control for Vanadium objects.\n//\n// Vanadium services implementing dynamic access control would typically embed\n// this interface and tag additional methods defined by the service with one of\n// Admin, Read, Write, Resolve etc. For example, the VDL definition of the\n// object would be:\n//\n//   package mypackage\n//\n//   import \"v.io/v23/security/access\"\n//   import \"v.io/v23/services/permissions\"\n//\n//   type MyObject interface {\n//     permissions.Object\n//     MyRead() (string, error) {access.Read}\n//     MyWrite(string) error    {access.Write}\n//   }\n//\n// If the set of pre-defined tags is insufficient, services may define their\n// own tag type and annotate all methods with this new type.\n//\n// Instead of embedding this Object interface, define SetPermissions and\n// GetPermissions in their own interface. Authorization policies will typically\n// respect annotations of a single type. For example, the VDL definition of an\n// object would be:\n//\n//  package mypackage\n//\n//  import \"v.io/v23/security/access\"\n//\n//  type MyTag string\n//\n//  const (\n//    Blue = MyTag(\"Blue\")\n//    Red  = MyTag(\"Red\")\n//  )\n//\n//  type MyObject interface {\n//    MyMethod() (string, error) {Blue}\n//\n//    // Allow clients to change access via the access.Object interface:\n//    SetPermissions(perms access.Permissions, version string) error         {Red}\n//    GetPermissions() (perms access.Permissions, version string, err error) {Blue}\n//  }"},
-		{"DatabaseWatcher", "v.io/syncbase/v23/services/syncbase/nosql", "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."},
+		{"DatabaseWatcher", "v.io/syncbase/v23/services/syncbase/nosql", "// DatabaseWatcher allows a client to watch for updates in the database.\n// For each watched request, the client will receive a reliable stream of watch\n// events without re-ordering. See watch.GlobWatcher for a detailed explanation\n// of the behavior.\n// TODO(rogulenko): Currently the only supported watch patterns are\n// 'table/row*'. Consider changing that.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."},
 		{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
 		{"BlobManager", "v.io/syncbase/v23/services/syncbase/nosql", "// BlobManager is the interface for blob operations."},
 		{"SchemaManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SchemaManager implements the API for managing schema metadata attached\n// to a Database."},
diff --git a/v23/syncbase/nosql/client_test.go b/v23/syncbase/nosql/client_test.go
index 2171ed2..673b0fd 100644
--- a/v23/syncbase/nosql/client_test.go
+++ b/v23/syncbase/nosql/client_test.go
@@ -7,14 +7,19 @@
 import (
 	"reflect"
 	"testing"
+	"time"
 
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase"
 	"v.io/syncbase/v23/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/nosql/syncql"
 	tu "v.io/syncbase/v23/syncbase/testutil"
+	"v.io/v23/context"
 	"v.io/v23/naming"
+	"v.io/v23/services/watch"
 	"v.io/v23/vdl"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 	_ "v.io/x/ref/runtime/factories/generic"
 )
 
@@ -577,3 +582,215 @@
 		t.Fatalf("Unexpected stream error: %v", err)
 	}
 }
+
+// TestWatchBasic test the basic client watch functionality: no perms,
+// no batches.
+func TestWatchBasic(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+	tb := tu.CreateTable(t, ctx, d, "tb")
+	var resumeMarkers []watch.ResumeMarker
+
+	// Generate the data and resume markers.
+	// Initial state.
+	resumeMarker, err := d.GetResumeMarker(ctx)
+	if err != nil {
+		t.Fatalf("d.GetResumeMarker() failed: %v", err)
+	}
+	resumeMarkers = append(resumeMarkers, resumeMarker)
+	// Put "abc".
+	r := tb.Row("abc")
+	if err := r.Put(ctx, "value"); err != nil {
+		t.Fatalf("r.Put() failed: %v", err)
+	}
+	if resumeMarker, err = d.GetResumeMarker(ctx); err != nil {
+		t.Fatalf("d.GetResumeMarker() failed: %v", err)
+	}
+	resumeMarkers = append(resumeMarkers, resumeMarker)
+	// Delete "abc".
+	if err := r.Delete(ctx); err != nil {
+		t.Fatalf("r.Delete() failed: %v", err)
+	}
+	if resumeMarker, err = d.GetResumeMarker(ctx); err != nil {
+		t.Fatalf("d.GetResumeMarker() failed: %v", err)
+	}
+	resumeMarkers = append(resumeMarkers, resumeMarker)
+	// Put "a".
+	r = tb.Row("a")
+	if err := r.Put(ctx, "value"); err != nil {
+		t.Fatalf("r.Put() failed: %v", err)
+	}
+	if resumeMarker, err = d.GetResumeMarker(ctx); err != nil {
+		t.Fatalf("d.GetResumeMarker() failed: %v", err)
+	}
+	resumeMarkers = append(resumeMarkers, resumeMarker)
+
+	vomValue, _ := vom.Encode("value")
+	allChanges := []nosql.WatchChange{
+		nosql.WatchChange{
+			Table:        "tb",
+			Row:          "abc",
+			ChangeType:   nosql.PutChange,
+			ValueBytes:   vomValue,
+			ResumeMarker: resumeMarkers[1],
+		},
+		nosql.WatchChange{
+			Table:        "tb",
+			Row:          "abc",
+			ChangeType:   nosql.DeleteChange,
+			ResumeMarker: resumeMarkers[2],
+		},
+		nosql.WatchChange{
+			Table:        "tb",
+			Row:          "a",
+			ChangeType:   nosql.PutChange,
+			ValueBytes:   vomValue,
+			ResumeMarker: resumeMarkers[3],
+		},
+	}
+	ctxWithTimeout, _ := context.WithTimeout(ctx, 10*time.Second)
+	wstream, _ := d.Watch(ctxWithTimeout, "tb", "a", resumeMarkers[0])
+	tu.CheckWatch(t, wstream, allChanges)
+	wstream, _ = d.Watch(ctxWithTimeout, "tb", "a", resumeMarkers[1])
+	tu.CheckWatch(t, wstream, allChanges[1:])
+	wstream, _ = d.Watch(ctxWithTimeout, "tb", "a", resumeMarkers[2])
+	tu.CheckWatch(t, wstream, allChanges[2:])
+
+	wstream, _ = d.Watch(ctxWithTimeout, "tb", "abc", resumeMarkers[0])
+	tu.CheckWatch(t, wstream, allChanges[:2])
+	wstream, _ = d.Watch(ctxWithTimeout, "tb", "abc", resumeMarkers[1])
+	tu.CheckWatch(t, wstream, allChanges[1:2])
+}
+
+// TestWatchWithBatchAndPerms test that the client watch correctly handles
+// batches and prefix perms.
+func TestWatchWithBatchAndPerms(t *testing.T) {
+	ctx, clientACtx, sName, rootp, cleanup := tu.SetupOrDieCustom("clientA", "server", nil)
+	defer cleanup()
+	clientBCtx := tu.NewCtx(ctx, rootp, "clientB")
+	a := tu.CreateApp(t, clientACtx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, clientACtx, a, "d")
+	tb := tu.CreateTable(t, clientACtx, d, "tb")
+
+	// Set initial permissions.
+	aAndB := tu.DefaultPerms("root/clientA", "root/clientB")
+	aOnly := tu.DefaultPerms("root/clientA")
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix(""), aAndB); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+	if err := tb.SetPermissions(clientACtx, nosql.Prefix("a"), aOnly); err != nil {
+		t.Fatalf("tb.SetPermissions() failed: %v", err)
+	}
+	// Get the initial resume marker.
+	resumeMarker, err := d.GetResumeMarker(clientACtx)
+	if err != nil {
+		t.Fatalf("d.GetResumeMarker() failed: %v", err)
+	}
+	initMarker := resumeMarker
+	// Do two puts in a batch.
+	if err := nosql.RunInBatch(clientACtx, d, wire.BatchOptions{}, func(b nosql.BatchDatabase) error {
+		tb := b.Table("tb")
+		if err := tb.Put(clientACtx, "a", "value"); err != nil {
+			return err
+		}
+		return tb.Put(clientACtx, "b", "value")
+	}); err != nil {
+		t.Fatalf("RunInBatch failed: %v", err)
+	}
+
+	if resumeMarker, err = d.GetResumeMarker(clientACtx); err != nil {
+		t.Fatalf("d.GetResumeMarker() failed: %v", err)
+	}
+	vomValue, _ := vom.Encode("value")
+	allChanges := []nosql.WatchChange{
+		nosql.WatchChange{
+			Table:        "tb",
+			Row:          "a",
+			ChangeType:   nosql.PutChange,
+			ValueBytes:   vomValue,
+			ResumeMarker: nil,
+			Continued:    true,
+		},
+		nosql.WatchChange{
+			Table:        "tb",
+			Row:          "b",
+			ChangeType:   nosql.PutChange,
+			ValueBytes:   vomValue,
+			ResumeMarker: resumeMarker,
+		},
+	}
+
+	ctxAWithTimeout, _ := context.WithTimeout(clientACtx, 10*time.Second)
+	ctxBWithTimeout, _ := context.WithTimeout(clientBCtx, 10*time.Second)
+	// ClientA should see both changes as one batch.
+	wstream, _ := d.Watch(ctxAWithTimeout, "tb", "", initMarker)
+	tu.CheckWatch(t, wstream, allChanges)
+	// ClientB should see only one change.
+	wstream, _ = d.Watch(ctxBWithTimeout, "tb", "", initMarker)
+	tu.CheckWatch(t, wstream, allChanges[1:])
+}
+
+// TestBlockingWatch tests that the server side of the client watch correctly
+// blocks until new updates to the database arrive.
+func TestBlockingWatch(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+	tb := tu.CreateTable(t, ctx, d, "tb")
+
+	resumeMarker, err := d.GetResumeMarker(ctx)
+	if err != nil {
+		t.Fatalf("d.GetResumeMarker() failed: %v", err)
+	}
+	ctxWithTimeout, _ := context.WithTimeout(ctx, 10*time.Second)
+	wstream, _ := d.Watch(ctxWithTimeout, "tb", "a", resumeMarker)
+	vomValue, _ := vom.Encode("value")
+	for i := 0; i < 10; i++ {
+		// Put "abc".
+		r := tb.Row("abc")
+		if err := r.Put(ctx, "value"); err != nil {
+			t.Fatalf("r.Put() failed: %v", err)
+		}
+		if resumeMarker, err = d.GetResumeMarker(ctx); err != nil {
+			t.Fatalf("d.GetResumeMarker() failed: %v", err)
+		}
+		if !wstream.Advance() {
+			t.Fatalf("wstream.Advance() reached the end: %v", wstream.Err())
+		}
+		want := nosql.WatchChange{
+			Table:        "tb",
+			Row:          "abc",
+			ChangeType:   nosql.PutChange,
+			ValueBytes:   vomValue,
+			ResumeMarker: resumeMarker,
+		}
+		if got := wstream.Change(); !reflect.DeepEqual(got, want) {
+			t.Fatalf("unexpected watch change: got %v, want %v", got, want)
+		}
+	}
+}
+
+// TestBlockedWatchCancel tests that the watch call blocked on the server side
+// can be successfully canceled from the client.
+func TestBlockedWatchCancel(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+
+	resumeMarker, err := d.GetResumeMarker(ctx)
+	if err != nil {
+		t.Fatalf("d.GetResumeMarker() failed: %v", err)
+	}
+	ctxWithTimeout, _ := context.WithTimeout(ctx, 100*time.Millisecond)
+	wstream, _ := d.Watch(ctxWithTimeout, "tb", "a", resumeMarker)
+	if wstream.Advance() {
+		t.Fatalf("wstream advanced")
+	}
+	if got, want := verror.ErrorID(wstream.Err()), verror.ErrTimeout.ID; got != want {
+		t.Fatalf("unexpected wstream error ID: got %v, want %v", got, want)
+	}
+}
diff --git a/v23/syncbase/nosql/database.go b/v23/syncbase/nosql/database.go
index 5e8159d..d48f450 100644
--- a/v23/syncbase/nosql/database.go
+++ b/v23/syncbase/nosql/database.go
@@ -10,6 +10,7 @@
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/security/access"
+	"v.io/v23/services/watch"
 	"v.io/v23/verror"
 	"v.io/x/lib/vlog"
 )
@@ -129,6 +130,24 @@
 	return d.c.GetPermissions(ctx)
 }
 
+// Watch implements the Database interface.
+func (d *database) Watch(ctx *context.T, table, prefix string, resumeMarker watch.ResumeMarker) (WatchStream, error) {
+	ctx, cancel := context.WithCancel(ctx)
+	call, err := d.c.WatchGlob(ctx, watch.GlobRequest{
+		Pattern:      naming.Join(table, prefix+"*"),
+		ResumeMarker: resumeMarker,
+	})
+	if err != nil {
+		return nil, err
+	}
+	return newWatchStream(cancel, call), nil
+}
+
+// GetResumeMarker implements the Database interface.
+func (d *database) GetResumeMarker(ctx *context.T) (watch.ResumeMarker, error) {
+	return d.c.GetResumeMarker(ctx)
+}
+
 // SyncGroup implements Database.SyncGroup.
 func (d *database) SyncGroup(sgName string) SyncGroup {
 	return newSyncGroup(d.fullName, sgName)
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index e26b76d..622f82d 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -10,7 +10,9 @@
 	"v.io/syncbase/v23/syncbase/util"
 	"v.io/v23/context"
 	"v.io/v23/security/access"
+	"v.io/v23/services/watch"
 	"v.io/v23/vdl"
+	"v.io/v23/vom"
 )
 
 // TODO(sadovsky): Document the access control policy for every method where
@@ -98,6 +100,24 @@
 	// interface.
 	util.AccessController
 
+	// Watch allows a client to watch for updates to the database. For each watch
+	// request, the client will receive a reliable stream of watch events without
+	// re-ordering. See watch.GlobWatcher for a detailed explanation of the
+	// behavior.
+	//
+	// This method is designed to be used in the following way:
+	// 1) begin a read-only batch
+	// 2) read all information your app needs
+	// 3) read the ResumeMarker
+	// 4) abort the batch
+	// 5) start watching for changes to the data using the ResumeMarker
+	// In this configuration the client doesn't miss any changes.
+	Watch(ctx *context.T, table, prefix string, resumeMarker watch.ResumeMarker) (WatchStream, error)
+
+	// GetResumeMarker returns the ResumeMarker that points to the current end
+	// of the event log.
+	GetResumeMarker(ctx *context.T) (watch.ResumeMarker, error)
+
 	// SyncGroup returns a handle to the SyncGroup with the given name.
 	SyncGroup(sgName string) SyncGroup
 
@@ -290,6 +310,69 @@
 	Result() []*vdl.Value
 }
 
+// WatchStream is an interface for receiving database updates.
+type WatchStream interface {
+	Stream
+
+	// Change returns the element that was staged by Advance.
+	// Change may panic if Advance returned false or was not called at all.
+	// Change does not block.
+	Change() WatchChange
+}
+
+// ChangeType describes the type of the row change: Put or Delete.
+// TODO(rogulenko): Add types for changes to syncgroups in this database,
+// as well as ACLs. Consider adding the Shell type.
+type ChangeType uint32
+
+const (
+	PutChange ChangeType = iota
+	DeleteChange
+)
+
+// WatchChange is the new value for a watched entity.
+type WatchChange struct {
+	// Table is the name of the table that contains the changed row.
+	Table string
+
+	// Row is the key of the changed row.
+	Row string
+
+	// ChangeType describes the type of the change. If the ChangeType equals to
+	// PutChange, then the row exists in the table and the Value contains the new
+	// value for this row. If the state equals to DeleteChange, then the row was
+	// removed from the table.
+	ChangeType ChangeType
+
+	// ValueBytes is the new VOM-encoded value for the row if the ChangeType is
+	// Put or nil otherwise.
+	ValueBytes []byte
+
+	// ResumeMarker provides a compact representation of all the messages
+	// that have been received by the caller for the given Watch call.
+	// This marker can be provided in the Request message to allow the caller
+	// to resume the stream watching at a specific point without fetching the
+	// initial state.
+	ResumeMarker watch.ResumeMarker
+
+	// FromSync indicates whether the change came from sync. If FromSync is
+	// false, then the change originated from the local device.
+	FromSync bool
+
+	// If true, this WatchChange is followed by more WatchChanges that are in the
+	// same batch as this WatchChange.
+	Continued bool
+}
+
+// Value decodes the new value of the watched element. Panics if the change type
+// is DeleteChange.
+func (c *WatchChange) Value(value interface{}) error {
+	if c.ChangeType == DeleteChange {
+		panic("invalid change type")
+	}
+	return vom.Decode(c.ValueBytes, value)
+}
+
 // SyncGroup is the interface for a SyncGroup in the store.
 type SyncGroup interface {
 	// Create creates a new SyncGroup with the given spec.
diff --git a/v23/syncbase/nosql/watch_stream.go b/v23/syncbase/nosql/watch_stream.go
new file mode 100644
index 0000000..ae58e80
--- /dev/null
+++ b/v23/syncbase/nosql/watch_stream.go
@@ -0,0 +1,124 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	"fmt"
+	"reflect"
+	"sync"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/v23/syncbase/util"
+	"v.io/v23/context"
+	"v.io/v23/services/watch"
+	"v.io/v23/vdl"
+	"v.io/v23/verror"
+)
+
+type watchStream struct {
+	mu sync.Mutex
+	// cancel cancels the RPC stream.
+	cancel context.CancelFunc
+	// call is the RPC stream object.
+	call watch.GlobWatcherWatchGlobClientCall
+	// curr is the currently staged element, or nil if nothing is staged.
+	curr *WatchChange
+	// err is the first error encountered during streaming. It may also be
+	// populated by a call to Cancel.
+	err error
+	// finished records whether we have called call.Finish().
+	finished bool
+}
+
+var _ WatchStream = (*watchStream)(nil)
+
+func newWatchStream(cancel context.CancelFunc, call watch.GlobWatcherWatchGlobClientCall) *watchStream {
+	return &watchStream{
+		cancel: cancel,
+		call:   call,
+	}
+}
+
+// Advance implements Stream interface.
+func (s *watchStream) Advance() bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.err != nil || s.finished {
+		return false
+	}
+	if !s.call.RecvStream().Advance() {
+		if s.err = s.call.RecvStream().Err(); s.err == nil {
+			s.err = s.call.Finish()
+			s.cancel()
+			s.finished = true
+		}
+		return false
+	}
+	watchChange := s.call.RecvStream().Value()
+
+	// Parse the table and the row.
+	table, row, err := util.ParseTableRowPair(nil, watchChange.Name)
+	if err != nil {
+		panic(err)
+	}
+	// Parse the store change.
+	var storeChange wire.StoreChange
+	rtarget, err := vdl.ReflectTarget(reflect.ValueOf(&storeChange))
+	if err != nil {
+		panic(err)
+	}
+	if err := vdl.FromValue(rtarget, watchChange.Value); err != nil {
+		panic(err)
+	}
+	// Parse the state.
+	var changeType ChangeType
+	switch watchChange.State {
+	case watch.Exists:
+		changeType = PutChange
+	case watch.DoesNotExist:
+		changeType = DeleteChange
+	default:
+		panic(fmt.Sprintf("unsupported watch change state: %v", watchChange.State))
+	}
+	s.curr = &WatchChange{
+		Table:        table,
+		Row:          row,
+		ChangeType:   changeType,
+		ValueBytes:   storeChange.Value,
+		ResumeMarker: watchChange.ResumeMarker,
+		FromSync:     storeChange.FromSync,
+		Continued:    watchChange.Continued,
+	}
+	return true
+}
+
+// Change implements WatchStream interface.
+func (s *watchStream) Change() WatchChange {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.curr == nil {
+		panic("nothing staged")
+	}
+	return *s.curr
+}
+
+// Err implements Stream interface.
+func (s *watchStream) Err() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.err == nil {
+		return nil
+	}
+	return verror.Convert(verror.IDAction{}, nil, s.err)
+}
+
+// Cancel implements Stream interface.
+// TODO(rogulenko): Make Cancel non-blocking.
+func (s *watchStream) Cancel() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.cancel()
+	s.err = verror.New(verror.ErrCanceled, nil)
+}
diff --git a/v23/syncbase/testutil/util.go b/v23/syncbase/testutil/util.go
index ce05711..53e16b0 100644
--- a/v23/syncbase/testutil/util.go
+++ b/v23/syncbase/testutil/util.go
@@ -181,6 +181,19 @@
 	}
 }
 
+// CheckWatch checks that the sequence of elements from the watch stream starts
+// with the given slice of watch changes.
+func CheckWatch(t *testing.T, wstream nosql.WatchStream, changes []nosql.WatchChange) {
+	for _, want := range changes {
+		if !wstream.Advance() {
+			Fatalf(t, "wstream.Advance() reached the end: %v", wstream.Err())
+		}
+		if got := wstream.Change(); !reflect.DeepEqual(got, want) {
+			Fatalf(t, "unexpected watch change: got %v, want %v", got, want)
+		}
+	}
+}
+
 type MockSchemaUpgrader struct {
 	CallCount int
 }
diff --git a/v23/syncbase/util/util.go b/v23/syncbase/util/util.go
index 9de2da6..6eeaaaa 100644
--- a/v23/syncbase/util/util.go
+++ b/v23/syncbase/util/util.go
@@ -6,9 +6,12 @@
 
 import (
 	"regexp"
+	"strings"
 
+	wire "v.io/syncbase/v23/services/syncbase"
 	"v.io/v23/context"
 	"v.io/v23/security/access"
+	"v.io/v23/verror"
 )
 
 // TODO(sadovsky): Expand the allowed charset. We should probably switch to a
@@ -21,6 +24,23 @@
 	return nameRegexp.MatchString(s)
 }
 
+// ParseTableRowPair splits the given pattern of the form 'table/row' into
+// the table part and the row part. The row part might be empty.
+func ParseTableRowPair(ctx *context.T, pattern string) (string, string, error) {
+	parts := strings.Split(pattern, "/")
+	if len(parts) != 2 {
+		return "", "", verror.New(verror.ErrBadArg, ctx, pattern)
+	}
+	table, prefix := parts[0], parts[1]
+	if !ValidName(table) {
+		return "", "", verror.New(wire.ErrInvalidName, ctx, table)
+	}
+	if prefix != "" && !ValidName(prefix) {
+		return "", "", verror.New(wire.ErrInvalidName, ctx, prefix)
+	}
+	return table, prefix, nil
+}
+
 // PrefixRangeStart returns the start of the row range for the given prefix.
 func PrefixRangeStart(p string) string {
 	return p
diff --git a/x/ref/services/syncbase/server/nosql/database_watch.go b/x/ref/services/syncbase/server/nosql/database_watch.go
index 951444d..35228e6 100644
--- a/x/ref/services/syncbase/server/nosql/database_watch.go
+++ b/x/ref/services/syncbase/server/nosql/database_watch.go
@@ -5,30 +5,213 @@
 package nosql
 
 import (
-	nosqlwire "v.io/syncbase/v23/services/syncbase/nosql"
+	"bytes"
+	"strings"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	pubutil "v.io/syncbase/v23/syncbase/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/services/watch"
+	"v.io/v23/vdl"
 	"v.io/v23/verror"
 )
 
-// WatchGlob implements the nosqlwire.DatabaseWatcher interface.
+// GetResumeMarker implements the wire.DatabaseWatcher interface.
+func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
+	if !d.exists {
+		return nil, verror.New(verror.ErrNoExist, ctx, d.name)
+	}
+	if d.batchId != nil {
+		return watchable.GetResumeMarker(d.batchReader())
+	} else {
+		return watchable.GetResumeMarker(d.st)
+	}
+}
+
+// WatchGlob implements the wire.DatabaseWatcher interface.
 func (d *databaseReq) WatchGlob(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, req watch.GlobRequest) error {
-	// TODO(rogulenko): Implement.
+	// TODO(rogulenko): Check permissions here and in other methods.
 	if !d.exists {
 		return verror.New(verror.ErrNoExist, ctx, d.name)
 	}
 	if d.batchId != nil {
-		return nosqlwire.NewErrBoundToBatch(ctx)
+		return wire.NewErrBoundToBatch(ctx)
 	}
-	return verror.NewErrNotImplemented(ctx)
+	// Parse the pattern.
+	if !strings.HasSuffix(req.Pattern, "*") {
+		return verror.New(verror.ErrBadArg, ctx, req.Pattern)
+	}
+	table, prefix, err := pubutil.ParseTableRowPair(ctx, strings.TrimSuffix(req.Pattern, "*"))
+	if err != nil {
+		return err
+	}
+	// Get the resume marker and fetch the initial state if necessary.
+	resumeMarker := req.ResumeMarker
+	if bytes.Equal(resumeMarker, []byte("now")) || len(resumeMarker) == 0 {
+		var err error
+		if resumeMarker, err = watchable.GetResumeMarker(d.st); err != nil {
+			return err
+		}
+		if len(req.ResumeMarker) == 0 {
+			// TODO(rogulenko): Fetch the initial state.
+			return verror.NewErrNotImplemented(ctx)
+		}
+	}
+	t := tableReq{
+		name: table,
+		d:    d,
+	}
+	return t.watchUpdates(ctx, call, prefix, resumeMarker)
 }
 
-// GetResumeMarker implements the nosqlwire.DatabaseWatcher interface.
-func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
-	// TODO(rogulenko): Implement.
-	if !d.exists {
-		return nil, verror.New(verror.ErrNoExist, ctx, d.name)
+// watchUpdates waits for database updates and sends them to the client.
+// This function does two steps in a for loop:
+// - scan through the watch log until the end, sending all updates to the client
+// - wait for one of two signals: new updates available or the call is canceled.
+// The 'new updates' signal is sent by a worker goroutine that translates a
+// condition variable signal to a Go channel. The worker goroutine waits on the
+// condition variable for changes. Whenever the state changes, the worker sends
+// a signal through the Go channel.
+func (t *tableReq) watchUpdates(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, prefix string, resumeMarker watch.ResumeMarker) error {
+	// The Go channel to send notifications from the worker to the main
+	// goroutine.
+	hasUpdates := make(chan struct{})
+	// The Go channel to signal the worker to stop. The worker might block
+	// on the condition variable, but we don't want the main goroutine
+	// to wait for the worker to stop, so we create a buffered channel.
+	cancelWorker := make(chan struct{}, 1)
+	defer close(cancelWorker)
+	go func() {
+		waitForChange := watchable.WatchUpdates(t.d.st)
+		var state, newState uint64 = 1, 1
+		for {
+			// Wait until the state changes or the main function returns.
+			for newState == state {
+				select {
+				case <-cancelWorker:
+					return
+				default:
+				}
+				newState = waitForChange(state)
+			}
+			// Update the current state to the new value and sends a signal to
+			// the main goroutine.
+			state = newState
+			if state == 0 {
+				close(hasUpdates)
+				return
+			}
+			// cancelWorker is closed as soons as the main function returns.
+			select {
+			case hasUpdates <- struct{}{}:
+			case <-cancelWorker:
+				return
+			}
+		}
+	}()
+
+	sender := call.SendStream()
+	for {
+		// Drain the log queue.
+		for {
+			logs, nextResumeMarker, err := watchable.ReadBatchFromLog(t.d.st, resumeMarker)
+			if err != nil {
+				return err
+			}
+			if logs == nil {
+				// No new log records available now.
+				break
+			}
+			resumeMarker = nextResumeMarker
+			changes, err := t.processLogBatch(ctx, call, prefix, logs)
+			if err != nil {
+				return err
+			}
+			if changes == nil {
+				// All batch changes are filtered out.
+				continue
+			}
+			changes[len(changes)-1].ResumeMarker = resumeMarker
+			for _, change := range changes {
+				if err := sender.Send(change); err != nil {
+					return err
+				}
+			}
+		}
+		// Wait for new updates or cancel.
+		select {
+		case _, ok := <-hasUpdates:
+			if !ok {
+				return verror.NewErrAborted(ctx)
+			}
+		case <-ctx.Done():
+			return ctx.Err()
+		}
 	}
-	return nil, verror.NewErrNotImplemented(ctx)
+}
+
+// processLogBatch converts []*watchable.LogEntry to []watch.Change, filtering
+// out unnecessary or inaccessible log records.
+func (t *tableReq) processLogBatch(ctx *context.T, call rpc.ServerCall, prefix string, logs []*watchable.LogEntry) ([]watch.Change, error) {
+	sn := t.d.st.NewSnapshot()
+	defer sn.Abort()
+	var changes []watch.Change
+	for _, logEntry := range logs {
+		var opKey string
+		switch op := logEntry.Op.(type) {
+		case watchable.OpPut:
+			opKey = string(op.Value.Key)
+		case watchable.OpDelete:
+			opKey = string(op.Value.Key)
+		default:
+			continue
+		}
+		parts := util.SplitKeyParts(opKey)
+		// TODO(rogulenko): Currently we process only rows, i.e. keys of the form
+		// $row:xxx:yyy. Consider processing other keys.
+		if len(parts) != 3 || parts[0] != util.RowPrefix {
+			continue
+		}
+		table, row := parts[1], parts[2]
+		// Filter out unnecessary rows and rows that we can't access.
+		if table != t.name || !strings.HasPrefix(row, prefix) {
+			continue
+		}
+		if err := t.checkAccess(ctx, call, sn, row); err != nil {
+			if verror.ErrorID(err) != verror.ErrNoAccess.ID {
+				return nil, err
+			}
+			continue
+		}
+		change := watch.Change{
+			Name:      naming.Join(table, row),
+			Continued: true,
+		}
+		switch op := logEntry.Op.(type) {
+		case watchable.OpPut:
+			rowValue, err := watchable.GetAtVersion(ctx, sn, op.Value.Key, nil, op.Value.Version)
+			if err != nil {
+				return nil, err
+			}
+			change.State = watch.Exists
+			change.Value = vdl.ValueOf(wire.StoreChange{
+				Value:    rowValue,
+				FromSync: logEntry.FromSync,
+			})
+		case watchable.OpDelete:
+			change.State = watch.DoesNotExist
+			change.Value = vdl.ValueOf(wire.StoreChange{
+				FromSync: logEntry.FromSync,
+			})
+		}
+		changes = append(changes, change)
+	}
+	if len(changes) > 0 {
+		changes[len(changes)-1].Continued = false
+	}
+	return changes, nil
 }
diff --git a/x/ref/services/syncbase/server/watchable/store.go b/x/ref/services/syncbase/server/watchable/store.go
index 010643d..72eebf2 100644
--- a/x/ref/services/syncbase/server/watchable/store.go
+++ b/x/ref/services/syncbase/server/watchable/store.go
@@ -16,12 +16,6 @@
 // where <key> is the client-specified key.
 package watchable
 
-// TODO(sadovsky): Write unit tests. (As of May 2015 we're still iterating on
-// the design for how to expose a "watch" API from the storage engine, and we
-// don't want to write lots of tests prematurely.)
-// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
-// TODO(sadovsky): Allow clients to subscribe via Go channel.
-
 import (
 	"fmt"
 	"strings"
@@ -51,25 +45,28 @@
 		return nil, err
 	}
 	return &wstore{
-		ist:   st,
-		opts:  opts,
-		seq:   seq,
-		clock: clock.NewVClock(),
+		ist:     st,
+		watcher: newWatcher(),
+		opts:    opts,
+		seq:     seq,
+		clock:   clock.NewVClock(),
 	}, nil
 }
 
 type wstore struct {
-	ist   store.Store
-	opts  *Options
-	mu    sync.Mutex    // held during transaction commits; protects seq
-	seq   uint64        // the next sequence number to be used for a new commit
-	clock *clock.VClock // used to provide write timestamps
+	ist     store.Store
+	watcher *watcher
+	opts    *Options
+	mu      sync.Mutex    // held during transaction commits; protects seq
+	seq     uint64        // the next sequence number to be used for a new commit
+	clock   *clock.VClock // used to provide write timestamps
 }
 
 var _ Store = (*wstore)(nil)
 
 // Close implements the store.Store interface.
 func (st *wstore) Close() error {
+	st.watcher.close()
 	return st.ist.Close()
 }
 
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 304c4d6..0419e7c 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -158,6 +158,7 @@
 		return err
 	}
 	tx.st.seq = seq
+	tx.st.watcher.broadcastUpdates()
 	return nil
 }
 
diff --git a/x/ref/services/syncbase/server/watchable/watcher.go b/x/ref/services/syncbase/server/watchable/watcher.go
index 1209a1e..fc0481a 100644
--- a/x/ref/services/syncbase/server/watchable/watcher.go
+++ b/x/ref/services/syncbase/server/watchable/watcher.go
@@ -7,6 +7,7 @@
 import (
 	"fmt"
 	"strconv"
+	"sync"
 
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
@@ -16,6 +17,79 @@
 	"v.io/x/lib/vlog"
 )
 
+// watcher maintains a state and a condition variable. The watcher sends
+// a broadcast signal every time the state changes. The state is increased
+// by 1 every time the store has new data. Initially the state equals to 1.
+// If the state becomes 0, then the watcher is closed and the state will not
+// be changed later.
+// TODO(rogulenko): Broadcast a signal from time to time to unblock waiting
+// clients.
+type watcher struct {
+	mu    *sync.RWMutex
+	cond  *sync.Cond
+	state uint64
+}
+
+func newWatcher() *watcher {
+	mu := &sync.RWMutex{}
+	return &watcher{
+		mu:    mu,
+		cond:  sync.NewCond(mu.RLocker()),
+		state: 1,
+	}
+}
+
+// close closes the watcher.
+func (w *watcher) close() {
+	w.mu.Lock()
+	w.state = 0
+	w.cond.Broadcast()
+	w.mu.Unlock()
+}
+
+// broadcastUpdates broadcast the update notification to watch clients.
+func (w *watcher) broadcastUpdates() {
+	w.mu.Lock()
+	if w.state != 0 {
+		w.state++
+		w.cond.Broadcast()
+	} else {
+		vlog.Error("broadcastUpdates() called on a closed watcher")
+	}
+	w.mu.Unlock()
+}
+
+// WatchUpdates returns a function that can be used to watch for changes of
+// the database. The store maintains a state (initially 1) that is increased
+// by 1 every time the store has new data. The waitForChange function takes
+// the last returned state and blocks until the state changes, returning the new
+// state. State equal to 0 means the store is closed and no updates will come
+// later. If waitForChange function takes a state different from the current
+// state of the store or the store is closed, the waitForChange function returns
+// immediately. It might happen that the waitForChange function returns
+// a non-zero state equal to the state passed as the argument. This behavior
+// helps to unblock clients if the store doesn't have updates for a long period
+// of time.
+func WatchUpdates(st store.Store) (waitForChange func(state uint64) uint64) {
+	// TODO(rogulenko): Remove dynamic type assertion here and in other places.
+	watcher := st.(*wstore).watcher
+	return func(state uint64) uint64 {
+		watcher.cond.L.Lock()
+		defer watcher.cond.L.Unlock()
+		if watcher.state != 0 && watcher.state == state {
+			watcher.cond.Wait()
+		}
+		return watcher.state
+	}
+}
+
+// GetResumeMarker returns the ResumeMarker that points to the current end
+// of the event log.
+func GetResumeMarker(st store.StoreReader) (watch.ResumeMarker, error) {
+	seq, err := getNextLogSeq(st)
+	return watch.ResumeMarker(logEntryKey(seq)), err
+}
+
 // MakeResumeMarker converts a sequence number to the resume marker.
 func MakeResumeMarker(seq uint64) watch.ResumeMarker {
 	return watch.ResumeMarker(logEntryKey(seq))
@@ -27,9 +101,9 @@
 	return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
 }
 
-// WatchLogBatch returns a batch of watch log records (a transaction) from
+// ReadBatchFromLog returns a batch of watch log records (a transaction) from
 // the given database and the new resume marker at the end of the batch.
-func WatchLogBatch(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error) {
+func ReadBatchFromLog(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error) {
 	seq, err := parseResumeMarker(string(resumeMarker))
 	if err != nil {
 		return nil, resumeMarker, err
diff --git a/x/ref/services/syncbase/server/watchable/watcher_test.go b/x/ref/services/syncbase/server/watchable/watcher_test.go
index 246bc65..c978123 100644
--- a/x/ref/services/syncbase/server/watchable/watcher_test.go
+++ b/x/ref/services/syncbase/server/watchable/watcher_test.go
@@ -45,7 +45,7 @@
 	var seq uint64
 
 	for i := 0; i < (numTx + 3); i++ {
-		logs, newResmark, err := WatchLogBatch(st, resmark)
+		logs, newResmark, err := ReadBatchFromLog(st, resmark)
 		if err != nil {
 			t.Fatalf("can't get watch log batch: %v", err)
 		}
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 04c3bed..ae69dc6 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -123,7 +123,7 @@
 	s.initDbSyncStateInMem(ctx, appName, dbName)
 
 	// Get a batch of watch log entries, if any, after this resume marker.
-	logs, nextResmark, err := watchable.WatchLogBatch(st, resMark)
+	logs, nextResmark, err := watchable.ReadBatchFromLog(st, resMark)
 	if err != nil {
 		vlog.Fatalf("sync: processDatabase: %s, %s: cannot get watch log batch: %v", appName, dbName, verror.DebugString(err))
 	}