Add Conflict resolution API
1) Client side API for receiving Conflicts
2) Model objects for Conflict and Resolution data
3) RPC API to register conflict resolver with syncbase service
4) Example conflict resolver along with tests to verify it

Change-Id: I671bcadd30b12c0388664ff7c8d315079eae7ddb
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index ebeccf5..16182de 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -73,6 +73,10 @@
 	// SchemaManager implements the API for managing schema metadata attached
 	// to a Database.
 	SchemaManager
+
+	// ConflictManager implements the API for registering resolvers, receiving
+	// conflicts and sending resolutions.
+	ConflictManager
 }
 
 // Table represents a collection of Rows.
@@ -234,6 +238,30 @@
 	SetSchemaMetadata(metadata SchemaMetadata) error {access.Write}
 }
 
+// ConflictManager interface provides all the methods necessary to handle
+// conflict resolution for a given database.
+type ConflictManager interface {
+	// StartConflictResolver registers a resolver for the database that is
+	// associated with this ConflictManager and creates a stream to receive
+	// conflicts and send resolutions.
+	// Batches of ConflictInfos will be sent over with the Continued field
+	// within the ConflictInfo representing the batch boundary. Client must
+	// respond with a batch of ResolutionInfos in the same fashion.
+	// A key is under conflict if two different values were written to it
+	// concurrently (in logical time), i.e. neither value is an ancestor of the
+	// other in the history graph.
+	// A key under conflict can be a part of a batch committed on local or
+	// remote or both syncbases. ConflictInfos for all keys in these two batches
+	// are grouped together. These keys may themselves be under conflict; the
+	// presented batch is a transitive closure of all batches containing keys
+	// under conflict.
+	// For example, for local batch {key1, key2} and remote batch {key1, key3},
+	// the batch sent for conflict resolution will be {key1, key2, key3}.
+	// If there was another concurrent batch {key2, key4}, then the batch sent
+	// for conflict resolution will be {key1, key2, key3, key4}.
+	StartConflictResolver() stream<ResolutionInfo, ConflictInfo> error {access.Write}
+}
+
 // BlobManager is the interface for blob operations.
 type BlobManager interface {
 	// API for resumable blob creation (append-only). After commit, a blob
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 2d3d4ba..a908cfa 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -1452,6 +1452,356 @@
 	},
 }
 
+// ConflictManagerClientMethods is the client interface
+// containing ConflictManager methods.
+//
+// ConflictManager interface provides all the methods necessary to handle
+// conflict resolution for a given database.
+type ConflictManagerClientMethods interface {
+	// StartConflictResolver registers a resolver for the database that is
+	// associated with this ConflictManager and creates a stream to receive
+	// conflicts and send resolutions.
+	// Batches of ConflictInfos will be sent over with the Continued field
+	// within the ConflictInfo representing the batch boundary. Client must
+	// respond with a batch of ResolutionInfos in the same fashion.
+	// A key is under conflict if two different values were written to it
+	// concurrently (in logical time), i.e. neither value is an ancestor of the
+	// other in the history graph.
+	// A key under conflict can be a part of a batch committed on local or
+	// remote or both syncbases. ConflictInfos for all keys in these two batches
+	// are grouped together. These keys may themselves be under conflict; the
+	// presented batch is a transitive closure of all batches containing keys
+	// under conflict.
+	// For example, for local batch {key1, key2} and remote batch {key1, key3},
+	// the batch sent for conflict resolution will be {key1, key2, key3}.
+	// If there was another concurrent batch {key2, key4}, then the batch sent
+	// for conflict resolution will be {key1, key2, key3, key4}.
+	StartConflictResolver(*context.T, ...rpc.CallOpt) (ConflictManagerStartConflictResolverClientCall, error)
+}
+
+// ConflictManagerClientStub adds universal methods to ConflictManagerClientMethods.
+type ConflictManagerClientStub interface {
+	ConflictManagerClientMethods
+	rpc.UniversalServiceMethods
+}
+
+// ConflictManagerClient returns a client stub for ConflictManager.
+func ConflictManagerClient(name string) ConflictManagerClientStub {
+	return implConflictManagerClientStub{name}
+}
+
+type implConflictManagerClientStub struct {
+	name string
+}
+
+func (c implConflictManagerClientStub) StartConflictResolver(ctx *context.T, opts ...rpc.CallOpt) (ocall ConflictManagerStartConflictResolverClientCall, err error) {
+	var call rpc.ClientCall
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "StartConflictResolver", nil, opts...); err != nil {
+		return
+	}
+	ocall = &implConflictManagerStartConflictResolverClientCall{ClientCall: call}
+	return
+}
+
+// ConflictManagerStartConflictResolverClientStream is the client stream for ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverClientStream interface {
+	// RecvStream returns the receiver side of the ConflictManager.StartConflictResolver client stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() ConflictInfo
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+	// SendStream returns the send side of the ConflictManager.StartConflictResolver client stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors
+		// encountered while sending, or if Send is called after Close or
+		// the stream has been canceled.  Blocks if there is no buffer
+		// space; will unblock when buffer space is available or after
+		// the stream has been canceled.
+		Send(item ResolutionInfo) error
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.
+		// This is an optional call - e.g. a client might call Close if it
+		// needs to continue receiving items from the server after it's
+		// done sending.  Returns errors encountered while closing, or if
+		// Close is called after the stream has been canceled.  Like Send,
+		// blocks if there is no buffer space available.
+		Close() error
+	}
+}
+
+// ConflictManagerStartConflictResolverClientCall represents the call returned from ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverClientCall interface {
+	ConflictManagerStartConflictResolverClientStream
+	// Finish performs the equivalent of SendStream().Close, then blocks until
+	// the server is done, and returns the positional return values for the call.
+	//
+	// Finish returns immediately if the call has been canceled; depending on the
+	// timing the output could either be an error signaling cancelation, or the
+	// valid positional return values from the server.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
+	// be called at most once.
+	Finish() error
+}
+
+type implConflictManagerStartConflictResolverClientCall struct {
+	rpc.ClientCall
+	valRecv ConflictInfo
+	errRecv error
+}
+
+func (c *implConflictManagerStartConflictResolverClientCall) RecvStream() interface {
+	Advance() bool
+	Value() ConflictInfo
+	Err() error
+} {
+	return implConflictManagerStartConflictResolverClientCallRecv{c}
+}
+
+type implConflictManagerStartConflictResolverClientCallRecv struct {
+	c *implConflictManagerStartConflictResolverClientCall
+}
+
+func (c implConflictManagerStartConflictResolverClientCallRecv) Advance() bool {
+	c.c.valRecv = ConflictInfo{}
+	c.c.errRecv = c.c.Recv(&c.c.valRecv)
+	return c.c.errRecv == nil
+}
+func (c implConflictManagerStartConflictResolverClientCallRecv) Value() ConflictInfo {
+	return c.c.valRecv
+}
+func (c implConflictManagerStartConflictResolverClientCallRecv) Err() error {
+	if c.c.errRecv == io.EOF {
+		return nil
+	}
+	return c.c.errRecv
+}
+func (c *implConflictManagerStartConflictResolverClientCall) SendStream() interface {
+	Send(item ResolutionInfo) error
+	Close() error
+} {
+	return implConflictManagerStartConflictResolverClientCallSend{c}
+}
+
+type implConflictManagerStartConflictResolverClientCallSend struct {
+	c *implConflictManagerStartConflictResolverClientCall
+}
+
+func (c implConflictManagerStartConflictResolverClientCallSend) Send(item ResolutionInfo) error {
+	return c.c.Send(item)
+}
+func (c implConflictManagerStartConflictResolverClientCallSend) Close() error {
+	return c.c.CloseSend()
+}
+func (c *implConflictManagerStartConflictResolverClientCall) Finish() (err error) {
+	err = c.ClientCall.Finish()
+	return
+}
+
+// ConflictManagerServerMethods is the interface a server writer
+// implements for ConflictManager.
+//
+// ConflictManager interface provides all the methods necessary to handle
+// conflict resolution for a given database.
+type ConflictManagerServerMethods interface {
+	// StartConflictResolver registers a resolver for the database that is
+	// associated with this ConflictManager and creates a stream to receive
+	// conflicts and send resolutions.
+	// Batches of ConflictInfos will be sent over with the Continued field
+	// within the ConflictInfo representing the batch boundary. Client must
+	// respond with a batch of ResolutionInfos in the same fashion.
+	// A key is under conflict if two different values were written to it
+	// concurrently (in logical time), i.e. neither value is an ancestor of the
+	// other in the history graph.
+	// A key under conflict can be a part of a batch committed on local or
+	// remote or both syncbases. ConflictInfos for all keys in these two batches
+	// are grouped together. These keys may themselves be under conflict; the
+	// presented batch is a transitive closure of all batches containing keys
+	// under conflict.
+	// For example, for local batch {key1, key2} and remote batch {key1, key3},
+	// the batch sent for conflict resolution will be {key1, key2, key3}.
+	// If there was another concurrent batch {key2, key4}, then the batch sent
+	// for conflict resolution will be {key1, key2, key3, key4}.
+	StartConflictResolver(*context.T, ConflictManagerStartConflictResolverServerCall) error
+}
+
+// ConflictManagerServerStubMethods is the server interface containing
+// ConflictManager methods, as expected by rpc.Server.
+// The only difference between this interface and ConflictManagerServerMethods
+// is the streaming methods.
+type ConflictManagerServerStubMethods interface {
+	// StartConflictResolver registers a resolver for the database that is
+	// associated with this ConflictManager and creates a stream to receive
+	// conflicts and send resolutions.
+	// Batches of ConflictInfos will be sent over with the Continued field
+	// within the ConflictInfo representing the batch boundary. Client must
+	// respond with a batch of ResolutionInfos in the same fashion.
+	// A key is under conflict if two different values were written to it
+	// concurrently (in logical time), i.e. neither value is an ancestor of the
+	// other in the history graph.
+	// A key under conflict can be a part of a batch committed on local or
+	// remote or both syncbases. ConflictInfos for all keys in these two batches
+	// are grouped together. These keys may themselves be under conflict; the
+	// presented batch is a transitive closure of all batches containing keys
+	// under conflict.
+	// For example, for local batch {key1, key2} and remote batch {key1, key3},
+	// the batch sent for conflict resolution will be {key1, key2, key3}.
+	// If there was another concurrent batch {key2, key4}, then the batch sent
+	// for conflict resolution will be {key1, key2, key3, key4}.
+	StartConflictResolver(*context.T, *ConflictManagerStartConflictResolverServerCallStub) error
+}
+
+// ConflictManagerServerStub adds universal methods to ConflictManagerServerStubMethods.
+type ConflictManagerServerStub interface {
+	ConflictManagerServerStubMethods
+	// Describe the ConflictManager interfaces.
+	Describe__() []rpc.InterfaceDesc
+}
+
+// ConflictManagerServer returns a server stub for ConflictManager.
+// It converts an implementation of ConflictManagerServerMethods into
+// an object that may be used by rpc.Server.
+func ConflictManagerServer(impl ConflictManagerServerMethods) ConflictManagerServerStub {
+	stub := implConflictManagerServerStub{
+		impl: impl,
+	}
+	// Initialize GlobState; always check the stub itself first, to handle the
+	// case where the user has the Glob method defined in their VDL source.
+	if gs := rpc.NewGlobState(stub); gs != nil {
+		stub.gs = gs
+	} else if gs := rpc.NewGlobState(impl); gs != nil {
+		stub.gs = gs
+	}
+	return stub
+}
+
+type implConflictManagerServerStub struct {
+	impl ConflictManagerServerMethods
+	gs   *rpc.GlobState
+}
+
+func (s implConflictManagerServerStub) StartConflictResolver(ctx *context.T, call *ConflictManagerStartConflictResolverServerCallStub) error {
+	return s.impl.StartConflictResolver(ctx, call)
+}
+
+func (s implConflictManagerServerStub) Globber() *rpc.GlobState {
+	return s.gs
+}
+
+func (s implConflictManagerServerStub) Describe__() []rpc.InterfaceDesc {
+	return []rpc.InterfaceDesc{ConflictManagerDesc}
+}
+
+// ConflictManagerDesc describes the ConflictManager interface.
+var ConflictManagerDesc rpc.InterfaceDesc = descConflictManager
+
+// descConflictManager hides the desc to keep godoc clean.
+var descConflictManager = rpc.InterfaceDesc{
+	Name:    "ConflictManager",
+	PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
+	Doc:     "// ConflictManager interface provides all the methods necessary to handle\n// conflict resolution for a given database.",
+	Methods: []rpc.MethodDesc{
+		{
+			Name: "StartConflictResolver",
+			Doc:  "// StartConflictResolver registers a resolver for the database that is\n// associated with this ConflictManager and creates a stream to receive\n// conflicts and send resolutions.\n// Batches of ConflictInfos will be sent over with the Continued field\n// within the ConflictInfo representing the batch boundary. Client must\n// respond with a batch of ResolutionInfos in the same fashion.\n// A key is under conflict if two different values were written to it\n// concurrently (in logical time), i.e. neither value is an ancestor of the\n// other in the history graph.\n// A key under conflict can be a part of a batch committed on local or\n// remote or both syncbases. ConflictInfos for all keys in these two batches\n// are grouped together. These keys may themselves be under conflict; the\n// presented batch is a transitive closure of all batches containing keys\n// under conflict.\n// For example, for local batch {key1, key2} and remote batch {key1, key3},\n// the batch sent for conflict resolution will be {key1, key2, key3}.\n// If there was another concurrent batch {key2, key4}, then the batch sent\n// for conflict resolution will be {key1, key2, key3, key4}.",
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+		},
+	},
+}
+
+// ConflictManagerStartConflictResolverServerStream is the server stream for ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverServerStream interface {
+	// RecvStream returns the receiver side of the ConflictManager.StartConflictResolver server stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() ResolutionInfo
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+	// SendStream returns the send side of the ConflictManager.StartConflictResolver server stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors encountered
+		// while sending.  Blocks if there is no buffer space; will unblock when
+		// buffer space is available.
+		Send(item ConflictInfo) error
+	}
+}
+
+// ConflictManagerStartConflictResolverServerCall represents the context passed to ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverServerCall interface {
+	rpc.ServerCall
+	ConflictManagerStartConflictResolverServerStream
+}
+
+// ConflictManagerStartConflictResolverServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements ConflictManagerStartConflictResolverServerCall.
+type ConflictManagerStartConflictResolverServerCallStub struct {
+	rpc.StreamServerCall
+	valRecv ResolutionInfo
+	errRecv error
+}
+
+// Init initializes ConflictManagerStartConflictResolverServerCallStub from rpc.StreamServerCall.
+func (s *ConflictManagerStartConflictResolverServerCallStub) Init(call rpc.StreamServerCall) {
+	s.StreamServerCall = call
+}
+
+// RecvStream returns the receiver side of the ConflictManager.StartConflictResolver server stream.
+func (s *ConflictManagerStartConflictResolverServerCallStub) RecvStream() interface {
+	Advance() bool
+	Value() ResolutionInfo
+	Err() error
+} {
+	return implConflictManagerStartConflictResolverServerCallRecv{s}
+}
+
+type implConflictManagerStartConflictResolverServerCallRecv struct {
+	s *ConflictManagerStartConflictResolverServerCallStub
+}
+
+func (s implConflictManagerStartConflictResolverServerCallRecv) Advance() bool {
+	s.s.valRecv = ResolutionInfo{}
+	s.s.errRecv = s.s.Recv(&s.s.valRecv)
+	return s.s.errRecv == nil
+}
+func (s implConflictManagerStartConflictResolverServerCallRecv) Value() ResolutionInfo {
+	return s.s.valRecv
+}
+func (s implConflictManagerStartConflictResolverServerCallRecv) Err() error {
+	if s.s.errRecv == io.EOF {
+		return nil
+	}
+	return s.s.errRecv
+}
+
+// SendStream returns the send side of the ConflictManager.StartConflictResolver server stream.
+func (s *ConflictManagerStartConflictResolverServerCallStub) SendStream() interface {
+	Send(item ConflictInfo) error
+} {
+	return implConflictManagerStartConflictResolverServerCallSend{s}
+}
+
+type implConflictManagerStartConflictResolverServerCallSend struct {
+	s *ConflictManagerStartConflictResolverServerCallStub
+}
+
+func (s implConflictManagerStartConflictResolverServerCallSend) Send(item ConflictInfo) error {
+	return s.s.Send(item)
+}
+
 // DatabaseClientMethods is the client interface
 // containing Database methods.
 //
@@ -1539,6 +1889,9 @@
 	// SchemaManager implements the API for managing schema metadata attached
 	// to a Database.
 	SchemaManagerClientMethods
+	// ConflictManager interface provides all the methods necessary to handle
+	// conflict resolution for a given database.
+	ConflictManagerClientMethods
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
@@ -1579,7 +1932,7 @@
 
 // DatabaseClient returns a client stub for Database.
 func DatabaseClient(name string) DatabaseClientStub {
-	return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name), SchemaManagerClient(name)}
+	return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name), SchemaManagerClient(name), ConflictManagerClient(name)}
 }
 
 type implDatabaseClientStub struct {
@@ -1590,6 +1943,7 @@
 	SyncGroupManagerClientStub
 	BlobManagerClientStub
 	SchemaManagerClientStub
+	ConflictManagerClientStub
 }
 
 func (c implDatabaseClientStub) Create(ctx *context.T, i0 *SchemaMetadata, i1 access.Permissions, opts ...rpc.CallOpt) (err error) {
@@ -1786,6 +2140,9 @@
 	// SchemaManager implements the API for managing schema metadata attached
 	// to a Database.
 	SchemaManagerServerMethods
+	// ConflictManager interface provides all the methods necessary to handle
+	// conflict resolution for a given database.
+	ConflictManagerServerMethods
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
@@ -1899,6 +2256,9 @@
 	// SchemaManager implements the API for managing schema metadata attached
 	// to a Database.
 	SchemaManagerServerStubMethods
+	// ConflictManager interface provides all the methods necessary to handle
+	// conflict resolution for a given database.
+	ConflictManagerServerStubMethods
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
@@ -1949,6 +2309,7 @@
 		SyncGroupManagerServerStub: SyncGroupManagerServer(impl),
 		BlobManagerServerStub:      BlobManagerServer(impl),
 		SchemaManagerServerStub:    SchemaManagerServer(impl),
+		ConflictManagerServerStub:  ConflictManagerServer(impl),
 	}
 	// Initialize GlobState; always check the stub itself first, to handle the
 	// case where the user has the Glob method defined in their VDL source.
@@ -1967,6 +2328,7 @@
 	SyncGroupManagerServerStub
 	BlobManagerServerStub
 	SchemaManagerServerStub
+	ConflictManagerServerStub
 	gs *rpc.GlobState
 }
 
@@ -2003,7 +2365,7 @@
 }
 
 func (s implDatabaseServerStub) Describe__() []rpc.InterfaceDesc {
-	return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, watch.GlobWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc}
+	return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, watch.GlobWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc, ConflictManagerDesc}
 }
 
 // DatabaseDesc describes the Database interface.
@@ -2020,6 +2382,7 @@
 		{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
 		{"BlobManager", "v.io/syncbase/v23/services/syncbase/nosql", "// BlobManager is the interface for blob operations."},
 		{"SchemaManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SchemaManager implements the API for managing schema metadata attached\n// to a Database."},
+		{"ConflictManager", "v.io/syncbase/v23/services/syncbase/nosql", "// ConflictManager interface provides all the methods necessary to handle\n// conflict resolution for a given database."},
 	},
 	Methods: []rpc.MethodDesc{
 		{
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index eae8e9d..407e956 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -111,6 +111,158 @@
 	Defer
 }
 
+// ConflictInfo contains information to fully specify a conflict
+// for a key, providing the (local, remote, ancestor) tuple.
+// A key under conflict can be a part of a batch in local, remote or both
+// updates. Since the batches can have more than one key, all ConflictInfos
+// for the keys within the batches are grouped together into a single conflict
+// batch and sent as a stream with the Continued field representing conflict
+// batch boundaries.
+type ConflictInfo struct {
+	// Data is a unit chunk of ConflictInfo which can be sent over the conflict
+	// stream.
+	Data ConflictData
+
+	// Continued represents whether the batch of ConflictInfos has ended.
+	Continued bool
+}
+
+// ConflictData represents a unit of conflict data sent over the stream. It
+// can either contain information about a Batch or about an operation done
+// on a row.
+type ConflictData union {
+	Batch BatchInfo
+	Row RowInfo
+}
+
+type BatchInfo struct {
+	// Id is an identifier for a batch contained in a conflict. It is
+	// unique only in the context of a given conflict. Its purpose is solely to
+	// group one or more RowInfo objects together to represent a batch that
+	// was committed by the client.
+	Id uint16
+
+	// Hint is the hint provided by the client when this batch was committed.
+	Hint string
+
+	// Source states where the batch comes from.
+	Source BatchSource
+}
+
+// BatchSource represents where the batch was committed.
+type BatchSource enum {
+	// Local represents that the batch was committed on local syncbase.
+	Local
+
+	// Remote represents that the batch was committed on remote syncbase.
+	Remote
+}
+
+// RowInfo contains a single operation performed on a row (in case of read or
+// write) or a range or rows (in case of scan) along with a mapping to each
+// of the batches that this operation belongs to.
+// For example, if Row1 was updated on local syncbase conflicting with a write
+// on remote syncbase as part of two separate batches, then it will be
+// represented by a single RowInfo with Write Operation containing the
+// respective local and remote values along with the batch id for both batches
+// stored in the BatchIds field.
+type RowInfo struct {
+	// Op is a specific operation represented by RowInfo
+	Op Operation
+	// BatchIds contains ids of all batches that this RowInfo is a part of.
+	BatchIds []uint16
+}
+
+// Operation represents a specific operation on a row or a set of rows that is
+// a part of the conflict.
+type Operation union {
+	// Read represents a read operation performed on a specific row. For a given
+	// row key there can only be at max one Read operation within a conflict.
+	Read  RowOp
+
+	// Write represents a write operation performed on a specific row. For a
+	// given row key there can only be at max one Write operation within a
+	// conflict.
+	Write RowOp
+
+	// Scan represents a scan operation performed over a specific range of keys.
+	// For a given key range there can be at max one ScanOp within the Conflict.
+	Scan  ScanOp
+}
+
+// RowOp represents a read or write operation on a row corresponding to the
+// given key.
+type RowOp struct {
+	// The key under conflict.
+	Key string
+
+	// LocalValue contains the value read or written by local syncbase or nil.
+	LocalValue ?Value
+
+	// RemoteValue contains the value read or written by remote syncbase or nil.
+	RemoteValue ?Value
+
+	// AncestorValue contains the value for the key which is the lowest common
+	// ancestor of the two values represented by LocalValue and RemoteValue or
+	// nil if no ancestor exists or if the operation was read.
+	AncestorValue ?Value
+}
+
+// ScanOp provides details of a scan operation.
+type ScanOp struct {
+	// Start contains the starting key for a range scan.
+	Start string
+
+	// Limit contains the end key for a range scan.
+	Limit string
+}
+
+// Value contains the encoded bytes for a row's value stored in syncbase.
+type Value struct {
+	// VOM encoded bytes for a row's value
+	Bytes []byte
+
+	// Write timestamp for this value in nanoseconds.
+	// TODO(jlodhia): change the timestamp to vdl.time here, in commit timestamp
+	// and clock data.
+	WriteTs int64
+}
+
+// ValueSelection represents the value that was selected as the final resolution
+// for a conflict.
+type ValueSelection enum {
+	// Local should be used if local value is picked as the final result.
+	Local
+
+	// Remote should be used if remote value is picked as the final result.
+	Remote
+
+	// Other should be used if a new value different from local and remote is
+	// used as the final result.
+	Other
+}
+
+// ResolutionInfo contains the application’s reply to a conflict for a key,
+// providing the resolution value. The resolution may be over a group of keys
+// in which case the application must send a stream of ResolutionInfos with
+// the Continued field for the last ResolutionInfo representing the end of the
+// batch with a value false. ResolutionInfos sent as part of a batch will be
+// committed as a batch. If the commit fails, the Conflict will be re-sent.
+type ResolutionInfo struct {
+	// Key is the key under conflict.
+	Key	string
+
+	// Selection represents the value that was selected as resolution.
+	Selection ValueSelection
+
+	// Result is the resolved value for the key. This field should be used only
+	// if value of Selection field is 'Other'
+	Result ?Value
+
+	// Continued represents whether the batch of ResolutionInfos has ended.
+	Continued bool
+}
+
 // SchemaMetadata maintains metadata related to the schema of a given database.
 // There is one SchemaMetadata per database.
 type SchemaMetadata struct {
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index adb4e66..6a8e0c1 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -166,6 +166,336 @@
 }) {
 }
 
