syncbase API: client watch (VDL only)
Watch allows a client to watch for updates in the database.
This change contains only the VDL API and implementation stubs.
The Watcher is not embedded directly from the "v.io/v23/services/watch"
package because:
- the store has a more convenient way to get the initial state: grab a
snapshot, do gets and scans and get the ResumeMarker (as a part of the
same snapshot) as a point to start
- we want to have a bit different information in a change, for example,
whether the change came from Sync.
- it is more convenient to use tuples instead of strings for table+row names.
Change-Id: Ib657dc7da2bbf857274d994ffd2d0853b07c587c
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index 70424b4..6d221d1 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -6,8 +6,8 @@
package nosql
import (
- "v.io/v23/security/access"
- "v.io/v23/services/permissions"
+ "v.io/v23/security/access"
+ "v.io/v23/services/permissions"
)
// Database represents a collection of Tables. Batches, queries, sync, watch,
@@ -55,6 +55,9 @@
// SetPermissions and GetPermissions are included from the Object interface.
permissions.Object
+ // DatabaseWatcher implements the API to watch for updates in the database.
+ DatabaseWatcher
+
// SyncGroupManager implements the API for managing SyncGroups attached to a
// Database.
SyncGroupManager
@@ -200,6 +203,42 @@
// aggressively", "sync once per day".
}
+// Watch allows a client to watch for updates in the database. For each watched
+// request, the client will receive a reliable stream of watch events without
+// re-ordering.
+//
+// The watching is done by starting a streaming RPC. The argument to the RPC
+// contains the ResumeMarker that points to a particular place in the database
+// event log and a set of (table, row prefix) pairs. Updates with rows not
+// covered by the set are excluded from the result stream. The result stream
+// consists of a never-ending sequence of Change messages (until the call fails
+// or is canceled). Each Change message contains an optional continued bit.
+// A sub-sequence of Change messages with continued=true followed by a Change
+// message with continued=false forms a batch. If the client has no access to
+// a row specified in a change, that change is excluded from the result stream.
+//
+// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+// the general behavior.
+//
+// The DatabaseWatcher is designed to be used in the following way:
+// 1) begin a read-only batch
+// 2) read all information your app needs
+// 3) read the ResumeMarker
+// 4) abort the batch
+// 5) start watching changes to the data using the ResumeMarker
+// In this configuration the client doesn't miss any changes.
+type DatabaseWatcher interface {
+ // Watch returns a stream of changes for a given watch request. If this
+ // Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
+ // If the provided ResumeMarker is invalid or outdated, Watch() will fail
+ // with "v.io/v23/services/watch".ErrUnknownResumeMarker.
+ Watch(req WatchRequest) stream<_, Change> error {access.Read}
+
+ // GetResumeMarker returns the ResumeMarker that points to the current end
+ // of the current event log. GetResumeMarker() can be called on a batch.
+ GetResumeMarker() (ResumeMarker | error) {access.Read}
+}
+
error (
BoundToBatch() {"en": "bound to batch"}
NotBoundToBatch() {"en": "not bound to batch"}
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index d093c61..31c877b 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -50,6 +50,311 @@
return verror.New(ErrReadOnlyBatch, ctx)
}
+// DatabaseWatcherClientMethods is the client interface
+// containing DatabaseWatcher methods.
+//
+// Watch allows a client to watch for updates in the database. For each watched
+// request, the client will receive a reliable stream of watch events without
+// re-ordering.
+//
+// The watching is done by starting a streaming RPC. The argument to the RPC
+// contains the ResumeMarker that points to a particular place in the database
+// event log and a set of (table, row prefix) pairs. Updates with rows not
+// covered by the set are excluded from the result stream. The result stream
+// consists of a never-ending sequence of Change messages (until the call fails
+// or is canceled). Each Change message contains an optional continued bit.
+// A sub-sequence of Change messages with continued=true followed by a Change
+// message with continued=false forms a batch. If the client has no access to
+// a row specified in a change, that change is excluded from the result stream.
+//
+// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+// the general behavior.
+//
+// The DatabaseWatcher is designed to be used in the following way:
+// 1) begin a read-only batch
+// 2) read all information your app needs
+// 3) read the ResumeMarker
+// 4) abort the batch
+// 5) start watching changes to the data using the ResumeMarker
+// In this configuration the client doesn't miss any changes.
+type DatabaseWatcherClientMethods interface {
+ // Watch returns a stream of changes for a given watch request. If this
+ // Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
+ // If the provided ResumeMarker is invalid or outdated, Watch() will fail
+ // with "v.io/v23/services/watch".ErrUnknownResumeMarker.
+ Watch(ctx *context.T, req WatchRequest, opts ...rpc.CallOpt) (DatabaseWatcherWatchClientCall, error)
+ // GetResumeMarker returns the ResumeMarker that points to the current end
+ // of the current event log. GetResumeMarker() can be called on a batch.
+ GetResumeMarker(*context.T, ...rpc.CallOpt) (ResumeMarker, error)
+}
+
+// DatabaseWatcherClientStub adds universal methods to DatabaseWatcherClientMethods.
+type DatabaseWatcherClientStub interface {
+ DatabaseWatcherClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// DatabaseWatcherClient returns a client stub for DatabaseWatcher.
+func DatabaseWatcherClient(name string) DatabaseWatcherClientStub {
+ return implDatabaseWatcherClientStub{name}
+}
+
+type implDatabaseWatcherClientStub struct {
+ name string
+}
+
+func (c implDatabaseWatcherClientStub) Watch(ctx *context.T, i0 WatchRequest, opts ...rpc.CallOpt) (ocall DatabaseWatcherWatchClientCall, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Watch", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ ocall = &implDatabaseWatcherWatchClientCall{ClientCall: call}
+ return
+}
+
+func (c implDatabaseWatcherClientStub) GetResumeMarker(ctx *context.T, opts ...rpc.CallOpt) (o0 ResumeMarker, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "GetResumeMarker", nil, []interface{}{&o0}, opts...)
+ return
+}
+
+// DatabaseWatcherWatchClientStream is the client stream for DatabaseWatcher.Watch.
+type DatabaseWatcherWatchClientStream interface {
+ // RecvStream returns the receiver side of the DatabaseWatcher.Watch client stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() Change
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+}
+
+// DatabaseWatcherWatchClientCall represents the call returned from DatabaseWatcher.Watch.
+type DatabaseWatcherWatchClientCall interface {
+ DatabaseWatcherWatchClientStream
+ // Finish blocks until the server is done, and returns the positional return
+ // values for call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implDatabaseWatcherWatchClientCall struct {
+ rpc.ClientCall
+ valRecv Change
+ errRecv error
+}
+
+func (c *implDatabaseWatcherWatchClientCall) RecvStream() interface {
+ Advance() bool
+ Value() Change
+ Err() error
+} {
+ return implDatabaseWatcherWatchClientCallRecv{c}
+}
+
+type implDatabaseWatcherWatchClientCallRecv struct {
+ c *implDatabaseWatcherWatchClientCall
+}
+
+func (c implDatabaseWatcherWatchClientCallRecv) Advance() bool {
+ c.c.valRecv = Change{}
+ c.c.errRecv = c.c.Recv(&c.c.valRecv)
+ return c.c.errRecv == nil
+}
+func (c implDatabaseWatcherWatchClientCallRecv) Value() Change {
+ return c.c.valRecv
+}
+func (c implDatabaseWatcherWatchClientCallRecv) Err() error {
+ if c.c.errRecv == io.EOF {
+ return nil
+ }
+ return c.c.errRecv
+}
+func (c *implDatabaseWatcherWatchClientCall) Finish() (err error) {
+ err = c.ClientCall.Finish()
+ return
+}
+
+// DatabaseWatcherServerMethods is the interface a server writer
+// implements for DatabaseWatcher.
+//
+// Watch allows a client to watch for updates in the database. For each watched
+// request, the client will receive a reliable stream of watch events without
+// re-ordering.
+//
+// The watching is done by starting a streaming RPC. The argument to the RPC
+// contains the ResumeMarker that points to a particular place in the database
+// event log and a set of (table, row prefix) pairs. Updates with rows not
+// covered by the set are excluded from the result stream. The result stream
+// consists of a never-ending sequence of Change messages (until the call fails
+// or is canceled). Each Change message contains an optional continued bit.
+// A sub-sequence of Change messages with continued=true followed by a Change
+// message with continued=false forms a batch. If the client has no access to
+// a row specified in a change, that change is excluded from the result stream.
+//
+// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+// the general behavior.
+//
+// The DatabaseWatcher is designed to be used in the following way:
+// 1) begin a read-only batch
+// 2) read all information your app needs
+// 3) read the ResumeMarker
+// 4) abort the batch
+// 5) start watching changes to the data using the ResumeMarker
+// In this configuration the client doesn't miss any changes.
+type DatabaseWatcherServerMethods interface {
+ // Watch returns a stream of changes for a given watch request. If this
+ // Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
+ // If the provided ResumeMarker is invalid or outdated, Watch() will fail
+ // with "v.io/v23/services/watch".ErrUnknownResumeMarker.
+ Watch(ctx *context.T, call DatabaseWatcherWatchServerCall, req WatchRequest) error
+ // GetResumeMarker returns the ResumeMarker that points to the current end
+ // of the current event log. GetResumeMarker() can be called on a batch.
+ GetResumeMarker(*context.T, rpc.ServerCall) (ResumeMarker, error)
+}
+
+// DatabaseWatcherServerStubMethods is the server interface containing
+// DatabaseWatcher methods, as expected by rpc.Server.
+// The only difference between this interface and DatabaseWatcherServerMethods
+// is the streaming methods.
+type DatabaseWatcherServerStubMethods interface {
+ // Watch returns a stream of changes for a given watch request. If this
+ // Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
+ // If the provided ResumeMarker is invalid or outdated, Watch() will fail
+ // with "v.io/v23/services/watch".ErrUnknownResumeMarker.
+ Watch(ctx *context.T, call *DatabaseWatcherWatchServerCallStub, req WatchRequest) error
+ // GetResumeMarker returns the ResumeMarker that points to the current end
+ // of the current event log. GetResumeMarker() can be called on a batch.
+ GetResumeMarker(*context.T, rpc.ServerCall) (ResumeMarker, error)
+}
+
+// DatabaseWatcherServerStub adds universal methods to DatabaseWatcherServerStubMethods.
+type DatabaseWatcherServerStub interface {
+ DatabaseWatcherServerStubMethods
+ // Describe the DatabaseWatcher interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// DatabaseWatcherServer returns a server stub for DatabaseWatcher.
+// It converts an implementation of DatabaseWatcherServerMethods into
+// an object that may be used by rpc.Server.
+func DatabaseWatcherServer(impl DatabaseWatcherServerMethods) DatabaseWatcherServerStub {
+ stub := implDatabaseWatcherServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implDatabaseWatcherServerStub struct {
+ impl DatabaseWatcherServerMethods
+ gs *rpc.GlobState
+}
+
+func (s implDatabaseWatcherServerStub) Watch(ctx *context.T, call *DatabaseWatcherWatchServerCallStub, i0 WatchRequest) error {
+ return s.impl.Watch(ctx, call, i0)
+}
+
+func (s implDatabaseWatcherServerStub) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (ResumeMarker, error) {
+ return s.impl.GetResumeMarker(ctx, call)
+}
+
+func (s implDatabaseWatcherServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implDatabaseWatcherServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{DatabaseWatcherDesc}
+}
+
+// DatabaseWatcherDesc describes the DatabaseWatcher interface.
+var DatabaseWatcherDesc rpc.InterfaceDesc = descDatabaseWatcher
+
+// descDatabaseWatcher hides the desc to keep godoc clean.
+var descDatabaseWatcher = rpc.InterfaceDesc{
+ Name: "DatabaseWatcher",
+ PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
+ Doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "Watch",
+ Doc: "// Watch returns a stream of changes for a given watch request. If this\n// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.\n// If the provided ResumeMarker is invalid or outdated, Watch() will fail\n// with \"v.io/v23/services/watch\".ErrUnknownResumeMarker.",
+ InArgs: []rpc.ArgDesc{
+ {"req", ``}, // WatchRequest
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ {
+ Name: "GetResumeMarker",
+ Doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the current event log. GetResumeMarker() can be called on a batch.",
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // ResumeMarker
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ },
+}
+
+// DatabaseWatcherWatchServerStream is the server stream for DatabaseWatcher.Watch.
+type DatabaseWatcherWatchServerStream interface {
+ // SendStream returns the send side of the DatabaseWatcher.Watch server stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors encountered
+ // while sending. Blocks if there is no buffer space; will unblock when
+ // buffer space is available.
+ Send(item Change) error
+ }
+}
+
+// DatabaseWatcherWatchServerCall represents the context passed to DatabaseWatcher.Watch.
+type DatabaseWatcherWatchServerCall interface {
+ rpc.ServerCall
+ DatabaseWatcherWatchServerStream
+}
+
+// DatabaseWatcherWatchServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements DatabaseWatcherWatchServerCall.
+type DatabaseWatcherWatchServerCallStub struct {
+ rpc.StreamServerCall
+}
+
+// Init initializes DatabaseWatcherWatchServerCallStub from rpc.StreamServerCall.
+func (s *DatabaseWatcherWatchServerCallStub) Init(call rpc.StreamServerCall) {
+ s.StreamServerCall = call
+}
+
+// SendStream returns the send side of the DatabaseWatcher.Watch server stream.
+func (s *DatabaseWatcherWatchServerCallStub) SendStream() interface {
+ Send(item Change) error
+} {
+ return implDatabaseWatcherWatchServerCallSend{s}
+}
+
+type implDatabaseWatcherWatchServerCallSend struct {
+ s *DatabaseWatcherWatchServerCallStub
+}
+
+func (s implDatabaseWatcherWatchServerCallSend) Send(item Change) error {
+ return s.s.Send(item)
+}
+
// SyncGroupManagerClientMethods is the client interface
// containing SyncGroupManager methods.
//
@@ -459,6 +764,31 @@
// GetPermissions() (perms access.Permissions, version string, err error) {Blue}
// }
permissions.ObjectClientMethods
+ // Watch allows a client to watch for updates in the database. For each watched
+ // request, the client will receive a reliable stream of watch events without
+ // re-ordering.
+ //
+ // The watching is done by starting a streaming RPC. The argument to the RPC
+ // contains the ResumeMarker that points to a particular place in the database
+ // event log and a set of (table, row prefix) pairs. Updates with rows not
+ // covered by the set are excluded from the result stream. The result stream
+ // consists of a never-ending sequence of Change messages (until the call fails
+ // or is canceled). Each Change message contains an optional continued bit.
+ // A sub-sequence of Change messages with continued=true followed by a Change
+ // message with continued=false forms a batch. If the client has no access to
+ // a row specified in a change, that change is excluded from the result stream.
+ //
+ // See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+ // the general behavior.
+ //
+ // The DatabaseWatcher is designed to be used in the following way:
+ // 1) begin a read-only batch
+ // 2) read all information your app needs
+ // 3) read the ResumeMarker
+ // 4) abort the batch
+ // 5) start watching changes to the data using the ResumeMarker
+ // In this configuration the client doesn't miss any changes.
+ DatabaseWatcherClientMethods
// SyncGroupManager is the interface for SyncGroup operations.
// TODO(hpucha): Add blessings to create/join and add a refresh method.
SyncGroupManagerClientMethods
@@ -501,13 +831,14 @@
// DatabaseClient returns a client stub for Database.
func DatabaseClient(name string) DatabaseClientStub {
- return implDatabaseClientStub{name, permissions.ObjectClient(name), SyncGroupManagerClient(name)}
+ return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name)}
}
type implDatabaseClientStub struct {
name string
permissions.ObjectClientStub
+ DatabaseWatcherClientStub
SyncGroupManagerClientStub
}
@@ -672,6 +1003,31 @@
// GetPermissions() (perms access.Permissions, version string, err error) {Blue}
// }
permissions.ObjectServerMethods
+ // Watch allows a client to watch for updates in the database. For each watched
+ // request, the client will receive a reliable stream of watch events without
+ // re-ordering.
+ //
+ // The watching is done by starting a streaming RPC. The argument to the RPC
+ // contains the ResumeMarker that points to a particular place in the database
+ // event log and a set of (table, row prefix) pairs. Updates with rows not
+ // covered by the set are excluded from the result stream. The result stream
+ // consists of a never-ending sequence of Change messages (until the call fails
+ // or is canceled). Each Change message contains an optional continued bit.
+ // A sub-sequence of Change messages with continued=true followed by a Change
+ // message with continued=false forms a batch. If the client has no access to
+ // a row specified in a change, that change is excluded from the result stream.
+ //
+ // See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+ // the general behavior.
+ //
+ // The DatabaseWatcher is designed to be used in the following way:
+ // 1) begin a read-only batch
+ // 2) read all information your app needs
+ // 3) read the ResumeMarker
+ // 4) abort the batch
+ // 5) start watching changes to the data using the ResumeMarker
+ // In this configuration the client doesn't miss any changes.
+ DatabaseWatcherServerMethods
// SyncGroupManager is the interface for SyncGroup operations.
// TODO(hpucha): Add blessings to create/join and add a refresh method.
SyncGroupManagerServerMethods
@@ -756,6 +1112,31 @@
// GetPermissions() (perms access.Permissions, version string, err error) {Blue}
// }
permissions.ObjectServerStubMethods
+ // Watch allows a client to watch for updates in the database. For each watched
+ // request, the client will receive a reliable stream of watch events without
+ // re-ordering.
+ //
+ // The watching is done by starting a streaming RPC. The argument to the RPC
+ // contains the ResumeMarker that points to a particular place in the database
+ // event log and a set of (table, row prefix) pairs. Updates with rows not
+ // covered by the set are excluded from the result stream. The result stream
+ // consists of a never-ending sequence of Change messages (until the call fails
+ // or is canceled). Each Change message contains an optional continued bit.
+ // A sub-sequence of Change messages with continued=true followed by a Change
+ // message with continued=false forms a batch. If the client has no access to
+ // a row specified in a change, that change is excluded from the result stream.
+ //
+ // See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
+ // the general behavior.
+ //
+ // The DatabaseWatcher is designed to be used in the following way:
+ // 1) begin a read-only batch
+ // 2) read all information your app needs
+ // 3) read the ResumeMarker
+ // 4) abort the batch
+ // 5) start watching changes to the data using the ResumeMarker
+ // In this configuration the client doesn't miss any changes.
+ DatabaseWatcherServerStubMethods
// SyncGroupManager is the interface for SyncGroup operations.
// TODO(hpucha): Add blessings to create/join and add a refresh method.
SyncGroupManagerServerStubMethods
@@ -804,6 +1185,7 @@
stub := implDatabaseServerStub{
impl: impl,
ObjectServerStub: permissions.ObjectServer(impl),
+ DatabaseWatcherServerStub: DatabaseWatcherServer(impl),
SyncGroupManagerServerStub: SyncGroupManagerServer(impl),
}
// Initialize GlobState; always check the stub itself first, to handle the
@@ -819,6 +1201,7 @@
type implDatabaseServerStub struct {
impl DatabaseServerMethods
permissions.ObjectServerStub
+ DatabaseWatcherServerStub
SyncGroupManagerServerStub
gs *rpc.GlobState
}
@@ -856,7 +1239,7 @@
}
func (s implDatabaseServerStub) Describe__() []rpc.InterfaceDesc {
- return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, SyncGroupManagerDesc}
+ return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, SyncGroupManagerDesc}
}
// DatabaseDesc describes the Database interface.
@@ -869,6 +1252,7 @@
Doc: "// Database represents a collection of Tables. Batches, queries, sync, watch,\n// etc. all operate at the Database level.\n// Database.Glob operates over Table names.\n//\n// TODO(sadovsky): Add Watch method.",
Embeds: []rpc.EmbedDesc{
{"Object", "v.io/v23/services/permissions", "// Object provides access control for Vanadium objects.\n//\n// Vanadium services implementing dynamic access control would typically embed\n// this interface and tag additional methods defined by the service with one of\n// Admin, Read, Write, Resolve etc. For example, the VDL definition of the\n// object would be:\n//\n// package mypackage\n//\n// import \"v.io/v23/security/access\"\n// import \"v.io/v23/services/permissions\"\n//\n// type MyObject interface {\n// permissions.Object\n// MyRead() (string, error) {access.Read}\n// MyWrite(string) error {access.Write}\n// }\n//\n// If the set of pre-defined tags is insufficient, services may define their\n// own tag type and annotate all methods with this new type.\n//\n// Instead of embedding this Object interface, define SetPermissions and\n// GetPermissions in their own interface. Authorization policies will typically\n// respect annotations of a single type. For example, the VDL definition of an\n// object would be:\n//\n// package mypackage\n//\n// import \"v.io/v23/security/access\"\n//\n// type MyTag string\n//\n// const (\n// Blue = MyTag(\"Blue\")\n// Red = MyTag(\"Red\")\n// )\n//\n// type MyObject interface {\n// MyMethod() (string, error) {Blue}\n//\n// // Allow clients to change access via the access.Object interface:\n// SetPermissions(perms access.Permissions, version string) error {Red}\n// GetPermissions() (perms access.Permissions, version string, err error) {Blue}\n// }"},
+ {"DatabaseWatcher", "v.io/syncbase/v23/services/syncbase/nosql", "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."},
{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
},
Methods: []rpc.MethodDesc{
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index bc3a32f..ffa95ab 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -73,3 +73,65 @@
type SyncGroupMemberInfo struct {
SyncPriority byte
}
+
+// ResumeMarker is a pointer in the database event log.
+type ResumeMarker string
+
+// TablePrefixRange describes a prefix range in a table.
+type TablePrefixRange struct {
+ Table string
+ Prefix string
+}
+
+// WatchRequest specifies which rows should be watched and a starting point
+// in the database event log from which to receive updates.
+type WatchRequest struct {
+ // Ranges specifies the subset of the database for which the client wants
+ // updates.
+ Ranges []TablePrefixRange
+
+ // ResumeMarker is the starting point in the database event log from which
+ // to receive updates.
+ ResumeMarker ResumeMarker
+}
+
+// ChangeType describes the type of the row change: Put or Delete.
+// TODO(rogulenko): Consider adding the Shell type.
+type ChangeType enum {
+ Put
+ Delete
+}
+
+// Change is the new value for a watched entity.
+type Change struct {
+ // Table is the name of the table that contains the changed row.
+ Table string
+
+ // Row is the key of the changed row.
+ Row string
+
+ // ChangeType describes the type of the change. If the ChangeType equals to
+ // Put, then the row exists in the table and the Value contains the new
+ // value for this row. If the state equals to Delete, then the row was
+ // removed from the table.
+ ChangeType ChangeType
+
+ // Value is the new value for the row if the state equals to Put,
+ // otherwise the Value is nil.
+ Value []byte
+
+ // ResumeMarker provides a compact representation of all the messages
+ // that have been received by the caller for the given Watch call.
+ // This marker can be provided in the Request message to allow the caller
+ // to resume the stream watching at a specific point without fetching the
+ // initial state.
+ ResumeMarker ResumeMarker
+
+ // FromSync indicates whether the change came from sync. If FromSync is
+ // false, then the change originated from the local device.
+ FromSync bool
+
+ // If true, this Change is followed by more Changes that are in the
+ // same batch as this Change.
+ Continued bool
+}
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index 08040f4..784ef37 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -9,6 +9,7 @@
import (
// VDL system imports
+ "fmt"
"v.io/v23/vdl"
// VDL user imports
@@ -101,10 +102,132 @@
}) {
}
+// ResumeMarker is a pointer in the database event log.
+type ResumeMarker string
+
+func (ResumeMarker) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ResumeMarker"`
+}) {
+}
+
+// TablePrefixRange describes a prefix range in a table.
+type TablePrefixRange struct {
+ Table string
+ Prefix string
+}
+
+func (TablePrefixRange) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.TablePrefixRange"`
+}) {
+}
+
+// WatchRequest specifies which rows should be watched and a starting point
+// in the database event log from which to receive updates.
+type WatchRequest struct {
+ // Ranges specifies the subset of the database for which the client wants
+ // updates.
+ Ranges []TablePrefixRange
+ // ResumeMarker is the starting point in the database event log from which
+ // to receive updates.
+ ResumeMarker ResumeMarker
+}
+
+func (WatchRequest) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.WatchRequest"`
+}) {
+}
+
+// ChangeType describes the type of the row change: Put or Delete.
+// TODO(rogulenko): Consider adding the Shell type.
+type ChangeType int
+
+const (
+ ChangeTypePut ChangeType = iota
+ ChangeTypeDelete
+)
+
+// ChangeTypeAll holds all labels for ChangeType.
+var ChangeTypeAll = [...]ChangeType{ChangeTypePut, ChangeTypeDelete}
+
+// ChangeTypeFromString creates a ChangeType from a string label.
+func ChangeTypeFromString(label string) (x ChangeType, err error) {
+ err = x.Set(label)
+ return
+}
+
+// Set assigns label to x.
+func (x *ChangeType) Set(label string) error {
+ switch label {
+ case "Put", "put":
+ *x = ChangeTypePut
+ return nil
+ case "Delete", "delete":
+ *x = ChangeTypeDelete
+ return nil
+ }
+ *x = -1
+ return fmt.Errorf("unknown label %q in nosql.ChangeType", label)
+}
+
+// String returns the string label of x.
+func (x ChangeType) String() string {
+ switch x {
+ case ChangeTypePut:
+ return "Put"
+ case ChangeTypeDelete:
+ return "Delete"
+ }
+ return ""
+}
+
+func (ChangeType) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ChangeType"`
+ Enum struct{ Put, Delete string }
+}) {
+}
+
+// Change is the new value for a watched entity.
+type Change struct {
+ // Table is the name of the table that contains the changed row.
+ Table string
+ // Row is the key of the changed row.
+ Row string
+ // ChangeType describes the type of the change. If the ChangeType equals to
+ // Put, then the row exists in the table and the Value contains the new
+ // value for this row. If the state equals to Delete, then the row was
+ // removed from the table.
+ ChangeType ChangeType
+ // Value is the new value for the row if the state equals to Put,
+ // otherwise the Value is nil.
+ Value []byte
+ // ResumeMarker provides a compact representation of all the messages
+ // that have been received by the caller for the given Watch call.
+ // This marker can be provided in the Request message to allow the caller
+ // to resume the stream watching at a specific point without fetching the
+ // initial state.
+ ResumeMarker ResumeMarker
+ // FromSync indicates whether the change came from sync. If FromSync is
+ // false, then the change originated from the local device.
+ FromSync bool
+ // If true, this Change is followed by more Changes that are in the
+ // same batch as this Change.
+ Continued bool
+}
+
+func (Change) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.Change"`
+}) {
+}
+
func init() {
vdl.Register((*BatchOptions)(nil))
vdl.Register((*PrefixPermissions)(nil))
vdl.Register((*KeyValue)(nil))
vdl.Register((*SyncGroupSpec)(nil))
vdl.Register((*SyncGroupMemberInfo)(nil))
+ vdl.Register((*ResumeMarker)(nil))
+ vdl.Register((*TablePrefixRange)(nil))
+ vdl.Register((*WatchRequest)(nil))
+ vdl.Register((*ChangeType)(nil))
+ vdl.Register((*Change)(nil))
}
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index cb97fb9..2da519e 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -286,6 +286,25 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
+func (d *databaseReq) Watch(ctx *context.T, call wire.DatabaseWatcherWatchServerCall, req wire.WatchRequest) error {
+ // TODO(rogulenko): Implement.
+ if !d.exists {
+ return verror.New(verror.ErrNoExist, ctx, d.name)
+ }
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (wire.ResumeMarker, error) {
+ // TODO(rogulenko): Implement.
+ if !d.exists {
+ return "", verror.New(verror.ErrNoExist, ctx, d.name)
+ }
+ return "", verror.NewErrNotImplemented(ctx)
+}
+
func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
if !d.exists {
return nil, verror.New(verror.ErrNoExist, ctx, d.name)