Merge "Make query_checker tests pass in go1.5."
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index ebeccf5..16182de 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -73,6 +73,10 @@
// SchemaManager implements the API for managing schema metadata attached
// to a Database.
SchemaManager
+
+ // ConflictManager implements the API for registering resolvers, receiving
+ // conflicts and sending resolutions.
+ ConflictManager
}
// Table represents a collection of Rows.
@@ -234,6 +238,30 @@
SetSchemaMetadata(metadata SchemaMetadata) error {access.Write}
}
+// ConflictManager interface provides all the methods necessary to handle
+// conflict resolution for a given database.
+type ConflictManager interface {
+ // StartConflictResolver registers a resolver for the database that is
+ // associated with this ConflictManager and creates a stream to receive
+ // conflicts and send resolutions.
+ // Batches of ConflictInfos will be sent over with the Continued field
+ // within the ConflictInfo representing the batch boundary. Client must
+ // respond with a batch of ResolutionInfos in the same fashion.
+ // A key is under conflict if two different values were written to it
+ // concurrently (in logical time), i.e. neither value is an ancestor of the
+ // other in the history graph.
+ // A key under conflict can be a part of a batch committed on local or
+ // remote or both syncbases. ConflictInfos for all keys in these two batches
+ // are grouped together. These keys may themselves be under conflict; the
+ // presented batch is a transitive closure of all batches containing keys
+ // under conflict.
+ // For example, for local batch {key1, key2} and remote batch {key1, key3},
+ // the batch sent for conflict resolution will be {key1, key2, key3}.
+ // If there was another concurrent batch {key2, key4}, then the batch sent
+ // for conflict resolution will be {key1, key2, key3, key4}.
+ StartConflictResolver() stream<ResolutionInfo, ConflictInfo> error {access.Write}
+}
+
// BlobManager is the interface for blob operations.
type BlobManager interface {
// API for resumable blob creation (append-only). After commit, a blob
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 2d3d4ba..a908cfa 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -1452,6 +1452,356 @@
},
}
+// ConflictManagerClientMethods is the client interface
+// containing ConflictManager methods.
+//
+// ConflictManager interface provides all the methods necessary to handle
+// conflict resolution for a given database.
+type ConflictManagerClientMethods interface {
+ // StartConflictResolver registers a resolver for the database that is
+ // associated with this ConflictManager and creates a stream to receive
+ // conflicts and send resolutions.
+ // Batches of ConflictInfos will be sent over with the Continued field
+ // within the ConflictInfo representing the batch boundary. Client must
+ // respond with a batch of ResolutionInfos in the same fashion.
+ // A key is under conflict if two different values were written to it
+ // concurrently (in logical time), i.e. neither value is an ancestor of the
+ // other in the history graph.
+ // A key under conflict can be a part of a batch committed on local or
+ // remote or both syncbases. ConflictInfos for all keys in these two batches
+ // are grouped together. These keys may themselves be under conflict; the
+ // presented batch is a transitive closure of all batches containing keys
+ // under conflict.
+ // For example, for local batch {key1, key2} and remote batch {key1, key3},
+ // the batch sent for conflict resolution will be {key1, key2, key3}.
+ // If there was another concurrent batch {key2, key4}, then the batch sent
+ // for conflict resolution will be {key1, key2, key3, key4}.
+ StartConflictResolver(*context.T, ...rpc.CallOpt) (ConflictManagerStartConflictResolverClientCall, error)
+}
+
+// ConflictManagerClientStub adds universal methods to ConflictManagerClientMethods.
+type ConflictManagerClientStub interface {
+ ConflictManagerClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// ConflictManagerClient returns a client stub for ConflictManager.
+func ConflictManagerClient(name string) ConflictManagerClientStub {
+ return implConflictManagerClientStub{name}
+}
+
+type implConflictManagerClientStub struct {
+ name string
+}
+
+func (c implConflictManagerClientStub) StartConflictResolver(ctx *context.T, opts ...rpc.CallOpt) (ocall ConflictManagerStartConflictResolverClientCall, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "StartConflictResolver", nil, opts...); err != nil {
+ return
+ }
+ ocall = &implConflictManagerStartConflictResolverClientCall{ClientCall: call}
+ return
+}
+
+// ConflictManagerStartConflictResolverClientStream is the client stream for ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverClientStream interface {
+ // RecvStream returns the receiver side of the ConflictManager.StartConflictResolver client stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() ConflictInfo
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+ // SendStream returns the send side of the ConflictManager.StartConflictResolver client stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors
+ // encountered while sending, or if Send is called after Close or
+ // the stream has been canceled. Blocks if there is no buffer
+ // space; will unblock when buffer space is available or after
+ // the stream has been canceled.
+ Send(item ResolutionInfo) error
+ // Close indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items.
+ // This is an optional call - e.g. a client might call Close if it
+ // needs to continue receiving items from the server after it's
+ // done sending. Returns errors encountered while closing, or if
+ // Close is called after the stream has been canceled. Like Send,
+ // blocks if there is no buffer space available.
+ Close() error
+ }
+}
+
+// ConflictManagerStartConflictResolverClientCall represents the call returned from ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverClientCall interface {
+ ConflictManagerStartConflictResolverClientStream
+ // Finish performs the equivalent of SendStream().Close, then blocks until
+ // the server is done, and returns the positional return values for the call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implConflictManagerStartConflictResolverClientCall struct {
+ rpc.ClientCall
+ valRecv ConflictInfo
+ errRecv error
+}
+
+func (c *implConflictManagerStartConflictResolverClientCall) RecvStream() interface {
+ Advance() bool
+ Value() ConflictInfo
+ Err() error
+} {
+ return implConflictManagerStartConflictResolverClientCallRecv{c}
+}
+
+type implConflictManagerStartConflictResolverClientCallRecv struct {
+ c *implConflictManagerStartConflictResolverClientCall
+}
+
+func (c implConflictManagerStartConflictResolverClientCallRecv) Advance() bool {
+ c.c.valRecv = ConflictInfo{}
+ c.c.errRecv = c.c.Recv(&c.c.valRecv)
+ return c.c.errRecv == nil
+}
+func (c implConflictManagerStartConflictResolverClientCallRecv) Value() ConflictInfo {
+ return c.c.valRecv
+}
+func (c implConflictManagerStartConflictResolverClientCallRecv) Err() error {
+ if c.c.errRecv == io.EOF {
+ return nil
+ }
+ return c.c.errRecv
+}
+func (c *implConflictManagerStartConflictResolverClientCall) SendStream() interface {
+ Send(item ResolutionInfo) error
+ Close() error
+} {
+ return implConflictManagerStartConflictResolverClientCallSend{c}
+}
+
+type implConflictManagerStartConflictResolverClientCallSend struct {
+ c *implConflictManagerStartConflictResolverClientCall
+}
+
+func (c implConflictManagerStartConflictResolverClientCallSend) Send(item ResolutionInfo) error {
+ return c.c.Send(item)
+}
+func (c implConflictManagerStartConflictResolverClientCallSend) Close() error {
+ return c.c.CloseSend()
+}
+func (c *implConflictManagerStartConflictResolverClientCall) Finish() (err error) {
+ err = c.ClientCall.Finish()
+ return
+}
+
+// ConflictManagerServerMethods is the interface a server writer
+// implements for ConflictManager.
+//
+// ConflictManager interface provides all the methods necessary to handle
+// conflict resolution for a given database.
+type ConflictManagerServerMethods interface {
+ // StartConflictResolver registers a resolver for the database that is
+ // associated with this ConflictManager and creates a stream to receive
+ // conflicts and send resolutions.
+ // Batches of ConflictInfos will be sent over with the Continued field
+ // within the ConflictInfo representing the batch boundary. Client must
+ // respond with a batch of ResolutionInfos in the same fashion.
+ // A key is under conflict if two different values were written to it
+ // concurrently (in logical time), i.e. neither value is an ancestor of the
+ // other in the history graph.
+ // A key under conflict can be a part of a batch committed on local or
+ // remote or both syncbases. ConflictInfos for all keys in these two batches
+ // are grouped together. These keys may themselves be under conflict; the
+ // presented batch is a transitive closure of all batches containing keys
+ // under conflict.
+ // For example, for local batch {key1, key2} and remote batch {key1, key3},
+ // the batch sent for conflict resolution will be {key1, key2, key3}.
+ // If there was another concurrent batch {key2, key4}, then the batch sent
+ // for conflict resolution will be {key1, key2, key3, key4}.
+ StartConflictResolver(*context.T, ConflictManagerStartConflictResolverServerCall) error
+}
+
+// ConflictManagerServerStubMethods is the server interface containing
+// ConflictManager methods, as expected by rpc.Server.
+// The only difference between this interface and ConflictManagerServerMethods
+// is the streaming methods.
+type ConflictManagerServerStubMethods interface {
+ // StartConflictResolver registers a resolver for the database that is
+ // associated with this ConflictManager and creates a stream to receive
+ // conflicts and send resolutions.
+ // Batches of ConflictInfos will be sent over with the Continued field
+ // within the ConflictInfo representing the batch boundary. Client must
+ // respond with a batch of ResolutionInfos in the same fashion.
+ // A key is under conflict if two different values were written to it
+ // concurrently (in logical time), i.e. neither value is an ancestor of the
+ // other in the history graph.
+ // A key under conflict can be a part of a batch committed on local or
+ // remote or both syncbases. ConflictInfos for all keys in these two batches
+ // are grouped together. These keys may themselves be under conflict; the
+ // presented batch is a transitive closure of all batches containing keys
+ // under conflict.
+ // For example, for local batch {key1, key2} and remote batch {key1, key3},
+ // the batch sent for conflict resolution will be {key1, key2, key3}.
+ // If there was another concurrent batch {key2, key4}, then the batch sent
+ // for conflict resolution will be {key1, key2, key3, key4}.
+ StartConflictResolver(*context.T, *ConflictManagerStartConflictResolverServerCallStub) error
+}
+
+// ConflictManagerServerStub adds universal methods to ConflictManagerServerStubMethods.
+type ConflictManagerServerStub interface {
+ ConflictManagerServerStubMethods
+ // Describe the ConflictManager interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// ConflictManagerServer returns a server stub for ConflictManager.
+// It converts an implementation of ConflictManagerServerMethods into
+// an object that may be used by rpc.Server.
+func ConflictManagerServer(impl ConflictManagerServerMethods) ConflictManagerServerStub {
+ stub := implConflictManagerServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implConflictManagerServerStub struct {
+ impl ConflictManagerServerMethods
+ gs *rpc.GlobState
+}
+
+func (s implConflictManagerServerStub) StartConflictResolver(ctx *context.T, call *ConflictManagerStartConflictResolverServerCallStub) error {
+ return s.impl.StartConflictResolver(ctx, call)
+}
+
+func (s implConflictManagerServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implConflictManagerServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{ConflictManagerDesc}
+}
+
+// ConflictManagerDesc describes the ConflictManager interface.
+var ConflictManagerDesc rpc.InterfaceDesc = descConflictManager
+
+// descConflictManager hides the desc to keep godoc clean.
+var descConflictManager = rpc.InterfaceDesc{
+ Name: "ConflictManager",
+ PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
+ Doc: "// ConflictManager interface provides all the methods necessary to handle\n// conflict resolution for a given database.",
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "StartConflictResolver",
+ Doc: "// StartConflictResolver registers a resolver for the database that is\n// associated with this ConflictManager and creates a stream to receive\n// conflicts and send resolutions.\n// Batches of ConflictInfos will be sent over with the Continued field\n// within the ConflictInfo representing the batch boundary. Client must\n// respond with a batch of ResolutionInfos in the same fashion.\n// A key is under conflict if two different values were written to it\n// concurrently (in logical time), i.e. neither value is an ancestor of the\n// other in the history graph.\n// A key under conflict can be a part of a batch committed on local or\n// remote or both syncbases. ConflictInfos for all keys in these two batches\n// are grouped together. These keys may themselves be under conflict; the\n// presented batch is a transitive closure of all batches containing keys\n// under conflict.\n// For example, for local batch {key1, key2} and remote batch {key1, key3},\n// the batch sent for conflict resolution will be {key1, key2, key3}.\n// If there was another concurrent batch {key2, key4}, then the batch sent\n// for conflict resolution will be {key1, key2, key3, key4}.",
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+ },
+ },
+}
+
+// ConflictManagerStartConflictResolverServerStream is the server stream for ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverServerStream interface {
+ // RecvStream returns the receiver side of the ConflictManager.StartConflictResolver server stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() ResolutionInfo
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+ // SendStream returns the send side of the ConflictManager.StartConflictResolver server stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors encountered
+ // while sending. Blocks if there is no buffer space; will unblock when
+ // buffer space is available.
+ Send(item ConflictInfo) error
+ }
+}
+
+// ConflictManagerStartConflictResolverServerCall represents the context passed to ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverServerCall interface {
+ rpc.ServerCall
+ ConflictManagerStartConflictResolverServerStream
+}
+
+// ConflictManagerStartConflictResolverServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements ConflictManagerStartConflictResolverServerCall.
+type ConflictManagerStartConflictResolverServerCallStub struct {
+ rpc.StreamServerCall
+ valRecv ResolutionInfo
+ errRecv error
+}
+
+// Init initializes ConflictManagerStartConflictResolverServerCallStub from rpc.StreamServerCall.
+func (s *ConflictManagerStartConflictResolverServerCallStub) Init(call rpc.StreamServerCall) {
+ s.StreamServerCall = call
+}
+
+// RecvStream returns the receiver side of the ConflictManager.StartConflictResolver server stream.
+func (s *ConflictManagerStartConflictResolverServerCallStub) RecvStream() interface {
+ Advance() bool
+ Value() ResolutionInfo
+ Err() error
+} {
+ return implConflictManagerStartConflictResolverServerCallRecv{s}
+}
+
+type implConflictManagerStartConflictResolverServerCallRecv struct {
+ s *ConflictManagerStartConflictResolverServerCallStub
+}
+
+func (s implConflictManagerStartConflictResolverServerCallRecv) Advance() bool {
+ s.s.valRecv = ResolutionInfo{}
+ s.s.errRecv = s.s.Recv(&s.s.valRecv)
+ return s.s.errRecv == nil
+}
+func (s implConflictManagerStartConflictResolverServerCallRecv) Value() ResolutionInfo {
+ return s.s.valRecv
+}
+func (s implConflictManagerStartConflictResolverServerCallRecv) Err() error {
+ if s.s.errRecv == io.EOF {
+ return nil
+ }
+ return s.s.errRecv
+}
+
+// SendStream returns the send side of the ConflictManager.StartConflictResolver server stream.
+func (s *ConflictManagerStartConflictResolverServerCallStub) SendStream() interface {
+ Send(item ConflictInfo) error
+} {
+ return implConflictManagerStartConflictResolverServerCallSend{s}
+}
+
+type implConflictManagerStartConflictResolverServerCallSend struct {
+ s *ConflictManagerStartConflictResolverServerCallStub
+}
+
+func (s implConflictManagerStartConflictResolverServerCallSend) Send(item ConflictInfo) error {
+ return s.s.Send(item)
+}
+
// DatabaseClientMethods is the client interface
// containing Database methods.
//
@@ -1539,6 +1889,9 @@
// SchemaManager implements the API for managing schema metadata attached
// to a Database.
SchemaManagerClientMethods
+ // ConflictManager interface provides all the methods necessary to handle
+ // conflict resolution for a given database.
+ ConflictManagerClientMethods
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
@@ -1579,7 +1932,7 @@
// DatabaseClient returns a client stub for Database.
func DatabaseClient(name string) DatabaseClientStub {
- return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name), SchemaManagerClient(name)}
+ return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name), SchemaManagerClient(name), ConflictManagerClient(name)}
}
type implDatabaseClientStub struct {
@@ -1590,6 +1943,7 @@
SyncGroupManagerClientStub
BlobManagerClientStub
SchemaManagerClientStub
+ ConflictManagerClientStub
}
func (c implDatabaseClientStub) Create(ctx *context.T, i0 *SchemaMetadata, i1 access.Permissions, opts ...rpc.CallOpt) (err error) {
@@ -1786,6 +2140,9 @@
// SchemaManager implements the API for managing schema metadata attached
// to a Database.
SchemaManagerServerMethods
+ // ConflictManager interface provides all the methods necessary to handle
+ // conflict resolution for a given database.
+ ConflictManagerServerMethods
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
@@ -1899,6 +2256,9 @@
// SchemaManager implements the API for managing schema metadata attached
// to a Database.
SchemaManagerServerStubMethods
+ // ConflictManager interface provides all the methods necessary to handle
+ // conflict resolution for a given database.
+ ConflictManagerServerStubMethods
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
@@ -1949,6 +2309,7 @@
SyncGroupManagerServerStub: SyncGroupManagerServer(impl),
BlobManagerServerStub: BlobManagerServer(impl),
SchemaManagerServerStub: SchemaManagerServer(impl),
+ ConflictManagerServerStub: ConflictManagerServer(impl),
}
// Initialize GlobState; always check the stub itself first, to handle the
// case where the user has the Glob method defined in their VDL source.
@@ -1967,6 +2328,7 @@
SyncGroupManagerServerStub
BlobManagerServerStub
SchemaManagerServerStub
+ ConflictManagerServerStub
gs *rpc.GlobState
}
@@ -2003,7 +2365,7 @@
}
func (s implDatabaseServerStub) Describe__() []rpc.InterfaceDesc {
- return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, watch.GlobWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc}
+ return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, watch.GlobWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc, ConflictManagerDesc}
}
// DatabaseDesc describes the Database interface.
@@ -2020,6 +2382,7 @@
{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
{"BlobManager", "v.io/syncbase/v23/services/syncbase/nosql", "// BlobManager is the interface for blob operations."},
{"SchemaManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SchemaManager implements the API for managing schema metadata attached\n// to a Database."},
+ {"ConflictManager", "v.io/syncbase/v23/services/syncbase/nosql", "// ConflictManager interface provides all the methods necessary to handle\n// conflict resolution for a given database."},
},
Methods: []rpc.MethodDesc{
{
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index eae8e9d..407e956 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -111,6 +111,158 @@
Defer
}
+// ConflictInfo contains information to fully specify a conflict
+// for a key, providing the (local, remote, ancestor) tuple.
+// A key under conflict can be a part of a batch in local, remote or both
+// updates. Since the batches can have more than one key, all ConflictInfos
+// for the keys within the batches are grouped together into a single conflict
+// batch and sent as a stream with the Continued field representing conflict
+// batch boundaries.
+type ConflictInfo struct {
+ // Data is a unit chunk of ConflictInfo which can be sent over the conflict
+ // stream.
+ Data ConflictData
+
+ // Continued represents whether the batch of ConflictInfos has ended.
+ Continued bool
+}
+
+// ConflictData represents a unit of conflict data sent over the stream. It
+// can either contain information about a Batch or about an operation done
+// on a row.
+type ConflictData union {
+ Batch BatchInfo
+ Row RowInfo
+}
+
+type BatchInfo struct {
+ // Id is an identifier for a batch contained in a conflict. It is
+ // unique only in the context of a given conflict. Its purpose is solely to
+ // group one or more RowInfo objects together to represent a batch that
+ // was committed by the client.
+ Id uint16
+
+ // Hint is the hint provided by the client when this batch was committed.
+ Hint string
+
+ // Source states where the batch comes from.
+ Source BatchSource
+}
+
+// BatchSource represents where the batch was committed.
+type BatchSource enum {
+ // Local represents that the batch was committed on local syncbase.
+ Local
+
+ // Remote represents that the batch was committed on remote syncbase.
+ Remote
+}
+
+// RowInfo contains a single operation performed on a row (in case of read or
+// write) or a range or rows (in case of scan) along with a mapping to each
+// of the batches that this operation belongs to.
+// For example, if Row1 was updated on local syncbase conflicting with a write
+// on remote syncbase as part of two separate batches, then it will be
+// represented by a single RowInfo with Write Operation containing the
+// respective local and remote values along with the batch id for both batches
+// stored in the BatchIds field.
+type RowInfo struct {
+ // Op is a specific operation represented by RowInfo
+ Op Operation
+ // BatchIds contains ids of all batches that this RowInfo is a part of.
+ BatchIds []uint16
+}
+
+// Operation represents a specific operation on a row or a set of rows that is
+// a part of the conflict.
+type Operation union {
+ // Read represents a read operation performed on a specific row. For a given
+ // row key there can only be at max one Read operation within a conflict.
+ Read RowOp
+
+ // Write represents a write operation performed on a specific row. For a
+ // given row key there can only be at max one Write operation within a
+ // conflict.
+ Write RowOp
+
+ // Scan represents a scan operation performed over a specific range of keys.
+ // For a given key range there can be at max one ScanOp within the Conflict.
+ Scan ScanOp
+}
+
+// RowOp represents a read or write operation on a row corresponding to the
+// given key.
+type RowOp struct {
+ // The key under conflict.
+ Key string
+
+ // LocalValue contains the value read or written by local syncbase or nil.
+ LocalValue ?Value
+
+ // RemoteValue contains the value read or written by remote syncbase or nil.
+ RemoteValue ?Value
+
+ // AncestorValue contains the value for the key which is the lowest common
+ // ancestor of the two values represented by LocalValue and RemoteValue or
+ // nil if no ancestor exists or if the operation was read.
+ AncestorValue ?Value
+}
+
+// ScanOp provides details of a scan operation.
+type ScanOp struct {
+ // Start contains the starting key for a range scan.
+ Start string
+
+ // Limit contains the end key for a range scan.
+ Limit string
+}
+
+// Value contains the encoded bytes for a row's value stored in syncbase.
+type Value struct {
+ // VOM encoded bytes for a row's value
+ Bytes []byte
+
+ // Write timestamp for this value in nanoseconds.
+ // TODO(jlodhia): change the timestamp to vdl.time here, in commit timestamp
+ // and clock data.
+ WriteTs int64
+}
+
+// ValueSelection represents the value that was selected as the final resolution
+// for a conflict.
+type ValueSelection enum {
+ // Local should be used if local value is picked as the final result.
+ Local
+
+ // Remote should be used if remote value is picked as the final result.
+ Remote
+
+ // Other should be used if a new value different from local and remote is
+ // used as the final result.
+ Other
+}
+
+// ResolutionInfo contains the application’s reply to a conflict for a key,
+// providing the resolution value. The resolution may be over a group of keys
+// in which case the application must send a stream of ResolutionInfos with
+// the Continued field for the last ResolutionInfo representing the end of the
+// batch with a value false. ResolutionInfos sent as part of a batch will be
+// committed as a batch. If the commit fails, the Conflict will be re-sent.
+type ResolutionInfo struct {
+ // Key is the key under conflict.
+ Key string
+
+ // Selection represents the value that was selected as resolution.
+ Selection ValueSelection
+
+ // Result is the resolved value for the key. This field should be used only
+ // if value of Selection field is 'Other'
+ Result ?Value
+
+ // Continued represents whether the batch of ResolutionInfos has ended.
+ Continued bool
+}
+
// SchemaMetadata maintains metadata related to the schema of a given database.
// There is one SchemaMetadata per database.
type SchemaMetadata struct {
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index adb4e66..6a8e0c1 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -166,6 +166,336 @@
}) {
}
+// ConflictInfo contains information to fully specify a conflict
+// for a key, providing the (local, remote, ancestor) tuple.
+// A key under conflict can be a part of a batch in local, remote or both
+// updates. Since the batches can have more than one key, all ConflictInfos
+// for the keys within the batches are grouped together into a single conflict
+// batch and sent as a stream with the Continued field representing conflict
+// batch boundaries.
+type ConflictInfo struct {
+ // Data is a unit chunk of ConflictInfo which can be sent over the conflict
+ // stream.
+ Data ConflictData
+ // Continued represents whether the batch of ConflictInfos has ended.
+ Continued bool
+}
+
+func (ConflictInfo) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ConflictInfo"`
+}) {
+}
+
+type (
+ // ConflictData represents any single field of the ConflictData union type.
+ //
+ // ConflictData represents a unit of conflict data sent over the stream. It
+ // can either contain information about a Batch or about an operation done
+ // on a row.
+ ConflictData interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the ConflictData union type.
+ __VDLReflect(__ConflictDataReflect)
+ }
+ // ConflictDataBatch represents field Batch of the ConflictData union type.
+ ConflictDataBatch struct{ Value BatchInfo }
+ // ConflictDataRow represents field Row of the ConflictData union type.
+ ConflictDataRow struct{ Value RowInfo }
+ // __ConflictDataReflect describes the ConflictData union type.
+ __ConflictDataReflect struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ConflictData"`
+ Type ConflictData
+ Union struct {
+ Batch ConflictDataBatch
+ Row ConflictDataRow
+ }
+ }
+)
+
+func (x ConflictDataBatch) Index() int { return 0 }
+func (x ConflictDataBatch) Interface() interface{} { return x.Value }
+func (x ConflictDataBatch) Name() string { return "Batch" }
+func (x ConflictDataBatch) __VDLReflect(__ConflictDataReflect) {}
+
+func (x ConflictDataRow) Index() int { return 1 }
+func (x ConflictDataRow) Interface() interface{} { return x.Value }
+func (x ConflictDataRow) Name() string { return "Row" }
+func (x ConflictDataRow) __VDLReflect(__ConflictDataReflect) {}
+
+type BatchInfo struct {
+ // Id is an identifier for a batch contained in a conflict. It is
+ // unique only in the context of a given conflict. Its purpose is solely to
+ // group one or more RowInfo objects together to represent a batch that
+ // was committed by the client.
+ Id uint16
+ // Hint is the hint provided by the client when this batch was committed.
+ Hint string
+ // Source states where the batch comes from.
+ Source BatchSource
+}
+
+func (BatchInfo) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.BatchInfo"`
+}) {
+}
+
+// BatchSource represents where the batch was committed.
+type BatchSource int
+
+const (
+ BatchSourceLocal BatchSource = iota
+ BatchSourceRemote
+)
+
+// BatchSourceAll holds all labels for BatchSource.
+var BatchSourceAll = [...]BatchSource{BatchSourceLocal, BatchSourceRemote}
+
+// BatchSourceFromString creates a BatchSource from a string label.
+func BatchSourceFromString(label string) (x BatchSource, err error) {
+ err = x.Set(label)
+ return
+}
+
+// Set assigns label to x.
+func (x *BatchSource) Set(label string) error {
+ switch label {
+ case "Local", "local":
+ *x = BatchSourceLocal
+ return nil
+ case "Remote", "remote":
+ *x = BatchSourceRemote
+ return nil
+ }
+ *x = -1
+ return fmt.Errorf("unknown label %q in nosql.BatchSource", label)
+}
+
+// String returns the string label of x.
+func (x BatchSource) String() string {
+ switch x {
+ case BatchSourceLocal:
+ return "Local"
+ case BatchSourceRemote:
+ return "Remote"
+ }
+ return ""
+}
+
+func (BatchSource) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.BatchSource"`
+ Enum struct{ Local, Remote string }
+}) {
+}
+
+// RowInfo contains a single operation performed on a row (in case of read or
+// write) or a range or rows (in case of scan) along with a mapping to each
+// of the batches that this operation belongs to.
+// For example, if Row1 was updated on local syncbase conflicting with a write
+// on remote syncbase as part of two separate batches, then it will be
+// represented by a single RowInfo with Write Operation containing the
+// respective local and remote values along with the batch id for both batches
+// stored in the BatchIds field.
+type RowInfo struct {
+ // Op is a specific operation represented by RowInfo
+ Op Operation
+ // BatchIds contains ids of all batches that this RowInfo is a part of.
+ BatchIds []uint16
+}
+
+func (RowInfo) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.RowInfo"`
+}) {
+}
+
+type (
+ // Operation represents any single field of the Operation union type.
+ //
+ // Operation represents a specific operation on a row or a set of rows that is
+ // a part of the conflict.
+ Operation interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the Operation union type.
+ __VDLReflect(__OperationReflect)
+ }
+ // OperationRead represents field Read of the Operation union type.
+ //
+ // Read represents a read operation performed on a specific row. For a given
+ // row key there can only be at max one Read operation within a conflict.
+ OperationRead struct{ Value RowOp }
+ // OperationWrite represents field Write of the Operation union type.
+ //
+ // Write represents a write operation performed on a specific row. For a
+ // given row key there can only be at max one Write operation within a
+ // conflict.
+ OperationWrite struct{ Value RowOp }
+ // OperationScan represents field Scan of the Operation union type.
+ //
+ // Scan represents a scan operation performed over a specific range of keys.
+ // For a given key range there can be at max one ScanOp within the Conflict.
+ OperationScan struct{ Value ScanOp }
+ // __OperationReflect describes the Operation union type.
+ __OperationReflect struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.Operation"`
+ Type Operation
+ Union struct {
+ Read OperationRead
+ Write OperationWrite
+ Scan OperationScan
+ }
+ }
+)
+
+func (x OperationRead) Index() int { return 0 }
+func (x OperationRead) Interface() interface{} { return x.Value }
+func (x OperationRead) Name() string { return "Read" }
+func (x OperationRead) __VDLReflect(__OperationReflect) {}
+
+func (x OperationWrite) Index() int { return 1 }
+func (x OperationWrite) Interface() interface{} { return x.Value }
+func (x OperationWrite) Name() string { return "Write" }
+func (x OperationWrite) __VDLReflect(__OperationReflect) {}
+
+func (x OperationScan) Index() int { return 2 }
+func (x OperationScan) Interface() interface{} { return x.Value }
+func (x OperationScan) Name() string { return "Scan" }
+func (x OperationScan) __VDLReflect(__OperationReflect) {}
+
+// RowOp represents a read or write operation on a row corresponding to the
+// given key.
+type RowOp struct {
+ // The key under conflict.
+ Key string
+ // LocalValue contains the value read or written by local syncbase or nil.
+ LocalValue *Value
+ // RemoteValue contains the value read or written by remote syncbase or nil.
+ RemoteValue *Value
+ // AncestorValue contains the value for the key which is the lowest common
+ // ancestor of the two values represented by LocalValue and RemoteValue or
+ // nil if no ancestor exists or if the operation was read.
+ AncestorValue *Value
+}
+
+func (RowOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.RowOp"`
+}) {
+}
+
+// ScanOp provides details of a scan operation.
+type ScanOp struct {
+ // Start contains the starting key for a range scan.
+ Start string
+ // Limit contains the end key for a range scan.
+ Limit string
+}
+
+func (ScanOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ScanOp"`
+}) {
+}
+
+// Value contains the encoded bytes for a row's value stored in syncbase.
+type Value struct {
+ // VOM encoded bytes for a row's value
+ Bytes []byte
+ // Write timestamp for this value in nanoseconds.
+ // TODO(jlodhia): change the timestamp to vdl.time here, in commit timestamp
+ // and clock data.
+ WriteTs int64
+}
+
+func (Value) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.Value"`
+}) {
+}
+
+// ValueSelection represents the value that was selected as the final resolution
+// for a conflict.
+type ValueSelection int
+
+const (
+ ValueSelectionLocal ValueSelection = iota
+ ValueSelectionRemote
+ ValueSelectionOther
+)
+
+// ValueSelectionAll holds all labels for ValueSelection.
+var ValueSelectionAll = [...]ValueSelection{ValueSelectionLocal, ValueSelectionRemote, ValueSelectionOther}
+
+// ValueSelectionFromString creates a ValueSelection from a string label.
+func ValueSelectionFromString(label string) (x ValueSelection, err error) {
+ err = x.Set(label)
+ return
+}
+
+// Set assigns label to x.
+func (x *ValueSelection) Set(label string) error {
+ switch label {
+ case "Local", "local":
+ *x = ValueSelectionLocal
+ return nil
+ case "Remote", "remote":
+ *x = ValueSelectionRemote
+ return nil
+ case "Other", "other":
+ *x = ValueSelectionOther
+ return nil
+ }
+ *x = -1
+ return fmt.Errorf("unknown label %q in nosql.ValueSelection", label)
+}
+
+// String returns the string label of x.
+func (x ValueSelection) String() string {
+ switch x {
+ case ValueSelectionLocal:
+ return "Local"
+ case ValueSelectionRemote:
+ return "Remote"
+ case ValueSelectionOther:
+ return "Other"
+ }
+ return ""
+}
+
+func (ValueSelection) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ValueSelection"`
+ Enum struct{ Local, Remote, Other string }
+}) {
+}
+
+// ResolutionInfo contains the application’s reply to a conflict for a key,
+// providing the resolution value. The resolution may be over a group of keys
+// in which case the application must send a stream of ResolutionInfos with
+// the Continued field for the last ResolutionInfo representing the end of the
+// batch with a value false. ResolutionInfos sent as part of a batch will be
+// committed as a batch. If the commit fails, the Conflict will be re-sent.
+type ResolutionInfo struct {
+ // Key is the key under conflict.
+ Key string
+ // Selection represents the value that was selected as resolution.
+ Selection ValueSelection
+ // Result is the resolved value for the key. This field should be used only
+ // if value of Selection field is 'Other'
+ Result *Value
+ // Continued represents whether the batch of ResolutionInfos has ended.
+ Continued bool
+}
+
+func (ResolutionInfo) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ResolutionInfo"`
+}) {
+}
+
// SchemaMetadata maintains metadata related to the schema of a given database.
// There is one SchemaMetadata per database.
type SchemaMetadata struct {
@@ -319,6 +649,17 @@
vdl.Register((*SyncGroupSpec)(nil))
vdl.Register((*SyncGroupMemberInfo)(nil))
vdl.Register((*ResolverType)(nil))
+ vdl.Register((*ConflictInfo)(nil))
+ vdl.Register((*ConflictData)(nil))
+ vdl.Register((*BatchInfo)(nil))
+ vdl.Register((*BatchSource)(nil))
+ vdl.Register((*RowInfo)(nil))
+ vdl.Register((*Operation)(nil))
+ vdl.Register((*RowOp)(nil))
+ vdl.Register((*ScanOp)(nil))
+ vdl.Register((*Value)(nil))
+ vdl.Register((*ValueSelection)(nil))
+ vdl.Register((*ResolutionInfo)(nil))
vdl.Register((*SchemaMetadata)(nil))
vdl.Register((*CrPolicy)(nil))
vdl.Register((*CrRule)(nil))
diff --git a/v23/syncbase/nosql/cr_connection_test.go b/v23/syncbase/nosql/cr_connection_test.go
new file mode 100644
index 0000000..b99c15d
--- /dev/null
+++ b/v23/syncbase/nosql/cr_connection_test.go
@@ -0,0 +1,138 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/v23/syncbase/nosql/crtestutil"
+ "v.io/v23/context"
+)
+
+type resolverImpl1 struct {
+}
+
+func (ri *resolverImpl1) OnConflict(ctx *context.T, conflict *Conflict) (r Resolution) {
+ return
+}
+
+// TestCrConnectionClose tests if db.Close() correctly shuts down the goroutine
+// that runs conflict resolution code in an infinite loop.
+// Setup:
+// - A mutex lock is used to cause Advance() on the receive stream to block.
+// This lock is held by the test before starting off the CR thread.
+// - ctx.Cancel() is simulated by releasing the mutex lock. This is done right
+// after calling db.Close()
+// - The mutex is held again by the test to cause the CR thread to block on
+// Advance() in case the CR thead did not shutdown. This will lead to
+// failure of the test.
+// Expected result:
+// - The CR thread should not get/remain blocked on Advance() after db.Close()
+func TestCrConnectionClose(t *testing.T) {
+ db := NewDatabase("parentName", "db1", getSchema(&resolverImpl1{}))
+ advance := func(st *crtestutil.State) bool {
+ fmt.Println("Advance() for ConflictStreamImpl called")
+ st.IsBlocked = true
+ st.Mu.Lock()
+ defer st.Mu.Unlock()
+ fmt.Println("Advance() for ConflictStreamImpl returning")
+ st.IsBlocked = false
+ return false
+ }
+
+ st := &crtestutil.State{}
+ crStream := &crtestutil.CrStreamImpl{
+ C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
+ R: &crtestutil.ResolutionStreamImpl{St: st},
+ }
+ db.c = crtestutil.MockDbClient(db.c, crStream)
+ db.crState.reconnectWaitTime = 10 * time.Millisecond
+
+ ctx, _ := context.RootContext()
+ st.Mu.Lock() // causes Advance() to block
+ db.EnforceSchema(ctx) // kicks off the CR thread
+ for i := 0; i < 100 && !st.IsBlocked; i++ { // wait till Advance() call is blocked
+ time.Sleep(time.Millisecond)
+ }
+ db.Close()
+ st.Mu.Unlock() // simulate ctx cancel()
+ for i := 0; i < 100 && st.IsBlocked; i++ {
+ time.Sleep(time.Millisecond)
+ }
+
+ st.Mu.Lock() // causes Advance() to block if called again
+ // let conflict resolution routine complete its cleanup.
+ // If the code does not work as intended then it will end up reconnecting
+ // the stream and blocking on the mutex
+ time.Sleep(time.Millisecond)
+ if st.IsBlocked {
+ t.Error("Error: The conflict resolution routine did not die")
+ }
+}
+
+// TestCrConnectionReestablish tests if the CR thread reestablishes a broken
+// connection stream with syncbase.
+// Setup:
+// - Advance() is allowed to return immediately 3 times with return value
+// false signifying 3 consecutive breaks in the connection.
+// - A mutex lock is used to cause Advance() on the recieve stream to block
+// on the 4th attempt. This lock is held by the test before starting off the
+// CR thread. This simulates a stable connection between CR and syncbase.
+// Expected result:
+// - Advance() should get blocked after some time.
+// - st.AdvanceCount should be equal to 3
+func TestCrConnectionReestablish(t *testing.T) {
+ db := NewDatabase("parentName", "db1", getSchema(&resolverImpl1{}))
+ advance := func(st *crtestutil.State) bool {
+ fmt.Println("Advance() for ConflictStreamImpl called")
+ st.AdvanceCount++
+ if st.AdvanceCount > 2 {
+ // Connection stays stable after 3 attempts
+ st.IsBlocked = true
+ st.Mu.Lock()
+ defer st.Mu.Unlock()
+ }
+ fmt.Println("Advance() for ConflictStreamImpl returning")
+ st.IsBlocked = false
+ return false
+ }
+
+ st := &crtestutil.State{}
+ crStream := &crtestutil.CrStreamImpl{
+ C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
+ R: &crtestutil.ResolutionStreamImpl{St: st},
+ }
+ db.c = crtestutil.MockDbClient(db.c, crStream)
+ db.crState.reconnectWaitTime = 10 * time.Millisecond
+
+ ctx, _ := context.RootContext()
+ st.Mu.Lock() // causes Advance() to block
+ db.EnforceSchema(ctx)
+ for i := 0; i < 100 && !st.IsBlocked; i++ {
+ time.Sleep(time.Millisecond) // wait till Advance() call is blocked
+ }
+
+ if !st.IsBlocked {
+ t.Error("Error: Advance() did not block")
+ }
+ if st.AdvanceCount != 3 {
+ t.Errorf("Error: Advance() expected to be called 3 times, instead called %d times", st.AdvanceCount)
+ }
+
+ // Shutdown the cr routine
+ db.Close()
+ st.Mu.Unlock()
+}
+
+func getSchema(cr ConflictResolver) *Schema {
+ return &Schema{
+ Metadata: wire.SchemaMetadata{},
+ Upgrader: nil,
+ Resolver: cr,
+ }
+}
diff --git a/v23/syncbase/nosql/cr_test.go b/v23/syncbase/nosql/cr_test.go
new file mode 100644
index 0000000..f9618d9
--- /dev/null
+++ b/v23/syncbase/nosql/cr_test.go
@@ -0,0 +1,605 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ "bytes"
+ "strings"
+ "testing"
+ "time"
+
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/v23/syncbase/nosql/crtestutil"
+ "v.io/v23/context"
+ "v.io/v23/vom"
+)
+
+// TODO(jlodhia): extract this test (along with the crtest helpers) to
+// nosql_test, or alternatively, move the crtest helper to an internal dir.
+
+
+////////////////////////////////////////
+// Client Code
+
+// Client constants
+const (
+ TaskIdPrefix = "task:"
+ ListIdPrefix = "list:"
+
+ List1 = "list:1"
+
+ Task1 = "task:1:1"
+ Task2 = "task:1:2"
+
+ HintListDelete = "ListDelete"
+
+ HintTextEdit = "TaskTextEdit"
+ HintDone = "TaskDoneToggle"
+ HintTaskDelete = "TaskDelete"
+ HintTaskAdd = "TaskAdd"
+
+ TextNew = "EditedText"
+)
+
+// Client data structures
+type Task struct {
+ Text string
+ Done bool
+ DoneTs int64
+ ListId string
+}
+
+type List struct {
+ Name string
+ TaskCount int16
+ DoneCount int16
+}
+
+// Client conflict reolution impl
+type ConflictResolverImpl struct {
+}
+
+func (ri *ConflictResolverImpl) OnConflict(ctx *context.T, conflict *Conflict) Resolution {
+ taskRows := findRowsByType(conflict, TaskIdPrefix)
+ result := ri.handleTasks(ctx, taskRows)
+
+ listRows := findRowsByType(conflict, ListIdPrefix)
+ ri.handleLists(ctx, listRows, result)
+ return Resolution{ResultSet: result}
+}
+
+// handleTasks does a merge of fields within each task under conflict.
+// It handles conflict on fields as follows:
+// Text: Last writer wins
+// Done: Depends on DoneTs
+// DoneTs: The side that is not equal to ancestor and is the lower of the two
+// wins. This means that if a user marks a task done on one device
+// and then later marks the same task done on another device that has
+// not synced yet, the done timestamp that is selected will be from the
+// former action.
+func (ri *ConflictResolverImpl) handleTasks(ctx *context.T, tasks []ConflictRow) map[string]ResolvedRow {
+ result := map[string]ResolvedRow{}
+ for _, t := range tasks {
+ if isRowAdded(t) {
+ result[t.Key] = handleRowAdd(t)
+ continue
+ }
+ if isRowDeleted(t) {
+ result[t.Key] = ResolvedRow{Key: t.Key, Result: nil}
+ continue
+ }
+
+ var localTask, remoteTask, ancestorTask, resolvedTask Task
+ t.LocalValue.Get(&localTask)
+ t.RemoteValue.Get(&remoteTask)
+ t.AncestorValue.Get(&ancestorTask)
+
+ // prime the reolved task with local version for simplicity.
+ resolvedTask = localTask
+
+ // check field text for conflict
+ if localTask.Text != remoteTask.Text {
+ if remoteTask.Text != ancestorTask.Text {
+ if (localTask.Text == ancestorTask.Text) || (t.RemoteValue.WriteTs.After(t.LocalValue.WriteTs)) {
+ resolvedTask.Text = remoteTask.Text
+ }
+ }
+ }
+
+ // check field done for conflict
+ if localTask.DoneTs != remoteTask.DoneTs {
+ if remoteTask.DoneTs != ancestorTask.DoneTs {
+ if (localTask.DoneTs == ancestorTask.DoneTs) || (remoteTask.DoneTs < localTask.DoneTs) {
+ resolvedTask.DoneTs = remoteTask.DoneTs
+ resolvedTask.Done = remoteTask.Done
+ }
+ }
+ }
+ resValue, _ := NewValue(ctx, &resolvedTask)
+ result[t.Key] = ResolvedRow{Key: t.Key, Result: resValue}
+ }
+ return result
+}
+
+// handleLists does a merge of fields within each list under conflict. If a list
+// is deleted, it looks at the resolution map, finds all tasks that belong to
+// the list and changes the resolution to delete for those tasks.
+// It handles conflict on fields as follows:
+// Name: Last writer wins
+// TaskCount: Add both counts and subtract the ancestor count.
+// DoneCount: Add both counts and subtract the ancestor count.
+// NOTE: The above counters are resolved in a naive way for simplicity of this
+// test. There are many other ways to represent the counters in a more correct
+// way.
+func (ri *ConflictResolverImpl) handleLists(ctx *context.T, lists []ConflictRow, resolution map[string]ResolvedRow) {
+ for _, l := range lists {
+ if isRowAdded(l) {
+ resolution[l.Key] = handleRowAdd(l)
+ continue
+ }
+ if isRowDeleted(l) {
+ resolution[l.Key] = ResolvedRow{Key: l.Key, Result: nil}
+ deleteTasksForList(resolution, l.Key)
+ continue
+ }
+
+ var localList, remoteList, ancestorList, resolvedList List
+ l.LocalValue.Get(&localList)
+ l.RemoteValue.Get(&remoteList)
+ l.AncestorValue.Get(&ancestorList)
+
+ // prime the reolved task with local version for simplicity.
+ resolvedList = localList
+
+ // check field Name for conflict
+ if localList.Name != remoteList.Name {
+ if remoteList.Name != ancestorList.Name {
+ if (localList.Name == ancestorList.Name) || (l.RemoteValue.WriteTs.After(l.LocalValue.WriteTs)) {
+ resolvedList.Name = remoteList.Name
+ }
+ }
+ }
+
+ // check field TaskCount for conflict
+ if localList.TaskCount != remoteList.TaskCount {
+ if remoteList.TaskCount != ancestorList.TaskCount {
+ if localList.TaskCount == ancestorList.TaskCount {
+ resolvedList.TaskCount = remoteList.TaskCount
+ } else {
+ resolvedList.TaskCount = localList.TaskCount + remoteList.TaskCount - ancestorList.TaskCount
+ }
+ }
+ }
+
+ // check field DoneCount for conflict
+ if localList.DoneCount != remoteList.DoneCount {
+ if remoteList.DoneCount != ancestorList.DoneCount {
+ if localList.DoneCount == ancestorList.DoneCount {
+ resolvedList.DoneCount = remoteList.DoneCount
+ } else {
+ resolvedList.DoneCount = localList.DoneCount + remoteList.DoneCount - ancestorList.DoneCount
+ }
+ }
+ }
+ resValue, _ := NewValue(ctx, &resolvedList)
+ resolution[l.Key] = ResolvedRow{Key: l.Key, Result: resValue}
+ }
+}
+
+func deleteTasksForList(resolution map[string]ResolvedRow, listKey string) {
+ // find key prefix for tasks belonging to list
+ taskKeyPrefix := strings.Replace(listKey, ListIdPrefix, TaskIdPrefix, 1) + ":"
+ for k, v := range resolution {
+ if strings.HasPrefix(k, taskKeyPrefix) {
+ v.Result = nil
+ resolution[k] = v
+ }
+ }
+}
+
+func handleRowAdd(r ConflictRow) ResolvedRow {
+ var resolvedTask *Value
+ // first one to add wins
+ if r.RemoteValue == nil {
+ resolvedTask = r.LocalValue
+ } else if r.LocalValue == nil {
+ resolvedTask = r.RemoteValue
+ } else if r.LocalValue.WriteTs.Before(r.RemoteValue.WriteTs) {
+ resolvedTask = r.LocalValue
+ } else {
+ resolvedTask = r.RemoteValue
+ }
+ return ResolvedRow{Key: r.Key, Result: resolvedTask}
+}
+
+func isRowDeleted(r ConflictRow) bool {
+ return r.AncestorValue != nil &&
+ (r.LocalValue == nil || r.RemoteValue == nil)
+}
+
+func isRowAdded(r ConflictRow) bool {
+ return r.AncestorValue == nil &&
+ (r.LocalValue != nil || r.RemoteValue != nil)
+}
+
+func findRowsByType(conflict *Conflict, typePrefix string) []ConflictRow {
+ rows := []ConflictRow{}
+ if conflict.WriteSet != nil {
+ for k, v := range conflict.WriteSet.ByKey {
+ if strings.HasPrefix(k, typePrefix) {
+ rows = append(rows, v)
+ }
+ }
+ }
+ return rows
+}
+
+////////////////////////////////////////
+// Tests
+
+func TestConflictResolutionSingleObject(t *testing.T) {
+ mockConflit, expResult := simpleConflictStream(t)
+ RunTest(t, mockConflit, expResult, false)
+}
+
+func TestConflictResolutionAddDelete(t *testing.T) {
+ mockConflit, expResult := addDeleteConflictStream(t)
+ RunTest(t, mockConflit, expResult, true)
+}
+
+func TestConflictResolutionIntersectingConflicts(t *testing.T) {
+ mockConflict, expResult := twoIntersectingBatchesConflictStream(t)
+ RunTest(t, mockConflict, expResult, true)
+}
+
+func TestConflictResolutionListDelete(t *testing.T) {
+ mockConflit, expResult := listDeleteConflictsWithTaskAdd(t)
+ RunTest(t, mockConflit, expResult, true)
+}
+
+func RunTest(t *testing.T, mockConflit []wire.ConflictInfo, expResult map[string]wire.ResolutionInfo, singleBatch bool) {
+ db := NewDatabase("parentName", "db1", getSchema(&ConflictResolverImpl{}))
+ advance := func(st *crtestutil.State) bool {
+ if st.ValIndex >= len(mockConflit) {
+ st.IsBlocked = true
+ st.Mu.Lock()
+ defer st.Mu.Unlock()
+ }
+ st.IsBlocked = false
+ st.Val = mockConflit[st.ValIndex]
+ st.ValIndex++
+ return true
+ }
+
+ st := &crtestutil.State{}
+ crStream := &crtestutil.CrStreamImpl{
+ C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
+ R: &crtestutil.ResolutionStreamImpl{St: st},
+ }
+ db.c = crtestutil.MockDbClient(db.c, crStream)
+ db.crState.reconnectWaitTime = 10 * time.Millisecond
+
+ ctx, _ := context.RootContext()
+ st.Mu.Lock() // causes Advance() to block
+ db.EnforceSchema(ctx)
+ for i := 0; i < 100 && (len(st.Result) != len(expResult)); i++ {
+ time.Sleep(time.Millisecond) // wait till Advance() call is blocked
+ }
+ if len(st.Result) != len(expResult) {
+ t.Errorf("\n Unexpected number of results. Expected: %d, actual: %d", len(expResult), len(st.Result))
+ }
+ for i, result := range st.Result {
+ compareResult(t, expResult[result.Key], result)
+ if singleBatch {
+ if result.Continued != (i < len(st.Result)-1) {
+ t.Error("\nUnexpected value for continued in single batch")
+ }
+ } else {
+ if result.Continued != false {
+ t.Error("\nUnexpected value for continued in multi batch")
+ }
+ }
+ }
+}
+
+////////////////////////////////////////
+// Test helpers to create test data
+
+// Consider a database with a list List1 with two tasks Task1, Task2. The
+// following helper methods create conflicts with these three objects in various
+// ways.
+
+// simpleConflictStream prepares a conflict stream with two conflicts, one
+// each for Task1 and Task2. No batch is involved and Task1 is independent of
+// Task2. Assume that List does not get updated for these task updates.
+// Local syncbase changes Text field for Task1 and marks Task2 done.
+// Remote syncbase marks Task1 done and Task2 done.
+// Expected result is a merge for Task1 and Task2.
+func simpleConflictStream(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+ // Conflict for task1
+ row1 := makeRowInfo(Task1,
+ encode(&Task{"OriginalText", false, -1, List1}), 1, // ancestor
+ encode(&Task{TextNew, false, -1, List1}), 3, // local
+ encode(&Task{"OriginalText", true, 20, List1}), 2) // remote
+ c1 := makeConflictInfo(row1, false)
+
+ // Expected result
+ r1 := makeResolution(Task1, encode(&Task{TextNew, true, 20, List1}), wire.ValueSelectionOther)
+
+ // Conflict for task2
+ row2 := makeRowInfo(Task2,
+ encode(&Task{"Text1", false, -1, List1}), 1, // ancestor
+ encode(&Task{"Text1", true, 100, List1}), 3, // local
+ encode(&Task{"Text1", true, 20, List1}), 2) // remote
+ c2 := makeConflictInfo(row2, false)
+
+ // Expected result
+ r2 := makeResolution(Task2, encode(&Task{"Text1", true, 20, List1}),
+ wire.ValueSelectionOther) // for simplicity the test resolver does not optimize
+
+ respMap := map[string]wire.ResolutionInfo{}
+ respMap[r1.Key] = r1
+ respMap[r2.Key] = r2
+
+ return []wire.ConflictInfo{c1, c2}, respMap
+}
+
+// addDeleteConflictStream prepares a conflict stream with one conflict
+// containing Task1, Task2 and List1. List contains a task count and hence
+// addition and deletion of task leads to update to the list.
+// Local syncbase deletes Task1 and updates List1 in a batch.
+// Remote syncbase adds Task2 and updates List1 in a batch.
+// Expected result is Task1 gets deleted, Task2 gets added and List1 shows
+// the correct taskcount after the two operations.
+func addDeleteConflictStream(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+ // Batch1 is local and contains task1 and list1
+ b1 := makeConflictBatch(1, HintTaskDelete, wire.BatchSourceLocal, true)
+
+ // Batch2 is remote and contains task2 and list1
+ b2 := makeConflictBatch(2, HintTaskAdd, wire.BatchSourceRemote, true)
+
+ // Deletion for task1 on local syncbase
+ row1 := makeRowInfo(Task1,
+ encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1, // ancestor
+ nil, -1, // local
+ encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1) // remote
+ row1.BatchIds = []uint16{1}
+ c1 := makeConflictInfo(row1, true)
+
+
+ // Expected result
+ r1 := makeResolution(Task1, nil, wire.ValueSelectionOther)
+
+ // Addition for task2 on remote syncbase
+ row2 := makeRowInfo(Task2,
+ nil, -1, // ancestor value
+ nil, -1, // local value
+ encode(&Task{"AddedTask", false, -1, List1}), 2) // remote value
+ row2.BatchIds = []uint16{2}
+ c2 := makeConflictInfo(row2, true)
+
+ // Expected result
+ r2 := makeResolution(Task2, encode(&Task{"AddedTask", false, -1, List1}), wire.ValueSelectionRemote)
+
+ // Update to List on both sided on filed TaskCount
+ listName := "Groceries"
+ row3 := makeRowInfo(List1,
+ encode(&List{listName, 8, 0}), 25, // ancestor value
+ encode(&List{listName, 7, 0}), 33, // local value
+ encode(&List{listName, 9, 0}), 44) // remote value
+ row3.BatchIds = []uint16{1, 2}
+ c3 := makeConflictInfo(row3, false)
+
+ // Expected result
+ r3 := makeResolution(List1, encode(&List{listName, 8, 0}), wire.ValueSelectionOther)
+
+ respMap := map[string]wire.ResolutionInfo{}
+ respMap[r1.Key] = r1
+ respMap[r2.Key] = r2
+ respMap[r3.Key] = r3
+
+ return []wire.ConflictInfo{b1, b2, c1, c2, c3}, respMap
+}
+
+// twoIntersectingBatchesConflictStream prepares a conflict stream with one
+// conflict containing Task1, Task2 and List1. Here we assume that List keeps
+// a count of done tasks.
+// Local syncbase marks Task1 done and updates done count on List1. Later it
+// it also marks Task2 done and updates done count on List1.
+// Remote syncbase updates field Text on Task1 followed by an update to field
+// Text on Task2 (not in a batch).
+// Expected result is a merge for Task1 and Task2 and Local version of List1
+// being accepted.
+func twoIntersectingBatchesConflictStream(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+ // Batch1 is local and contains task1 and list1
+ b1 := makeConflictBatch(1, HintDone, wire.BatchSourceLocal, true)
+
+ // Batch2 is local and contains task2 and list1
+ b2 := makeConflictBatch(2, HintDone, wire.BatchSourceLocal, true)
+
+ // Batch3 is remote and contains task1
+ b3 := makeConflictBatch(3, HintTextEdit, wire.BatchSourceRemote, true)
+
+ // Batch4 is remote and contains task2
+ b4 := makeConflictBatch(4, HintTextEdit, wire.BatchSourceRemote, true)
+
+ // For task1, mark done on local and edit text on remote
+ row1 := makeRowInfo(Task1,
+ encode(&Task{"TaskOrig", false, -1, List1}), 1, // ancestor
+ encode(&Task{"TaskOrig", true, 204, List1}), 5, // local
+ encode(&Task{"TaskEdit", false, -1, List1}), 3) // remote
+ row1.BatchIds = []uint16{1, 3}
+ c1 := makeConflictInfo(row1, true)
+
+ // Expected result
+ r1 := makeResolution(Task1, encode(&Task{"TaskEdit", true, 204, List1}), wire.ValueSelectionOther)
+
+ // For task2, mark done on local and edit text on remote
+ row2 := makeRowInfo(Task2,
+ encode(&Task{"TaskOrig", false, -1, List1}), 2, // ancestor
+ encode(&Task{"TaskOrig", true, 204, List1}), 5, // local
+ encode(&Task{"TaskEdit", false, -1, List1}), 3) // remote
+ row2.BatchIds = []uint16{2, 4}
+ c2 := makeConflictInfo(row2, true)
+
+ // Expected result
+ r2 := makeResolution(Task2, encode(&Task{"TaskEdit", true, 204, List1}), wire.ValueSelectionOther)
+
+ // Update to List on Local for updating TaskDoneCount
+ listName := "Groceries"
+ row3 := makeRowInfo(List1,
+ encode(&List{listName, 8, 0}), 1, // ancestor value
+ encode(&List{listName, 8, 2}), 5, // local value
+ encode(&List{listName, 8, 0}), 3) // remote value
+ row3.BatchIds = []uint16{1, 2}
+ c3 := makeConflictInfo(row3, false)
+
+ // Expected result
+ r3 := makeResolution(List1, encode(&List{listName, 8, 2}), wire.ValueSelectionOther)
+
+ respMap := map[string]wire.ResolutionInfo{}
+ respMap[r1.Key] = r1
+ respMap[r2.Key] = r2
+ respMap[r3.Key] = r3
+
+ return []wire.ConflictInfo{b1, b2, b3, b4, c1, c2, c3}, respMap
+}
+
+// listDeleteConflictsWithTaskAdd prepares a conflict stream with one
+// conflict containing Task1, Task2 and List1. Assume Task2 does not exist yet.
+// Local syncbase deletes List1 and Task1 as a batch.
+// Remote syncbase adds Task2 along with an update to List1 as a batch.
+// Expected result is deletion of List1, Task1 and Task2.
+func listDeleteConflictsWithTaskAdd(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+ // Batch1 is local and contains task1 and list1
+ b1 := makeConflictBatch(1, HintListDelete, wire.BatchSourceLocal, true)
+
+ // Batch2 is remote and contains task2 and list1
+ b2 := makeConflictBatch(2, HintTaskAdd, wire.BatchSourceRemote, true)
+
+ // Delition for list1 on local syncbase
+ listName := "Groceries"
+ row0 := makeRowInfo(List1,
+ encode(&List{listName, 1, 0}), 1, // ancestor value
+ nil, 5, // local value
+ encode(&List{listName, 2, 0}), 3) // remote value
+ row0.BatchIds = []uint16{1, 2}
+ c0 := makeConflictInfo(row0, true)
+
+ // Expected result
+ r0 := makeResolution(List1, nil, wire.ValueSelectionOther)
+
+ // Deletion for task1 on local syncbase along with list1
+ row1 := makeRowInfo(Task1,
+ encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1, // ancestor
+ nil, -1, // local
+ encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1) // remote
+ row1.BatchIds = []uint16{1}
+ c1 := makeConflictInfo(row1, true)
+
+ // Expected result
+ r1 := makeResolution(Task1, nil, wire.ValueSelectionOther)
+
+ // Addition for task2 on remote syncbase
+ row2 := makeRowInfo(Task2,
+ nil, -1, // ancestor value
+ nil, -1, // local value
+ encode(&Task{"AddedTask", false, -1, List1}), 2) // remote value
+ row2.BatchIds = []uint16{2}
+ c2 := makeConflictInfo(row2, false)
+
+ // Expected result
+ r2 := makeResolution(Task2, nil, wire.ValueSelectionOther)
+
+ respMap := map[string]wire.ResolutionInfo{}
+ respMap[r0.Key] = r0
+ respMap[r1.Key] = r1
+ respMap[r2.Key] = r2
+
+ return []wire.ConflictInfo{b1, b2, c0, c1, c2}, respMap
+}
+
+func makeResolution(key string, result []byte, selection wire.ValueSelection) wire.ResolutionInfo {
+ r := wire.ResolutionInfo{}
+ r.Key = key
+ if result != nil {
+ r.Result = &wire.Value{
+ Bytes: result,
+ WriteTs: -1,
+ }
+ }
+ r.Selection = selection
+ return r
+}
+
+func makeConflictBatch(id uint16, hint string, source wire.BatchSource, continued bool) wire.ConflictInfo {
+ batch := wire.BatchInfo{Id: id, Hint: hint, Source: source}
+ return wire.ConflictInfo{Data: wire.ConflictDataBatch{Value: batch}, Continued: continued}
+}
+
+func makeConflictInfo(row wire.RowInfo, continued bool) wire.ConflictInfo {
+ return wire.ConflictInfo{
+ Data: wire.ConflictDataRow{Value: row},
+ Continued: continued,
+ }
+}
+
+func makeRowInfo(key string, ancestor []byte, ats int64, local []byte, lts int64, remote []byte, rts int64) wire.RowInfo {
+ op := wire.RowOp{}
+ op.Key = key
+
+ if ancestor != nil {
+ op.AncestorValue = &wire.Value{
+ Bytes: ancestor,
+ WriteTs: ats,
+ }
+ }
+
+ if local != nil {
+ op.LocalValue = &wire.Value{
+ Bytes: local,
+ WriteTs: lts,
+ }
+ }
+
+ if remote != nil {
+ op.RemoteValue = &wire.Value{
+ Bytes: remote,
+ WriteTs: rts,
+ }
+ }
+ return wire.RowInfo{
+ Op: wire.OperationWrite{Value: op},
+ }
+}
+
+func compareResult(t *testing.T, expected wire.ResolutionInfo, actual wire.ResolutionInfo) {
+ if actual.Key != expected.Key {
+ t.Error("Key does not match")
+ }
+ if actual.Selection != expected.Selection {
+ t.Errorf("Key: %s", expected.Key)
+ t.Errorf("Expected selection: %v, actual selection: %v", expected.Selection, actual.Selection)
+ }
+ if (expected.Result == nil) && (actual.Result != nil) {
+ t.Errorf("Key: %s", expected.Key)
+ t.Error("Result expected to be nil but found non nil")
+ }
+ if expected.Result != nil {
+ if actual.Result == nil {
+ t.Errorf("Key: %s", expected.Key)
+ t.Error("Result found nil")
+ }
+ if bytes.Compare(actual.Result.Bytes, expected.Result.Bytes) != 0 {
+ t.Errorf("Key: %s", expected.Key)
+ t.Error("Result bytes do not match")
+ }
+ }
+}
+
+func encode(value interface{}) []byte {
+ v, _ := vom.Encode(value)
+ return v
+}
diff --git a/v23/syncbase/nosql/crtestutil/mock_dbclient.go b/v23/syncbase/nosql/crtestutil/mock_dbclient.go
new file mode 100644
index 0000000..99e27ee
--- /dev/null
+++ b/v23/syncbase/nosql/crtestutil/mock_dbclient.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package crtestutil
+
+import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/verror"
+)
+
+type MockWireDatabaseClient struct {
+ wire.DatabaseClientMethods
+ crs *CrStreamImpl
+}
+
+func MockDbClient(c wire.DatabaseClientMethods, crs *CrStreamImpl) *MockWireDatabaseClient {
+ return &MockWireDatabaseClient{c, crs}
+}
+
+func (wclient *MockWireDatabaseClient) GetSchemaMetadata(ctx *context.T, opts ...rpc.CallOpt) (sm wire.SchemaMetadata, err error) {
+ err = verror.NewErrNoExist(ctx)
+ return
+}
+
+func (wclient *MockWireDatabaseClient) SetSchemaMetadata(ctx *context.T, i0 wire.SchemaMetadata, opts ...rpc.CallOpt) error {
+ return nil
+}
+
+func (wclient *MockWireDatabaseClient) StartConflictResolver(ctx *context.T, opts ...rpc.CallOpt) (wire.ConflictManagerStartConflictResolverClientCall, error) {
+ return wclient.crs, nil
+}
diff --git a/v23/syncbase/nosql/crtestutil/mock_stream.go b/v23/syncbase/nosql/crtestutil/mock_stream.go
new file mode 100644
index 0000000..3bb8f4d
--- /dev/null
+++ b/v23/syncbase/nosql/crtestutil/mock_stream.go
@@ -0,0 +1,119 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package crtestutil
+
+import (
+ "sync"
+
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+)
+
+var _ wire.ConflictManagerStartConflictResolverClientCall = (*CrStreamImpl)(nil)
+
+type State struct {
+ // ConflictStream state
+ IsBlocked bool
+ Mu sync.Mutex
+ Val wire.ConflictInfo
+ AdvanceCount int
+ ValIndex int
+ // ResolutionStream state variables
+ Result []wire.ResolutionInfo
+}
+
+type CrStreamImpl struct {
+ C ConflictStream
+ R ResolutionStream
+}
+
+func (s *CrStreamImpl) RecvStream() interface {
+ Advance() bool
+ Value() wire.ConflictInfo
+ Err() error
+} {
+ return recvStreamImpl{s.C}
+}
+
+func (s *CrStreamImpl) SendStream() interface {
+ Send(item wire.ResolutionInfo) error
+ Close() error
+} {
+ return sendStreamImpl{s.R}
+}
+
+func (s *CrStreamImpl) Finish() error {
+ return nil
+}
+
+type recvStreamImpl struct {
+ c ConflictStream
+}
+
+func (rs recvStreamImpl) Advance() bool {
+ return rs.c.Advance()
+}
+func (rs recvStreamImpl) Value() wire.ConflictInfo {
+ return rs.c.Value()
+}
+func (rs recvStreamImpl) Err() error {
+ return rs.c.Err()
+}
+
+type sendStreamImpl struct {
+ r ResolutionStream
+}
+
+func (ss sendStreamImpl) Send(item wire.ResolutionInfo) error {
+ return ss.r.Send(item)
+}
+func (c sendStreamImpl) Close() error {
+ return nil
+}
+
+type ConflictStream interface {
+ Advance() bool
+ Value() wire.ConflictInfo
+ Err() error
+}
+
+type ResolutionStream interface {
+ Send(item wire.ResolutionInfo) error
+}
+
+type ConflictStreamImpl struct {
+ St *State
+ AdvanceFn func(*State) bool
+}
+
+func (cs *ConflictStreamImpl) Advance() bool {
+ return cs.AdvanceFn(cs.St)
+}
+func (cs *ConflictStreamImpl) Value() wire.ConflictInfo {
+ return cs.St.Val
+}
+func (cs *ConflictStreamImpl) Err() error {
+ return &TestError{"Stream broken"}
+}
+
+type ResolutionStreamImpl struct {
+ St *State
+}
+
+func (rs *ResolutionStreamImpl) Send(item wire.ResolutionInfo) error {
+ if rs.St.Result == nil {
+ rs.St.Result = []wire.ResolutionInfo{item}
+ return nil
+ }
+ rs.St.Result = append(rs.St.Result, item)
+ return nil
+}
+
+type TestError struct {
+ str string
+}
+
+func (e *TestError) Error() string {
+ return e.str
+}
diff --git a/v23/syncbase/nosql/database.go b/v23/syncbase/nosql/database.go
index d48f450..42b5270 100644
--- a/v23/syncbase/nosql/database.go
+++ b/v23/syncbase/nosql/database.go
@@ -5,6 +5,9 @@
package nosql
import (
+ "sync"
+ "time"
+
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase/util"
"v.io/v23/context"
@@ -15,6 +18,12 @@
"v.io/x/lib/vlog"
)
+const (
+ // Wait time before we try to reconnect a broken conflict resolution stream.
+ waitBeforeReconnectInMillis = 2 * time.Second
+ reconnectionCount = "rcc"
+)
+
func NewDatabase(parentFullName, relativeName string, schema *Schema) *database {
fullName := naming.Join(parentFullName, relativeName)
return &database{
@@ -23,6 +32,9 @@
fullName: fullName,
name: relativeName,
schema: schema,
+ crState: conflictResolutionState{
+ reconnectWaitTime: waitBeforeReconnectInMillis,
+ },
}
}
@@ -32,6 +44,31 @@
fullName string
name string
schema *Schema
+ crState conflictResolutionState
+}
+
+// conflictResolutionState maintains data about the connection of
+// conflict resolution stream with syncbase. It provides a way to disconnect
+// an existing open stream.
+type conflictResolutionState struct {
+ mu sync.Mutex // guards access to all fields in this struct
+ crContext *context.T
+ cancelFn context.CancelFunc
+ isClosed bool
+ reconnectWaitTime time.Duration
+}
+
+func (crs *conflictResolutionState) disconnect() {
+ crs.mu.Lock()
+ defer crs.mu.Unlock()
+ crs.isClosed = true
+ crs.cancelFn()
+}
+
+func (crs *conflictResolutionState) isDisconnected() bool {
+ crs.mu.Lock()
+ defer crs.mu.Unlock()
+ return crs.isClosed
}
var _ Database = (*database)(nil)
@@ -168,17 +205,44 @@
return createBlob(ctx, d.fullName)
}
-// UpgradeIfOutdated implements Database.UpgradeIfOutdated.
-func (d *database) UpgradeIfOutdated(ctx *context.T) (bool, error) {
+// EnforceSchema implements Database.EnforceSchema.
+func (d *database) EnforceSchema(ctx *context.T) error {
var schema *Schema = d.schema
if schema == nil {
- return false, verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
+ return verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
}
if schema.Metadata.Version < 0 {
- return false, verror.New(verror.ErrBadState, ctx, "Schema version cannot be less than zero.")
+ return verror.New(verror.ErrBadState, ctx, "Schema version cannot be less than zero.")
}
+ if needsResolver(d.schema.Metadata) && d.schema.Resolver == nil {
+ return verror.New(verror.ErrBadState, ctx, "ResolverTypeAppResolves cannot be used in CrRule without providing a ConflictResolver in Schema.")
+ }
+
+ if _, err := d.upgradeIfOutdated(ctx); err != nil {
+ return err
+ }
+
+ if d.schema.Resolver == nil {
+ return nil
+ }
+
+ childCtx, cancelFn := context.WithCancel(ctx)
+ d.crState.crContext = childCtx
+ d.crState.cancelFn = cancelFn
+
+ go d.establishConflictResolution(childCtx)
+ return nil
+}
+
+// Close implements Database.Close.
+func (d *database) Close() {
+ d.crState.disconnect()
+}
+
+func (d *database) upgradeIfOutdated(ctx *context.T) (bool, error) {
+ var schema *Schema = d.schema
schemaMgr := d.getSchemaManager()
currMeta, err := schemaMgr.getSchemaMetadata(ctx)
if err != nil {
@@ -222,8 +286,182 @@
return true, nil
}
+func (d *database) establishConflictResolution(ctx *context.T) {
+ count := 0
+ for {
+ count++
+ vlog.Infof("Starting a new conflict resolution connection. Re-Connection count: %d", count)
+ childCtx := context.WithValue(ctx, reconnectionCount, count)
+ // listenForConflicts is a blocking method which returns only when the
+ // conflict stream is broken.
+ if err := d.listenForConflicts(childCtx); err != nil {
+ vlog.Errorf("Conflict resolution connection ended with error: %v", err)
+ }
+
+ // Check if database is closed and if we need to shutdown conflict
+ // resolution.
+ if d.crState.isDisconnected() {
+ vlog.Infof("Shutting down conflict resolution connection.")
+ break
+ }
+
+ // The connection might have broken because syncbase service went down.
+ // Sleep for a few seconds to allow syncbase to come back up.
+ time.Sleep(d.crState.reconnectWaitTime)
+ }
+}
+
+func (d *database) listenForConflicts(ctx *context.T) error {
+ resolver, err := d.c.StartConflictResolver(ctx)
+ if err != nil {
+ return err
+ }
+ conflictStream := resolver.RecvStream()
+ resolutionStream := resolver.SendStream()
+ var c *Conflict = &Conflict{}
+ for conflictStream.Advance() {
+ row := conflictStream.Value()
+ addRowToConflict(c, &row)
+ if !row.Continued {
+ resolution := d.schema.Resolver.OnConflict(ctx, c)
+ if err := sendResolution(resolutionStream, resolution); err != nil {
+ return err
+ }
+ c = &Conflict{} // create a new conflict object for the next batch
+ }
+ }
+ if err := conflictStream.Err(); err != nil {
+ return err
+ }
+ return resolver.Finish()
+}
+
+// TODO(jlodhia): Should we check if the Resolution received addresses all
+// conflicts in write set?
+func sendResolution(stream interface {
+ Send(item wire.ResolutionInfo) error
+}, resolution Resolution) error {
+ size := len(resolution.ResultSet)
+ count := 0
+ for _, v := range resolution.ResultSet {
+ count++
+ ri := toResolutionInfo(v, count != size)
+ if err := stream.Send(ri); err != nil {
+ vlog.Error("Error while sending resolution")
+ return err
+ }
+ }
+ return nil
+}
+
+func addRowToConflict(c *Conflict, ci *wire.ConflictInfo) {
+ switch v := ci.Data.(type) {
+ case wire.ConflictDataBatch:
+ if c.Batches == nil {
+ c.Batches = map[uint16]wire.BatchInfo{}
+ }
+ c.Batches[v.Value.Id] = v.Value
+ case wire.ConflictDataRow:
+ rowInfo := v.Value
+ switch op := rowInfo.Op.(type) {
+ case wire.OperationWrite:
+ if c.WriteSet == nil {
+ c.WriteSet = &ConflictRowSet{map[string]ConflictRow{}, map[uint16][]ConflictRow{}}
+ }
+ cr := toConflictRow(op.Value, rowInfo.BatchIds)
+ c.WriteSet.ByKey[cr.Key] = cr
+ for _, bid := range rowInfo.BatchIds {
+ c.WriteSet.ByBatch[bid] = append(c.WriteSet.ByBatch[bid], cr)
+ }
+ case wire.OperationRead:
+ if c.ReadSet == nil {
+ c.ReadSet = &ConflictRowSet{map[string]ConflictRow{}, map[uint16][]ConflictRow{}}
+ }
+ cr := toConflictRow(op.Value, rowInfo.BatchIds)
+ c.ReadSet.ByKey[cr.Key] = cr
+ for _, bid := range rowInfo.BatchIds {
+ c.ReadSet.ByBatch[bid] = append(c.ReadSet.ByBatch[bid], cr)
+ }
+ case wire.OperationScan:
+ if c.ScanSet == nil {
+ c.ScanSet = &ConflictScanSet{map[uint16][]wire.ScanOp{}}
+ }
+ for _, bid := range rowInfo.BatchIds {
+ c.ScanSet.ByBatch[bid] = append(c.ScanSet.ByBatch[bid], op.Value)
+ }
+ }
+ }
+}
+
+func toConflictRow(op wire.RowOp, batchIds []uint16) ConflictRow {
+ var local, remote, ancestor *Value
+ if op.LocalValue != nil {
+ local = &Value{
+ val: op.LocalValue.Bytes,
+ WriteTs: toTime(op.LocalValue.WriteTs),
+ selection: wire.ValueSelectionLocal,
+ }
+ }
+ if op.RemoteValue != nil {
+ remote = &Value{
+ val: op.RemoteValue.Bytes,
+ WriteTs: toTime(op.RemoteValue.WriteTs),
+ selection: wire.ValueSelectionRemote,
+ }
+ }
+ if op.AncestorValue != nil {
+ ancestor = &Value{
+ val: op.AncestorValue.Bytes,
+ WriteTs: toTime(op.AncestorValue.WriteTs),
+ selection: wire.ValueSelectionOther,
+ }
+ }
+ return ConflictRow{
+ Key: op.Key,
+ LocalValue: local,
+ RemoteValue: remote,
+ AncestorValue: ancestor,
+ BatchIds: batchIds,
+ }
+}
+
+// TODO(jlodhia): remove this method once time is stored as time.Time instead
+// of int64
+func toTime(unixNanos int64) time.Time {
+ return time.Unix(
+ unixNanos / 1e9, // seconds
+ unixNanos % 1e9) // nanoseconds
+}
+
+func toResolutionInfo(r ResolvedRow, lastRow bool) wire.ResolutionInfo {
+ sel := wire.ValueSelectionOther
+ resVal := (*wire.Value)(nil)
+ if r.Result != nil {
+ sel = r.Result.selection
+ resVal = &wire.Value{
+ Bytes: r.Result.val,
+ WriteTs: r.Result.WriteTs.UnixNano(), // this timestamp is ignored by syncbase
+ }
+ }
+ return wire.ResolutionInfo{
+ Key: r.Key,
+ Selection: sel,
+ Result: resVal,
+ Continued: lastRow,
+ }
+}
+
+func needsResolver(metadata wire.SchemaMetadata) bool {
+ for _, rule := range metadata.Policy.Rules {
+ if rule.Resolver == wire.ResolverTypeAppResolves {
+ return true
+ }
+ }
+ return false
+}
+
func (d *database) getSchemaManager() schemaManagerImpl {
- return newSchemaManager(d.fullName)
+ return newSchemaManager(d.c)
}
func (d *database) schemaVersion() int32 {
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index 622f82d..d9ae93f 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -6,12 +6,15 @@
package nosql
import (
+ "time"
+
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase/util"
"v.io/v23/context"
"v.io/v23/security/access"
"v.io/v23/services/watch"
"v.io/v23/vdl"
+ "v.io/v23/verror"
"v.io/v23/vom"
)
@@ -45,6 +48,10 @@
// time of the RPC, and will not reflect subsequent writes to keys not yet
// reached by the stream.
Exec(ctx *context.T, query string) ([]string, ResultStream, error)
+
+ // Close cleans up any state associated with this client handle including
+ // shutting down any open conflict resolution stream.
+ Close()
}
// Database represents a collection of Tables. Batches, queries, sync, watch,
@@ -131,14 +138,15 @@
// Blob returns a handle to the blob with the given BlobRef.
Blob(br wire.BlobRef) Blob
- // This method compares the current schema version of the database with the
- // schema version provided while creating this database handle. If the
- // current database schema version is lower, then the SchemaUpdater is
+ // EnforceSchema compares the current schema version of the database
+ // with the schema version provided while creating this database handle. If
+ // the current database schema version is lower, then the SchemaUpdater is
// called. If SchemaUpdater is successful this method stores the new schema
// metadata in database.
- // Note: schema can be nil, in which case this method skips schema check
- // and the caller is responsible for maintaining schema sanity.
- UpgradeIfOutdated(ctx *context.T) (bool, error)
+ // This method also registers a conflict resolver with syncbase to receive
+ // conflicts. Note: schema can be nil, in which case this method skips
+ // schema check and the caller is responsible for maintaining schema sanity.
+ EnforceSchema(ctx *context.T) error
}
// BatchDatabase is a handle to a set of reads and writes to the database that
@@ -550,4 +558,114 @@
type Schema struct {
Metadata wire.SchemaMetadata
Upgrader SchemaUpgrader
+ Resolver ConflictResolver
+}
+
+// ConflictResolver interface allows the App to define resolution of conflicts
+// that it requested to handle.
+type ConflictResolver interface {
+ OnConflict(ctx *context.T, conflict *Conflict) Resolution
+}
+
+// Conflict contains information to fully specify a conflict. Since syncbase
+// supports batches there can be one or more rows within the batch that has a
+// conflict. Each of these rows will be sent together as part of a single
+// conflict. Each row contains an Id of the batch to which it belongs,
+// enabling the client to group together rows that are part of a batch. Note
+// that a single row can be part of more than one batch.
+//
+// WriteSet contains rows that were written.
+// ReadSet contains rows that were read within a batch corresponding to a row
+// within the write set.
+// ScanSet contains scans performed within a batch corresponding to a row
+// within the write set.
+// Batches is a map of unique ids to BatchInfo objects. The id is unique only in
+// the context of a given conflict and is otherwise meaningless.
+type Conflict struct {
+ ReadSet *ConflictRowSet
+ WriteSet *ConflictRowSet
+ ScanSet *ConflictScanSet
+ Batches map[uint16]wire.BatchInfo
+}
+
+// ConflictRowSet contains a set of rows under conflict. It provides two different
+// ways to access the same set.
+// ByKey is a map of ConflictRows keyed by the row key.
+// ByBatch is a map of []ConflictRows keyed by batch id. This map lets the client
+// access all ConflictRows within this set that contain a given hint.
+type ConflictRowSet struct {
+ ByKey map[string]ConflictRow
+ ByBatch map[uint16][]ConflictRow
+}
+
+// ConflictScanSet contains a set of scans under conflict.
+// ByBatch is a map of array of ScanOps keyed by batch id.
+type ConflictScanSet struct {
+ ByBatch map[uint16][]wire.ScanOp
+}
+
+// ConflictRow represents a row under conflict.
+// Key is the key for the row.
+// LocalValue is the value present in the local db.
+// RemoteValue is the value received via sync.
+// AncestorValue is the value for the key which is the lowest common
+// ancestor of the two values represented by LocalValue and RemoteValue.
+// AncestorValue is nil if the ConflictRow is a part of the read set.
+// BatchIds is a list of ids of all the batches that this row belongs to.
+type ConflictRow struct {
+ Key string
+ LocalValue *Value
+ RemoteValue *Value
+ AncestorValue *Value
+ BatchIds []uint16
+}
+
+// Resolution contains the application’s reply to a conflict. It must contain a
+// resolved value for each conflict row within the WriteSet of the given
+// conflict.
+// ResultSet is a map of row key to ResolvedRow.
+type Resolution struct {
+ ResultSet map[string]ResolvedRow
+ // TODO(jlodhia): Hint []string
+}
+
+// ResolvedRow represents a result of resolution of a row under conflict.
+// Key is the key for the row.
+// Selection represents the value that was selected for resolution.
+// Value is the resolved value for the key. This field should be used only
+// if value of Selection field is 'Other'.
+type ResolvedRow struct {
+ Key string
+ Result *Value
+}
+
+// Value contains a specific version of data for the row under conflict along
+// with the write timestamp and hints associated with the version.
+// WriteTs is the write timestamp for this value.
+type Value struct {
+ val []byte
+ WriteTs time.Time
+ selection wire.ValueSelection
+}
+
+// Get takes a reference to an instance of a type that is expected to be
+// represented by Value.
+func (v *Value) Get(value interface{}) error {
+ return vom.Decode(v.val, value)
+}
+
+// NewValue creates a new Value to be added to Resolution.
+func NewValue(ctx *context.T, data interface{}) (*Value, error) {
+ if data == nil {
+ return nil, verror.New(verror.ErrBadArg, ctx, "data cannot be nil")
+ }
+ bytes, err := vom.Encode(data)
+ if err != nil {
+ return nil, err
+ }
+ return &Value{
+ val: bytes,
+ WriteTs: time.Now(), // ignored by syncbase
+ selection: wire.ValueSelectionOther,
+ }, nil
}
diff --git a/v23/syncbase/nosql/schema.go b/v23/syncbase/nosql/schema.go
index 9f52a1d..8dd480d 100644
--- a/v23/syncbase/nosql/schema.go
+++ b/v23/syncbase/nosql/schema.go
@@ -21,14 +21,12 @@
// Implementation of SchemaManager (Not part of public client API)
type schemaManagerImpl struct {
- dbName string
- c wire.DatabaseClientMethods
+ c wire.DatabaseClientMethods
}
-func newSchemaManager(dbName string) schemaManagerImpl {
+func newSchemaManager(client wire.DatabaseClientMethods) schemaManagerImpl {
return schemaManagerImpl{
- dbName: dbName,
- c: wire.DatabaseClient(dbName),
+ c: client,
}
}
diff --git a/v23/syncbase/nosql/schema_test.go b/v23/syncbase/nosql/schema_test.go
index 6d7ed0d..78ebd95 100644
--- a/v23/syncbase/nosql/schema_test.go
+++ b/v23/syncbase/nosql/schema_test.go
@@ -19,10 +19,10 @@
// This test as following steps:
// 1) Call NoSQLDatabase() for a non existent db.
// 2) Create the database, and verify if Schema got stored properly.
-// 3) Call UpgradeIfOutdated() to make sure that the method is no-op and is
+// 3) Call EnforceSchema() to make sure that the method is no-op and is
// able to read the schema from db.
// 4) Call NoSQLDatabase() on the same db to create a new handle with an
-// upgraded schema, call UpgradeIfOutdated() and check if SchemaUpgrader
+// upgraded schema, call EnforceSchema() and check if SchemaUpgrader
// is called and if the new schema is stored appropriately.
func TestSchemaCheck(t *testing.T) {
ctx, sName, cleanup := tu.SetupOrDie(nil)
@@ -35,9 +35,9 @@
// Verify that calling Upgrade on a non existing database does not throw
// errors.
- _, err := db1.UpgradeIfOutdated(ctx)
+ err := db1.EnforceSchema(ctx)
if err != nil {
- t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err)
+ t.Fatalf("db1.EnforceSchema() failed: %v", err)
}
if mockUpgrader.CallCount > 0 {
t.Fatal("Call to upgrader was not expected.")
@@ -53,12 +53,8 @@
}
// Make redundant call to Upgrade to verify that it is a no-op
- result, err1 := db1.UpgradeIfOutdated(ctx)
- if result {
- t.Fatalf("db1.UpgradeIfOutdated() should not return true")
- }
- if err1 != nil {
- t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err1)
+ if err := db1.EnforceSchema(ctx); err != nil {
+ t.Fatalf("db1.EnforceSchema() failed: %v", err)
}
if mockUpgrader.CallCount > 0 {
t.Fatal("Call to upgrader was not expected.")
@@ -73,13 +69,8 @@
}
schema.Metadata.Policy = policy
otherdb1 := a.NoSQLDatabase("db1", schema)
- otherresult, othererr := otherdb1.UpgradeIfOutdated(ctx)
-
- if !otherresult {
- t.Fatalf("otherdb1.UpgradeIfOutdated() expected to return true")
- }
- if othererr != nil {
- t.Fatalf("otherdb1.UpgradeIfOutdated() failed: %v", othererr)
+ if err := otherdb1.EnforceSchema(ctx); err != nil {
+ t.Fatalf("otherdb1.EnforceSchema() failed: %v", err)
}
if mockUpgrader.CallCount != 1 {
t.Fatalf("Unexpected number of calls to upgrader. Expected: %d, Actual: %d.", 1, mockUpgrader.CallCount)
@@ -226,7 +217,7 @@
// Upgrade schema version for underlying db using a different handle
schema2 := tu.DefaultSchema(1)
dbHandle2 := a.NoSQLDatabase("db1", schema2)
- dbHandle2.UpgradeIfOutdated(ctx)
+ dbHandle2.EnforceSchema(ctx)
// Commit batch1, abort batch2, attempt writing a row using batch3.
// Each of these operations should fail.
diff --git a/v23/syncbase/util/util.go b/v23/syncbase/util/util.go
index 6eeaaaa..dda4c6d 100644
--- a/v23/syncbase/util/util.go
+++ b/v23/syncbase/util/util.go
@@ -64,6 +64,12 @@
return string(x)
}
+// IsPrefix returns true if start and limit strings together represent a prefix
+// range. If true, start represents the prefix.
+func IsPrefix(start string, limit string) bool {
+ return PrefixRangeLimit(start) == limit
+}
+
// AccessController provides access control for various syncbase objects.
type AccessController interface {
// SetPermissions replaces the current Permissions for an object.
diff --git a/v23/syncbase/util/util_test.go b/v23/syncbase/util/util_test.go
index 321691a..2d6384a 100644
--- a/v23/syncbase/util/util_test.go
+++ b/v23/syncbase/util/util_test.go
@@ -61,3 +61,42 @@
}
}
}
+
+func TestIsPrefix(t *testing.T) {
+ tests := []struct {
+ isPrefix bool
+ start string
+ limit string
+ }{
+ {true, "", ""},
+ {true, "a", "b"},
+ {true, "aa", "ab"},
+ {true, "\xfe", "\xff"},
+ {true, "a\xfe", "a\xff"},
+ {true, "aa\xfe", "aa\xff"},
+ {true, "a\xff", "b"},
+ {true, "aa\xff", "ab"},
+ {true, "a\xff\xff", "b"},
+ {true, "aa\xff\xff", "ab"},
+ {true, "\xff", ""},
+ {true, "\xff\xff", ""},
+
+ {false, "", "\x00"},
+ {false, "a", "aa"},
+ {false, "aa", "aa"},
+ {false, "\xfe", "\x00"},
+ {false, "a\xfe", "b\xfe"},
+ {false, "aa\xfe", "aa\x00"},
+ {false, "a\xff", "b\x00"},
+ {false, "aa\xff", "ab\x00"},
+ {false, "a\xff\xff", "a\xff\xff\xff"},
+ {false, "aa\xff\xff", "a"},
+ {false, "\xff", "\x00"},
+ }
+ for _, test := range tests {
+ result := util.IsPrefix(test.start, test.limit)
+ if result != test.isPrefix {
+ t.Errorf("%q, %q: got %v, want %v", test.start, test.limit, result, test.isPrefix)
+ }
+ }
+}
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 2818a8c..a9f503f 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -49,6 +49,11 @@
mu sync.Mutex // protects the fields below
sns map[uint64]store.Snapshot
txs map[uint64]store.Transaction
+
+ // Active ConflictResolver connection from the app to this database.
+ // NOTE: For now, we assume there's only one open conflict resolution stream
+ // per database (typically, from the app that owns the database).
+ resolver wire.ConflictManagerStartConflictResolverServerCall
}
// databaseReq is a per-request object that handles Database RPCs.
diff --git a/x/ref/services/syncbase/server/nosql/database_crm.go b/x/ref/services/syncbase/server/nosql/database_crm.go
new file mode 100644
index 0000000..e1135a9
--- /dev/null
+++ b/x/ref/services/syncbase/server/nosql/database_crm.go
@@ -0,0 +1,20 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/v23/context"
+)
+
+////////////////////////////////////////
+// ConflictManager RPC methods
+
+func (d *databaseReq) StartConflictResolver(ctx *context.T, call wire.ConflictManagerStartConflictResolverServerCall) error {
+ // Store the conflict resolver connection in the per-app, per-database
+ // singleton so that sync can access it.
+ d.database.resolver = call
+ return nil
+}