+// ConflictInfo contains information to fully specify a conflict
+// for a key, providing the (local, remote, ancestor) tuple.
+// A key under conflict can be a part of a batch in local, remote or both
+// updates. Since the batches can have more than one key, all ConflictInfos
+// for the keys within the batches are grouped together into a single conflict
+// batch and sent as a stream with the Continued field representing conflict
+// batch boundaries.
+type ConflictInfo struct {
+	// Data is a unit chunk of ConflictInfo which can be sent over the conflict
+	// stream.
+	Data ConflictData
+	// Continued represents whether the batch of ConflictInfos has ended.
+	Continued bool
+}
+
+func (ConflictInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ConflictInfo"`
+}) {
+}
+
+type (
+	// ConflictData represents any single field of the ConflictData union type.
+	//
+	// ConflictData represents a unit of conflict data sent over the stream. It
+	// can either contain information about a Batch or about an operation done
+	// on a row.
+	ConflictData interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the ConflictData union type.
+		__VDLReflect(__ConflictDataReflect)
+	}
+	// ConflictDataBatch represents field Batch of the ConflictData union type.
+	ConflictDataBatch struct{ Value BatchInfo }
+	// ConflictDataRow represents field Row of the ConflictData union type.
+	ConflictDataRow struct{ Value RowInfo }
+	// __ConflictDataReflect describes the ConflictData union type.
+	__ConflictDataReflect struct {
+		Name  string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ConflictData"`
+		Type  ConflictData
+		Union struct {
+			Batch ConflictDataBatch
+			Row   ConflictDataRow
+		}
+	}
+)
+
+func (x ConflictDataBatch) Index() int                         { return 0 }
+func (x ConflictDataBatch) Interface() interface{}             { return x.Value }
+func (x ConflictDataBatch) Name() string                       { return "Batch" }
+func (x ConflictDataBatch) __VDLReflect(__ConflictDataReflect) {}
+
+func (x ConflictDataRow) Index() int                         { return 1 }
+func (x ConflictDataRow) Interface() interface{}             { return x.Value }
+func (x ConflictDataRow) Name() string                       { return "Row" }
+func (x ConflictDataRow) __VDLReflect(__ConflictDataReflect) {}
+
+type BatchInfo struct {
+	// Id is an identifier for a batch contained in a conflict. It is
+	// unique only in the context of a given conflict. Its purpose is solely to
+	// group one or more RowInfo objects together to represent a batch that
+	// was committed by the client.
+	Id uint16
+	// Hint is the hint provided by the client when this batch was committed.
+	Hint string
+	// Source states where the batch comes from.
+	Source BatchSource
+}
+
+func (BatchInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.BatchInfo"`
+}) {
+}
+
+// BatchSource represents where the batch was committed.
+type BatchSource int
+
+const (
+	BatchSourceLocal BatchSource = iota
+	BatchSourceRemote
+)
+
+// BatchSourceAll holds all labels for BatchSource.
+var BatchSourceAll = [...]BatchSource{BatchSourceLocal, BatchSourceRemote}
+
+// BatchSourceFromString creates a BatchSource from a string label.
+func BatchSourceFromString(label string) (x BatchSource, err error) {
+	err = x.Set(label)
+	return
+}
+
+// Set assigns label to x.
+func (x *BatchSource) Set(label string) error {
+	switch label {
+	case "Local", "local":
+		*x = BatchSourceLocal
+		return nil
+	case "Remote", "remote":
+		*x = BatchSourceRemote
+		return nil
+	}
+	*x = -1
+	return fmt.Errorf("unknown label %q in nosql.BatchSource", label)
+}
+
+// String returns the string label of x.
+func (x BatchSource) String() string {
+	switch x {
+	case BatchSourceLocal:
+		return "Local"
+	case BatchSourceRemote:
+		return "Remote"
+	}
+	return ""
+}
+
+func (BatchSource) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.BatchSource"`
+	Enum struct{ Local, Remote string }
+}) {
+}
+
+// RowInfo contains a single operation performed on a row (in case of read or
+// write) or a range or rows (in case of scan) along with a mapping to each
+// of the batches that this operation belongs to.
+// For example, if Row1 was updated on local syncbase conflicting with a write
+// on remote syncbase as part of two separate batches, then it will be
+// represented by a single RowInfo with Write Operation containing the
+// respective local and remote values along with the batch id for both batches
+// stored in the BatchIds field.
+type RowInfo struct {
+	// Op is a specific operation represented by RowInfo
+	Op Operation
+	// BatchIds contains ids of all batches that this RowInfo is a part of.
+	BatchIds []uint16
+}
+
+func (RowInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.RowInfo"`
+}) {
+}
+
+type (
+	// Operation represents any single field of the Operation union type.
+	//
+	// Operation represents a specific operation on a row or a set of rows that is
+	// a part of the conflict.
+	Operation interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the Operation union type.
+		__VDLReflect(__OperationReflect)
+	}
+	// OperationRead represents field Read of the Operation union type.
+	//
+	// Read represents a read operation performed on a specific row. For a given
+	// row key there can only be at max one Read operation within a conflict.
+	OperationRead struct{ Value RowOp }
+	// OperationWrite represents field Write of the Operation union type.
+	//
+	// Write represents a write operation performed on a specific row. For a
+	// given row key there can only be at max one Write operation within a
+	// conflict.
+	OperationWrite struct{ Value RowOp }
+	// OperationScan represents field Scan of the Operation union type.
+	//
+	// Scan represents a scan operation performed over a specific range of keys.
+	// For a given key range there can be at max one ScanOp within the Conflict.
+	OperationScan struct{ Value ScanOp }
+	// __OperationReflect describes the Operation union type.
+	__OperationReflect struct {
+		Name  string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.Operation"`
+		Type  Operation
+		Union struct {
+			Read  OperationRead
+			Write OperationWrite
+			Scan  OperationScan
+		}
+	}
+)
+
+func (x OperationRead) Index() int                      { return 0 }
+func (x OperationRead) Interface() interface{}          { return x.Value }
+func (x OperationRead) Name() string                    { return "Read" }
+func (x OperationRead) __VDLReflect(__OperationReflect) {}
+
+func (x OperationWrite) Index() int                      { return 1 }
+func (x OperationWrite) Interface() interface{}          { return x.Value }
+func (x OperationWrite) Name() string                    { return "Write" }
+func (x OperationWrite) __VDLReflect(__OperationReflect) {}
+
+func (x OperationScan) Index() int                      { return 2 }
+func (x OperationScan) Interface() interface{}          { return x.Value }
+func (x OperationScan) Name() string                    { return "Scan" }
+func (x OperationScan) __VDLReflect(__OperationReflect) {}
+
+// RowOp represents a read or write operation on a row corresponding to the
+// given key.
+type RowOp struct {
+	// The key under conflict.
+	Key string
+	// LocalValue contains the value read or written by local syncbase or nil.
+	LocalValue *Value
+	// RemoteValue contains the value read or written by remote syncbase or nil.
+	RemoteValue *Value
+	// AncestorValue contains the value for the key which is the lowest common
+	// ancestor of the two values represented by LocalValue and RemoteValue or
+	// nil if no ancestor exists or if the operation was read.
+	AncestorValue *Value
+}
+
+func (RowOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.RowOp"`
+}) {
+}
+
+// ScanOp provides details of a scan operation.
+type ScanOp struct {
+	// Start contains the starting key for a range scan.
+	Start string
+	// Limit contains the end key for a range scan.
+	Limit string
+}
+
+func (ScanOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ScanOp"`
+}) {
+}
+
+// Value contains the encoded bytes for a row's value stored in syncbase.
+type Value struct {
+	// VOM encoded bytes for a row's value
+	Bytes []byte
+	// Write timestamp for this value in nanoseconds.
+	// TODO(jlodhia): change the timestamp to vdl.time here, in commit timestamp
+	// and clock data.
+	WriteTs int64
+}
+
+func (Value) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.Value"`
+}) {
+}
+
+// ValueSelection represents the value that was selected as the final resolution
+// for a conflict.
+type ValueSelection int
+
+const (
+	ValueSelectionLocal ValueSelection = iota
+	ValueSelectionRemote
+	ValueSelectionOther
+)
+
+// ValueSelectionAll holds all labels for ValueSelection.
+var ValueSelectionAll = [...]ValueSelection{ValueSelectionLocal, ValueSelectionRemote, ValueSelectionOther}
+
+// ValueSelectionFromString creates a ValueSelection from a string label.
+func ValueSelectionFromString(label string) (x ValueSelection, err error) {
+	err = x.Set(label)
+	return
+}
+
+// Set assigns label to x.
+func (x *ValueSelection) Set(label string) error {
+	switch label {
+	case "Local", "local":
+		*x = ValueSelectionLocal
+		return nil
+	case "Remote", "remote":
+		*x = ValueSelectionRemote
+		return nil
+	case "Other", "other":
+		*x = ValueSelectionOther
+		return nil
+	}
+	*x = -1
+	return fmt.Errorf("unknown label %q in nosql.ValueSelection", label)
+}
+
+// String returns the string label of x.
+func (x ValueSelection) String() string {
+	switch x {
+	case ValueSelectionLocal:
+		return "Local"
+	case ValueSelectionRemote:
+		return "Remote"
+	case ValueSelectionOther:
+		return "Other"
+	}
+	return ""
+}
+
+func (ValueSelection) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ValueSelection"`
+	Enum struct{ Local, Remote, Other string }
+}) {
+}
+
+// ResolutionInfo contains the application’s reply to a conflict for a key,
+// providing the resolution value. The resolution may be over a group of keys
+// in which case the application must send a stream of ResolutionInfos with
+// the Continued field for the last ResolutionInfo representing the end of the
+// batch with a value false. ResolutionInfos sent as part of a batch will be
+// committed as a batch. If the commit fails, the Conflict will be re-sent.
+type ResolutionInfo struct {
+	// Key is the key under conflict.
+	Key string
+	// Selection represents the value that was selected as resolution.
+	Selection ValueSelection
+	// Result is the resolved value for the key. This field should be used only
+	// if value of Selection field is 'Other'
+	Result *Value
+	// Continued represents whether the batch of ResolutionInfos has ended.
+	Continued bool
+}
+
+func (ResolutionInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ResolutionInfo"`
+}) {
+}
+
 // SchemaMetadata maintains metadata related to the schema of a given database.
 // There is one SchemaMetadata per database.
 type SchemaMetadata struct {
@@ -319,6 +649,17 @@
 	vdl.Register((*SyncGroupSpec)(nil))
 	vdl.Register((*SyncGroupMemberInfo)(nil))
 	vdl.Register((*ResolverType)(nil))
+	vdl.Register((*ConflictInfo)(nil))
+	vdl.Register((*ConflictData)(nil))
+	vdl.Register((*BatchInfo)(nil))
+	vdl.Register((*BatchSource)(nil))
+	vdl.Register((*RowInfo)(nil))
+	vdl.Register((*Operation)(nil))
+	vdl.Register((*RowOp)(nil))
+	vdl.Register((*ScanOp)(nil))
+	vdl.Register((*Value)(nil))
+	vdl.Register((*ValueSelection)(nil))
+	vdl.Register((*ResolutionInfo)(nil))
 	vdl.Register((*SchemaMetadata)(nil))
 	vdl.Register((*CrPolicy)(nil))
 	vdl.Register((*CrRule)(nil))
diff --git a/v23/syncbase/nosql/cr_connection_test.go b/v23/syncbase/nosql/cr_connection_test.go
new file mode 100644
index 0000000..b99c15d
--- /dev/null
+++ b/v23/syncbase/nosql/cr_connection_test.go
@@ -0,0 +1,138 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/v23/syncbase/nosql/crtestutil"
+	"v.io/v23/context"
+)
+
+type resolverImpl1 struct {
+}
+
+func (ri *resolverImpl1) OnConflict(ctx *context.T, conflict *Conflict) (r Resolution) {
+	return
+}
+
+// TestCrConnectionClose tests if db.Close() correctly shuts down the goroutine
+// that runs conflict resolution code in an infinite loop.
+// Setup:
+//   - A mutex lock is used to cause Advance() on the receive stream to block.
+//     This lock is held by the test before starting off the CR thread.
+//   - ctx.Cancel() is simulated by releasing the mutex lock. This is done right
+//     after calling db.Close()
+//   - The mutex is held again by the test to cause the CR thread to block on
+//     Advance() in case the CR thead did not shutdown. This will lead to
+//     failure of the test.
+// Expected result:
+//   - The CR thread should not get/remain blocked on Advance() after db.Close()
+func TestCrConnectionClose(t *testing.T) {
+	db := NewDatabase("parentName", "db1", getSchema(&resolverImpl1{}))
+	advance := func(st *crtestutil.State) bool {
+		fmt.Println("Advance() for ConflictStreamImpl called")
+		st.IsBlocked = true
+		st.Mu.Lock()
+		defer st.Mu.Unlock()
+		fmt.Println("Advance() for ConflictStreamImpl returning")
+		st.IsBlocked = false
+		return false
+	}
+
+	st := &crtestutil.State{}
+	crStream := &crtestutil.CrStreamImpl{
+		C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
+		R: &crtestutil.ResolutionStreamImpl{St: st},
+	}
+	db.c = crtestutil.MockDbClient(db.c, crStream)
+	db.crState.reconnectWaitTime = 10 * time.Millisecond
+
+	ctx, _ := context.RootContext()
+	st.Mu.Lock()                                // causes Advance() to block
+	db.EnforceSchema(ctx)                       // kicks off the CR thread
+	for i := 0; i < 100 && !st.IsBlocked; i++ { // wait till Advance() call is blocked
+		time.Sleep(time.Millisecond)
+	}
+	db.Close()
+	st.Mu.Unlock() // simulate ctx cancel()
+	for i := 0; i < 100 && st.IsBlocked; i++ {
+		time.Sleep(time.Millisecond)
+	}
+
+	st.Mu.Lock() // causes Advance() to block if called again
+	// let conflict resolution routine complete its cleanup.
+	// If the code does not work as intended then it will end up reconnecting
+	// the stream and blocking on the mutex
+	time.Sleep(time.Millisecond)
+	if st.IsBlocked {
+		t.Error("Error: The conflict resolution routine did not die")
+	}
+}
+
+// TestCrConnectionReestablish tests if the CR thread reestablishes a broken
+// connection stream with syncbase.
+// Setup:
+//   - Advance() is allowed to return immediately 3 times with return value
+//     false signifying 3 consecutive breaks in the connection.
+//   - A mutex lock is used to cause Advance() on the recieve stream to block
+//     on the 4th attempt. This lock is held by the test before starting off the
+//     CR thread. This simulates a stable connection between CR and syncbase.
+// Expected result:
+//   - Advance() should get blocked after some time.
+//   - st.AdvanceCount should be equal to 3
+func TestCrConnectionReestablish(t *testing.T) {
+	db := NewDatabase("parentName", "db1", getSchema(&resolverImpl1{}))
+	advance := func(st *crtestutil.State) bool {
+		fmt.Println("Advance() for ConflictStreamImpl called")
+		st.AdvanceCount++
+		if st.AdvanceCount > 2 {
+			// Connection stays stable after 3 attempts
+			st.IsBlocked = true
+			st.Mu.Lock()
+			defer st.Mu.Unlock()
+		}
+		fmt.Println("Advance() for ConflictStreamImpl returning")
+		st.IsBlocked = false
+		return false
+	}
+
+	st := &crtestutil.State{}
+	crStream := &crtestutil.CrStreamImpl{
+		C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
+		R: &crtestutil.ResolutionStreamImpl{St: st},
+	}
+	db.c = crtestutil.MockDbClient(db.c, crStream)
+	db.crState.reconnectWaitTime = 10 * time.Millisecond
+
+	ctx, _ := context.RootContext()
+	st.Mu.Lock() // causes Advance() to block
+	db.EnforceSchema(ctx)
+	for i := 0; i < 100 && !st.IsBlocked; i++ {
+		time.Sleep(time.Millisecond) // wait till Advance() call is blocked
+	}
+
+	if !st.IsBlocked {
+		t.Error("Error: Advance() did not block")
+	}
+	if st.AdvanceCount != 3 {
+		t.Errorf("Error: Advance() expected to be called 3 times, instead called %d times", st.AdvanceCount)
+	}
+
+	// Shutdown the cr routine
+	db.Close()
+	st.Mu.Unlock()
+}
+
+func getSchema(cr ConflictResolver) *Schema {
+	return &Schema{
+		Metadata: wire.SchemaMetadata{},
+		Upgrader: nil,
+		Resolver: cr,
+	}
+}
diff --git a/v23/syncbase/nosql/cr_test.go b/v23/syncbase/nosql/cr_test.go
new file mode 100644
index 0000000..f9618d9
--- /dev/null
+++ b/v23/syncbase/nosql/cr_test.go
@@ -0,0 +1,605 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	"bytes"
+	"strings"
+	"testing"
+	"time"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/v23/syncbase/nosql/crtestutil"
+	"v.io/v23/context"
+	"v.io/v23/vom"
+)
+
+// TODO(jlodhia): extract this test (along with the crtest helpers) to
+// nosql_test, or alternatively, move the crtest helper to an internal dir.
+
+
+////////////////////////////////////////
+// Client Code
+
+// Client constants
+const (
+	TaskIdPrefix = "task:"
+	ListIdPrefix = "list:"
+
+	List1 = "list:1"
+
+	Task1 = "task:1:1"
+	Task2 = "task:1:2"
+
+	HintListDelete = "ListDelete"
+
+	HintTextEdit   = "TaskTextEdit"
+	HintDone       = "TaskDoneToggle"
+	HintTaskDelete = "TaskDelete"
+	HintTaskAdd    = "TaskAdd"
+
+	TextNew = "EditedText"
+)
+
+// Client data structures
+type Task struct {
+	Text   string
+	Done   bool
+	DoneTs int64
+	ListId string
+}
+
+type List struct {
+	Name      string
+	TaskCount int16
+	DoneCount int16
+}
+
+// Client conflict reolution impl
+type ConflictResolverImpl struct {
+}
+
+func (ri *ConflictResolverImpl) OnConflict(ctx *context.T, conflict *Conflict) Resolution {
+	taskRows := findRowsByType(conflict, TaskIdPrefix)
+	result := ri.handleTasks(ctx, taskRows)
+
+	listRows := findRowsByType(conflict, ListIdPrefix)
+	ri.handleLists(ctx, listRows, result)
+	return Resolution{ResultSet: result}
+}
+
+// handleTasks does a merge of fields within each task under conflict.
+// It handles conflict on fields as follows:
+// Text: Last writer wins
+// Done: Depends on DoneTs
+// DoneTs: The side that is not equal to ancestor and is the lower of the two
+//         wins. This means that if a user marks a task done on one device
+//         and then later marks the same task done on another device that has
+//         not synced yet, the done timestamp that is selected will be from the
+//         former action.
+func (ri *ConflictResolverImpl) handleTasks(ctx *context.T, tasks []ConflictRow) map[string]ResolvedRow {
+	result := map[string]ResolvedRow{}
+	for _, t := range tasks {
+		if isRowAdded(t) {
+			result[t.Key] = handleRowAdd(t)
+			continue
+		}
+		if isRowDeleted(t) {
+			result[t.Key] = ResolvedRow{Key: t.Key, Result: nil}
+			continue
+		}
+
+		var localTask, remoteTask, ancestorTask, resolvedTask Task
+		t.LocalValue.Get(&localTask)
+		t.RemoteValue.Get(&remoteTask)
+		t.AncestorValue.Get(&ancestorTask)
+
+		// prime the reolved task with local version for simplicity.
+		resolvedTask = localTask
+
+		// check field text for conflict
+		if localTask.Text != remoteTask.Text {
+			if remoteTask.Text != ancestorTask.Text {
+				if (localTask.Text == ancestorTask.Text) || (t.RemoteValue.WriteTs.After(t.LocalValue.WriteTs)) {
+					resolvedTask.Text = remoteTask.Text
+				}
+			}
+		}
+
+		// check field done for conflict
+		if localTask.DoneTs != remoteTask.DoneTs {
+			if remoteTask.DoneTs != ancestorTask.DoneTs {
+				if (localTask.DoneTs == ancestorTask.DoneTs) || (remoteTask.DoneTs < localTask.DoneTs) {
+					resolvedTask.DoneTs = remoteTask.DoneTs
+					resolvedTask.Done = remoteTask.Done
+				}
+			}
+		}
+		resValue, _ := NewValue(ctx, &resolvedTask)
+		result[t.Key] = ResolvedRow{Key: t.Key, Result: resValue}
+	}
+	return result
+}
+
+// handleLists does a merge of fields within each list under conflict. If a list
+// is deleted, it looks at the resolution map, finds all tasks that belong to
+// the list and changes the resolution to delete for those tasks.
+// It handles conflict on fields as follows:
+// Name: Last writer wins
+// TaskCount: Add both counts and subtract the ancestor count.
+// DoneCount: Add both counts and subtract the ancestor count.
+// NOTE: The above counters are resolved in a naive way for simplicity of this
+// test. There are many other ways to represent the counters in a more correct
+// way.
+func (ri *ConflictResolverImpl) handleLists(ctx *context.T, lists []ConflictRow, resolution map[string]ResolvedRow) {
+	for _, l := range lists {
+		if isRowAdded(l) {
+			resolution[l.Key] = handleRowAdd(l)
+			continue
+		}
+		if isRowDeleted(l) {
+			resolution[l.Key] = ResolvedRow{Key: l.Key, Result: nil}
+			deleteTasksForList(resolution, l.Key)
+			continue
+		}
+
+		var localList, remoteList, ancestorList, resolvedList List
+		l.LocalValue.Get(&localList)
+		l.RemoteValue.Get(&remoteList)
+		l.AncestorValue.Get(&ancestorList)
+
+		// prime the reolved task with local version for simplicity.
+		resolvedList = localList
+
+		// check field Name for conflict
+		if localList.Name != remoteList.Name {
+			if remoteList.Name != ancestorList.Name {
+				if (localList.Name == ancestorList.Name) || (l.RemoteValue.WriteTs.After(l.LocalValue.WriteTs)) {
+					resolvedList.Name = remoteList.Name
+				}
+			}
+		}
+
+		// check field TaskCount for conflict
+		if localList.TaskCount != remoteList.TaskCount {
+			if remoteList.TaskCount != ancestorList.TaskCount {
+				if localList.TaskCount == ancestorList.TaskCount {
+					resolvedList.TaskCount = remoteList.TaskCount
+				} else {
+					resolvedList.TaskCount = localList.TaskCount + remoteList.TaskCount - ancestorList.TaskCount
+				}
+			}
+		}
+
+		// check field DoneCount for conflict
+		if localList.DoneCount != remoteList.DoneCount {
+			if remoteList.DoneCount != ancestorList.DoneCount {
+				if localList.DoneCount == ancestorList.DoneCount {
+					resolvedList.DoneCount = remoteList.DoneCount
+				} else {
+					resolvedList.DoneCount = localList.DoneCount + remoteList.DoneCount - ancestorList.DoneCount
+				}
+			}
+		}
+		resValue, _ := NewValue(ctx, &resolvedList)
+		resolution[l.Key] = ResolvedRow{Key: l.Key, Result: resValue}
+	}
+}
+
+func deleteTasksForList(resolution map[string]ResolvedRow, listKey string) {
+	// find key prefix for tasks belonging to list
+	taskKeyPrefix := strings.Replace(listKey, ListIdPrefix, TaskIdPrefix, 1) + ":"
+	for k, v := range resolution {
+		if strings.HasPrefix(k, taskKeyPrefix) {
+			v.Result = nil
+			resolution[k] = v
+		}
+	}
+}
+
+func handleRowAdd(r ConflictRow) ResolvedRow {
+	var resolvedTask *Value
+	// first one to add wins
+	if r.RemoteValue == nil {
+		resolvedTask = r.LocalValue
+	} else if r.LocalValue == nil {
+		resolvedTask = r.RemoteValue
+	} else if r.LocalValue.WriteTs.Before(r.RemoteValue.WriteTs) {
+		resolvedTask = r.LocalValue
+	} else {
+		resolvedTask = r.RemoteValue
+	}
+	return ResolvedRow{Key: r.Key, Result: resolvedTask}
+}
+
+func isRowDeleted(r ConflictRow) bool {
+	return r.AncestorValue != nil &&
+		(r.LocalValue == nil || r.RemoteValue == nil)
+}
+
+func isRowAdded(r ConflictRow) bool {
+	return r.AncestorValue == nil &&
+		(r.LocalValue != nil || r.RemoteValue != nil)
+}
+
+func findRowsByType(conflict *Conflict, typePrefix string) []ConflictRow {
+	rows := []ConflictRow{}
+	if conflict.WriteSet != nil {
+		for k, v := range conflict.WriteSet.ByKey {
+			if strings.HasPrefix(k, typePrefix) {
+				rows = append(rows, v)
+			}
+		}
+	}
+	return rows
+}
+
+////////////////////////////////////////
+// Tests
+
+func TestConflictResolutionSingleObject(t *testing.T) {
+	mockConflit, expResult := simpleConflictStream(t)
+	RunTest(t, mockConflit, expResult, false)
+}
+
+func TestConflictResolutionAddDelete(t *testing.T) {
+	mockConflit, expResult := addDeleteConflictStream(t)
+	RunTest(t, mockConflit, expResult, true)
+}
+
+func TestConflictResolutionIntersectingConflicts(t *testing.T) {
+	mockConflict, expResult := twoIntersectingBatchesConflictStream(t)
+	RunTest(t, mockConflict, expResult, true)
+}
+
+func TestConflictResolutionListDelete(t *testing.T) {
+	mockConflit, expResult := listDeleteConflictsWithTaskAdd(t)
+	RunTest(t, mockConflit, expResult, true)
+}
+
+func RunTest(t *testing.T, mockConflit []wire.ConflictInfo, expResult map[string]wire.ResolutionInfo, singleBatch bool) {
+	db := NewDatabase("parentName", "db1", getSchema(&ConflictResolverImpl{}))
+	advance := func(st *crtestutil.State) bool {
+		if st.ValIndex >= len(mockConflit) {
+			st.IsBlocked = true
+			st.Mu.Lock()
+			defer st.Mu.Unlock()
+		}
+		st.IsBlocked = false
+		st.Val = mockConflit[st.ValIndex]
+		st.ValIndex++
+		return true
+	}
+
+	st := &crtestutil.State{}
+	crStream := &crtestutil.CrStreamImpl{
+		C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
+		R: &crtestutil.ResolutionStreamImpl{St: st},
+	}
+	db.c = crtestutil.MockDbClient(db.c, crStream)
+	db.crState.reconnectWaitTime = 10 * time.Millisecond
+
+	ctx, _ := context.RootContext()
+	st.Mu.Lock() // causes Advance() to block
+	db.EnforceSchema(ctx)
+	for i := 0; i < 100 && (len(st.Result) != len(expResult)); i++ {
+		time.Sleep(time.Millisecond) // wait till Advance() call is blocked
+	}
+	if len(st.Result) != len(expResult) {
+		t.Errorf("\n Unexpected number of results. Expected: %d, actual: %d", len(expResult), len(st.Result))
+	}
+	for i, result := range st.Result {
+		compareResult(t, expResult[result.Key], result)
+		if singleBatch {
+			if result.Continued != (i < len(st.Result)-1) {
+				t.Error("\nUnexpected value for continued in single batch")
+			}
+		} else {
+			if result.Continued != false {
+				t.Error("\nUnexpected value for continued in multi batch")
+			}
+		}
+	}
+}
+
+////////////////////////////////////////
+// Test helpers to create test data
+
+// Consider a database with a list List1 with two tasks Task1, Task2. The
+// following helper methods create conflicts with these three objects in various
+// ways.
+
+// simpleConflictStream prepares a conflict stream with two conflicts, one
+// each for Task1 and Task2. No batch is involved and Task1 is independent of
+// Task2. Assume that List does not get updated for these task updates.
+// Local syncbase changes Text field for Task1 and marks Task2 done.
+// Remote syncbase marks Task1 done and Task2 done.
+// Expected result is a merge for Task1 and Task2.
+func simpleConflictStream(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+	// Conflict for task1
+	row1 := makeRowInfo(Task1,
+		encode(&Task{"OriginalText", false, -1, List1}), 1, // ancestor
+		encode(&Task{TextNew, false, -1, List1}), 3, // local
+		encode(&Task{"OriginalText", true, 20, List1}), 2) // remote
+	c1 := makeConflictInfo(row1, false)
+
+	// Expected result
+	r1 := makeResolution(Task1, encode(&Task{TextNew, true, 20, List1}), wire.ValueSelectionOther)
+
+	// Conflict for task2
+	row2 := makeRowInfo(Task2,
+		encode(&Task{"Text1", false, -1, List1}), 1, // ancestor
+		encode(&Task{"Text1", true, 100, List1}), 3, // local
+		encode(&Task{"Text1", true, 20, List1}), 2) // remote
+	c2 := makeConflictInfo(row2, false)
+
+	// Expected result
+	r2 := makeResolution(Task2, encode(&Task{"Text1", true, 20, List1}),
+		wire.ValueSelectionOther) // for simplicity the test resolver does not optimize
+
+	respMap := map[string]wire.ResolutionInfo{}
+	respMap[r1.Key] = r1
+	respMap[r2.Key] = r2
+
+	return []wire.ConflictInfo{c1, c2}, respMap
+}
+
+// addDeleteConflictStream prepares a conflict stream with one conflict
+// containing Task1, Task2 and List1. List contains a task count and hence
+// addition and deletion of task leads to update to the list.
+// Local syncbase deletes Task1 and updates List1 in a batch.
+// Remote syncbase adds Task2 and updates List1 in a batch.
+// Expected result is Task1 gets deleted, Task2 gets added and List1 shows
+// the correct taskcount after the two operations.
+func addDeleteConflictStream(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+	// Batch1 is local and contains task1 and list1
+	b1 := makeConflictBatch(1, HintTaskDelete, wire.BatchSourceLocal, true)
+
+	// Batch2 is remote and contains task2 and list1
+	b2 := makeConflictBatch(2, HintTaskAdd, wire.BatchSourceRemote, true)
+
+	// Deletion for task1 on local syncbase
+	row1 := makeRowInfo(Task1,
+		encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1, // ancestor
+		nil, -1, // local
+		encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1) // remote
+	row1.BatchIds = []uint16{1}
+	c1 := makeConflictInfo(row1, true)
+
+
+	// Expected result
+	r1 := makeResolution(Task1, nil, wire.ValueSelectionOther)
+
+	// Addition for task2 on remote syncbase
+	row2 := makeRowInfo(Task2,
+		nil, -1, // ancestor value
+		nil, -1, // local value
+		encode(&Task{"AddedTask", false, -1, List1}), 2) // remote value
+	row2.BatchIds = []uint16{2}
+	c2 := makeConflictInfo(row2, true)
+
+	// Expected result
+	r2 := makeResolution(Task2, encode(&Task{"AddedTask", false, -1, List1}), wire.ValueSelectionRemote)
+
+	// Update to List on both sided on filed TaskCount
+	listName := "Groceries"
+	row3 := makeRowInfo(List1,
+		encode(&List{listName, 8, 0}), 25, // ancestor value
+		encode(&List{listName, 7, 0}), 33, // local value
+		encode(&List{listName, 9, 0}), 44) // remote value
+	row3.BatchIds = []uint16{1, 2}
+	c3 := makeConflictInfo(row3, false)
+
+	// Expected result
+	r3 := makeResolution(List1, encode(&List{listName, 8, 0}), wire.ValueSelectionOther)
+
+	respMap := map[string]wire.ResolutionInfo{}
+	respMap[r1.Key] = r1
+	respMap[r2.Key] = r2
+	respMap[r3.Key] = r3
+
+	return []wire.ConflictInfo{b1, b2, c1, c2, c3}, respMap
+}
+
+// twoIntersectingBatchesConflictStream prepares a conflict stream with one
+// conflict containing Task1, Task2 and List1. Here we assume that List keeps
+// a count of done tasks.
+// Local syncbase marks Task1 done and updates done count on List1. Later it
+// it also marks Task2 done and updates done count on List1.
+// Remote syncbase updates field Text on Task1 followed by an update to field
+// Text on Task2 (not in a batch).
+// Expected result is a merge for Task1 and Task2 and Local version of List1
+// being accepted.
+func twoIntersectingBatchesConflictStream(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+	// Batch1 is local and contains task1 and list1
+	b1 := makeConflictBatch(1, HintDone, wire.BatchSourceLocal, true)
+
+	// Batch2 is local and contains task2 and list1
+	b2 := makeConflictBatch(2, HintDone, wire.BatchSourceLocal, true)
+
+	// Batch3 is remote and contains task1
+	b3 := makeConflictBatch(3, HintTextEdit, wire.BatchSourceRemote, true)
+
+	// Batch4 is remote and contains task2
+	b4 := makeConflictBatch(4, HintTextEdit, wire.BatchSourceRemote, true)
+
+	// For task1, mark done on local and edit text on remote
+	row1 := makeRowInfo(Task1,
+		encode(&Task{"TaskOrig", false, -1, List1}), 1, // ancestor
+		encode(&Task{"TaskOrig", true, 204, List1}), 5, // local
+		encode(&Task{"TaskEdit", false, -1, List1}), 3) // remote
+	row1.BatchIds = []uint16{1, 3}
+	c1 := makeConflictInfo(row1, true)
+
+	// Expected result
+	r1 := makeResolution(Task1, encode(&Task{"TaskEdit", true, 204, List1}), wire.ValueSelectionOther)
+
+	// For task2, mark done on local and edit text on remote
+	row2 := makeRowInfo(Task2,
+		encode(&Task{"TaskOrig", false, -1, List1}), 2, // ancestor
+		encode(&Task{"TaskOrig", true, 204, List1}), 5, // local
+		encode(&Task{"TaskEdit", false, -1, List1}), 3) // remote
+	row2.BatchIds = []uint16{2, 4}
+	c2 := makeConflictInfo(row2, true)
+
+	// Expected result
+	r2 := makeResolution(Task2, encode(&Task{"TaskEdit", true, 204, List1}), wire.ValueSelectionOther)
+
+	// Update to List on Local for updating TaskDoneCount
+	listName := "Groceries"
+	row3 := makeRowInfo(List1,
+		encode(&List{listName, 8, 0}), 1, // ancestor value
+		encode(&List{listName, 8, 2}), 5, // local value
+		encode(&List{listName, 8, 0}), 3) // remote value
+	row3.BatchIds = []uint16{1, 2}
+	c3 := makeConflictInfo(row3, false)
+
+	// Expected result
+	r3 := makeResolution(List1, encode(&List{listName, 8, 2}), wire.ValueSelectionOther)
+
+	respMap := map[string]wire.ResolutionInfo{}
+	respMap[r1.Key] = r1
+	respMap[r2.Key] = r2
+	respMap[r3.Key] = r3
+
+	return []wire.ConflictInfo{b1, b2, b3, b4, c1, c2, c3}, respMap
+}
+
+// listDeleteConflictsWithTaskAdd prepares a conflict stream with one
+// conflict containing Task1, Task2 and List1. Assume Task2 does not exist yet.
+// Local syncbase deletes List1 and Task1 as a batch.
+// Remote syncbase adds Task2 along with an update to List1 as a batch.
+// Expected result is deletion of List1, Task1 and Task2.
+func listDeleteConflictsWithTaskAdd(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+	// Batch1 is local and contains task1 and list1
+	b1 := makeConflictBatch(1, HintListDelete, wire.BatchSourceLocal, true)
+
+	// Batch2 is remote and contains task2 and list1
+	b2 := makeConflictBatch(2, HintTaskAdd, wire.BatchSourceRemote, true)
+
+	// Delition for list1 on local syncbase
+	listName := "Groceries"
+	row0 := makeRowInfo(List1,
+		encode(&List{listName, 1, 0}), 1, // ancestor value
+		nil, 5, // local value
+		encode(&List{listName, 2, 0}), 3) // remote value
+	row0.BatchIds = []uint16{1, 2}
+	c0 := makeConflictInfo(row0, true)
+
+	// Expected result
+	r0 := makeResolution(List1, nil, wire.ValueSelectionOther)
+
+	// Deletion for task1 on local syncbase along with list1
+	row1 := makeRowInfo(Task1,
+		encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1, // ancestor
+		nil, -1, // local
+		encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1) // remote
+	row1.BatchIds = []uint16{1}
+	c1 := makeConflictInfo(row1, true)
+
+	// Expected result
+	r1 := makeResolution(Task1, nil, wire.ValueSelectionOther)
+
+	// Addition for task2 on remote syncbase
+	row2 := makeRowInfo(Task2,
+		nil, -1, // ancestor value
+		nil, -1, // local value
+		encode(&Task{"AddedTask", false, -1, List1}), 2) // remote value
+	row2.BatchIds = []uint16{2}
+	c2 := makeConflictInfo(row2, false)
+
+	// Expected result
+	r2 := makeResolution(Task2, nil, wire.ValueSelectionOther)
+
+	respMap := map[string]wire.ResolutionInfo{}
+	respMap[r0.Key] = r0
+	respMap[r1.Key] = r1
+	respMap[r2.Key] = r2
+
+	return []wire.ConflictInfo{b1, b2, c0, c1, c2}, respMap
+}
+
+func makeResolution(key string, result []byte, selection wire.ValueSelection) wire.ResolutionInfo {
+	r := wire.ResolutionInfo{}
+	r.Key = key
+	if result != nil {
+		r.Result = &wire.Value{
+			Bytes:   result,
+			WriteTs: -1,
+		}
+	}
+	r.Selection = selection
+	return r
+}
+
+func makeConflictBatch(id uint16, hint string, source wire.BatchSource, continued bool) wire.ConflictInfo {
+	batch := wire.BatchInfo{Id: id, Hint: hint, Source: source}
+	return wire.ConflictInfo{Data: wire.ConflictDataBatch{Value: batch}, Continued: continued}
+}
+
+func makeConflictInfo(row wire.RowInfo, continued bool) wire.ConflictInfo {
+	return wire.ConflictInfo{
+		Data: wire.ConflictDataRow{Value: row},
+		Continued: continued,
+	}
+}
+
+func makeRowInfo(key string, ancestor []byte, ats int64, local []byte, lts int64, remote []byte, rts int64) wire.RowInfo {
+	op := wire.RowOp{}
+	op.Key = key
+
+	if ancestor != nil {
+		op.AncestorValue = &wire.Value{
+			Bytes:   ancestor,
+			WriteTs: ats,
+		}
+	}
+
+	if local != nil {
+		op.LocalValue = &wire.Value{
+			Bytes:   local,
+			WriteTs: lts,
+		}
+	}
+
+	if remote != nil {
+		op.RemoteValue = &wire.Value{
+			Bytes:   remote,
+			WriteTs: rts,
+		}
+	}
+	return wire.RowInfo{
+		Op: wire.OperationWrite{Value: op},
+	}
+}
+
+func compareResult(t *testing.T, expected wire.ResolutionInfo, actual wire.ResolutionInfo) {
+	if actual.Key != expected.Key {
+		t.Error("Key does not match")
+	}
+	if actual.Selection != expected.Selection {
+		t.Errorf("Key: %s", expected.Key)
+		t.Errorf("Expected selection: %v, actual selection: %v", expected.Selection, actual.Selection)
+	}
+	if (expected.Result == nil) && (actual.Result != nil) {
+		t.Errorf("Key: %s", expected.Key)
+		t.Error("Result expected to be nil but found non nil")
+	}
+	if expected.Result != nil {
+		if actual.Result == nil {
+			t.Errorf("Key: %s", expected.Key)
+			t.Error("Result found nil")
+		}
+		if bytes.Compare(actual.Result.Bytes, expected.Result.Bytes) != 0 {
+			t.Errorf("Key: %s", expected.Key)
+			t.Error("Result bytes do not match")
+		}
+	}
+}
+
+func encode(value interface{}) []byte {
+	v, _ := vom.Encode(value)
+	return v
+}
diff --git a/v23/syncbase/nosql/crtestutil/mock_dbclient.go b/v23/syncbase/nosql/crtestutil/mock_dbclient.go
new file mode 100644
index 0000000..99e27ee
--- /dev/null
+++ b/v23/syncbase/nosql/crtestutil/mock_dbclient.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package crtestutil
+
+import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
+)
+
+type MockWireDatabaseClient struct {
+	wire.DatabaseClientMethods
+	crs *CrStreamImpl
+}
+
+func MockDbClient(c wire.DatabaseClientMethods, crs *CrStreamImpl) *MockWireDatabaseClient {
+	return &MockWireDatabaseClient{c, crs}
+}
+
+func (wclient *MockWireDatabaseClient) GetSchemaMetadata(ctx *context.T, opts ...rpc.CallOpt) (sm wire.SchemaMetadata, err error) {
+	err = verror.NewErrNoExist(ctx)
+	return
+}
+
+func (wclient *MockWireDatabaseClient) SetSchemaMetadata(ctx *context.T, i0 wire.SchemaMetadata, opts ...rpc.CallOpt) error {
+	return nil
+}
+
+func (wclient *MockWireDatabaseClient) StartConflictResolver(ctx *context.T, opts ...rpc.CallOpt) (wire.ConflictManagerStartConflictResolverClientCall, error) {
+	return wclient.crs, nil
+}
diff --git a/v23/syncbase/nosql/crtestutil/mock_stream.go b/v23/syncbase/nosql/crtestutil/mock_stream.go
new file mode 100644
index 0000000..3bb8f4d
--- /dev/null
+++ b/v23/syncbase/nosql/crtestutil/mock_stream.go
@@ -0,0 +1,119 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package crtestutil
+
+import (
+	"sync"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+)
+
+var _ wire.ConflictManagerStartConflictResolverClientCall = (*CrStreamImpl)(nil)
+
+type State struct {
+	// ConflictStream state
+	IsBlocked    bool
+	Mu           sync.Mutex
+	Val          wire.ConflictInfo
+	AdvanceCount int
+	ValIndex     int
+	// ResolutionStream state variables
+	Result []wire.ResolutionInfo
+}
+
+type CrStreamImpl struct {
+	C ConflictStream
+	R ResolutionStream
+}
+
+func (s *CrStreamImpl) RecvStream() interface {
+	Advance() bool
+	Value() wire.ConflictInfo
+	Err() error
+} {
+	return recvStreamImpl{s.C}
+}
+
+func (s *CrStreamImpl) SendStream() interface {
+	Send(item wire.ResolutionInfo) error
+	Close() error
+} {
+	return sendStreamImpl{s.R}
+}
+
+func (s *CrStreamImpl) Finish() error {
+	return nil
+}
+
+type recvStreamImpl struct {
+	c ConflictStream
+}
+
+func (rs recvStreamImpl) Advance() bool {
+	return rs.c.Advance()
+}
+func (rs recvStreamImpl) Value() wire.ConflictInfo {
+	return rs.c.Value()
+}
+func (rs recvStreamImpl) Err() error {
+	return rs.c.Err()
+}
+
+type sendStreamImpl struct {
+	r ResolutionStream
+}
+
+func (ss sendStreamImpl) Send(item wire.ResolutionInfo) error {
+	return ss.r.Send(item)
+}
+func (c sendStreamImpl) Close() error {
+	return nil
+}
+
+type ConflictStream interface {
+	Advance() bool
+	Value() wire.ConflictInfo
+	Err() error
+}
+
+type ResolutionStream interface {
+	Send(item wire.ResolutionInfo) error
+}
+
+type ConflictStreamImpl struct {
+	St        *State
+	AdvanceFn func(*State) bool
+}
+
+func (cs *ConflictStreamImpl) Advance() bool {
+	return cs.AdvanceFn(cs.St)
+}
+func (cs *ConflictStreamImpl) Value() wire.ConflictInfo {
+	return cs.St.Val
+}
+func (cs *ConflictStreamImpl) Err() error {
+	return &TestError{"Stream broken"}
+}
+
+type ResolutionStreamImpl struct {
+	St *State
+}
+
+func (rs *ResolutionStreamImpl) Send(item wire.ResolutionInfo) error {
+	if rs.St.Result == nil {
+		rs.St.Result = []wire.ResolutionInfo{item}
+		return nil
+	}
+	rs.St.Result = append(rs.St.Result, item)
+	return nil
+}
+
+type TestError struct {
+	str string
+}
+
+func (e *TestError) Error() string {
+	return e.str
+}
diff --git a/v23/syncbase/nosql/database.go b/v23/syncbase/nosql/database.go
index d48f450..42b5270 100644
--- a/v23/syncbase/nosql/database.go
+++ b/v23/syncbase/nosql/database.go
@@ -5,6 +5,9 @@
 package nosql
 
 import (
+	"sync"
+	"time"
+
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/util"
 	"v.io/v23/context"
@@ -15,6 +18,12 @@
 	"v.io/x/lib/vlog"
 )
 
