syncbase API: client watch, addressing v.io/c/13615 comments
Discussed offline, decided to use the general watch API.
Change-Id: I6ac70ab11f3703188ab15b73e9ef7867a9605d0c
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index b56275b..b17ea31 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -8,6 +8,7 @@
import (
"v.io/v23/security/access"
"v.io/v23/services/permissions"
+ "v.io/v23/services/watch"
)
// Database represents a collection of Tables. Batches, queries, sync, watch,
@@ -282,20 +283,16 @@
// Watch allows a client to watch for updates in the database. For each watched
// request, the client will receive a reliable stream of watch events without
-// re-ordering.
+// re-ordering. See watch.GlobWatcher for a detailed explanation of the
+// behavior.
//
// The watching is done by starting a streaming RPC. The argument to the RPC
// contains the ResumeMarker that points to a particular place in the database
-// event log and a set of (table, row prefix) pairs. Updates with rows not
-// covered by the set are excluded from the result stream. The result stream
-// consists of a never-ending sequence of Change messages (until the call fails
-// or is canceled). Each Change message contains an optional continued bit.
-// A sub-sequence of Change messages with continued=true followed by a Change
-// message with continued=false forms a batch. If the client has no access to
-// a row specified in a change, that change is excluded from the result stream.
-//
-// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
-// the general behavior.
+// event log. The result stream consists of a never-ending sequence of Change
+// messages (until the call fails or is canceled). Each Change contains the
+// Name field in the form "<tableName>/<rowKey>" and the Value field of the
+// StoreChange type. If the client has no access to a row specified in a change,
+// that change is excluded from the result stream.
//
// The DatabaseWatcher is designed to be used in the following way:
// 1) begin a read-only batch
@@ -305,15 +302,11 @@
// 5) start watching changes to the data using the ResumeMarker
// In this configuration the client doesn't miss any changes.
type DatabaseWatcher interface {
- // Watch returns a stream of changes for a given watch request. If this
- // Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
- // If the provided ResumeMarker is invalid or outdated, Watch() will fail
- // with "v.io/v23/services/watch".ErrUnknownResumeMarker.
- Watch(req WatchRequest) stream<_, Change> error {access.Read}
+ watch.GlobWatcher
// GetResumeMarker returns the ResumeMarker that points to the current end
- // of the current event log. GetResumeMarker() can be called on a batch.
- GetResumeMarker() (ResumeMarker | error) {access.Read}
+ // of the event log. GetResumeMarker() can be called on a batch.
+ GetResumeMarker() (watch.ResumeMarker | error) {access.Read}
}
error (
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 447bd7d..f01ee42 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -21,6 +21,7 @@
// VDL user imports
"v.io/v23/security/access"
"v.io/v23/services/permissions"
+ "v.io/v23/services/watch"
)
var (
@@ -55,20 +56,16 @@
//
// Watch allows a client to watch for updates in the database. For each watched
// request, the client will receive a reliable stream of watch events without
-// re-ordering.
+// re-ordering. See watch.GlobWatcher for a detailed explanation of the
+// behavior.
//
// The watching is done by starting a streaming RPC. The argument to the RPC
// contains the ResumeMarker that points to a particular place in the database
-// event log and a set of (table, row prefix) pairs. Updates with rows not
-// covered by the set are excluded from the result stream. The result stream
-// consists of a never-ending sequence of Change messages (until the call fails
-// or is canceled). Each Change message contains an optional continued bit.
-// A sub-sequence of Change messages with continued=true followed by a Change
-// message with continued=false forms a batch. If the client has no access to
-// a row specified in a change, that change is excluded from the result stream.
-//
-// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
-// the general behavior.
+// event log. The result stream consists of a never-ending sequence of Change
+// messages (until the call fails or is canceled). Each Change contains the
+// Name field in the form "<tableName>/<rowKey>" and the Value field of the
+// StoreChange type. If the client has no access to a row specified in a change,
+// that change is excluded from the result stream.
//
// The DatabaseWatcher is designed to be used in the following way:
// 1) begin a read-only batch
@@ -78,14 +75,12 @@
// 5) start watching changes to the data using the ResumeMarker
// In this configuration the client doesn't miss any changes.
type DatabaseWatcherClientMethods interface {
- // Watch returns a stream of changes for a given watch request. If this
- // Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
- // If the provided ResumeMarker is invalid or outdated, Watch() will fail
- // with "v.io/v23/services/watch".ErrUnknownResumeMarker.
- Watch(ctx *context.T, req WatchRequest, opts ...rpc.CallOpt) (DatabaseWatcherWatchClientCall, error)
+ // GlobWatcher allows a client to receive updates for changes to objects
+ // that match a pattern. See the package comments for details.
+ watch.GlobWatcherClientMethods
// GetResumeMarker returns the ResumeMarker that points to the current end
- // of the current event log. GetResumeMarker() can be called on a batch.
- GetResumeMarker(*context.T, ...rpc.CallOpt) (ResumeMarker, error)
+ // of the event log. GetResumeMarker() can be called on a batch.
+ GetResumeMarker(*context.T, ...rpc.CallOpt) (watch.ResumeMarker, error)
}
// DatabaseWatcherClientStub adds universal methods to DatabaseWatcherClientMethods.
@@ -96,115 +91,35 @@
// DatabaseWatcherClient returns a client stub for DatabaseWatcher.
func DatabaseWatcherClient(name string) DatabaseWatcherClientStub {
- return implDatabaseWatcherClientStub{name}
+ return implDatabaseWatcherClientStub{name, watch.GlobWatcherClient(name)}
}
type implDatabaseWatcherClientStub struct {
name string
+
+ watch.GlobWatcherClientStub
}
-func (c implDatabaseWatcherClientStub) Watch(ctx *context.T, i0 WatchRequest, opts ...rpc.CallOpt) (ocall DatabaseWatcherWatchClientCall, err error) {
- var call rpc.ClientCall
- if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Watch", []interface{}{i0}, opts...); err != nil {
- return
- }
- ocall = &implDatabaseWatcherWatchClientCall{ClientCall: call}
- return
-}
-
-func (c implDatabaseWatcherClientStub) GetResumeMarker(ctx *context.T, opts ...rpc.CallOpt) (o0 ResumeMarker, err error) {
+func (c implDatabaseWatcherClientStub) GetResumeMarker(ctx *context.T, opts ...rpc.CallOpt) (o0 watch.ResumeMarker, err error) {
err = v23.GetClient(ctx).Call(ctx, c.name, "GetResumeMarker", nil, []interface{}{&o0}, opts...)
return
}
-// DatabaseWatcherWatchClientStream is the client stream for DatabaseWatcher.Watch.
-type DatabaseWatcherWatchClientStream interface {
- // RecvStream returns the receiver side of the DatabaseWatcher.Watch client stream.
- RecvStream() interface {
- // Advance stages an item so that it may be retrieved via Value. Returns
- // true iff there is an item to retrieve. Advance must be called before
- // Value is called. May block if an item is not available.
- Advance() bool
- // Value returns the item that was staged by Advance. May panic if Advance
- // returned false or was not called. Never blocks.
- Value() Change
- // Err returns any error encountered by Advance. Never blocks.
- Err() error
- }
-}
-
-// DatabaseWatcherWatchClientCall represents the call returned from DatabaseWatcher.Watch.
-type DatabaseWatcherWatchClientCall interface {
- DatabaseWatcherWatchClientStream
- // Finish blocks until the server is done, and returns the positional return
- // values for call.
- //
- // Finish returns immediately if the call has been canceled; depending on the
- // timing the output could either be an error signaling cancelation, or the
- // valid positional return values from the server.
- //
- // Calling Finish is mandatory for releasing stream resources, unless the call
- // has been canceled or any of the other methods return an error. Finish should
- // be called at most once.
- Finish() error
-}
-
-type implDatabaseWatcherWatchClientCall struct {
- rpc.ClientCall
- valRecv Change
- errRecv error
-}
-
-func (c *implDatabaseWatcherWatchClientCall) RecvStream() interface {
- Advance() bool
- Value() Change
- Err() error
-} {
- return implDatabaseWatcherWatchClientCallRecv{c}
-}
-
-type implDatabaseWatcherWatchClientCallRecv struct {
- c *implDatabaseWatcherWatchClientCall
-}
-
-func (c implDatabaseWatcherWatchClientCallRecv) Advance() bool {
- c.c.valRecv = Change{}
- c.c.errRecv = c.c.Recv(&c.c.valRecv)
- return c.c.errRecv == nil
-}
-func (c implDatabaseWatcherWatchClientCallRecv) Value() Change {
- return c.c.valRecv
-}
-func (c implDatabaseWatcherWatchClientCallRecv) Err() error {
- if c.c.errRecv == io.EOF {
- return nil
- }
- return c.c.errRecv
-}
-func (c *implDatabaseWatcherWatchClientCall) Finish() (err error) {
- err = c.ClientCall.Finish()
- return
-}
-
// DatabaseWatcherServerMethods is the interface a server writer
// implements for DatabaseWatcher.
//
// Watch allows a client to watch for updates in the database. For each watched
// request, the client will receive a reliable stream of watch events without
-// re-ordering.
+// re-ordering. See watch.GlobWatcher for a detailed explanation of the
+// behavior.
//
// The watching is done by starting a streaming RPC. The argument to the RPC
// contains the ResumeMarker that points to a particular place in the database
-// event log and a set of (table, row prefix) pairs. Updates with rows not
-// covered by the set are excluded from the result stream. The result stream
-// consists of a never-ending sequence of Change messages (until the call fails
-// or is canceled). Each Change message contains an optional continued bit.
-// A sub-sequence of Change messages with continued=true followed by a Change
-// message with continued=false forms a batch. If the client has no access to
-// a row specified in a change, that change is excluded from the result stream.
-//
-// See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
-// the general behavior.
+// event log. The result stream consists of a never-ending sequence of Change
+// messages (until the call fails or is canceled). Each Change contains the
+// Name field in the form "<tableName>/<rowKey>" and the Value field of the
+// StoreChange type. If the client has no access to a row specified in a change,
+// that change is excluded from the result stream.
//
// The DatabaseWatcher is designed to be used in the following way:
// 1) begin a read-only batch
@@ -214,14 +129,12 @@
// 5) start watching changes to the data using the ResumeMarker
// In this configuration the client doesn't miss any changes.
type DatabaseWatcherServerMethods interface {
- // Watch returns a stream of changes for a given watch request. If this
- // Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
- // If the provided ResumeMarker is invalid or outdated, Watch() will fail
- // with "v.io/v23/services/watch".ErrUnknownResumeMarker.
- Watch(ctx *context.T, call DatabaseWatcherWatchServerCall, req WatchRequest) error
+ // GlobWatcher allows a client to receive updates for changes to objects
+ // that match a pattern. See the package comments for details.
+ watch.GlobWatcherServerMethods
// GetResumeMarker returns the ResumeMarker that points to the current end
- // of the current event log. GetResumeMarker() can be called on a batch.
- GetResumeMarker(*context.T, rpc.ServerCall) (ResumeMarker, error)
+ // of the event log. GetResumeMarker() can be called on a batch.
+ GetResumeMarker(*context.T, rpc.ServerCall) (watch.ResumeMarker, error)
}
// DatabaseWatcherServerStubMethods is the server interface containing
@@ -229,14 +142,12 @@
// The only difference between this interface and DatabaseWatcherServerMethods
// is the streaming methods.
type DatabaseWatcherServerStubMethods interface {
- // Watch returns a stream of changes for a given watch request. If this
- // Database is bound to a batch, Watch() will fail with ErrBoundToBatch.
- // If the provided ResumeMarker is invalid or outdated, Watch() will fail
- // with "v.io/v23/services/watch".ErrUnknownResumeMarker.
- Watch(ctx *context.T, call *DatabaseWatcherWatchServerCallStub, req WatchRequest) error
+ // GlobWatcher allows a client to receive updates for changes to objects
+ // that match a pattern. See the package comments for details.
+ watch.GlobWatcherServerStubMethods
// GetResumeMarker returns the ResumeMarker that points to the current end
- // of the current event log. GetResumeMarker() can be called on a batch.
- GetResumeMarker(*context.T, rpc.ServerCall) (ResumeMarker, error)
+ // of the event log. GetResumeMarker() can be called on a batch.
+ GetResumeMarker(*context.T, rpc.ServerCall) (watch.ResumeMarker, error)
}
// DatabaseWatcherServerStub adds universal methods to DatabaseWatcherServerStubMethods.
@@ -252,6 +163,7 @@
func DatabaseWatcherServer(impl DatabaseWatcherServerMethods) DatabaseWatcherServerStub {
stub := implDatabaseWatcherServerStub{
impl: impl,
+ GlobWatcherServerStub: watch.GlobWatcherServer(impl),
}
// Initialize GlobState; always check the stub itself first, to handle the
// case where the user has the Glob method defined in their VDL source.
@@ -265,14 +177,11 @@
type implDatabaseWatcherServerStub struct {
impl DatabaseWatcherServerMethods
- gs *rpc.GlobState
+ watch.GlobWatcherServerStub
+ gs *rpc.GlobState
}
-func (s implDatabaseWatcherServerStub) Watch(ctx *context.T, call *DatabaseWatcherWatchServerCallStub, i0 WatchRequest) error {
- return s.impl.Watch(ctx, call, i0)
-}
-
-func (s implDatabaseWatcherServerStub) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (ResumeMarker, error) {
+func (s implDatabaseWatcherServerStub) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
return s.impl.GetResumeMarker(ctx, call)
}
@@ -281,7 +190,7 @@
}
func (s implDatabaseWatcherServerStub) Describe__() []rpc.InterfaceDesc {
- return []rpc.InterfaceDesc{DatabaseWatcherDesc}
+ return []rpc.InterfaceDesc{DatabaseWatcherDesc, watch.GlobWatcherDesc}
}
// DatabaseWatcherDesc describes the DatabaseWatcher interface.
@@ -291,70 +200,22 @@
var descDatabaseWatcher = rpc.InterfaceDesc{
Name: "DatabaseWatcher",
PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
- Doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
+ Doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
+ Embeds: []rpc.EmbedDesc{
+ {"GlobWatcher", "v.io/v23/services/watch", "// GlobWatcher allows a client to receive updates for changes to objects\n// that match a pattern. See the package comments for details."},
+ },
Methods: []rpc.MethodDesc{
{
- Name: "Watch",
- Doc: "// Watch returns a stream of changes for a given watch request. If this\n// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.\n// If the provided ResumeMarker is invalid or outdated, Watch() will fail\n// with \"v.io/v23/services/watch\".ErrUnknownResumeMarker.",
- InArgs: []rpc.ArgDesc{
- {"req", ``}, // WatchRequest
- },
- Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
- },
- {
Name: "GetResumeMarker",
- Doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the current event log. GetResumeMarker() can be called on a batch.",
+ Doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the event log. GetResumeMarker() can be called on a batch.",
OutArgs: []rpc.ArgDesc{
- {"", ``}, // ResumeMarker
+ {"", ``}, // watch.ResumeMarker
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
},
}
-// DatabaseWatcherWatchServerStream is the server stream for DatabaseWatcher.Watch.
-type DatabaseWatcherWatchServerStream interface {
- // SendStream returns the send side of the DatabaseWatcher.Watch server stream.
- SendStream() interface {
- // Send places the item onto the output stream. Returns errors encountered
- // while sending. Blocks if there is no buffer space; will unblock when
- // buffer space is available.
- Send(item Change) error
- }
-}
-
-// DatabaseWatcherWatchServerCall represents the context passed to DatabaseWatcher.Watch.
-type DatabaseWatcherWatchServerCall interface {
- rpc.ServerCall
- DatabaseWatcherWatchServerStream
-}
-
-// DatabaseWatcherWatchServerCallStub is a wrapper that converts rpc.StreamServerCall into
-// a typesafe stub that implements DatabaseWatcherWatchServerCall.
-type DatabaseWatcherWatchServerCallStub struct {
- rpc.StreamServerCall
-}
-
-// Init initializes DatabaseWatcherWatchServerCallStub from rpc.StreamServerCall.
-func (s *DatabaseWatcherWatchServerCallStub) Init(call rpc.StreamServerCall) {
- s.StreamServerCall = call
-}
-
-// SendStream returns the send side of the DatabaseWatcher.Watch server stream.
-func (s *DatabaseWatcherWatchServerCallStub) SendStream() interface {
- Send(item Change) error
-} {
- return implDatabaseWatcherWatchServerCallSend{s}
-}
-
-type implDatabaseWatcherWatchServerCallSend struct {
- s *DatabaseWatcherWatchServerCallStub
-}
-
-func (s implDatabaseWatcherWatchServerCallSend) Send(item Change) error {
- return s.s.Send(item)
-}
-
// SyncGroupManagerClientMethods is the client interface
// containing SyncGroupManager methods.
//
@@ -1613,20 +1474,16 @@
permissions.ObjectClientMethods
// Watch allows a client to watch for updates in the database. For each watched
// request, the client will receive a reliable stream of watch events without
- // re-ordering.
+ // re-ordering. See watch.GlobWatcher for a detailed explanation of the
+ // behavior.
//
// The watching is done by starting a streaming RPC. The argument to the RPC
// contains the ResumeMarker that points to a particular place in the database
- // event log and a set of (table, row prefix) pairs. Updates with rows not
- // covered by the set are excluded from the result stream. The result stream
- // consists of a never-ending sequence of Change messages (until the call fails
- // or is canceled). Each Change message contains an optional continued bit.
- // A sub-sequence of Change messages with continued=true followed by a Change
- // message with continued=false forms a batch. If the client has no access to
- // a row specified in a change, that change is excluded from the result stream.
- //
- // See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
- // the general behavior.
+ // event log. The result stream consists of a never-ending sequence of Change
+ // messages (until the call fails or is canceled). Each Change contains the
+ // Name field in the form "<tableName>/<rowKey>" and the Value field of the
+ // StoreChange type. If the client has no access to a row specified in a change,
+ // that change is excluded from the result stream.
//
// The DatabaseWatcher is designed to be used in the following way:
// 1) begin a read-only batch
@@ -1860,20 +1717,16 @@
permissions.ObjectServerMethods
// Watch allows a client to watch for updates in the database. For each watched
// request, the client will receive a reliable stream of watch events without
- // re-ordering.
+ // re-ordering. See watch.GlobWatcher for a detailed explanation of the
+ // behavior.
//
// The watching is done by starting a streaming RPC. The argument to the RPC
// contains the ResumeMarker that points to a particular place in the database
- // event log and a set of (table, row prefix) pairs. Updates with rows not
- // covered by the set are excluded from the result stream. The result stream
- // consists of a never-ending sequence of Change messages (until the call fails
- // or is canceled). Each Change message contains an optional continued bit.
- // A sub-sequence of Change messages with continued=true followed by a Change
- // message with continued=false forms a batch. If the client has no access to
- // a row specified in a change, that change is excluded from the result stream.
- //
- // See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
- // the general behavior.
+ // event log. The result stream consists of a never-ending sequence of Change
+ // messages (until the call fails or is canceled). Each Change contains the
+ // Name field in the form "<tableName>/<rowKey>" and the Value field of the
+ // StoreChange type. If the client has no access to a row specified in a change,
+ // that change is excluded from the result stream.
//
// The DatabaseWatcher is designed to be used in the following way:
// 1) begin a read-only batch
@@ -1975,20 +1828,16 @@
permissions.ObjectServerStubMethods
// Watch allows a client to watch for updates in the database. For each watched
// request, the client will receive a reliable stream of watch events without
- // re-ordering.
+ // re-ordering. See watch.GlobWatcher for a detailed explanation of the
+ // behavior.
//
// The watching is done by starting a streaming RPC. The argument to the RPC
// contains the ResumeMarker that points to a particular place in the database
- // event log and a set of (table, row prefix) pairs. Updates with rows not
- // covered by the set are excluded from the result stream. The result stream
- // consists of a never-ending sequence of Change messages (until the call fails
- // or is canceled). Each Change message contains an optional continued bit.
- // A sub-sequence of Change messages with continued=true followed by a Change
- // message with continued=false forms a batch. If the client has no access to
- // a row specified in a change, that change is excluded from the result stream.
- //
- // See "v.io/v23/services/watch".GlobWatcher for more detailed explanation of
- // the general behavior.
+ // event log. The result stream consists of a never-ending sequence of Change
+ // messages (until the call fails or is canceled). Each Change contains the
+ // Name field in the form "<tableName>/<rowKey>" and the Value field of the
+ // StoreChange type. If the client has no access to a row specified in a change,
+ // that change is excluded from the result stream.
//
// The DatabaseWatcher is designed to be used in the following way:
// 1) begin a read-only batch
@@ -2110,7 +1959,7 @@
}
func (s implDatabaseServerStub) Describe__() []rpc.InterfaceDesc {
- return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc}
+ return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, watch.GlobWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc}
}
// DatabaseDesc describes the Database interface.
@@ -2123,7 +1972,7 @@
Doc: "// Database represents a collection of Tables. Batches, queries, sync, watch,\n// etc. all operate at the Database level.\n// Database.Glob operates over Table names.\n//\n// TODO(sadovsky): Add Watch method.",
Embeds: []rpc.EmbedDesc{
{"Object", "v.io/v23/services/permissions", "// Object provides access control for Vanadium objects.\n//\n// Vanadium services implementing dynamic access control would typically embed\n// this interface and tag additional methods defined by the service with one of\n// Admin, Read, Write, Resolve etc. For example, the VDL definition of the\n// object would be:\n//\n// package mypackage\n//\n// import \"v.io/v23/security/access\"\n// import \"v.io/v23/services/permissions\"\n//\n// type MyObject interface {\n// permissions.Object\n// MyRead() (string, error) {access.Read}\n// MyWrite(string) error {access.Write}\n// }\n//\n// If the set of pre-defined tags is insufficient, services may define their\n// own tag type and annotate all methods with this new type.\n//\n// Instead of embedding this Object interface, define SetPermissions and\n// GetPermissions in their own interface. Authorization policies will typically\n// respect annotations of a single type. For example, the VDL definition of an\n// object would be:\n//\n// package mypackage\n//\n// import \"v.io/v23/security/access\"\n//\n// type MyTag string\n//\n// const (\n// Blue = MyTag(\"Blue\")\n// Red = MyTag(\"Red\")\n// )\n//\n// type MyObject interface {\n// MyMethod() (string, error) {Blue}\n//\n// // Allow clients to change access via the access.Object interface:\n// SetPermissions(perms access.Permissions, version string) error {Red}\n// GetPermissions() (perms access.Permissions, version string, err error) {Blue}\n// }"},
- {"DatabaseWatcher", "v.io/syncbase/v23/services/syncbase/nosql", "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."},
+ {"DatabaseWatcher", "v.io/syncbase/v23/services/syncbase/nosql", "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."},
{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
{"BlobManager", "v.io/syncbase/v23/services/syncbase/nosql", "// BlobManager is the interface for blob operations."},
{"SchemaManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SchemaManager implements the API for managing schema metadata attached\n// to a Database."},
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index e6c0415..23d2d37 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -171,64 +171,14 @@
Total uint64 // Blob size.
}
-// ResumeMarker is a pointer in the database event log.
-type ResumeMarker string
-
-// TablePrefixRange describes a prefix range in a table.
-type TablePrefixRange struct {
- Table string
- Prefix string
-}
-
-// WatchRequest specifies which rows should be watched and a starting point
-// in the database event log from which to receive updates.
-type WatchRequest struct {
- // Ranges specifies the subset of the database for which the client wants
- // updates.
- Ranges []TablePrefixRange
-
- // ResumeMarker is the starting point in the database event log from which
- // to receive updates.
- ResumeMarker ResumeMarker
-}
-
-// ChangeType describes the type of the row change: Put or Delete.
-// TODO(rogulenko): Consider adding the Shell type.
-type ChangeType enum {
- Put
- Delete
-}
-
-// Change is the new value for a watched entity.
-type Change struct {
- // Table is the name of the table that contains the changed row.
- Table string
-
- // Row is the key of the changed row.
- Row string
-
- // ChangeType describes the type of the change. If the ChangeType equals to
- // Put, then the row exists in the table and the Value contains the new
- // value for this row. If the state equals to Delete, then the row was
- // removed from the table.
- ChangeType ChangeType
-
- // Value is the new value for the row if the state equals to Put,
+// StoreChange is the new value for a watched entity.
+// TODO(rogulenko): Consider adding the Shell state.
+type StoreChange struct {
+ // Value is the new value for the row if the Change state equals to Exists,
// otherwise the Value is nil.
Value []byte
- // ResumeMarker provides a compact representation of all the messages
- // that have been received by the caller for the given Watch call.
- // This marker can be provided in the Request message to allow the caller
- // to resume the stream watching at a specific point without fetching the
- // initial state.
- ResumeMarker ResumeMarker
-
// FromSync indicates whether the change came from sync. If FromSync is
// false, then the change originated from the local device.
FromSync bool
-
- // If true, this Change is followed by more Changes that are in the
- // same batch as this Change.
- Continued bool
}
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index 0f05cca..facbd29 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -296,120 +296,19 @@
}) {
}
-// ResumeMarker is a pointer in the database event log.
-type ResumeMarker string
-
-func (ResumeMarker) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ResumeMarker"`
-}) {
-}
-
-// TablePrefixRange describes a prefix range in a table.
-type TablePrefixRange struct {
- Table string
- Prefix string
-}
-
-func (TablePrefixRange) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.TablePrefixRange"`
-}) {
-}
-
-// WatchRequest specifies which rows should be watched and a starting point
-// in the database event log from which to receive updates.
-type WatchRequest struct {
- // Ranges specifies the subset of the database for which the client wants
- // updates.
- Ranges []TablePrefixRange
- // ResumeMarker is the starting point in the database event log from which
- // to receive updates.
- ResumeMarker ResumeMarker
-}
-
-func (WatchRequest) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.WatchRequest"`
-}) {
-}
-
-// ChangeType describes the type of the row change: Put or Delete.
-// TODO(rogulenko): Consider adding the Shell type.
-type ChangeType int
-
-const (
- ChangeTypePut ChangeType = iota
- ChangeTypeDelete
-)
-
-// ChangeTypeAll holds all labels for ChangeType.
-var ChangeTypeAll = [...]ChangeType{ChangeTypePut, ChangeTypeDelete}
-
-// ChangeTypeFromString creates a ChangeType from a string label.
-func ChangeTypeFromString(label string) (x ChangeType, err error) {
- err = x.Set(label)
- return
-}
-
-// Set assigns label to x.
-func (x *ChangeType) Set(label string) error {
- switch label {
- case "Put", "put":
- *x = ChangeTypePut
- return nil
- case "Delete", "delete":
- *x = ChangeTypeDelete
- return nil
- }
- *x = -1
- return fmt.Errorf("unknown label %q in nosql.ChangeType", label)
-}
-
-// String returns the string label of x.
-func (x ChangeType) String() string {
- switch x {
- case ChangeTypePut:
- return "Put"
- case ChangeTypeDelete:
- return "Delete"
- }
- return ""
-}
-
-func (ChangeType) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ChangeType"`
- Enum struct{ Put, Delete string }
-}) {
-}
-
-// Change is the new value for a watched entity.
-type Change struct {
- // Table is the name of the table that contains the changed row.
- Table string
- // Row is the key of the changed row.
- Row string
- // ChangeType describes the type of the change. If the ChangeType equals to
- // Put, then the row exists in the table and the Value contains the new
- // value for this row. If the state equals to Delete, then the row was
- // removed from the table.
- ChangeType ChangeType
- // Value is the new value for the row if the state equals to Put,
+// StoreChange is the new value for a watched entity.
+// TODO(rogulenko): Consider adding the Shell state.
+type StoreChange struct {
+ // Value is the new value for the row if the Change state equals to Exists,
// otherwise the Value is nil.
Value []byte
- // ResumeMarker provides a compact representation of all the messages
- // that have been received by the caller for the given Watch call.
- // This marker can be provided in the Request message to allow the caller
- // to resume the stream watching at a specific point without fetching the
- // initial state.
- ResumeMarker ResumeMarker
// FromSync indicates whether the change came from sync. If FromSync is
// false, then the change originated from the local device.
FromSync bool
- // If true, this Change is followed by more Changes that are in the
- // same batch as this Change.
- Continued bool
}
-func (Change) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.Change"`
+func (StoreChange) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.StoreChange"`
}) {
}
@@ -426,11 +325,7 @@
vdl.Register((*BlobRef)(nil))
vdl.Register((*FetchState)(nil))
vdl.Register((*FetchStatus)(nil))
- vdl.Register((*ResumeMarker)(nil))
- vdl.Register((*TablePrefixRange)(nil))
- vdl.Register((*WatchRequest)(nil))
- vdl.Register((*ChangeType)(nil))
- vdl.Register((*Change)(nil))
+ vdl.Register((*StoreChange)(nil))
}
const NullBlobRef = BlobRef("")
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index bfe419a..386a0ad 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -22,6 +22,7 @@
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
+ "v.io/v23/services/watch"
"v.io/v23/vdl"
"v.io/v23/verror"
"v.io/v23/vom"
@@ -287,7 +288,7 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-func (d *databaseReq) Watch(ctx *context.T, call wire.DatabaseWatcherWatchServerCall, req wire.WatchRequest) error {
+func (d *databaseReq) WatchGlob(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, req watch.GlobRequest) error {
// TODO(rogulenko): Implement.
if !d.exists {
return verror.New(verror.ErrNoExist, ctx, d.name)
@@ -298,12 +299,12 @@
return verror.NewErrNotImplemented(ctx)
}
-func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (wire.ResumeMarker, error) {
+func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
// TODO(rogulenko): Implement.
if !d.exists {
- return "", verror.New(verror.ErrNoExist, ctx, d.name)
+ return nil, verror.New(verror.ErrNoExist, ctx, d.name)
}
- return "", verror.NewErrNotImplemented(ctx)
+ return nil, verror.NewErrNotImplemented(ctx)
}
func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {