Merge "syncbase API: client watch (VDL only)"
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index 70424b4..6d221d1 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -6,8 +6,8 @@
 package nosql
 
 import (
-  "v.io/v23/security/access"
-  "v.io/v23/services/permissions"
+	"v.io/v23/security/access"
+	"v.io/v23/services/permissions"
 )
 
 // Database represents a collection of Tables. Batches, queries, sync, watch,
@@ -55,6 +55,9 @@
 	// SetPermissions and GetPermissions are included from the Object interface.
 	permissions.Object
 
+	// DatabaseWatcher implements the API to watch for updates in the database.
+	DatabaseWatcher
+
 	// SyncGroupManager implements the API for managing SyncGroups attached to a
 	// Database.
 	SyncGroupManager
@@ -200,6 +203,42 @@
 	// aggressively", "sync once per day".
 }
 
+// Watch allows a client to watch for updates in the database. For each watched
+// request, the client will receive a reliable stream of watch events without
+// re-ordering.
+//
+// The watching is done by starting a streaming RPC. The argument to the RPC
+// contains the ResumeMarker that points to a particular place in the database
+// event log and a set of (table, row prefix) pairs. Updates with rows not
+// covered by the set are excluded from the result stream. The result stream
+// consists of a never-ending sequence of Change messages (until the call fails
+// or is canceled). Each Change message contains an optional continued bit.
+// A sub-sequence of Change messages with continued=true followed by a Change
+// message with continued=false forms a batch. If the client has no access to
+// a row specified in a change, that change is excluded from the result stream.
+//
+// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+// the general behavior.
+//
+// The DatabaseWatcher is designed to be used in the following way:
+// 1) begin a read-only batch
+// 2) read all information your app needs
+// 3) read the ResumeMarker
+// 4) abort the batch
+// 5) start watching changes to the data using the ResumeMarker
+// In this configuration the client doesn't miss any changes.
+type DatabaseWatcher interface {
+	// Watch returns a stream of changes for a given watch request. If this
+	// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
+	// If the provided ResumeMarker is invalid or outdated, Watch() will fail
+	// with "v.io/v23/services/watch".ErrUnknownResumeMarker.
+	Watch(req WatchRequest) stream<_, Change> error {access.Read}
+
+	// GetResumeMarker returns the ResumeMarker that points to the current end
+	// of the current event log. GetResumeMarker() can be called on a batch.
+	GetResumeMarker() (ResumeMarker | error) {access.Read}
+}
+
 error (
 	BoundToBatch() {"en": "bound to batch"}
 	NotBoundToBatch() {"en": "not bound to batch"}
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index d093c61..31c877b 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -50,6 +50,311 @@
 	return verror.New(ErrReadOnlyBatch, ctx)
 }
 