+const (
+	// Wait time before we try to reconnect a broken conflict resolution stream.
+	waitBeforeReconnectInMillis = 2 * time.Second
+	reconnectionCount = "rcc"
+)
+
 func NewDatabase(parentFullName, relativeName string, schema *Schema) *database {
 	fullName := naming.Join(parentFullName, relativeName)
 	return &database{
@@ -23,6 +32,9 @@
 		fullName:       fullName,
 		name:           relativeName,
 		schema:         schema,
+		crState: conflictResolutionState{
+			reconnectWaitTime: waitBeforeReconnectInMillis,
+		},
 	}
 }
 
@@ -32,6 +44,31 @@
 	fullName       string
 	name           string
 	schema         *Schema
+	crState        conflictResolutionState
+}
+
+// conflictResolutionState maintains data about the connection of
+// conflict resolution stream with syncbase. It provides a way to disconnect
+// an existing open stream.
+type conflictResolutionState struct {
+	mu                sync.Mutex // guards access to all fields in this struct
+	crContext         *context.T
+	cancelFn          context.CancelFunc
+	isClosed          bool
+	reconnectWaitTime time.Duration
+}
+
+func (crs *conflictResolutionState) disconnect() {
+	crs.mu.Lock()
+	defer crs.mu.Unlock()
+	crs.isClosed = true
+	crs.cancelFn()
+}
+
+func (crs *conflictResolutionState) isDisconnected() bool {
+	crs.mu.Lock()
+	defer crs.mu.Unlock()
+	return crs.isClosed
 }
 
 var _ Database = (*database)(nil)