+// DatabaseWatcherClientMethods is the client interface
+// containing DatabaseWatcher methods.
+//
+// Watch allows a client to watch for updates in the database. For each watched
+// request, the client will receive a reliable stream of watch events without
+// re-ordering.
+//
+// The watching is done by starting a streaming RPC. The argument to the RPC
+// contains the ResumeMarker that points to a particular place in the database
+// event log and a set of (table, row prefix) pairs. Updates with rows not
+// covered by the set are excluded from the result stream. The result stream
+// consists of a never-ending sequence of Change messages (until the call fails
+// or is canceled). Each Change message contains an optional continued bit.
+// A sub-sequence of Change messages with continued=true followed by a Change
+// message with continued=false forms a batch. If the client has no access to
+// a row specified in a change, that change is excluded from the result stream.
+//
+// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+// the general behavior.
+//
+// The DatabaseWatcher is designed to be used in the following way:
+// 1) begin a read-only batch
+// 2) read all information your app needs
+// 3) read the ResumeMarker
+// 4) abort the batch
+// 5) start watching changes to the data using the ResumeMarker
+// In this configuration the client doesn't miss any changes.
+type DatabaseWatcherClientMethods interface {
+	// Watch returns a stream of changes for a given watch request. If this
+	// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
+	// If the provided ResumeMarker is invalid or outdated, Watch() will fail
+	// with "v.io/v23/services/watch".ErrUnknownResumeMarker.
+	Watch(ctx *context.T, req WatchRequest, opts ...rpc.CallOpt) (DatabaseWatcherWatchClientCall, error)
+	// GetResumeMarker returns the ResumeMarker that points to the current end
+	// of the current event log. GetResumeMarker() can be called on a batch.
+	GetResumeMarker(*context.T, ...rpc.CallOpt) (ResumeMarker, error)
+}
+
+// DatabaseWatcherClientStub adds universal methods to DatabaseWatcherClientMethods.
+type DatabaseWatcherClientStub interface {
+	DatabaseWatcherClientMethods
+	rpc.UniversalServiceMethods
+}
+
+// DatabaseWatcherClient returns a client stub for DatabaseWatcher.
+func DatabaseWatcherClient(name string) DatabaseWatcherClientStub {
+	return implDatabaseWatcherClientStub{name}
+}
+
+type implDatabaseWatcherClientStub struct {
+	name string
+}
+
+func (c implDatabaseWatcherClientStub) Watch(ctx *context.T, i0 WatchRequest, opts ...rpc.CallOpt) (ocall DatabaseWatcherWatchClientCall, err error) {
+	var call rpc.ClientCall
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Watch", []interface{}{i0}, opts...); err != nil {
+		return
+	}
+	ocall = &implDatabaseWatcherWatchClientCall{ClientCall: call}
+	return
+}
+
+func (c implDatabaseWatcherClientStub) GetResumeMarker(ctx *context.T, opts ...rpc.CallOpt) (o0 ResumeMarker, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "GetResumeMarker", nil, []interface{}{&o0}, opts...)
+	return
+}
+
+// DatabaseWatcherWatchClientStream is the client stream for DatabaseWatcher.Watch.
+type DatabaseWatcherWatchClientStream interface {
+	// RecvStream returns the receiver side of the DatabaseWatcher.Watch client stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() Change
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+}
+
+// DatabaseWatcherWatchClientCall represents the call returned from DatabaseWatcher.Watch.
+type DatabaseWatcherWatchClientCall interface {
+	DatabaseWatcherWatchClientStream
+	// Finish blocks until the server is done, and returns the positional return
+	// values for call.
+	//
+	// Finish returns immediately if the call has been canceled; depending on the
+	// timing the output could either be an error signaling cancelation, or the
+	// valid positional return values from the server.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
+	// be called at most once.
+	Finish() error
+}
+
+type implDatabaseWatcherWatchClientCall struct {
+	rpc.ClientCall
+	valRecv Change
+	errRecv error
+}
+
+func (c *implDatabaseWatcherWatchClientCall) RecvStream() interface {
+	Advance() bool
+	Value() Change
+	Err() error
+} {
+	return implDatabaseWatcherWatchClientCallRecv{c}
+}
+
+type implDatabaseWatcherWatchClientCallRecv struct {
+	c *implDatabaseWatcherWatchClientCall
+}
+
+func (c implDatabaseWatcherWatchClientCallRecv) Advance() bool {
+	c.c.valRecv = Change{}
+	c.c.errRecv = c.c.Recv(&c.c.valRecv)
+	return c.c.errRecv == nil
+}
+func (c implDatabaseWatcherWatchClientCallRecv) Value() Change {
+	return c.c.valRecv
+}
+func (c implDatabaseWatcherWatchClientCallRecv) Err() error {
+	if c.c.errRecv == io.EOF {
+		return nil
+	}
+	return c.c.errRecv
+}
+func (c *implDatabaseWatcherWatchClientCall) Finish() (err error) {
+	err = c.ClientCall.Finish()
+	return
+}
+
+// DatabaseWatcherServerMethods is the interface a server writer
+// implements for DatabaseWatcher.
+//
+// Watch allows a client to watch for updates in the database. For each watched
+// request, the client will receive a reliable stream of watch events without
+// re-ordering.
+//
+// The watching is done by starting a streaming RPC. The argument to the RPC
+// contains the ResumeMarker that points to a particular place in the database
+// event log and a set of (table, row prefix) pairs. Updates with rows not
+// covered by the set are excluded from the result stream. The result stream
+// consists of a never-ending sequence of Change messages (until the call fails
+// or is canceled). Each Change message contains an optional continued bit.
+// A sub-sequence of Change messages with continued=true followed by a Change
+// message with continued=false forms a batch. If the client has no access to
+// a row specified in a change, that change is excluded from the result stream.
+//
+// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+// the general behavior.
+//
+// The DatabaseWatcher is designed to be used in the following way:
+// 1) begin a read-only batch
+// 2) read all information your app needs
+// 3) read the ResumeMarker
+// 4) abort the batch
+// 5) start watching changes to the data using the ResumeMarker
+// In this configuration the client doesn't miss any changes.
+type DatabaseWatcherServerMethods interface {
+	// Watch returns a stream of changes for a given watch request. If this
+	// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
+	// If the provided ResumeMarker is invalid or outdated, Watch() will fail
+	// with "v.io/v23/services/watch".ErrUnknownResumeMarker.
+	Watch(ctx *context.T, call DatabaseWatcherWatchServerCall, req WatchRequest) error
+	// GetResumeMarker returns the ResumeMarker that points to the current end
+	// of the current event log. GetResumeMarker() can be called on a batch.
+	GetResumeMarker(*context.T, rpc.ServerCall) (ResumeMarker, error)
+}
+
+// DatabaseWatcherServerStubMethods is the server interface containing
+// DatabaseWatcher methods, as expected by rpc.Server.
+// The only difference between this interface and DatabaseWatcherServerMethods
+// is the streaming methods.
+type DatabaseWatcherServerStubMethods interface {
+	// Watch returns a stream of changes for a given watch request. If this
+	// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
+	// If the provided ResumeMarker is invalid or outdated, Watch() will fail
+	// with "v.io/v23/services/watch".ErrUnknownResumeMarker.
+	Watch(ctx *context.T, call *DatabaseWatcherWatchServerCallStub, req WatchRequest) error
+	// GetResumeMarker returns the ResumeMarker that points to the current end
+	// of the current event log. GetResumeMarker() can be called on a batch.
+	GetResumeMarker(*context.T, rpc.ServerCall) (ResumeMarker, error)
+}
+
+// DatabaseWatcherServerStub adds universal methods to DatabaseWatcherServerStubMethods.
+type DatabaseWatcherServerStub interface {
+	DatabaseWatcherServerStubMethods
+	// Describe the DatabaseWatcher interfaces.
+	Describe__() []rpc.InterfaceDesc
+}
+
+// DatabaseWatcherServer returns a server stub for DatabaseWatcher.
+// It converts an implementation of DatabaseWatcherServerMethods into
+// an object that may be used by rpc.Server.
+func DatabaseWatcherServer(impl DatabaseWatcherServerMethods) DatabaseWatcherServerStub {
+	stub := implDatabaseWatcherServerStub{
+		impl: impl,
+	}
+	// Initialize GlobState; always check the stub itself first, to handle the
+	// case where the user has the Glob method defined in their VDL source.
+	if gs := rpc.NewGlobState(stub); gs != nil {
+		stub.gs = gs
+	} else if gs := rpc.NewGlobState(impl); gs != nil {
+		stub.gs = gs
+	}
+	return stub
+}
+
+type implDatabaseWatcherServerStub struct {
+	impl DatabaseWatcherServerMethods
+	gs   *rpc.GlobState
+}
+
+func (s implDatabaseWatcherServerStub) Watch(ctx *context.T, call *DatabaseWatcherWatchServerCallStub, i0 WatchRequest) error {
+	return s.impl.Watch(ctx, call, i0)
+}
+
+func (s implDatabaseWatcherServerStub) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (ResumeMarker, error) {
+	return s.impl.GetResumeMarker(ctx, call)
+}
+
+func (s implDatabaseWatcherServerStub) Globber() *rpc.GlobState {
+	return s.gs
+}
+
+func (s implDatabaseWatcherServerStub) Describe__() []rpc.InterfaceDesc {
+	return []rpc.InterfaceDesc{DatabaseWatcherDesc}
+}
+
+// DatabaseWatcherDesc describes the DatabaseWatcher interface.
+var DatabaseWatcherDesc rpc.InterfaceDesc = descDatabaseWatcher
+
+// descDatabaseWatcher hides the desc to keep godoc clean.
+var descDatabaseWatcher = rpc.InterfaceDesc{
+	Name:    "DatabaseWatcher",
+	PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
+	Doc:     "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
+	Methods: []rpc.MethodDesc{
+		{
+			Name: "Watch",
+			Doc:  "// Watch returns a stream of changes for a given watch request. If this\n// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.\n// If the provided ResumeMarker is invalid or outdated, Watch() will fail\n// with \"v.io/v23/services/watch\".ErrUnknownResumeMarker.",
+			InArgs: []rpc.ArgDesc{
+				{"req", ``}, // WatchRequest
+			},
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+		},
+		{
+			Name: "GetResumeMarker",
+			Doc:  "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the current event log. GetResumeMarker() can be called on a batch.",
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // ResumeMarker
+			},
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+		},
+	},
+}
+
+// DatabaseWatcherWatchServerStream is the server stream for DatabaseWatcher.Watch.
+type DatabaseWatcherWatchServerStream interface {
+	// SendStream returns the send side of the DatabaseWatcher.Watch server stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors encountered
+		// while sending.  Blocks if there is no buffer space; will unblock when
+		// buffer space is available.
+		Send(item Change) error
+	}
+}
+
+// DatabaseWatcherWatchServerCall represents the context passed to DatabaseWatcher.Watch.
+type DatabaseWatcherWatchServerCall interface {
+	rpc.ServerCall
+	DatabaseWatcherWatchServerStream
+}
+
+// DatabaseWatcherWatchServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements DatabaseWatcherWatchServerCall.
+type DatabaseWatcherWatchServerCallStub struct {
+	rpc.StreamServerCall
+}
+
+// Init initializes DatabaseWatcherWatchServerCallStub from rpc.StreamServerCall.
+func (s *DatabaseWatcherWatchServerCallStub) Init(call rpc.StreamServerCall) {
+	s.StreamServerCall = call
+}
+
+// SendStream returns the send side of the DatabaseWatcher.Watch server stream.
+func (s *DatabaseWatcherWatchServerCallStub) SendStream() interface {
+	Send(item Change) error
+} {
+	return implDatabaseWatcherWatchServerCallSend{s}
+}
+
+type implDatabaseWatcherWatchServerCallSend struct {
+	s *DatabaseWatcherWatchServerCallStub
+}
+
+func (s implDatabaseWatcherWatchServerCallSend) Send(item Change) error {
+	return s.s.Send(item)
+}
+
 // SyncGroupManagerClientMethods is the client interface
 // containing SyncGroupManager methods.
 //
@@ -459,6 +764,31 @@
 	//    GetPermissions() (perms access.Permissions, version string, err error) {Blue}
 	//  }
 	permissions.ObjectClientMethods
+	// Watch allows a client to watch for updates in the database. For each watched
+	// request, the client will receive a reliable stream of watch events without
+	// re-ordering.
+	//
+	// The watching is done by starting a streaming RPC. The argument to the RPC
+	// contains the ResumeMarker that points to a particular place in the database
+	// event log and a set of (table, row prefix) pairs. Updates with rows not
+	// covered by the set are excluded from the result stream. The result stream
+	// consists of a never-ending sequence of Change messages (until the call fails
+	// or is canceled). Each Change message contains an optional continued bit.
+	// A sub-sequence of Change messages with continued=true followed by a Change
+	// message with continued=false forms a batch. If the client has no access to
+	// a row specified in a change, that change is excluded from the result stream.
+	//
+	// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+	// the general behavior.
+	//
+	// The DatabaseWatcher is designed to be used in the following way:
+	// 1) begin a read-only batch
+	// 2) read all information your app needs
+	// 3) read the ResumeMarker
+	// 4) abort the batch
+	// 5) start watching changes to the data using the ResumeMarker
+	// In this configuration the client doesn't miss any changes.
+	DatabaseWatcherClientMethods
 	// SyncGroupManager is the interface for SyncGroup operations.
 	// TODO(hpucha): Add blessings to create/join and add a refresh method.
 	SyncGroupManagerClientMethods