@@ -168,17 +205,44 @@
 	return createBlob(ctx, d.fullName)
 }
 
-// UpgradeIfOutdated implements Database.UpgradeIfOutdated.
-func (d *database) UpgradeIfOutdated(ctx *context.T) (bool, error) {
+// EnforceSchema implements Database.EnforceSchema.
+func (d *database) EnforceSchema(ctx *context.T) error {
 	var schema *Schema = d.schema
 	if schema == nil {
-		return false, verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
+		return verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
 	}
 
 	if schema.Metadata.Version < 0 {
-		return false, verror.New(verror.ErrBadState, ctx, "Schema version cannot be less than zero.")
+		return verror.New(verror.ErrBadState, ctx, "Schema version cannot be less than zero.")
 	}
 
+	if needsResolver(d.schema.Metadata) && d.schema.Resolver == nil {
+		return verror.New(verror.ErrBadState, ctx, "ResolverTypeAppResolves cannot be used in CrRule without providing a ConflictResolver in Schema.")
+	}
+
+	if _, err := d.upgradeIfOutdated(ctx); err != nil {
+		return err
+	}
+
+	if d.schema.Resolver == nil {
+		return nil
+	}
+
+	childCtx, cancelFn := context.WithCancel(ctx)
+	d.crState.crContext = childCtx
+	d.crState.cancelFn = cancelFn
+
+	go d.establishConflictResolution(childCtx)
+	return nil
+}
+
+// Close implements Database.Close.
+func (d *database) Close() {
+	d.crState.disconnect()
+}
+
+func (d *database) upgradeIfOutdated(ctx *context.T) (bool, error) {
+	var schema *Schema = d.schema
 	schemaMgr := d.getSchemaManager()
 	currMeta, err := schemaMgr.getSchemaMetadata(ctx)
 	if err != nil {
@@ -222,8 +286,182 @@
 	return true, nil
 }
 
+func (d *database) establishConflictResolution(ctx *context.T) {
+	count := 0
+	for {
+		count++
+		vlog.Infof("Starting a new conflict resolution connection. Re-Connection count: %d", count)
+		childCtx := context.WithValue(ctx, reconnectionCount, count)
+		// listenForConflicts is a blocking method which returns only when the
+		// conflict stream is broken.
+		if err := d.listenForConflicts(childCtx); err != nil {
+			vlog.Errorf("Conflict resolution connection ended with error: %v", err)
+		}
+
+		// Check if database is closed and if we need to shutdown conflict
+		// resolution.
+		if d.crState.isDisconnected() {
+			vlog.Infof("Shutting down conflict resolution connection.")
+			break
+		}
+
+		// The connection might have broken because syncbase service went down.
+		// Sleep for a few seconds to allow syncbase to come back up.
+		time.Sleep(d.crState.reconnectWaitTime)
+	}
+}
+
+func (d *database) listenForConflicts(ctx *context.T) error {
+	resolver, err := d.c.StartConflictResolver(ctx)
+	if err != nil {
+		return err
+	}
+	conflictStream := resolver.RecvStream()
+	resolutionStream := resolver.SendStream()
+	var c *Conflict = &Conflict{}
+	for conflictStream.Advance() {
+		row := conflictStream.Value()
+		addRowToConflict(c, &row)
+		if !row.Continued {
+			resolution := d.schema.Resolver.OnConflict(ctx, c)
+			if err := sendResolution(resolutionStream, resolution); err != nil {
+				return err
+			}
+			c = &Conflict{}  // create a new conflict object for the next batch
+		}
+	}
+	if err := conflictStream.Err(); err != nil {
+		return err
+	}
+	return resolver.Finish()
+}
+
+// TODO(jlodhia): Should we check if the Resolution received addresses all
+// conflicts in write set?
+func sendResolution(stream interface {
+	Send(item wire.ResolutionInfo) error
+}, resolution Resolution) error {
+	size := len(resolution.ResultSet)
+	count := 0
+	for _, v := range resolution.ResultSet {
+		count++
+		ri := toResolutionInfo(v, count != size)
+		if err := stream.Send(ri); err != nil {
+			vlog.Error("Error while sending resolution")
+			return err
+		}
+	}
+	return nil
+}
+
+func addRowToConflict(c *Conflict, ci *wire.ConflictInfo) {
+	switch v := ci.Data.(type) {
+	case wire.ConflictDataBatch:
+		if c.Batches == nil {
+			c.Batches = map[uint16]wire.BatchInfo{}
+		}
+		c.Batches[v.Value.Id] = v.Value
+	case wire.ConflictDataRow:
+		rowInfo := v.Value
+		switch op := rowInfo.Op.(type) {
+		case wire.OperationWrite:
+			if c.WriteSet == nil {
+				c.WriteSet = &ConflictRowSet{map[string]ConflictRow{}, map[uint16][]ConflictRow{}}
+			}
+			cr := toConflictRow(op.Value, rowInfo.BatchIds)
+			c.WriteSet.ByKey[cr.Key] = cr
+			for _, bid := range rowInfo.BatchIds {
+				c.WriteSet.ByBatch[bid] = append(c.WriteSet.ByBatch[bid], cr)
+			}
+		case wire.OperationRead:
+			if c.ReadSet == nil {
+				c.ReadSet = &ConflictRowSet{map[string]ConflictRow{}, map[uint16][]ConflictRow{}}
+			}
+			cr := toConflictRow(op.Value, rowInfo.BatchIds)
+			c.ReadSet.ByKey[cr.Key] = cr
+			for _, bid := range rowInfo.BatchIds {
+				c.ReadSet.ByBatch[bid] = append(c.ReadSet.ByBatch[bid], cr)
+			}
+		case wire.OperationScan:
+			if c.ScanSet == nil {
+				c.ScanSet = &ConflictScanSet{map[uint16][]wire.ScanOp{}}
+			}
+			for _, bid := range rowInfo.BatchIds {
+				c.ScanSet.ByBatch[bid] = append(c.ScanSet.ByBatch[bid], op.Value)
+			}
+		}
+	}
+}
+
+func toConflictRow(op wire.RowOp, batchIds []uint16) ConflictRow {
+	var local, remote, ancestor *Value
+	if op.LocalValue != nil {
+		local = &Value{
+			val:       op.LocalValue.Bytes,
+			WriteTs:   toTime(op.LocalValue.WriteTs),
+			selection: wire.ValueSelectionLocal,
+		}
+	}
+	if op.RemoteValue != nil {
+		remote = &Value{
+			val:       op.RemoteValue.Bytes,
+			WriteTs:   toTime(op.RemoteValue.WriteTs),
+			selection: wire.ValueSelectionRemote,
+		}
+	}
+	if op.AncestorValue != nil {
+		ancestor = &Value{
+			val:       op.AncestorValue.Bytes,
+			WriteTs:   toTime(op.AncestorValue.WriteTs),
+			selection: wire.ValueSelectionOther,
+		}
+	}
+	return ConflictRow{
+		Key:           op.Key,
+		LocalValue:    local,
+		RemoteValue:   remote,
+		AncestorValue: ancestor,
+		BatchIds:      batchIds,
+	}
+}
+
+// TODO(jlodhia): remove this method once time is stored as time.Time instead
+// of int64
+func toTime(unixNanos int64) time.Time {
+	return time.Unix(
+		unixNanos / 1e9,  // seconds
+		unixNanos % 1e9)  // nanoseconds
+}
+
+func toResolutionInfo(r ResolvedRow, lastRow bool) wire.ResolutionInfo {
+	sel := wire.ValueSelectionOther
+	resVal := (*wire.Value)(nil)
+	if r.Result != nil {
+		sel = r.Result.selection
+		resVal = &wire.Value{
+			Bytes:   r.Result.val,
+			WriteTs: r.Result.WriteTs.UnixNano(),  // this timestamp is ignored by syncbase
+		}
+	}
+	return wire.ResolutionInfo{
+		Key:       r.Key,
+		Selection: sel,
+		Result:    resVal,
+		Continued: lastRow,
+	}
+}
+
+func needsResolver(metadata wire.SchemaMetadata) bool {
+	for _, rule := range metadata.Policy.Rules {
+		if rule.Resolver == wire.ResolverTypeAppResolves {
+			return true
+		}
+	}
+	return false
+}
+
 func (d *database) getSchemaManager() schemaManagerImpl {
-	return newSchemaManager(d.fullName)
+	return newSchemaManager(d.c)
 }
 
 func (d *database) schemaVersion() int32 {
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index 622f82d..d9ae93f 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -6,12 +6,15 @@
 package nosql
 
 import (
+	"time"
+
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/util"
 	"v.io/v23/context"
 	"v.io/v23/security/access"
 	"v.io/v23/services/watch"
 	"v.io/v23/vdl"
+	"v.io/v23/verror"
 	"v.io/v23/vom"
 )
 
@@ -45,6 +48,10 @@
 	// time of the RPC, and will not reflect subsequent writes to keys not yet
 	// reached by the stream.
 	Exec(ctx *context.T, query string) ([]string, ResultStream, error)
+
+	// Close cleans up any state associated with this client handle including
+	// shutting down any open conflict resolution stream.
+	Close()
 }
 
 // Database represents a collection of Tables. Batches, queries, sync, watch,
@@ -131,14 +138,15 @@
 	// Blob returns a handle to the blob with the given BlobRef.
 	Blob(br wire.BlobRef) Blob
 
-	// This method compares the current schema version of the database with the
-	// schema version provided while creating this database handle. If the
-	// current database schema version is lower, then the SchemaUpdater is
+	// EnforceSchema compares the current schema version of the database
+	// with the schema version provided while creating this database handle. If
+	// the current database schema version is lower, then the SchemaUpdater is
 	// called. If SchemaUpdater is successful this method stores the new schema
 	// metadata in database.
-	// Note: schema can be nil, in which case this method skips schema check
-	// and the caller is responsible for maintaining schema sanity.
-	UpgradeIfOutdated(ctx *context.T) (bool, error)
+	// This method also registers a conflict resolver with syncbase to receive
+	// conflicts. Note: schema can be nil, in which case this method skips
+	// schema check and the caller is responsible for maintaining schema sanity.
+	EnforceSchema(ctx *context.T) error
 }
 
 // BatchDatabase is a handle to a set of reads and writes to the database that
@@ -550,4 +558,114 @@
 type Schema struct {
 	Metadata wire.SchemaMetadata
 	Upgrader SchemaUpgrader
+	Resolver ConflictResolver
+}
+
+// ConflictResolver interface allows the App to define resolution of conflicts
+// that it requested to handle.
+type ConflictResolver interface {
+	OnConflict(ctx *context.T, conflict *Conflict) Resolution
+}
+
+// Conflict contains information to fully specify a conflict. Since syncbase
+// supports batches there can be one or more rows within the batch that has a
+// conflict. Each of these rows will be sent together as part of a single
+// conflict. Each row contains an Id of the batch to which it belongs,
+// enabling the client to group together rows that are part of a batch. Note
+// that a single row can be part of more than one batch.
+//
+// WriteSet contains rows that were written.
+// ReadSet contains rows that were read within a batch corresponding to a row
+// within the write set.
+// ScanSet contains scans performed within a batch corresponding to a row
+// within the write set.
+// Batches is a map of unique ids to BatchInfo objects. The id is unique only in
+// the context of a given conflict and is otherwise meaningless.
+type Conflict struct {
+	ReadSet       *ConflictRowSet
+	WriteSet      *ConflictRowSet
+	ScanSet       *ConflictScanSet
+	Batches       map[uint16]wire.BatchInfo
+}
+
+// ConflictRowSet contains a set of rows under conflict. It provides two different
+// ways to access the same set.
+// ByKey is a map of ConflictRows keyed by the row key.
+// ByBatch is a map of []ConflictRows keyed by batch id. This map lets the client
+// access all ConflictRows within this set that contain a given hint.
+type ConflictRowSet struct {
+	ByKey  map[string]ConflictRow
+	ByBatch map[uint16][]ConflictRow
+}
+
+// ConflictScanSet contains a set of scans under conflict.
+// ByBatch is a map of array of ScanOps keyed by batch id.
+type ConflictScanSet struct {
+	ByBatch map[uint16][]wire.ScanOp
+}
+
+// ConflictRow represents a row under conflict.
+// Key is the key for the row.
+// LocalValue is the value present in the local db.
+// RemoteValue is the value received via sync.
+// AncestorValue is the value for the key which is the lowest common
+// ancestor of the two values represented by LocalValue and RemoteValue.
+// AncestorValue is nil if the ConflictRow is a part of the read set.
+// BatchIds is a list of ids of all the batches that this row belongs to.
+type ConflictRow struct {
+	Key           string
+	LocalValue    *Value
+	RemoteValue   *Value
+	AncestorValue *Value
+	BatchIds      []uint16
+}
+
+// Resolution contains the application’s reply to a conflict. It must contain a
+// resolved value for each conflict row within the WriteSet of the given
+// conflict.
+// ResultSet is a map of row key to ResolvedRow.
+type Resolution struct {
+	ResultSet map[string]ResolvedRow
+	// TODO(jlodhia): Hint []string
+}
+
+// ResolvedRow represents a result of resolution of a row under conflict.
+// Key is the key for the row.
+// Selection represents the value that was selected for resolution.
+// Value is the resolved value for the key. This field should be used only
+// if value of Selection field is 'Other'.
+type ResolvedRow struct {
+	Key    string
+	Result *Value
+}
+
+// Value contains a specific version of data for the row under conflict along
+// with the write timestamp and hints associated with the version.
+// WriteTs is the write timestamp for this value.
+type Value struct {
+	val       []byte
+	WriteTs   time.Time
+	selection wire.ValueSelection
+}
+
+// Get takes a reference to an instance of a type that is expected to be
+// represented by Value.
+func (v *Value) Get(value interface{}) error {
+	return vom.Decode(v.val, value)
+}
+
+// NewValue creates a new Value to be added to Resolution.
+func NewValue(ctx *context.T, data interface{}) (*Value, error) {
+	if data == nil {
+		return nil, verror.New(verror.ErrBadArg, ctx, "data cannot be nil")
+	}
+	bytes, err := vom.Encode(data)
+	if err != nil {
+		return nil, err
+	}
+	return &Value{
+		val:       bytes,
+		WriteTs:   time.Now(),  // ignored by syncbase
+		selection: wire.ValueSelectionOther,
+	}, nil
 }
diff --git a/v23/syncbase/nosql/schema.go b/v23/syncbase/nosql/schema.go
index 9f52a1d..8dd480d 100644
--- a/v23/syncbase/nosql/schema.go
+++ b/v23/syncbase/nosql/schema.go
@@ -21,14 +21,12 @@
 // Implementation of SchemaManager (Not part of public client API)
 
 type schemaManagerImpl struct {
-	dbName string
-	c      wire.DatabaseClientMethods
+	c wire.DatabaseClientMethods
 }
 
-func newSchemaManager(dbName string) schemaManagerImpl {
+func newSchemaManager(client wire.DatabaseClientMethods) schemaManagerImpl {
 	return schemaManagerImpl{
-		dbName: dbName,
-		c:      wire.DatabaseClient(dbName),
+		c: client,
 	}
 }
 
diff --git a/v23/syncbase/nosql/schema_test.go b/v23/syncbase/nosql/schema_test.go
index 6d7ed0d..78ebd95 100644
--- a/v23/syncbase/nosql/schema_test.go
+++ b/v23/syncbase/nosql/schema_test.go
@@ -19,10 +19,10 @@
 // This test as following steps:
 // 1) Call NoSQLDatabase() for a non existent db.
 // 2) Create the database, and verify if Schema got stored properly.
-// 3) Call UpgradeIfOutdated() to make sure that the method is no-op and is
+// 3) Call EnforceSchema() to make sure that the method is no-op and is
 //    able to read the schema from db.
 // 4) Call NoSQLDatabase() on the same db to create a new handle with an
-//    upgraded schema, call UpgradeIfOutdated() and check if SchemaUpgrader
+//    upgraded schema, call EnforceSchema() and check if SchemaUpgrader
 //    is called and if the new schema is stored appropriately.
 func TestSchemaCheck(t *testing.T) {
 	ctx, sName, cleanup := tu.SetupOrDie(nil)
@@ -35,9 +35,9 @@
 
 	// Verify that calling Upgrade on a non existing database does not throw
 	// errors.
-	_, err := db1.UpgradeIfOutdated(ctx)
+	err := db1.EnforceSchema(ctx)
 	if err != nil {
-		t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err)
+		t.Fatalf("db1.EnforceSchema() failed: %v", err)
 	}
 	if mockUpgrader.CallCount > 0 {
 		t.Fatal("Call to upgrader was not expected.")
@@ -53,12 +53,8 @@
 	}
 
 	// Make redundant call to Upgrade to verify that it is a no-op
-	result, err1 := db1.UpgradeIfOutdated(ctx)
-	if result {
-		t.Fatalf("db1.UpgradeIfOutdated() should not return true")
-	}
-	if err1 != nil {
-		t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err1)
+	if err := db1.EnforceSchema(ctx); err != nil {
+		t.Fatalf("db1.EnforceSchema() failed: %v", err)
 	}
 	if mockUpgrader.CallCount > 0 {
 		t.Fatal("Call to upgrader was not expected.")
@@ -73,13 +69,8 @@
 	}
 	schema.Metadata.Policy = policy
 	otherdb1 := a.NoSQLDatabase("db1", schema)
-	otherresult, othererr := otherdb1.UpgradeIfOutdated(ctx)
-
-	if !otherresult {
-		t.Fatalf("otherdb1.UpgradeIfOutdated() expected to return true")
-	}
-	if othererr != nil {
-		t.Fatalf("otherdb1.UpgradeIfOutdated() failed: %v", othererr)
+	if err := otherdb1.EnforceSchema(ctx); err != nil {
+		t.Fatalf("otherdb1.EnforceSchema() failed: %v", err)
 	}
 	if mockUpgrader.CallCount != 1 {
 		t.Fatalf("Unexpected number of calls to upgrader. Expected: %d, Actual: %d.", 1, mockUpgrader.CallCount)
@@ -226,7 +217,7 @@
 	// Upgrade schema version for underlying db using a different handle
 	schema2 := tu.DefaultSchema(1)
 	dbHandle2 := a.NoSQLDatabase("db1", schema2)
-	dbHandle2.UpgradeIfOutdated(ctx)
+	dbHandle2.EnforceSchema(ctx)
 
 	// Commit batch1, abort batch2, attempt writing a row using batch3.
 	// Each of these operations should fail.
diff --git a/v23/syncbase/util/util.go b/v23/syncbase/util/util.go
index 6eeaaaa..dda4c6d 100644
--- a/v23/syncbase/util/util.go
+++ b/v23/syncbase/util/util.go
@@ -64,6 +64,12 @@
 	return string(x)
 }
 
+// IsPrefix returns true if start and limit strings together represent a prefix
+// range. If true, start represents the prefix.
+func IsPrefix(start string, limit string) bool {
+	return PrefixRangeLimit(start) == limit
+}
+
 // AccessController provides access control for various syncbase objects.
 type AccessController interface {
 	// SetPermissions replaces the current Permissions for an object.
diff --git a/v23/syncbase/util/util_test.go b/v23/syncbase/util/util_test.go
index 321691a..2d6384a 100644
--- a/v23/syncbase/util/util_test.go
+++ b/v23/syncbase/util/util_test.go
@@ -61,3 +61,42 @@
 		}
 	}
 }
+
+func TestIsPrefix(t *testing.T) {
+	tests := []struct {
+		isPrefix bool
+		start  string
+		limit  string
+	}{
+		{true, "", ""},
+		{true, "a", "b"},
+		{true, "aa", "ab"},
+		{true, "\xfe", "\xff"},
+		{true, "a\xfe", "a\xff"},
+		{true, "aa\xfe", "aa\xff"},
+		{true, "a\xff", "b"},
+		{true, "aa\xff", "ab"},
+		{true, "a\xff\xff", "b"},
+		{true, "aa\xff\xff", "ab"},
+		{true, "\xff", ""},
+		{true, "\xff\xff", ""},
+
+		{false, "", "\x00"},
+		{false, "a", "aa"},
+		{false, "aa", "aa"},
+		{false, "\xfe", "\x00"},
+		{false, "a\xfe", "b\xfe"},
+		{false, "aa\xfe", "aa\x00"},
+		{false, "a\xff", "b\x00"},
+		{false, "aa\xff", "ab\x00"},
+		{false, "a\xff\xff", "a\xff\xff\xff"},
+		{false, "aa\xff\xff", "a"},
+		{false, "\xff", "\x00"},
+	}
+	for _, test := range tests {
+		result := util.IsPrefix(test.start, test.limit)
+		if result != test.isPrefix {
+			t.Errorf("%q, %q: got %v, want %v", test.start, test.limit, result, test.isPrefix)
+		}
+	}
+}
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 2818a8c..a9f503f 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -49,6 +49,11 @@
 	mu  sync.Mutex // protects the fields below
 	sns map[uint64]store.Snapshot
 	txs map[uint64]store.Transaction
+
+	// Active ConflictResolver connection from the app to this database.
+	// NOTE: For now, we assume there's only one open conflict resolution stream
+	// per database (typically, from the app that owns the database).
+	resolver wire.ConflictManagerStartConflictResolverServerCall
 }
 
 // databaseReq is a per-request object that handles Database RPCs.
diff --git a/x/ref/services/syncbase/server/nosql/database_crm.go b/x/ref/services/syncbase/server/nosql/database_crm.go
new file mode 100644
index 0000000..e1135a9
--- /dev/null
+++ b/x/ref/services/syncbase/server/nosql/database_crm.go
@@ -0,0 +1,20 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/v23/context"
+)
+
+////////////////////////////////////////
+// ConflictManager RPC methods
+
+func (d *databaseReq) StartConflictResolver(ctx *context.T, call wire.ConflictManagerStartConflictResolverServerCall) error {
+	// Store the conflict resolver connection in the per-app, per-database
+	// singleton so that sync can access it.
+	d.database.resolver = call
+	return nil
+}