@@ -501,13 +831,14 @@
 
 // DatabaseClient returns a client stub for Database.
 func DatabaseClient(name string) DatabaseClientStub {
-	return implDatabaseClientStub{name, permissions.ObjectClient(name), SyncGroupManagerClient(name)}
+	return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name)}
 }
 
 type implDatabaseClientStub struct {
 	name string
 
 	permissions.ObjectClientStub
+	DatabaseWatcherClientStub
 	SyncGroupManagerClientStub
 }
 
@@ -672,6 +1003,31 @@
 	//    GetPermissions() (perms access.Permissions, version string, err error) {Blue}
 	//  }
 	permissions.ObjectServerMethods
+	// Watch allows a client to watch for updates in the database. For each watched
+	// request, the client will receive a reliable stream of watch events without
+	// re-ordering.
+	//
+	// The watching is done by starting a streaming RPC. The argument to the RPC
+	// contains the ResumeMarker that points to a particular place in the database
+	// event log and a set of (table, row prefix) pairs. Updates with rows not
+	// covered by the set are excluded from the result stream. The result stream
+	// consists of a never-ending sequence of Change messages (until the call fails
+	// or is canceled). Each Change message contains an optional continued bit.
+	// A sub-sequence of Change messages with continued=true followed by a Change
+	// message with continued=false forms a batch. If the client has no access to
+	// a row specified in a change, that change is excluded from the result stream.
+	//
+	// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+	// the general behavior.
+	//
+	// The DatabaseWatcher is designed to be used in the following way:
+	// 1) begin a read-only batch
+	// 2) read all information your app needs
+	// 3) read the ResumeMarker
+	// 4) abort the batch
+	// 5) start watching changes to the data using the ResumeMarker
+	// In this configuration the client doesn't miss any changes.
+	DatabaseWatcherServerMethods
 	// SyncGroupManager is the interface for SyncGroup operations.
 	// TODO(hpucha): Add blessings to create/join and add a refresh method.
 	SyncGroupManagerServerMethods
@@ -756,6 +1112,31 @@
 	//    GetPermissions() (perms access.Permissions, version string, err error) {Blue}
 	//  }
 	permissions.ObjectServerStubMethods
+	// Watch allows a client to watch for updates in the database. For each watched
+	// request, the client will receive a reliable stream of watch events without
+	// re-ordering.
+	//
+	// The watching is done by starting a streaming RPC. The argument to the RPC
+	// contains the ResumeMarker that points to a particular place in the database
+	// event log and a set of (table, row prefix) pairs. Updates with rows not
+	// covered by the set are excluded from the result stream. The result stream
+	// consists of a never-ending sequence of Change messages (until the call fails
+	// or is canceled). Each Change message contains an optional continued bit.
+	// A sub-sequence of Change messages with continued=true followed by a Change
+	// message with continued=false forms a batch. If the client has no access to
+	// a row specified in a change, that change is excluded from the result stream.
+	//
+	// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+	// the general behavior.
+	//
+	// The DatabaseWatcher is designed to be used in the following way:
+	// 1) begin a read-only batch
+	// 2) read all information your app needs
+	// 3) read the ResumeMarker
+	// 4) abort the batch
+	// 5) start watching changes to the data using the ResumeMarker
+	// In this configuration the client doesn't miss any changes.
+	DatabaseWatcherServerStubMethods
 	// SyncGroupManager is the interface for SyncGroup operations.
 	// TODO(hpucha): Add blessings to create/join and add a refresh method.
 	SyncGroupManagerServerStubMethods
@@ -804,6 +1185,7 @@
 	stub := implDatabaseServerStub{
 		impl:                       impl,
 		ObjectServerStub:           permissions.ObjectServer(impl),
+		DatabaseWatcherServerStub:  DatabaseWatcherServer(impl),
 		SyncGroupManagerServerStub: SyncGroupManagerServer(impl),
 	}
 	// Initialize GlobState; always check the stub itself first, to handle the
@@ -819,6 +1201,7 @@
 type implDatabaseServerStub struct {
 	impl DatabaseServerMethods
 	permissions.ObjectServerStub
+	DatabaseWatcherServerStub
 	SyncGroupManagerServerStub
 	gs *rpc.GlobState
 }
@@ -856,7 +1239,7 @@
 }
 
 func (s implDatabaseServerStub) Describe__() []rpc.InterfaceDesc {
-	return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, SyncGroupManagerDesc}
+	return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, SyncGroupManagerDesc}
 }
 
 // DatabaseDesc describes the Database interface.
@@ -869,6 +1252,7 @@
 	Doc:     "// Database represents a collection of Tables. Batches, queries, sync, watch,\n// etc. all operate at the Database level.\n// Database.Glob operates over Table names.\n//\n// TODO(sadovsky): Add Watch method.",
 	Embeds: []rpc.EmbedDesc{
 		{"Object", "v.io/v23/services/permissions", "// Object provides access control for Vanadium objects.\n//\n// Vanadium services implementing dynamic access control would typically embed\n// this interface and tag additional methods defined by the service with one of\n// Admin, Read, Write, Resolve etc. For example, the VDL definition of the\n// object would be:\n//\n//   package mypackage\n//\n//   import \"v.io/v23/security/access\"\n//   import \"v.io/v23/services/permissions\"\n//\n//   type MyObject interface {\n//     permissions.Object\n//     MyRead() (string, error) {access.Read}\n//     MyWrite(string) error    {access.Write}\n//   }\n//\n// If the set of pre-defined tags is insufficient, services may define their\n// own tag type and annotate all methods with this new type.\n//\n// Instead of embedding this Object interface, define SetPermissions and\n// GetPermissions in their own interface. Authorization policies will typically\n// respect annotations of a single type. For example, the VDL definition of an\n// object would be:\n//\n//  package mypackage\n//\n//  import \"v.io/v23/security/access\"\n//\n//  type MyTag string\n//\n//  const (\n//    Blue = MyTag(\"Blue\")\n//    Red  = MyTag(\"Red\")\n//  )\n//\n//  type MyObject interface {\n//    MyMethod() (string, error) {Blue}\n//\n//    // Allow clients to change access via the access.Object interface:\n//    SetPermissions(perms access.Permissions, version string) error         {Red}\n//    GetPermissions() (perms access.Permissions, version string, err error) {Blue}\n//  }"},
+		{"DatabaseWatcher", "v.io/syncbase/v23/services/syncbase/nosql", "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."},
 		{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
 	},
 	Methods: []rpc.MethodDesc{
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index bc3a32f..ffa95ab 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -73,3 +73,65 @@
 type SyncGroupMemberInfo struct {
 	SyncPriority byte
 }
+
+// ResumeMarker is a pointer in the database event log.
+type ResumeMarker string
+
+// TablePrefixRange describes a prefix range in a table.
+type TablePrefixRange struct {
+	Table  string
+	Prefix string
+}
+
+// WatchRequest specifies which rows should be watched and a starting point
+// in the database event log from which to receive updates.
+type WatchRequest struct {
+	// Ranges specifies the subset of the database for which the client wants
+	// updates.
+	Ranges []TablePrefixRange
+
+	// ResumeMarker is the starting point in the database event log from which
+	// to receive updates.
+	ResumeMarker ResumeMarker
+}
+
+// ChangeType describes the type of the row change: Put or Delete.
+// TODO(rogulenko): Consider adding the Shell type.
+type ChangeType enum {
+	Put
+	Delete
+}
+
+// Change is the new value for a watched entity.
+type Change struct {
+	// Table is the name of the table that contains the changed row.
+	Table string
+
+	// Row is the key of the changed row.
+	Row string
+
+	// ChangeType describes the type of the change. If the ChangeType equals to
+	// Put, then the row exists in the table and the Value contains the new
+	// value for this row. If the state equals to Delete, then the row was
+	// removed from the table.
+	ChangeType ChangeType
+
+	// Value is the new value for the row if the state equals to Put,
+	// otherwise the Value is nil.
+	Value []byte
+
+	// ResumeMarker provides a compact representation of all the messages
+	// that have been received by the caller for the given Watch call.
+	// This marker can be provided in the Request message to allow the caller
+	// to resume the stream watching at a specific point without fetching the
+	// initial state.
+	ResumeMarker ResumeMarker
+
+	// FromSync indicates whether the change came from sync. If FromSync is
+	// false, then the change originated from the local device.
+	FromSync bool
+
+	// If true, this Change is followed by more Changes that are in the
+	// same batch as this Change.
+	Continued bool
+}
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index 08040f4..784ef37 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -9,6 +9,7 @@
 
 import (
 	// VDL system imports
+	"fmt"
 	"v.io/v23/vdl"
 
 	// VDL user imports
@@ -101,10 +102,132 @@
 }) {
 }
 
+// ResumeMarker is a pointer in the database event log.
+type ResumeMarker string
+
+func (ResumeMarker) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ResumeMarker"`
+}) {
+}
+
+// TablePrefixRange describes a prefix range in a table.
+type TablePrefixRange struct {
+	Table  string
+	Prefix string
+}
+
+func (TablePrefixRange) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.TablePrefixRange"`
+}) {
+}
+
+// WatchRequest specifies which rows should be watched and a starting point
+// in the database event log from which to receive updates.
+type WatchRequest struct {
+	// Ranges specifies the subset of the database for which the client wants
+	// updates.
+	Ranges []TablePrefixRange
+	// ResumeMarker is the starting point in the database event log from which
+	// to receive updates.
+	ResumeMarker ResumeMarker
+}
+
+func (WatchRequest) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.WatchRequest"`
+}) {
+}
+
+// ChangeType describes the type of the row change: Put or Delete.
+// TODO(rogulenko): Consider adding the Shell type.
+type ChangeType int
+
+const (
+	ChangeTypePut ChangeType = iota
+	ChangeTypeDelete
+)
+
+// ChangeTypeAll holds all labels for ChangeType.
+var ChangeTypeAll = [...]ChangeType{ChangeTypePut, ChangeTypeDelete}
+
+// ChangeTypeFromString creates a ChangeType from a string label.
+func ChangeTypeFromString(label string) (x ChangeType, err error) {
+	err = x.Set(label)
+	return
+}
+
+// Set assigns label to x.
+func (x *ChangeType) Set(label string) error {
+	switch label {
+	case "Put", "put":
+		*x = ChangeTypePut
+		return nil
+	case "Delete", "delete":
+		*x = ChangeTypeDelete
+		return nil
+	}
+	*x = -1
+	return fmt.Errorf("unknown label %q in nosql.ChangeType", label)
+}
+
+// String returns the string label of x.
+func (x ChangeType) String() string {
+	switch x {
+	case ChangeTypePut:
+		return "Put"
+	case ChangeTypeDelete:
+		return "Delete"
+	}
+	return ""
+}
+
+func (ChangeType) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ChangeType"`
+	Enum struct{ Put, Delete string }
+}) {
+}
+
+// Change is the new value for a watched entity.
+type Change struct {
+	// Table is the name of the table that contains the changed row.
+	Table string
+	// Row is the key of the changed row.
+	Row string
+	// ChangeType describes the type of the change. If the ChangeType equals to
+	// Put, then the row exists in the table and the Value contains the new
+	// value for this row. If the state equals to Delete, then the row was
+	// removed from the table.
+	ChangeType ChangeType
+	// Value is the new value for the row if the state equals to Put,
+	// otherwise the Value is nil.
+	Value []byte
+	// ResumeMarker provides a compact representation of all the messages
+	// that have been received by the caller for the given Watch call.
+	// This marker can be provided in the Request message to allow the caller
+	// to resume the stream watching at a specific point without fetching the
+	// initial state.
+	ResumeMarker ResumeMarker
+	// FromSync indicates whether the change came from sync. If FromSync is
+	// false, then the change originated from the local device.
+	FromSync bool
+	// If true, this Change is followed by more Changes that are in the
+	// same batch as this Change.
+	Continued bool
+}
+
+func (Change) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.Change"`
+}) {
+}
+
 func init() {
 	vdl.Register((*BatchOptions)(nil))
 	vdl.Register((*PrefixPermissions)(nil))
 	vdl.Register((*KeyValue)(nil))
 	vdl.Register((*SyncGroupSpec)(nil))
 	vdl.Register((*SyncGroupMemberInfo)(nil))
+	vdl.Register((*ResumeMarker)(nil))
+	vdl.Register((*TablePrefixRange)(nil))
+	vdl.Register((*WatchRequest)(nil))
+	vdl.Register((*ChangeType)(nil))
+	vdl.Register((*Change)(nil))
 }
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index cb97fb9..2da519e 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -286,6 +286,25 @@
 	return data.Perms, util.FormatVersion(data.Version), nil
 }
 
+func (d *databaseReq) Watch(ctx *context.T, call wire.DatabaseWatcherWatchServerCall, req wire.WatchRequest) error {
+	// TODO(rogulenko): Implement.
+	if !d.exists {
+		return verror.New(verror.ErrNoExist, ctx, d.name)
+	}
+	if d.batchId != nil {
+		return wire.NewErrBoundToBatch(ctx)
+	}
+	return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (wire.ResumeMarker, error) {
+	// TODO(rogulenko): Implement.
+	if !d.exists {
+		return "", verror.New(verror.ErrNoExist, ctx, d.name)
+	}
+	return "", verror.NewErrNotImplemented(ctx)
+}
+
 func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
 	if !d.exists {
 		return nil, verror.New(verror.ErrNoExist, ctx, d.name)