Merge "Make query_checker tests pass in go1.5."
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index ebeccf5..16182de 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -73,6 +73,10 @@
 	// SchemaManager implements the API for managing schema metadata attached
 	// to a Database.
 	SchemaManager
+
+	// ConflictManager implements the API for registering resolvers, receiving
+	// conflicts and sending resolutions.
+	ConflictManager
 }
 
 // Table represents a collection of Rows.
@@ -234,6 +238,30 @@
 	SetSchemaMetadata(metadata SchemaMetadata) error {access.Write}
 }
 
+// ConflictManager interface provides all the methods necessary to handle
+// conflict resolution for a given database.
+type ConflictManager interface {
+	// StartConflictResolver registers a resolver for the database that is
+	// associated with this ConflictManager and creates a stream to receive
+	// conflicts and send resolutions.
+	// Batches of ConflictInfos will be sent over with the Continued field
+	// within the ConflictInfo representing the batch boundary. Client must
+	// respond with a batch of ResolutionInfos in the same fashion.
+	// A key is under conflict if two different values were written to it
+	// concurrently (in logical time), i.e. neither value is an ancestor of the
+	// other in the history graph.
+	// A key under conflict can be a part of a batch committed on local or
+	// remote or both syncbases. ConflictInfos for all keys in these two batches
+	// are grouped together. These keys may themselves be under conflict; the
+	// presented batch is a transitive closure of all batches containing keys
+	// under conflict.
+	// For example, for local batch {key1, key2} and remote batch {key1, key3},
+	// the batch sent for conflict resolution will be {key1, key2, key3}.
+	// If there was another concurrent batch {key2, key4}, then the batch sent
+	// for conflict resolution will be {key1, key2, key3, key4}.
+	StartConflictResolver() stream<ResolutionInfo, ConflictInfo> error {access.Write}
+}
+
 // BlobManager is the interface for blob operations.
 type BlobManager interface {
 	// API for resumable blob creation (append-only). After commit, a blob
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 2d3d4ba..a908cfa 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -1452,6 +1452,356 @@
 	},
 }
 
+// ConflictManagerClientMethods is the client interface
+// containing ConflictManager methods.
+//
+// ConflictManager interface provides all the methods necessary to handle
+// conflict resolution for a given database.
+type ConflictManagerClientMethods interface {
+	// StartConflictResolver registers a resolver for the database that is
+	// associated with this ConflictManager and creates a stream to receive
+	// conflicts and send resolutions.
+	// Batches of ConflictInfos will be sent over with the Continued field
+	// within the ConflictInfo representing the batch boundary. Client must
+	// respond with a batch of ResolutionInfos in the same fashion.
+	// A key is under conflict if two different values were written to it
+	// concurrently (in logical time), i.e. neither value is an ancestor of the
+	// other in the history graph.
+	// A key under conflict can be a part of a batch committed on local or
+	// remote or both syncbases. ConflictInfos for all keys in these two batches
+	// are grouped together. These keys may themselves be under conflict; the
+	// presented batch is a transitive closure of all batches containing keys
+	// under conflict.
+	// For example, for local batch {key1, key2} and remote batch {key1, key3},
+	// the batch sent for conflict resolution will be {key1, key2, key3}.
+	// If there was another concurrent batch {key2, key4}, then the batch sent
+	// for conflict resolution will be {key1, key2, key3, key4}.
+	StartConflictResolver(*context.T, ...rpc.CallOpt) (ConflictManagerStartConflictResolverClientCall, error)
+}
+
+// ConflictManagerClientStub adds universal methods to ConflictManagerClientMethods.
+type ConflictManagerClientStub interface {
+	ConflictManagerClientMethods
+	rpc.UniversalServiceMethods
+}
+
+// ConflictManagerClient returns a client stub for ConflictManager.
+func ConflictManagerClient(name string) ConflictManagerClientStub {
+	return implConflictManagerClientStub{name}
+}
+
+type implConflictManagerClientStub struct {
+	name string
+}
+
+func (c implConflictManagerClientStub) StartConflictResolver(ctx *context.T, opts ...rpc.CallOpt) (ocall ConflictManagerStartConflictResolverClientCall, err error) {
+	var call rpc.ClientCall
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "StartConflictResolver", nil, opts...); err != nil {
+		return
+	}
+	ocall = &implConflictManagerStartConflictResolverClientCall{ClientCall: call}
+	return
+}
+
+// ConflictManagerStartConflictResolverClientStream is the client stream for ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverClientStream interface {
+	// RecvStream returns the receiver side of the ConflictManager.StartConflictResolver client stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() ConflictInfo
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+	// SendStream returns the send side of the ConflictManager.StartConflictResolver client stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors
+		// encountered while sending, or if Send is called after Close or
+		// the stream has been canceled.  Blocks if there is no buffer
+		// space; will unblock when buffer space is available or after
+		// the stream has been canceled.
+		Send(item ResolutionInfo) error
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.
+		// This is an optional call - e.g. a client might call Close if it
+		// needs to continue receiving items from the server after it's
+		// done sending.  Returns errors encountered while closing, or if
+		// Close is called after the stream has been canceled.  Like Send,
+		// blocks if there is no buffer space available.
+		Close() error
+	}
+}
+
+// ConflictManagerStartConflictResolverClientCall represents the call returned from ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverClientCall interface {
+	ConflictManagerStartConflictResolverClientStream
+	// Finish performs the equivalent of SendStream().Close, then blocks until
+	// the server is done, and returns the positional return values for the call.
+	//
+	// Finish returns immediately if the call has been canceled; depending on the
+	// timing the output could either be an error signaling cancelation, or the
+	// valid positional return values from the server.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
+	// be called at most once.
+	Finish() error
+}
+
+type implConflictManagerStartConflictResolverClientCall struct {
+	rpc.ClientCall
+	valRecv ConflictInfo
+	errRecv error
+}
+
+func (c *implConflictManagerStartConflictResolverClientCall) RecvStream() interface {
+	Advance() bool
+	Value() ConflictInfo
+	Err() error
+} {
+	return implConflictManagerStartConflictResolverClientCallRecv{c}
+}
+
+type implConflictManagerStartConflictResolverClientCallRecv struct {
+	c *implConflictManagerStartConflictResolverClientCall
+}
+
+func (c implConflictManagerStartConflictResolverClientCallRecv) Advance() bool {
+	c.c.valRecv = ConflictInfo{}
+	c.c.errRecv = c.c.Recv(&c.c.valRecv)
+	return c.c.errRecv == nil
+}
+func (c implConflictManagerStartConflictResolverClientCallRecv) Value() ConflictInfo {
+	return c.c.valRecv
+}
+func (c implConflictManagerStartConflictResolverClientCallRecv) Err() error {
+	if c.c.errRecv == io.EOF {
+		return nil
+	}
+	return c.c.errRecv
+}
+func (c *implConflictManagerStartConflictResolverClientCall) SendStream() interface {
+	Send(item ResolutionInfo) error
+	Close() error
+} {
+	return implConflictManagerStartConflictResolverClientCallSend{c}
+}
+
+type implConflictManagerStartConflictResolverClientCallSend struct {
+	c *implConflictManagerStartConflictResolverClientCall
+}
+
+func (c implConflictManagerStartConflictResolverClientCallSend) Send(item ResolutionInfo) error {
+	return c.c.Send(item)
+}
+func (c implConflictManagerStartConflictResolverClientCallSend) Close() error {
+	return c.c.CloseSend()
+}
+func (c *implConflictManagerStartConflictResolverClientCall) Finish() (err error) {
+	err = c.ClientCall.Finish()
+	return
+}
+
+// ConflictManagerServerMethods is the interface a server writer
+// implements for ConflictManager.
+//
+// ConflictManager interface provides all the methods necessary to handle
+// conflict resolution for a given database.
+type ConflictManagerServerMethods interface {
+	// StartConflictResolver registers a resolver for the database that is
+	// associated with this ConflictManager and creates a stream to receive
+	// conflicts and send resolutions.
+	// Batches of ConflictInfos will be sent over with the Continued field
+	// within the ConflictInfo representing the batch boundary. Client must
+	// respond with a batch of ResolutionInfos in the same fashion.
+	// A key is under conflict if two different values were written to it
+	// concurrently (in logical time), i.e. neither value is an ancestor of the
+	// other in the history graph.
+	// A key under conflict can be a part of a batch committed on local or
+	// remote or both syncbases. ConflictInfos for all keys in these two batches
+	// are grouped together. These keys may themselves be under conflict; the
+	// presented batch is a transitive closure of all batches containing keys
+	// under conflict.
+	// For example, for local batch {key1, key2} and remote batch {key1, key3},
+	// the batch sent for conflict resolution will be {key1, key2, key3}.
+	// If there was another concurrent batch {key2, key4}, then the batch sent
+	// for conflict resolution will be {key1, key2, key3, key4}.
+	StartConflictResolver(*context.T, ConflictManagerStartConflictResolverServerCall) error
+}
+
+// ConflictManagerServerStubMethods is the server interface containing
+// ConflictManager methods, as expected by rpc.Server.
+// The only difference between this interface and ConflictManagerServerMethods
+// is the streaming methods.
+type ConflictManagerServerStubMethods interface {
+	// StartConflictResolver registers a resolver for the database that is
+	// associated with this ConflictManager and creates a stream to receive
+	// conflicts and send resolutions.
+	// Batches of ConflictInfos will be sent over with the Continued field
+	// within the ConflictInfo representing the batch boundary. Client must
+	// respond with a batch of ResolutionInfos in the same fashion.
+	// A key is under conflict if two different values were written to it
+	// concurrently (in logical time), i.e. neither value is an ancestor of the
+	// other in the history graph.
+	// A key under conflict can be a part of a batch committed on local or
+	// remote or both syncbases. ConflictInfos for all keys in these two batches
+	// are grouped together. These keys may themselves be under conflict; the
+	// presented batch is a transitive closure of all batches containing keys
+	// under conflict.
+	// For example, for local batch {key1, key2} and remote batch {key1, key3},
+	// the batch sent for conflict resolution will be {key1, key2, key3}.
+	// If there was another concurrent batch {key2, key4}, then the batch sent
+	// for conflict resolution will be {key1, key2, key3, key4}.
+	StartConflictResolver(*context.T, *ConflictManagerStartConflictResolverServerCallStub) error
+}
+
+// ConflictManagerServerStub adds universal methods to ConflictManagerServerStubMethods.
+type ConflictManagerServerStub interface {
+	ConflictManagerServerStubMethods
+	// Describe the ConflictManager interfaces.
+	Describe__() []rpc.InterfaceDesc
+}
+
+// ConflictManagerServer returns a server stub for ConflictManager.
+// It converts an implementation of ConflictManagerServerMethods into
+// an object that may be used by rpc.Server.
+func ConflictManagerServer(impl ConflictManagerServerMethods) ConflictManagerServerStub {
+	stub := implConflictManagerServerStub{
+		impl: impl,
+	}
+	// Initialize GlobState; always check the stub itself first, to handle the
+	// case where the user has the Glob method defined in their VDL source.
+	if gs := rpc.NewGlobState(stub); gs != nil {
+		stub.gs = gs
+	} else if gs := rpc.NewGlobState(impl); gs != nil {
+		stub.gs = gs
+	}
+	return stub
+}
+
+type implConflictManagerServerStub struct {
+	impl ConflictManagerServerMethods
+	gs   *rpc.GlobState
+}
+
+func (s implConflictManagerServerStub) StartConflictResolver(ctx *context.T, call *ConflictManagerStartConflictResolverServerCallStub) error {
+	return s.impl.StartConflictResolver(ctx, call)
+}
+
+func (s implConflictManagerServerStub) Globber() *rpc.GlobState {
+	return s.gs
+}
+
+func (s implConflictManagerServerStub) Describe__() []rpc.InterfaceDesc {
+	return []rpc.InterfaceDesc{ConflictManagerDesc}
+}
+
+// ConflictManagerDesc describes the ConflictManager interface.
+var ConflictManagerDesc rpc.InterfaceDesc = descConflictManager
+
+// descConflictManager hides the desc to keep godoc clean.
+var descConflictManager = rpc.InterfaceDesc{
+	Name:    "ConflictManager",
+	PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
+	Doc:     "// ConflictManager interface provides all the methods necessary to handle\n// conflict resolution for a given database.",
+	Methods: []rpc.MethodDesc{
+		{
+			Name: "StartConflictResolver",
+			Doc:  "// StartConflictResolver registers a resolver for the database that is\n// associated with this ConflictManager and creates a stream to receive\n// conflicts and send resolutions.\n// Batches of ConflictInfos will be sent over with the Continued field\n// within the ConflictInfo representing the batch boundary. Client must\n// respond with a batch of ResolutionInfos in the same fashion.\n// A key is under conflict if two different values were written to it\n// concurrently (in logical time), i.e. neither value is an ancestor of the\n// other in the history graph.\n// A key under conflict can be a part of a batch committed on local or\n// remote or both syncbases. ConflictInfos for all keys in these two batches\n// are grouped together. These keys may themselves be under conflict; the\n// presented batch is a transitive closure of all batches containing keys\n// under conflict.\n// For example, for local batch {key1, key2} and remote batch {key1, key3},\n// the batch sent for conflict resolution will be {key1, key2, key3}.\n// If there was another concurrent batch {key2, key4}, then the batch sent\n// for conflict resolution will be {key1, key2, key3, key4}.",
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+		},
+	},
+}
+
+// ConflictManagerStartConflictResolverServerStream is the server stream for ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverServerStream interface {
+	// RecvStream returns the receiver side of the ConflictManager.StartConflictResolver server stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() ResolutionInfo
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+	// SendStream returns the send side of the ConflictManager.StartConflictResolver server stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors encountered
+		// while sending.  Blocks if there is no buffer space; will unblock when
+		// buffer space is available.
+		Send(item ConflictInfo) error
+	}
+}
+
+// ConflictManagerStartConflictResolverServerCall represents the context passed to ConflictManager.StartConflictResolver.
+type ConflictManagerStartConflictResolverServerCall interface {
+	rpc.ServerCall
+	ConflictManagerStartConflictResolverServerStream
+}
+
+// ConflictManagerStartConflictResolverServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements ConflictManagerStartConflictResolverServerCall.
+type ConflictManagerStartConflictResolverServerCallStub struct {
+	rpc.StreamServerCall
+	valRecv ResolutionInfo
+	errRecv error
+}
+
+// Init initializes ConflictManagerStartConflictResolverServerCallStub from rpc.StreamServerCall.
+func (s *ConflictManagerStartConflictResolverServerCallStub) Init(call rpc.StreamServerCall) {
+	s.StreamServerCall = call
+}
+
+// RecvStream returns the receiver side of the ConflictManager.StartConflictResolver server stream.
+func (s *ConflictManagerStartConflictResolverServerCallStub) RecvStream() interface {
+	Advance() bool
+	Value() ResolutionInfo
+	Err() error
+} {
+	return implConflictManagerStartConflictResolverServerCallRecv{s}
+}
+
+type implConflictManagerStartConflictResolverServerCallRecv struct {
+	s *ConflictManagerStartConflictResolverServerCallStub
+}
+
+func (s implConflictManagerStartConflictResolverServerCallRecv) Advance() bool {
+	s.s.valRecv = ResolutionInfo{}
+	s.s.errRecv = s.s.Recv(&s.s.valRecv)
+	return s.s.errRecv == nil
+}
+func (s implConflictManagerStartConflictResolverServerCallRecv) Value() ResolutionInfo {
+	return s.s.valRecv
+}
+func (s implConflictManagerStartConflictResolverServerCallRecv) Err() error {
+	if s.s.errRecv == io.EOF {
+		return nil
+	}
+	return s.s.errRecv
+}
+
+// SendStream returns the send side of the ConflictManager.StartConflictResolver server stream.
+func (s *ConflictManagerStartConflictResolverServerCallStub) SendStream() interface {
+	Send(item ConflictInfo) error
+} {
+	return implConflictManagerStartConflictResolverServerCallSend{s}
+}
+
+type implConflictManagerStartConflictResolverServerCallSend struct {
+	s *ConflictManagerStartConflictResolverServerCallStub
+}
+
+func (s implConflictManagerStartConflictResolverServerCallSend) Send(item ConflictInfo) error {
+	return s.s.Send(item)
+}
+
 // DatabaseClientMethods is the client interface
 // containing Database methods.
 //
@@ -1539,6 +1889,9 @@
 	// SchemaManager implements the API for managing schema metadata attached
 	// to a Database.
 	SchemaManagerClientMethods
+	// ConflictManager interface provides all the methods necessary to handle
+	// conflict resolution for a given database.
+	ConflictManagerClientMethods
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
@@ -1579,7 +1932,7 @@
 
 // DatabaseClient returns a client stub for Database.
 func DatabaseClient(name string) DatabaseClientStub {
-	return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name), SchemaManagerClient(name)}
+	return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name), SchemaManagerClient(name), ConflictManagerClient(name)}
 }
 
 type implDatabaseClientStub struct {
@@ -1590,6 +1943,7 @@
 	SyncGroupManagerClientStub
 	BlobManagerClientStub
 	SchemaManagerClientStub
+	ConflictManagerClientStub
 }
 
 func (c implDatabaseClientStub) Create(ctx *context.T, i0 *SchemaMetadata, i1 access.Permissions, opts ...rpc.CallOpt) (err error) {
@@ -1786,6 +2140,9 @@
 	// SchemaManager implements the API for managing schema metadata attached
 	// to a Database.
 	SchemaManagerServerMethods
+	// ConflictManager interface provides all the methods necessary to handle
+	// conflict resolution for a given database.
+	ConflictManagerServerMethods
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
@@ -1899,6 +2256,9 @@
 	// SchemaManager implements the API for managing schema metadata attached
 	// to a Database.
 	SchemaManagerServerStubMethods
+	// ConflictManager interface provides all the methods necessary to handle
+	// conflict resolution for a given database.
+	ConflictManagerServerStubMethods
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
@@ -1949,6 +2309,7 @@
 		SyncGroupManagerServerStub: SyncGroupManagerServer(impl),
 		BlobManagerServerStub:      BlobManagerServer(impl),
 		SchemaManagerServerStub:    SchemaManagerServer(impl),
+		ConflictManagerServerStub:  ConflictManagerServer(impl),
 	}
 	// Initialize GlobState; always check the stub itself first, to handle the
 	// case where the user has the Glob method defined in their VDL source.
@@ -1967,6 +2328,7 @@
 	SyncGroupManagerServerStub
 	BlobManagerServerStub
 	SchemaManagerServerStub
+	ConflictManagerServerStub
 	gs *rpc.GlobState
 }
 
@@ -2003,7 +2365,7 @@
 }
 
 func (s implDatabaseServerStub) Describe__() []rpc.InterfaceDesc {
-	return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, watch.GlobWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc}
+	return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, watch.GlobWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc, ConflictManagerDesc}
 }
 
 // DatabaseDesc describes the Database interface.
@@ -2020,6 +2382,7 @@
 		{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
 		{"BlobManager", "v.io/syncbase/v23/services/syncbase/nosql", "// BlobManager is the interface for blob operations."},
 		{"SchemaManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SchemaManager implements the API for managing schema metadata attached\n// to a Database."},
+		{"ConflictManager", "v.io/syncbase/v23/services/syncbase/nosql", "// ConflictManager interface provides all the methods necessary to handle\n// conflict resolution for a given database."},
 	},
 	Methods: []rpc.MethodDesc{
 		{
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index eae8e9d..407e956 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -111,6 +111,158 @@
 	Defer
 }
 
+// ConflictInfo contains information to fully specify a conflict
+// for a key, providing the (local, remote, ancestor) tuple.
+// A key under conflict can be a part of a batch in local, remote or both
+// updates. Since the batches can have more than one key, all ConflictInfos
+// for the keys within the batches are grouped together into a single conflict
+// batch and sent as a stream with the Continued field representing conflict
+// batch boundaries.
+type ConflictInfo struct {
+	// Data is a unit chunk of ConflictInfo which can be sent over the conflict
+	// stream.
+	Data ConflictData
+
+	// Continued represents whether the batch of ConflictInfos has ended.
+	Continued bool
+}
+
+// ConflictData represents a unit of conflict data sent over the stream. It
+// can either contain information about a Batch or about an operation done
+// on a row.
+type ConflictData union {
+	Batch BatchInfo
+	Row RowInfo
+}
+
+type BatchInfo struct {
+	// Id is an identifier for a batch contained in a conflict. It is
+	// unique only in the context of a given conflict. Its purpose is solely to
+	// group one or more RowInfo objects together to represent a batch that
+	// was committed by the client.
+	Id uint16
+
+	// Hint is the hint provided by the client when this batch was committed.
+	Hint string
+
+	// Source states where the batch comes from.
+	Source BatchSource
+}
+
+// BatchSource represents where the batch was committed.
+type BatchSource enum {
+	// Local represents that the batch was committed on local syncbase.
+	Local
+
+	// Remote represents that the batch was committed on remote syncbase.
+	Remote
+}
+
+// RowInfo contains a single operation performed on a row (in case of read or
+// write) or a range or rows (in case of scan) along with a mapping to each
+// of the batches that this operation belongs to.
+// For example, if Row1 was updated on local syncbase conflicting with a write
+// on remote syncbase as part of two separate batches, then it will be
+// represented by a single RowInfo with Write Operation containing the
+// respective local and remote values along with the batch id for both batches
+// stored in the BatchIds field.
+type RowInfo struct {
+	// Op is a specific operation represented by RowInfo
+	Op Operation
+	// BatchIds contains ids of all batches that this RowInfo is a part of.
+	BatchIds []uint16
+}
+
+// Operation represents a specific operation on a row or a set of rows that is
+// a part of the conflict.
+type Operation union {
+	// Read represents a read operation performed on a specific row. For a given
+	// row key there can only be at max one Read operation within a conflict.
+	Read  RowOp
+
+	// Write represents a write operation performed on a specific row. For a
+	// given row key there can only be at max one Write operation within a
+	// conflict.
+	Write RowOp
+
+	// Scan represents a scan operation performed over a specific range of keys.
+	// For a given key range there can be at max one ScanOp within the Conflict.
+	Scan  ScanOp
+}
+
+// RowOp represents a read or write operation on a row corresponding to the
+// given key.
+type RowOp struct {
+	// The key under conflict.
+	Key string
+
+	// LocalValue contains the value read or written by local syncbase or nil.
+	LocalValue ?Value
+
+	// RemoteValue contains the value read or written by remote syncbase or nil.
+	RemoteValue ?Value
+
+	// AncestorValue contains the value for the key which is the lowest common
+	// ancestor of the two values represented by LocalValue and RemoteValue or
+	// nil if no ancestor exists or if the operation was read.
+	AncestorValue ?Value
+}
+
+// ScanOp provides details of a scan operation.
+type ScanOp struct {
+	// Start contains the starting key for a range scan.
+	Start string
+
+	// Limit contains the end key for a range scan.
+	Limit string
+}
+
+// Value contains the encoded bytes for a row's value stored in syncbase.
+type Value struct {
+	// VOM encoded bytes for a row's value
+	Bytes []byte
+
+	// Write timestamp for this value in nanoseconds.
+	// TODO(jlodhia): change the timestamp to vdl.time here, in commit timestamp
+	// and clock data.
+	WriteTs int64
+}
+
+// ValueSelection represents the value that was selected as the final resolution
+// for a conflict.
+type ValueSelection enum {
+	// Local should be used if local value is picked as the final result.
+	Local
+
+	// Remote should be used if remote value is picked as the final result.
+	Remote
+
+	// Other should be used if a new value different from local and remote is
+	// used as the final result.
+	Other
+}
+
+// ResolutionInfo contains the application’s reply to a conflict for a key,
+// providing the resolution value. The resolution may be over a group of keys
+// in which case the application must send a stream of ResolutionInfos with
+// the Continued field for the last ResolutionInfo representing the end of the
+// batch with a value false. ResolutionInfos sent as part of a batch will be
+// committed as a batch. If the commit fails, the Conflict will be re-sent.
+type ResolutionInfo struct {
+	// Key is the key under conflict.
+	Key	string
+
+	// Selection represents the value that was selected as resolution.
+	Selection ValueSelection
+
+	// Result is the resolved value for the key. This field should be used only
+	// if value of Selection field is 'Other'
+	Result ?Value
+
+	// Continued represents whether the batch of ResolutionInfos has ended.
+	Continued bool
+}
+
 // SchemaMetadata maintains metadata related to the schema of a given database.
 // There is one SchemaMetadata per database.
 type SchemaMetadata struct {
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index adb4e66..6a8e0c1 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -166,6 +166,336 @@
 }) {
 }
 
+// ConflictInfo contains information to fully specify a conflict
+// for a key, providing the (local, remote, ancestor) tuple.
+// A key under conflict can be a part of a batch in local, remote or both
+// updates. Since the batches can have more than one key, all ConflictInfos
+// for the keys within the batches are grouped together into a single conflict
+// batch and sent as a stream with the Continued field representing conflict
+// batch boundaries.
+type ConflictInfo struct {
+	// Data is a unit chunk of ConflictInfo which can be sent over the conflict
+	// stream.
+	Data ConflictData
+	// Continued represents whether the batch of ConflictInfos has ended.
+	Continued bool
+}
+
+func (ConflictInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ConflictInfo"`
+}) {
+}
+
+type (
+	// ConflictData represents any single field of the ConflictData union type.
+	//
+	// ConflictData represents a unit of conflict data sent over the stream. It
+	// can either contain information about a Batch or about an operation done
+	// on a row.
+	ConflictData interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the ConflictData union type.
+		__VDLReflect(__ConflictDataReflect)
+	}
+	// ConflictDataBatch represents field Batch of the ConflictData union type.
+	ConflictDataBatch struct{ Value BatchInfo }
+	// ConflictDataRow represents field Row of the ConflictData union type.
+	ConflictDataRow struct{ Value RowInfo }
+	// __ConflictDataReflect describes the ConflictData union type.
+	__ConflictDataReflect struct {
+		Name  string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ConflictData"`
+		Type  ConflictData
+		Union struct {
+			Batch ConflictDataBatch
+			Row   ConflictDataRow
+		}
+	}
+)
+
+func (x ConflictDataBatch) Index() int                         { return 0 }
+func (x ConflictDataBatch) Interface() interface{}             { return x.Value }
+func (x ConflictDataBatch) Name() string                       { return "Batch" }
+func (x ConflictDataBatch) __VDLReflect(__ConflictDataReflect) {}
+
+func (x ConflictDataRow) Index() int                         { return 1 }
+func (x ConflictDataRow) Interface() interface{}             { return x.Value }
+func (x ConflictDataRow) Name() string                       { return "Row" }
+func (x ConflictDataRow) __VDLReflect(__ConflictDataReflect) {}
+
+type BatchInfo struct {
+	// Id is an identifier for a batch contained in a conflict. It is
+	// unique only in the context of a given conflict. Its purpose is solely to
+	// group one or more RowInfo objects together to represent a batch that
+	// was committed by the client.
+	Id uint16
+	// Hint is the hint provided by the client when this batch was committed.
+	Hint string
+	// Source states where the batch comes from.
+	Source BatchSource
+}
+
+func (BatchInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.BatchInfo"`
+}) {
+}
+
+// BatchSource represents where the batch was committed.
+type BatchSource int
+
+const (
+	BatchSourceLocal BatchSource = iota
+	BatchSourceRemote
+)
+
+// BatchSourceAll holds all labels for BatchSource.
+var BatchSourceAll = [...]BatchSource{BatchSourceLocal, BatchSourceRemote}
+
+// BatchSourceFromString creates a BatchSource from a string label.
+func BatchSourceFromString(label string) (x BatchSource, err error) {
+	err = x.Set(label)
+	return
+}
+
+// Set assigns label to x.
+func (x *BatchSource) Set(label string) error {
+	switch label {
+	case "Local", "local":
+		*x = BatchSourceLocal
+		return nil
+	case "Remote", "remote":
+		*x = BatchSourceRemote
+		return nil
+	}
+	*x = -1
+	return fmt.Errorf("unknown label %q in nosql.BatchSource", label)
+}
+
+// String returns the string label of x.
+func (x BatchSource) String() string {
+	switch x {
+	case BatchSourceLocal:
+		return "Local"
+	case BatchSourceRemote:
+		return "Remote"
+	}
+	return ""
+}
+
+func (BatchSource) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.BatchSource"`
+	Enum struct{ Local, Remote string }
+}) {
+}
+
+// RowInfo contains a single operation performed on a row (in case of read or
+// write) or a range or rows (in case of scan) along with a mapping to each
+// of the batches that this operation belongs to.
+// For example, if Row1 was updated on local syncbase conflicting with a write
+// on remote syncbase as part of two separate batches, then it will be
+// represented by a single RowInfo with Write Operation containing the
+// respective local and remote values along with the batch id for both batches
+// stored in the BatchIds field.
+type RowInfo struct {
+	// Op is a specific operation represented by RowInfo
+	Op Operation
+	// BatchIds contains ids of all batches that this RowInfo is a part of.
+	BatchIds []uint16
+}
+
+func (RowInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.RowInfo"`
+}) {
+}
+
+type (
+	// Operation represents any single field of the Operation union type.
+	//
+	// Operation represents a specific operation on a row or a set of rows that is
+	// a part of the conflict.
+	Operation interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the Operation union type.
+		__VDLReflect(__OperationReflect)
+	}
+	// OperationRead represents field Read of the Operation union type.
+	//
+	// Read represents a read operation performed on a specific row. For a given
+	// row key there can only be at max one Read operation within a conflict.
+	OperationRead struct{ Value RowOp }
+	// OperationWrite represents field Write of the Operation union type.
+	//
+	// Write represents a write operation performed on a specific row. For a
+	// given row key there can only be at max one Write operation within a
+	// conflict.
+	OperationWrite struct{ Value RowOp }
+	// OperationScan represents field Scan of the Operation union type.
+	//
+	// Scan represents a scan operation performed over a specific range of keys.
+	// For a given key range there can be at max one ScanOp within the Conflict.
+	OperationScan struct{ Value ScanOp }
+	// __OperationReflect describes the Operation union type.
+	__OperationReflect struct {
+		Name  string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.Operation"`
+		Type  Operation
+		Union struct {
+			Read  OperationRead
+			Write OperationWrite
+			Scan  OperationScan
+		}
+	}
+)
+
+func (x OperationRead) Index() int                      { return 0 }
+func (x OperationRead) Interface() interface{}          { return x.Value }
+func (x OperationRead) Name() string                    { return "Read" }
+func (x OperationRead) __VDLReflect(__OperationReflect) {}
+
+func (x OperationWrite) Index() int                      { return 1 }
+func (x OperationWrite) Interface() interface{}          { return x.Value }
+func (x OperationWrite) Name() string                    { return "Write" }
+func (x OperationWrite) __VDLReflect(__OperationReflect) {}
+
+func (x OperationScan) Index() int                      { return 2 }
+func (x OperationScan) Interface() interface{}          { return x.Value }
+func (x OperationScan) Name() string                    { return "Scan" }
+func (x OperationScan) __VDLReflect(__OperationReflect) {}
+
+// RowOp represents a read or write operation on a row corresponding to the
+// given key.
+type RowOp struct {
+	// The key under conflict.
+	Key string
+	// LocalValue contains the value read or written by local syncbase or nil.
+	LocalValue *Value
+	// RemoteValue contains the value read or written by remote syncbase or nil.
+	RemoteValue *Value
+	// AncestorValue contains the value for the key which is the lowest common
+	// ancestor of the two values represented by LocalValue and RemoteValue or
+	// nil if no ancestor exists or if the operation was read.
+	AncestorValue *Value
+}
+
+func (RowOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.RowOp"`
+}) {
+}
+
+// ScanOp provides details of a scan operation.
+type ScanOp struct {
+	// Start contains the starting key for a range scan.
+	Start string
+	// Limit contains the end key for a range scan.
+	Limit string
+}
+
+func (ScanOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ScanOp"`
+}) {
+}
+
+// Value contains the encoded bytes for a row's value stored in syncbase.
+type Value struct {
+	// VOM encoded bytes for a row's value
+	Bytes []byte
+	// Write timestamp for this value in nanoseconds.
+	// TODO(jlodhia): change the timestamp to vdl.time here, in commit timestamp
+	// and clock data.
+	WriteTs int64
+}
+
+func (Value) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.Value"`
+}) {
+}
+
+// ValueSelection represents the value that was selected as the final resolution
+// for a conflict.
+type ValueSelection int
+
+const (
+	ValueSelectionLocal ValueSelection = iota
+	ValueSelectionRemote
+	ValueSelectionOther
+)
+
+// ValueSelectionAll holds all labels for ValueSelection.
+var ValueSelectionAll = [...]ValueSelection{ValueSelectionLocal, ValueSelectionRemote, ValueSelectionOther}
+
+// ValueSelectionFromString creates a ValueSelection from a string label.
+func ValueSelectionFromString(label string) (x ValueSelection, err error) {
+	err = x.Set(label)
+	return
+}
+
+// Set assigns label to x.
+func (x *ValueSelection) Set(label string) error {
+	switch label {
+	case "Local", "local":
+		*x = ValueSelectionLocal
+		return nil
+	case "Remote", "remote":
+		*x = ValueSelectionRemote
+		return nil
+	case "Other", "other":
+		*x = ValueSelectionOther
+		return nil
+	}
+	*x = -1
+	return fmt.Errorf("unknown label %q in nosql.ValueSelection", label)
+}
+
+// String returns the string label of x.
+func (x ValueSelection) String() string {
+	switch x {
+	case ValueSelectionLocal:
+		return "Local"
+	case ValueSelectionRemote:
+		return "Remote"
+	case ValueSelectionOther:
+		return "Other"
+	}
+	return ""
+}
+
+func (ValueSelection) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ValueSelection"`
+	Enum struct{ Local, Remote, Other string }
+}) {
+}
+
+// ResolutionInfo contains the application’s reply to a conflict for a key,
+// providing the resolution value. The resolution may be over a group of keys
+// in which case the application must send a stream of ResolutionInfos with
+// the Continued field for the last ResolutionInfo representing the end of the
+// batch with a value false. ResolutionInfos sent as part of a batch will be
+// committed as a batch. If the commit fails, the Conflict will be re-sent.
+type ResolutionInfo struct {
+	// Key is the key under conflict.
+	Key string
+	// Selection represents the value that was selected as resolution.
+	Selection ValueSelection
+	// Result is the resolved value for the key. This field should be used only
+	// if value of Selection field is 'Other'
+	Result *Value
+	// Continued represents whether the batch of ResolutionInfos has ended.
+	Continued bool
+}
+
+func (ResolutionInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ResolutionInfo"`
+}) {
+}
+
 // SchemaMetadata maintains metadata related to the schema of a given database.
 // There is one SchemaMetadata per database.
 type SchemaMetadata struct {
@@ -319,6 +649,17 @@
 	vdl.Register((*SyncGroupSpec)(nil))
 	vdl.Register((*SyncGroupMemberInfo)(nil))
 	vdl.Register((*ResolverType)(nil))
+	vdl.Register((*ConflictInfo)(nil))
+	vdl.Register((*ConflictData)(nil))
+	vdl.Register((*BatchInfo)(nil))
+	vdl.Register((*BatchSource)(nil))
+	vdl.Register((*RowInfo)(nil))
+	vdl.Register((*Operation)(nil))
+	vdl.Register((*RowOp)(nil))
+	vdl.Register((*ScanOp)(nil))
+	vdl.Register((*Value)(nil))
+	vdl.Register((*ValueSelection)(nil))
+	vdl.Register((*ResolutionInfo)(nil))
 	vdl.Register((*SchemaMetadata)(nil))
 	vdl.Register((*CrPolicy)(nil))
 	vdl.Register((*CrRule)(nil))
diff --git a/v23/syncbase/nosql/cr_connection_test.go b/v23/syncbase/nosql/cr_connection_test.go
new file mode 100644
index 0000000..b99c15d
--- /dev/null
+++ b/v23/syncbase/nosql/cr_connection_test.go
@@ -0,0 +1,138 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/v23/syncbase/nosql/crtestutil"
+	"v.io/v23/context"
+)
+
+type resolverImpl1 struct {
+}
+
+func (ri *resolverImpl1) OnConflict(ctx *context.T, conflict *Conflict) (r Resolution) {
+	return
+}
+
+// TestCrConnectionClose tests if db.Close() correctly shuts down the goroutine
+// that runs conflict resolution code in an infinite loop.
+// Setup:
+//   - A mutex lock is used to cause Advance() on the receive stream to block.
+//     This lock is held by the test before starting off the CR thread.
+//   - ctx.Cancel() is simulated by releasing the mutex lock. This is done right
+//     after calling db.Close()
+//   - The mutex is held again by the test to cause the CR thread to block on
+//     Advance() in case the CR thead did not shutdown. This will lead to
+//     failure of the test.
+// Expected result:
+//   - The CR thread should not get/remain blocked on Advance() after db.Close()
+func TestCrConnectionClose(t *testing.T) {
+	db := NewDatabase("parentName", "db1", getSchema(&resolverImpl1{}))
+	advance := func(st *crtestutil.State) bool {
+		fmt.Println("Advance() for ConflictStreamImpl called")
+		st.IsBlocked = true
+		st.Mu.Lock()
+		defer st.Mu.Unlock()
+		fmt.Println("Advance() for ConflictStreamImpl returning")
+		st.IsBlocked = false
+		return false
+	}
+
+	st := &crtestutil.State{}
+	crStream := &crtestutil.CrStreamImpl{
+		C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
+		R: &crtestutil.ResolutionStreamImpl{St: st},
+	}
+	db.c = crtestutil.MockDbClient(db.c, crStream)
+	db.crState.reconnectWaitTime = 10 * time.Millisecond
+
+	ctx, _ := context.RootContext()
+	st.Mu.Lock()                                // causes Advance() to block
+	db.EnforceSchema(ctx)                       // kicks off the CR thread
+	for i := 0; i < 100 && !st.IsBlocked; i++ { // wait till Advance() call is blocked
+		time.Sleep(time.Millisecond)
+	}
+	db.Close()
+	st.Mu.Unlock() // simulate ctx cancel()
+	for i := 0; i < 100 && st.IsBlocked; i++ {
+		time.Sleep(time.Millisecond)
+	}
+
+	st.Mu.Lock() // causes Advance() to block if called again
+	// let conflict resolution routine complete its cleanup.
+	// If the code does not work as intended then it will end up reconnecting
+	// the stream and blocking on the mutex
+	time.Sleep(time.Millisecond)
+	if st.IsBlocked {
+		t.Error("Error: The conflict resolution routine did not die")
+	}
+}
+
+// TestCrConnectionReestablish tests if the CR thread reestablishes a broken
+// connection stream with syncbase.
+// Setup:
+//   - Advance() is allowed to return immediately 3 times with return value
+//     false signifying 3 consecutive breaks in the connection.
+//   - A mutex lock is used to cause Advance() on the recieve stream to block
+//     on the 4th attempt. This lock is held by the test before starting off the
+//     CR thread. This simulates a stable connection between CR and syncbase.
+// Expected result:
+//   - Advance() should get blocked after some time.
+//   - st.AdvanceCount should be equal to 3
+func TestCrConnectionReestablish(t *testing.T) {
+	db := NewDatabase("parentName", "db1", getSchema(&resolverImpl1{}))
+	advance := func(st *crtestutil.State) bool {
+		fmt.Println("Advance() for ConflictStreamImpl called")
+		st.AdvanceCount++
+		if st.AdvanceCount > 2 {
+			// Connection stays stable after 3 attempts
+			st.IsBlocked = true
+			st.Mu.Lock()
+			defer st.Mu.Unlock()
+		}
+		fmt.Println("Advance() for ConflictStreamImpl returning")
+		st.IsBlocked = false
+		return false
+	}
+
+	st := &crtestutil.State{}
+	crStream := &crtestutil.CrStreamImpl{
+		C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
+		R: &crtestutil.ResolutionStreamImpl{St: st},
+	}
+	db.c = crtestutil.MockDbClient(db.c, crStream)
+	db.crState.reconnectWaitTime = 10 * time.Millisecond
+
+	ctx, _ := context.RootContext()
+	st.Mu.Lock() // causes Advance() to block
+	db.EnforceSchema(ctx)
+	for i := 0; i < 100 && !st.IsBlocked; i++ {
+		time.Sleep(time.Millisecond) // wait till Advance() call is blocked
+	}
+
+	if !st.IsBlocked {
+		t.Error("Error: Advance() did not block")
+	}
+	if st.AdvanceCount != 3 {
+		t.Errorf("Error: Advance() expected to be called 3 times, instead called %d times", st.AdvanceCount)
+	}
+
+	// Shutdown the cr routine
+	db.Close()
+	st.Mu.Unlock()
+}
+
+func getSchema(cr ConflictResolver) *Schema {
+	return &Schema{
+		Metadata: wire.SchemaMetadata{},
+		Upgrader: nil,
+		Resolver: cr,
+	}
+}
diff --git a/v23/syncbase/nosql/cr_test.go b/v23/syncbase/nosql/cr_test.go
new file mode 100644
index 0000000..f9618d9
--- /dev/null
+++ b/v23/syncbase/nosql/cr_test.go
@@ -0,0 +1,605 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	"bytes"
+	"strings"
+	"testing"
+	"time"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/v23/syncbase/nosql/crtestutil"
+	"v.io/v23/context"
+	"v.io/v23/vom"
+)
+
+// TODO(jlodhia): extract this test (along with the crtest helpers) to
+// nosql_test, or alternatively, move the crtest helper to an internal dir.
+
+
+////////////////////////////////////////
+// Client Code
+
+// Client constants
+const (
+	TaskIdPrefix = "task:"
+	ListIdPrefix = "list:"
+
+	List1 = "list:1"
+
+	Task1 = "task:1:1"
+	Task2 = "task:1:2"
+
+	HintListDelete = "ListDelete"
+
+	HintTextEdit   = "TaskTextEdit"
+	HintDone       = "TaskDoneToggle"
+	HintTaskDelete = "TaskDelete"
+	HintTaskAdd    = "TaskAdd"
+
+	TextNew = "EditedText"
+)
+
+// Client data structures
+type Task struct {
+	Text   string
+	Done   bool
+	DoneTs int64
+	ListId string
+}
+
+type List struct {
+	Name      string
+	TaskCount int16
+	DoneCount int16
+}
+
+// Client conflict reolution impl
+type ConflictResolverImpl struct {
+}
+
+func (ri *ConflictResolverImpl) OnConflict(ctx *context.T, conflict *Conflict) Resolution {
+	taskRows := findRowsByType(conflict, TaskIdPrefix)
+	result := ri.handleTasks(ctx, taskRows)
+
+	listRows := findRowsByType(conflict, ListIdPrefix)
+	ri.handleLists(ctx, listRows, result)
+	return Resolution{ResultSet: result}
+}
+
+// handleTasks does a merge of fields within each task under conflict.
+// It handles conflict on fields as follows:
+// Text: Last writer wins
+// Done: Depends on DoneTs
+// DoneTs: The side that is not equal to ancestor and is the lower of the two
+//         wins. This means that if a user marks a task done on one device
+//         and then later marks the same task done on another device that has
+//         not synced yet, the done timestamp that is selected will be from the
+//         former action.
+func (ri *ConflictResolverImpl) handleTasks(ctx *context.T, tasks []ConflictRow) map[string]ResolvedRow {
+	result := map[string]ResolvedRow{}
+	for _, t := range tasks {
+		if isRowAdded(t) {
+			result[t.Key] = handleRowAdd(t)
+			continue
+		}
+		if isRowDeleted(t) {
+			result[t.Key] = ResolvedRow{Key: t.Key, Result: nil}
+			continue
+		}
+
+		var localTask, remoteTask, ancestorTask, resolvedTask Task
+		t.LocalValue.Get(&localTask)
+		t.RemoteValue.Get(&remoteTask)
+		t.AncestorValue.Get(&ancestorTask)
+
+		// prime the reolved task with local version for simplicity.
+		resolvedTask = localTask
+
+		// check field text for conflict
+		if localTask.Text != remoteTask.Text {
+			if remoteTask.Text != ancestorTask.Text {
+				if (localTask.Text == ancestorTask.Text) || (t.RemoteValue.WriteTs.After(t.LocalValue.WriteTs)) {
+					resolvedTask.Text = remoteTask.Text
+				}
+			}
+		}
+
+		// check field done for conflict
+		if localTask.DoneTs != remoteTask.DoneTs {
+			if remoteTask.DoneTs != ancestorTask.DoneTs {
+				if (localTask.DoneTs == ancestorTask.DoneTs) || (remoteTask.DoneTs < localTask.DoneTs) {
+					resolvedTask.DoneTs = remoteTask.DoneTs
+					resolvedTask.Done = remoteTask.Done
+				}
+			}
+		}
+		resValue, _ := NewValue(ctx, &resolvedTask)
+		result[t.Key] = ResolvedRow{Key: t.Key, Result: resValue}
+	}
+	return result
+}
+
+// handleLists does a merge of fields within each list under conflict. If a list
+// is deleted, it looks at the resolution map, finds all tasks that belong to
+// the list and changes the resolution to delete for those tasks.
+// It handles conflict on fields as follows:
+// Name: Last writer wins
+// TaskCount: Add both counts and subtract the ancestor count.
+// DoneCount: Add both counts and subtract the ancestor count.
+// NOTE: The above counters are resolved in a naive way for simplicity of this
+// test. There are many other ways to represent the counters in a more correct
+// way.
+func (ri *ConflictResolverImpl) handleLists(ctx *context.T, lists []ConflictRow, resolution map[string]ResolvedRow) {
+	for _, l := range lists {
+		if isRowAdded(l) {
+			resolution[l.Key] = handleRowAdd(l)
+			continue
+		}
+		if isRowDeleted(l) {
+			resolution[l.Key] = ResolvedRow{Key: l.Key, Result: nil}
+			deleteTasksForList(resolution, l.Key)
+			continue
+		}
+
+		var localList, remoteList, ancestorList, resolvedList List
+		l.LocalValue.Get(&localList)
+		l.RemoteValue.Get(&remoteList)
+		l.AncestorValue.Get(&ancestorList)
+
+		// prime the reolved task with local version for simplicity.
+		resolvedList = localList
+
+		// check field Name for conflict
+		if localList.Name != remoteList.Name {
+			if remoteList.Name != ancestorList.Name {
+				if (localList.Name == ancestorList.Name) || (l.RemoteValue.WriteTs.After(l.LocalValue.WriteTs)) {
+					resolvedList.Name = remoteList.Name
+				}
+			}
+		}
+
+		// check field TaskCount for conflict
+		if localList.TaskCount != remoteList.TaskCount {
+			if remoteList.TaskCount != ancestorList.TaskCount {
+				if localList.TaskCount == ancestorList.TaskCount {
+					resolvedList.TaskCount = remoteList.TaskCount
+				} else {
+					resolvedList.TaskCount = localList.TaskCount + remoteList.TaskCount - ancestorList.TaskCount
+				}
+			}
+		}
+
+		// check field DoneCount for conflict
+		if localList.DoneCount != remoteList.DoneCount {
+			if remoteList.DoneCount != ancestorList.DoneCount {
+				if localList.DoneCount == ancestorList.DoneCount {
+					resolvedList.DoneCount = remoteList.DoneCount
+				} else {
+					resolvedList.DoneCount = localList.DoneCount + remoteList.DoneCount - ancestorList.DoneCount
+				}
+			}
+		}
+		resValue, _ := NewValue(ctx, &resolvedList)
+		resolution[l.Key] = ResolvedRow{Key: l.Key, Result: resValue}
+	}
+}
+
+func deleteTasksForList(resolution map[string]ResolvedRow, listKey string) {
+	// find key prefix for tasks belonging to list
+	taskKeyPrefix := strings.Replace(listKey, ListIdPrefix, TaskIdPrefix, 1) + ":"
+	for k, v := range resolution {
+		if strings.HasPrefix(k, taskKeyPrefix) {
+			v.Result = nil
+			resolution[k] = v
+		}
+	}
+}
+
+func handleRowAdd(r ConflictRow) ResolvedRow {
+	var resolvedTask *Value
+	// first one to add wins
+	if r.RemoteValue == nil {
+		resolvedTask = r.LocalValue
+	} else if r.LocalValue == nil {
+		resolvedTask = r.RemoteValue
+	} else if r.LocalValue.WriteTs.Before(r.RemoteValue.WriteTs) {
+		resolvedTask = r.LocalValue
+	} else {
+		resolvedTask = r.RemoteValue
+	}
+	return ResolvedRow{Key: r.Key, Result: resolvedTask}
+}
+
+func isRowDeleted(r ConflictRow) bool {
+	return r.AncestorValue != nil &&
+		(r.LocalValue == nil || r.RemoteValue == nil)
+}
+
+func isRowAdded(r ConflictRow) bool {
+	return r.AncestorValue == nil &&
+		(r.LocalValue != nil || r.RemoteValue != nil)
+}
+
+func findRowsByType(conflict *Conflict, typePrefix string) []ConflictRow {
+	rows := []ConflictRow{}
+	if conflict.WriteSet != nil {
+		for k, v := range conflict.WriteSet.ByKey {
+			if strings.HasPrefix(k, typePrefix) {
+				rows = append(rows, v)
+			}
+		}
+	}
+	return rows
+}
+
+////////////////////////////////////////
+// Tests
+
+func TestConflictResolutionSingleObject(t *testing.T) {
+	mockConflit, expResult := simpleConflictStream(t)
+	RunTest(t, mockConflit, expResult, false)
+}
+
+func TestConflictResolutionAddDelete(t *testing.T) {
+	mockConflit, expResult := addDeleteConflictStream(t)
+	RunTest(t, mockConflit, expResult, true)
+}
+
+func TestConflictResolutionIntersectingConflicts(t *testing.T) {
+	mockConflict, expResult := twoIntersectingBatchesConflictStream(t)
+	RunTest(t, mockConflict, expResult, true)
+}
+
+func TestConflictResolutionListDelete(t *testing.T) {
+	mockConflit, expResult := listDeleteConflictsWithTaskAdd(t)
+	RunTest(t, mockConflit, expResult, true)
+}
+
+func RunTest(t *testing.T, mockConflit []wire.ConflictInfo, expResult map[string]wire.ResolutionInfo, singleBatch bool) {
+	db := NewDatabase("parentName", "db1", getSchema(&ConflictResolverImpl{}))
+	advance := func(st *crtestutil.State) bool {
+		if st.ValIndex >= len(mockConflit) {
+			st.IsBlocked = true
+			st.Mu.Lock()
+			defer st.Mu.Unlock()
+		}
+		st.IsBlocked = false
+		st.Val = mockConflit[st.ValIndex]
+		st.ValIndex++
+		return true
+	}
+
+	st := &crtestutil.State{}
+	crStream := &crtestutil.CrStreamImpl{
+		C: &crtestutil.ConflictStreamImpl{St: st, AdvanceFn: advance},
+		R: &crtestutil.ResolutionStreamImpl{St: st},
+	}
+	db.c = crtestutil.MockDbClient(db.c, crStream)
+	db.crState.reconnectWaitTime = 10 * time.Millisecond
+
+	ctx, _ := context.RootContext()
+	st.Mu.Lock() // causes Advance() to block
+	db.EnforceSchema(ctx)
+	for i := 0; i < 100 && (len(st.Result) != len(expResult)); i++ {
+		time.Sleep(time.Millisecond) // wait till Advance() call is blocked
+	}
+	if len(st.Result) != len(expResult) {
+		t.Errorf("\n Unexpected number of results. Expected: %d, actual: %d", len(expResult), len(st.Result))
+	}
+	for i, result := range st.Result {
+		compareResult(t, expResult[result.Key], result)
+		if singleBatch {
+			if result.Continued != (i < len(st.Result)-1) {
+				t.Error("\nUnexpected value for continued in single batch")
+			}
+		} else {
+			if result.Continued != false {
+				t.Error("\nUnexpected value for continued in multi batch")
+			}
+		}
+	}
+}
+
+////////////////////////////////////////
+// Test helpers to create test data
+
+// Consider a database with a list List1 with two tasks Task1, Task2. The
+// following helper methods create conflicts with these three objects in various
+// ways.
+
+// simpleConflictStream prepares a conflict stream with two conflicts, one
+// each for Task1 and Task2. No batch is involved and Task1 is independent of
+// Task2. Assume that List does not get updated for these task updates.
+// Local syncbase changes Text field for Task1 and marks Task2 done.
+// Remote syncbase marks Task1 done and Task2 done.
+// Expected result is a merge for Task1 and Task2.
+func simpleConflictStream(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+	// Conflict for task1
+	row1 := makeRowInfo(Task1,
+		encode(&Task{"OriginalText", false, -1, List1}), 1, // ancestor
+		encode(&Task{TextNew, false, -1, List1}), 3, // local
+		encode(&Task{"OriginalText", true, 20, List1}), 2) // remote
+	c1 := makeConflictInfo(row1, false)
+
+	// Expected result
+	r1 := makeResolution(Task1, encode(&Task{TextNew, true, 20, List1}), wire.ValueSelectionOther)
+
+	// Conflict for task2
+	row2 := makeRowInfo(Task2,
+		encode(&Task{"Text1", false, -1, List1}), 1, // ancestor
+		encode(&Task{"Text1", true, 100, List1}), 3, // local
+		encode(&Task{"Text1", true, 20, List1}), 2) // remote
+	c2 := makeConflictInfo(row2, false)
+
+	// Expected result
+	r2 := makeResolution(Task2, encode(&Task{"Text1", true, 20, List1}),
+		wire.ValueSelectionOther) // for simplicity the test resolver does not optimize
+
+	respMap := map[string]wire.ResolutionInfo{}
+	respMap[r1.Key] = r1
+	respMap[r2.Key] = r2
+
+	return []wire.ConflictInfo{c1, c2}, respMap
+}
+
+// addDeleteConflictStream prepares a conflict stream with one conflict
+// containing Task1, Task2 and List1. List contains a task count and hence
+// addition and deletion of task leads to update to the list.
+// Local syncbase deletes Task1 and updates List1 in a batch.
+// Remote syncbase adds Task2 and updates List1 in a batch.
+// Expected result is Task1 gets deleted, Task2 gets added and List1 shows
+// the correct taskcount after the two operations.
+func addDeleteConflictStream(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+	// Batch1 is local and contains task1 and list1
+	b1 := makeConflictBatch(1, HintTaskDelete, wire.BatchSourceLocal, true)
+
+	// Batch2 is remote and contains task2 and list1
+	b2 := makeConflictBatch(2, HintTaskAdd, wire.BatchSourceRemote, true)
+
+	// Deletion for task1 on local syncbase
+	row1 := makeRowInfo(Task1,
+		encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1, // ancestor
+		nil, -1, // local
+		encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1) // remote
+	row1.BatchIds = []uint16{1}
+	c1 := makeConflictInfo(row1, true)
+
+
+	// Expected result
+	r1 := makeResolution(Task1, nil, wire.ValueSelectionOther)
+
+	// Addition for task2 on remote syncbase
+	row2 := makeRowInfo(Task2,
+		nil, -1, // ancestor value
+		nil, -1, // local value
+		encode(&Task{"AddedTask", false, -1, List1}), 2) // remote value
+	row2.BatchIds = []uint16{2}
+	c2 := makeConflictInfo(row2, true)
+
+	// Expected result
+	r2 := makeResolution(Task2, encode(&Task{"AddedTask", false, -1, List1}), wire.ValueSelectionRemote)
+
+	// Update to List on both sided on filed TaskCount
+	listName := "Groceries"
+	row3 := makeRowInfo(List1,
+		encode(&List{listName, 8, 0}), 25, // ancestor value
+		encode(&List{listName, 7, 0}), 33, // local value
+		encode(&List{listName, 9, 0}), 44) // remote value
+	row3.BatchIds = []uint16{1, 2}
+	c3 := makeConflictInfo(row3, false)
+
+	// Expected result
+	r3 := makeResolution(List1, encode(&List{listName, 8, 0}), wire.ValueSelectionOther)
+
+	respMap := map[string]wire.ResolutionInfo{}
+	respMap[r1.Key] = r1
+	respMap[r2.Key] = r2
+	respMap[r3.Key] = r3
+
+	return []wire.ConflictInfo{b1, b2, c1, c2, c3}, respMap
+}
+
+// twoIntersectingBatchesConflictStream prepares a conflict stream with one
+// conflict containing Task1, Task2 and List1. Here we assume that List keeps
+// a count of done tasks.
+// Local syncbase marks Task1 done and updates done count on List1. Later it
+// it also marks Task2 done and updates done count on List1.
+// Remote syncbase updates field Text on Task1 followed by an update to field
+// Text on Task2 (not in a batch).
+// Expected result is a merge for Task1 and Task2 and Local version of List1
+// being accepted.
+func twoIntersectingBatchesConflictStream(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+	// Batch1 is local and contains task1 and list1
+	b1 := makeConflictBatch(1, HintDone, wire.BatchSourceLocal, true)
+
+	// Batch2 is local and contains task2 and list1
+	b2 := makeConflictBatch(2, HintDone, wire.BatchSourceLocal, true)
+
+	// Batch3 is remote and contains task1
+	b3 := makeConflictBatch(3, HintTextEdit, wire.BatchSourceRemote, true)
+
+	// Batch4 is remote and contains task2
+	b4 := makeConflictBatch(4, HintTextEdit, wire.BatchSourceRemote, true)
+
+	// For task1, mark done on local and edit text on remote
+	row1 := makeRowInfo(Task1,
+		encode(&Task{"TaskOrig", false, -1, List1}), 1, // ancestor
+		encode(&Task{"TaskOrig", true, 204, List1}), 5, // local
+		encode(&Task{"TaskEdit", false, -1, List1}), 3) // remote
+	row1.BatchIds = []uint16{1, 3}
+	c1 := makeConflictInfo(row1, true)
+
+	// Expected result
+	r1 := makeResolution(Task1, encode(&Task{"TaskEdit", true, 204, List1}), wire.ValueSelectionOther)
+
+	// For task2, mark done on local and edit text on remote
+	row2 := makeRowInfo(Task2,
+		encode(&Task{"TaskOrig", false, -1, List1}), 2, // ancestor
+		encode(&Task{"TaskOrig", true, 204, List1}), 5, // local
+		encode(&Task{"TaskEdit", false, -1, List1}), 3) // remote
+	row2.BatchIds = []uint16{2, 4}
+	c2 := makeConflictInfo(row2, true)
+
+	// Expected result
+	r2 := makeResolution(Task2, encode(&Task{"TaskEdit", true, 204, List1}), wire.ValueSelectionOther)
+
+	// Update to List on Local for updating TaskDoneCount
+	listName := "Groceries"
+	row3 := makeRowInfo(List1,
+		encode(&List{listName, 8, 0}), 1, // ancestor value
+		encode(&List{listName, 8, 2}), 5, // local value
+		encode(&List{listName, 8, 0}), 3) // remote value
+	row3.BatchIds = []uint16{1, 2}
+	c3 := makeConflictInfo(row3, false)
+
+	// Expected result
+	r3 := makeResolution(List1, encode(&List{listName, 8, 2}), wire.ValueSelectionOther)
+
+	respMap := map[string]wire.ResolutionInfo{}
+	respMap[r1.Key] = r1
+	respMap[r2.Key] = r2
+	respMap[r3.Key] = r3
+
+	return []wire.ConflictInfo{b1, b2, b3, b4, c1, c2, c3}, respMap
+}
+
+// listDeleteConflictsWithTaskAdd prepares a conflict stream with one
+// conflict containing Task1, Task2 and List1. Assume Task2 does not exist yet.
+// Local syncbase deletes List1 and Task1 as a batch.
+// Remote syncbase adds Task2 along with an update to List1 as a batch.
+// Expected result is deletion of List1, Task1 and Task2.
+func listDeleteConflictsWithTaskAdd(t *testing.T) ([]wire.ConflictInfo, map[string]wire.ResolutionInfo) {
+	// Batch1 is local and contains task1 and list1
+	b1 := makeConflictBatch(1, HintListDelete, wire.BatchSourceLocal, true)
+
+	// Batch2 is remote and contains task2 and list1
+	b2 := makeConflictBatch(2, HintTaskAdd, wire.BatchSourceRemote, true)
+
+	// Delition for list1 on local syncbase
+	listName := "Groceries"
+	row0 := makeRowInfo(List1,
+		encode(&List{listName, 1, 0}), 1, // ancestor value
+		nil, 5, // local value
+		encode(&List{listName, 2, 0}), 3) // remote value
+	row0.BatchIds = []uint16{1, 2}
+	c0 := makeConflictInfo(row0, true)
+
+	// Expected result
+	r0 := makeResolution(List1, nil, wire.ValueSelectionOther)
+
+	// Deletion for task1 on local syncbase along with list1
+	row1 := makeRowInfo(Task1,
+		encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1, // ancestor
+		nil, -1, // local
+		encode(&Task{"TaskToBeRemoved", false, -1, List1}), 1) // remote
+	row1.BatchIds = []uint16{1}
+	c1 := makeConflictInfo(row1, true)
+
+	// Expected result
+	r1 := makeResolution(Task1, nil, wire.ValueSelectionOther)
+
+	// Addition for task2 on remote syncbase
+	row2 := makeRowInfo(Task2,
+		nil, -1, // ancestor value
+		nil, -1, // local value
+		encode(&Task{"AddedTask", false, -1, List1}), 2) // remote value
+	row2.BatchIds = []uint16{2}
+	c2 := makeConflictInfo(row2, false)
+
+	// Expected result
+	r2 := makeResolution(Task2, nil, wire.ValueSelectionOther)
+
+	respMap := map[string]wire.ResolutionInfo{}
+	respMap[r0.Key] = r0
+	respMap[r1.Key] = r1
+	respMap[r2.Key] = r2
+
+	return []wire.ConflictInfo{b1, b2, c0, c1, c2}, respMap
+}
+
+func makeResolution(key string, result []byte, selection wire.ValueSelection) wire.ResolutionInfo {
+	r := wire.ResolutionInfo{}
+	r.Key = key
+	if result != nil {
+		r.Result = &wire.Value{
+			Bytes:   result,
+			WriteTs: -1,
+		}
+	}
+	r.Selection = selection
+	return r
+}
+
+func makeConflictBatch(id uint16, hint string, source wire.BatchSource, continued bool) wire.ConflictInfo {
+	batch := wire.BatchInfo{Id: id, Hint: hint, Source: source}
+	return wire.ConflictInfo{Data: wire.ConflictDataBatch{Value: batch}, Continued: continued}
+}
+
+func makeConflictInfo(row wire.RowInfo, continued bool) wire.ConflictInfo {
+	return wire.ConflictInfo{
+		Data: wire.ConflictDataRow{Value: row},
+		Continued: continued,
+	}
+}
+
+func makeRowInfo(key string, ancestor []byte, ats int64, local []byte, lts int64, remote []byte, rts int64) wire.RowInfo {
+	op := wire.RowOp{}
+	op.Key = key
+
+	if ancestor != nil {
+		op.AncestorValue = &wire.Value{
+			Bytes:   ancestor,
+			WriteTs: ats,
+		}
+	}
+
+	if local != nil {
+		op.LocalValue = &wire.Value{
+			Bytes:   local,
+			WriteTs: lts,
+		}
+	}
+
+	if remote != nil {
+		op.RemoteValue = &wire.Value{
+			Bytes:   remote,
+			WriteTs: rts,
+		}
+	}
+	return wire.RowInfo{
+		Op: wire.OperationWrite{Value: op},
+	}
+}
+
+func compareResult(t *testing.T, expected wire.ResolutionInfo, actual wire.ResolutionInfo) {
+	if actual.Key != expected.Key {
+		t.Error("Key does not match")
+	}
+	if actual.Selection != expected.Selection {
+		t.Errorf("Key: %s", expected.Key)
+		t.Errorf("Expected selection: %v, actual selection: %v", expected.Selection, actual.Selection)
+	}
+	if (expected.Result == nil) && (actual.Result != nil) {
+		t.Errorf("Key: %s", expected.Key)
+		t.Error("Result expected to be nil but found non nil")
+	}
+	if expected.Result != nil {
+		if actual.Result == nil {
+			t.Errorf("Key: %s", expected.Key)
+			t.Error("Result found nil")
+		}
+		if bytes.Compare(actual.Result.Bytes, expected.Result.Bytes) != 0 {
+			t.Errorf("Key: %s", expected.Key)
+			t.Error("Result bytes do not match")
+		}
+	}
+}
+
+func encode(value interface{}) []byte {
+	v, _ := vom.Encode(value)
+	return v
+}
diff --git a/v23/syncbase/nosql/crtestutil/mock_dbclient.go b/v23/syncbase/nosql/crtestutil/mock_dbclient.go
new file mode 100644
index 0000000..99e27ee
--- /dev/null
+++ b/v23/syncbase/nosql/crtestutil/mock_dbclient.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package crtestutil
+
+import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
+)
+
+type MockWireDatabaseClient struct {
+	wire.DatabaseClientMethods
+	crs *CrStreamImpl
+}
+
+func MockDbClient(c wire.DatabaseClientMethods, crs *CrStreamImpl) *MockWireDatabaseClient {
+	return &MockWireDatabaseClient{c, crs}
+}
+
+func (wclient *MockWireDatabaseClient) GetSchemaMetadata(ctx *context.T, opts ...rpc.CallOpt) (sm wire.SchemaMetadata, err error) {
+	err = verror.NewErrNoExist(ctx)
+	return
+}
+
+func (wclient *MockWireDatabaseClient) SetSchemaMetadata(ctx *context.T, i0 wire.SchemaMetadata, opts ...rpc.CallOpt) error {
+	return nil
+}
+
+func (wclient *MockWireDatabaseClient) StartConflictResolver(ctx *context.T, opts ...rpc.CallOpt) (wire.ConflictManagerStartConflictResolverClientCall, error) {
+	return wclient.crs, nil
+}
diff --git a/v23/syncbase/nosql/crtestutil/mock_stream.go b/v23/syncbase/nosql/crtestutil/mock_stream.go
new file mode 100644
index 0000000..3bb8f4d
--- /dev/null
+++ b/v23/syncbase/nosql/crtestutil/mock_stream.go
@@ -0,0 +1,119 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package crtestutil
+
+import (
+	"sync"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+)
+
+var _ wire.ConflictManagerStartConflictResolverClientCall = (*CrStreamImpl)(nil)
+
+type State struct {
+	// ConflictStream state
+	IsBlocked    bool
+	Mu           sync.Mutex
+	Val          wire.ConflictInfo
+	AdvanceCount int
+	ValIndex     int
+	// ResolutionStream state variables
+	Result []wire.ResolutionInfo
+}
+
+type CrStreamImpl struct {
+	C ConflictStream
+	R ResolutionStream
+}
+
+func (s *CrStreamImpl) RecvStream() interface {
+	Advance() bool
+	Value() wire.ConflictInfo
+	Err() error
+} {
+	return recvStreamImpl{s.C}
+}
+
+func (s *CrStreamImpl) SendStream() interface {
+	Send(item wire.ResolutionInfo) error
+	Close() error
+} {
+	return sendStreamImpl{s.R}
+}
+
+func (s *CrStreamImpl) Finish() error {
+	return nil
+}
+
+type recvStreamImpl struct {
+	c ConflictStream
+}
+
+func (rs recvStreamImpl) Advance() bool {
+	return rs.c.Advance()
+}
+func (rs recvStreamImpl) Value() wire.ConflictInfo {
+	return rs.c.Value()
+}
+func (rs recvStreamImpl) Err() error {
+	return rs.c.Err()
+}
+
+type sendStreamImpl struct {
+	r ResolutionStream
+}
+
+func (ss sendStreamImpl) Send(item wire.ResolutionInfo) error {
+	return ss.r.Send(item)
+}
+func (c sendStreamImpl) Close() error {
+	return nil
+}
+
+type ConflictStream interface {
+	Advance() bool
+	Value() wire.ConflictInfo
+	Err() error
+}
+
+type ResolutionStream interface {
+	Send(item wire.ResolutionInfo) error
+}
+
+type ConflictStreamImpl struct {
+	St        *State
+	AdvanceFn func(*State) bool
+}
+
+func (cs *ConflictStreamImpl) Advance() bool {
+	return cs.AdvanceFn(cs.St)
+}
+func (cs *ConflictStreamImpl) Value() wire.ConflictInfo {
+	return cs.St.Val
+}
+func (cs *ConflictStreamImpl) Err() error {
+	return &TestError{"Stream broken"}
+}
+
+type ResolutionStreamImpl struct {
+	St *State
+}
+
+func (rs *ResolutionStreamImpl) Send(item wire.ResolutionInfo) error {
+	if rs.St.Result == nil {
+		rs.St.Result = []wire.ResolutionInfo{item}
+		return nil
+	}
+	rs.St.Result = append(rs.St.Result, item)
+	return nil
+}
+
+type TestError struct {
+	str string
+}
+
+func (e *TestError) Error() string {
+	return e.str
+}
diff --git a/v23/syncbase/nosql/database.go b/v23/syncbase/nosql/database.go
index d48f450..42b5270 100644
--- a/v23/syncbase/nosql/database.go
+++ b/v23/syncbase/nosql/database.go
@@ -5,6 +5,9 @@
 package nosql
 
 import (
+	"sync"
+	"time"
+
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/util"
 	"v.io/v23/context"
@@ -15,6 +18,12 @@
 	"v.io/x/lib/vlog"
 )
 
+const (
+	// Wait time before we try to reconnect a broken conflict resolution stream.
+	waitBeforeReconnectInMillis = 2 * time.Second
+	reconnectionCount = "rcc"
+)
+
 func NewDatabase(parentFullName, relativeName string, schema *Schema) *database {
 	fullName := naming.Join(parentFullName, relativeName)
 	return &database{
@@ -23,6 +32,9 @@
 		fullName:       fullName,
 		name:           relativeName,
 		schema:         schema,
+		crState: conflictResolutionState{
+			reconnectWaitTime: waitBeforeReconnectInMillis,
+		},
 	}
 }
 
@@ -32,6 +44,31 @@
 	fullName       string
 	name           string
 	schema         *Schema
+	crState        conflictResolutionState
+}
+
+// conflictResolutionState maintains data about the connection of
+// conflict resolution stream with syncbase. It provides a way to disconnect
+// an existing open stream.
+type conflictResolutionState struct {
+	mu                sync.Mutex // guards access to all fields in this struct
+	crContext         *context.T
+	cancelFn          context.CancelFunc
+	isClosed          bool
+	reconnectWaitTime time.Duration
+}
+
+func (crs *conflictResolutionState) disconnect() {
+	crs.mu.Lock()
+	defer crs.mu.Unlock()
+	crs.isClosed = true
+	crs.cancelFn()
+}
+
+func (crs *conflictResolutionState) isDisconnected() bool {
+	crs.mu.Lock()
+	defer crs.mu.Unlock()
+	return crs.isClosed
 }
 
 var _ Database = (*database)(nil)
@@ -168,17 +205,44 @@
 	return createBlob(ctx, d.fullName)
 }
 
-// UpgradeIfOutdated implements Database.UpgradeIfOutdated.
-func (d *database) UpgradeIfOutdated(ctx *context.T) (bool, error) {
+// EnforceSchema implements Database.EnforceSchema.
+func (d *database) EnforceSchema(ctx *context.T) error {
 	var schema *Schema = d.schema
 	if schema == nil {
-		return false, verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
+		return verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
 	}
 
 	if schema.Metadata.Version < 0 {
-		return false, verror.New(verror.ErrBadState, ctx, "Schema version cannot be less than zero.")
+		return verror.New(verror.ErrBadState, ctx, "Schema version cannot be less than zero.")
 	}
 
+	if needsResolver(d.schema.Metadata) && d.schema.Resolver == nil {
+		return verror.New(verror.ErrBadState, ctx, "ResolverTypeAppResolves cannot be used in CrRule without providing a ConflictResolver in Schema.")
+	}
+
+	if _, err := d.upgradeIfOutdated(ctx); err != nil {
+		return err
+	}
+
+	if d.schema.Resolver == nil {
+		return nil
+	}
+
+	childCtx, cancelFn := context.WithCancel(ctx)
+	d.crState.crContext = childCtx
+	d.crState.cancelFn = cancelFn
+
+	go d.establishConflictResolution(childCtx)
+	return nil
+}
+
+// Close implements Database.Close.
+func (d *database) Close() {
+	d.crState.disconnect()
+}
+
+func (d *database) upgradeIfOutdated(ctx *context.T) (bool, error) {
+	var schema *Schema = d.schema
 	schemaMgr := d.getSchemaManager()
 	currMeta, err := schemaMgr.getSchemaMetadata(ctx)
 	if err != nil {
@@ -222,8 +286,182 @@
 	return true, nil
 }
 
+func (d *database) establishConflictResolution(ctx *context.T) {
+	count := 0
+	for {
+		count++
+		vlog.Infof("Starting a new conflict resolution connection. Re-Connection count: %d", count)
+		childCtx := context.WithValue(ctx, reconnectionCount, count)
+		// listenForConflicts is a blocking method which returns only when the
+		// conflict stream is broken.
+		if err := d.listenForConflicts(childCtx); err != nil {
+			vlog.Errorf("Conflict resolution connection ended with error: %v", err)
+		}
+
+		// Check if database is closed and if we need to shutdown conflict
+		// resolution.
+		if d.crState.isDisconnected() {
+			vlog.Infof("Shutting down conflict resolution connection.")
+			break
+		}
+
+		// The connection might have broken because syncbase service went down.
+		// Sleep for a few seconds to allow syncbase to come back up.
+		time.Sleep(d.crState.reconnectWaitTime)
+	}
+}
+
+func (d *database) listenForConflicts(ctx *context.T) error {
+	resolver, err := d.c.StartConflictResolver(ctx)
+	if err != nil {
+		return err
+	}
+	conflictStream := resolver.RecvStream()
+	resolutionStream := resolver.SendStream()
+	var c *Conflict = &Conflict{}
+	for conflictStream.Advance() {
+		row := conflictStream.Value()
+		addRowToConflict(c, &row)
+		if !row.Continued {
+			resolution := d.schema.Resolver.OnConflict(ctx, c)
+			if err := sendResolution(resolutionStream, resolution); err != nil {
+				return err
+			}
+			c = &Conflict{}  // create a new conflict object for the next batch
+		}
+	}
+	if err := conflictStream.Err(); err != nil {
+		return err
+	}
+	return resolver.Finish()
+}
+
+// TODO(jlodhia): Should we check if the Resolution received addresses all
+// conflicts in write set?
+func sendResolution(stream interface {
+	Send(item wire.ResolutionInfo) error
+}, resolution Resolution) error {
+	size := len(resolution.ResultSet)
+	count := 0
+	for _, v := range resolution.ResultSet {
+		count++
+		ri := toResolutionInfo(v, count != size)
+		if err := stream.Send(ri); err != nil {
+			vlog.Error("Error while sending resolution")
+			return err
+		}
+	}
+	return nil
+}
+
+func addRowToConflict(c *Conflict, ci *wire.ConflictInfo) {
+	switch v := ci.Data.(type) {
+	case wire.ConflictDataBatch:
+		if c.Batches == nil {
+			c.Batches = map[uint16]wire.BatchInfo{}
+		}
+		c.Batches[v.Value.Id] = v.Value
+	case wire.ConflictDataRow:
+		rowInfo := v.Value
+		switch op := rowInfo.Op.(type) {
+		case wire.OperationWrite:
+			if c.WriteSet == nil {
+				c.WriteSet = &ConflictRowSet{map[string]ConflictRow{}, map[uint16][]ConflictRow{}}
+			}
+			cr := toConflictRow(op.Value, rowInfo.BatchIds)
+			c.WriteSet.ByKey[cr.Key] = cr
+			for _, bid := range rowInfo.BatchIds {
+				c.WriteSet.ByBatch[bid] = append(c.WriteSet.ByBatch[bid], cr)
+			}
+		case wire.OperationRead:
+			if c.ReadSet == nil {
+				c.ReadSet = &ConflictRowSet{map[string]ConflictRow{}, map[uint16][]ConflictRow{}}
+			}
+			cr := toConflictRow(op.Value, rowInfo.BatchIds)
+			c.ReadSet.ByKey[cr.Key] = cr
+			for _, bid := range rowInfo.BatchIds {
+				c.ReadSet.ByBatch[bid] = append(c.ReadSet.ByBatch[bid], cr)
+			}
+		case wire.OperationScan:
+			if c.ScanSet == nil {
+				c.ScanSet = &ConflictScanSet{map[uint16][]wire.ScanOp{}}
+			}
+			for _, bid := range rowInfo.BatchIds {
+				c.ScanSet.ByBatch[bid] = append(c.ScanSet.ByBatch[bid], op.Value)
+			}
+		}
+	}
+}
+
+func toConflictRow(op wire.RowOp, batchIds []uint16) ConflictRow {
+	var local, remote, ancestor *Value
+	if op.LocalValue != nil {
+		local = &Value{
+			val:       op.LocalValue.Bytes,
+			WriteTs:   toTime(op.LocalValue.WriteTs),
+			selection: wire.ValueSelectionLocal,
+		}
+	}
+	if op.RemoteValue != nil {
+		remote = &Value{
+			val:       op.RemoteValue.Bytes,
+			WriteTs:   toTime(op.RemoteValue.WriteTs),
+			selection: wire.ValueSelectionRemote,
+		}
+	}
+	if op.AncestorValue != nil {
+		ancestor = &Value{
+			val:       op.AncestorValue.Bytes,
+			WriteTs:   toTime(op.AncestorValue.WriteTs),
+			selection: wire.ValueSelectionOther,
+		}
+	}
+	return ConflictRow{
+		Key:           op.Key,
+		LocalValue:    local,
+		RemoteValue:   remote,
+		AncestorValue: ancestor,
+		BatchIds:      batchIds,
+	}
+}
+
+// TODO(jlodhia): remove this method once time is stored as time.Time instead
+// of int64
+func toTime(unixNanos int64) time.Time {
+	return time.Unix(
+		unixNanos / 1e9,  // seconds
+		unixNanos % 1e9)  // nanoseconds
+}
+
+func toResolutionInfo(r ResolvedRow, lastRow bool) wire.ResolutionInfo {
+	sel := wire.ValueSelectionOther
+	resVal := (*wire.Value)(nil)
+	if r.Result != nil {
+		sel = r.Result.selection
+		resVal = &wire.Value{
+			Bytes:   r.Result.val,
+			WriteTs: r.Result.WriteTs.UnixNano(),  // this timestamp is ignored by syncbase
+		}
+	}
+	return wire.ResolutionInfo{
+		Key:       r.Key,
+		Selection: sel,
+		Result:    resVal,
+		Continued: lastRow,
+	}
+}
+
+func needsResolver(metadata wire.SchemaMetadata) bool {
+	for _, rule := range metadata.Policy.Rules {
+		if rule.Resolver == wire.ResolverTypeAppResolves {
+			return true
+		}
+	}
+	return false
+}
+
 func (d *database) getSchemaManager() schemaManagerImpl {
-	return newSchemaManager(d.fullName)
+	return newSchemaManager(d.c)
 }
 
 func (d *database) schemaVersion() int32 {
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index 622f82d..d9ae93f 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -6,12 +6,15 @@
 package nosql
 
 import (
+	"time"
+
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/util"
 	"v.io/v23/context"
 	"v.io/v23/security/access"
 	"v.io/v23/services/watch"
 	"v.io/v23/vdl"
+	"v.io/v23/verror"
 	"v.io/v23/vom"
 )
 
@@ -45,6 +48,10 @@
 	// time of the RPC, and will not reflect subsequent writes to keys not yet
 	// reached by the stream.
 	Exec(ctx *context.T, query string) ([]string, ResultStream, error)
+
+	// Close cleans up any state associated with this client handle including
+	// shutting down any open conflict resolution stream.
+	Close()
 }
 
 // Database represents a collection of Tables. Batches, queries, sync, watch,
@@ -131,14 +138,15 @@
 	// Blob returns a handle to the blob with the given BlobRef.
 	Blob(br wire.BlobRef) Blob
 
-	// This method compares the current schema version of the database with the
-	// schema version provided while creating this database handle. If the
-	// current database schema version is lower, then the SchemaUpdater is
+	// EnforceSchema compares the current schema version of the database
+	// with the schema version provided while creating this database handle. If
+	// the current database schema version is lower, then the SchemaUpdater is
 	// called. If SchemaUpdater is successful this method stores the new schema
 	// metadata in database.
-	// Note: schema can be nil, in which case this method skips schema check
-	// and the caller is responsible for maintaining schema sanity.
-	UpgradeIfOutdated(ctx *context.T) (bool, error)
+	// This method also registers a conflict resolver with syncbase to receive
+	// conflicts. Note: schema can be nil, in which case this method skips
+	// schema check and the caller is responsible for maintaining schema sanity.
+	EnforceSchema(ctx *context.T) error
 }
 
 // BatchDatabase is a handle to a set of reads and writes to the database that
@@ -550,4 +558,114 @@
 type Schema struct {
 	Metadata wire.SchemaMetadata
 	Upgrader SchemaUpgrader
+	Resolver ConflictResolver
+}
+
+// ConflictResolver interface allows the App to define resolution of conflicts
+// that it requested to handle.
+type ConflictResolver interface {
+	OnConflict(ctx *context.T, conflict *Conflict) Resolution
+}
+
+// Conflict contains information to fully specify a conflict. Since syncbase
+// supports batches there can be one or more rows within the batch that has a
+// conflict. Each of these rows will be sent together as part of a single
+// conflict. Each row contains an Id of the batch to which it belongs,
+// enabling the client to group together rows that are part of a batch. Note
+// that a single row can be part of more than one batch.
+//
+// WriteSet contains rows that were written.
+// ReadSet contains rows that were read within a batch corresponding to a row
+// within the write set.
+// ScanSet contains scans performed within a batch corresponding to a row
+// within the write set.
+// Batches is a map of unique ids to BatchInfo objects. The id is unique only in
+// the context of a given conflict and is otherwise meaningless.
+type Conflict struct {
+	ReadSet       *ConflictRowSet
+	WriteSet      *ConflictRowSet
+	ScanSet       *ConflictScanSet
+	Batches       map[uint16]wire.BatchInfo
+}
+
+// ConflictRowSet contains a set of rows under conflict. It provides two different
+// ways to access the same set.
+// ByKey is a map of ConflictRows keyed by the row key.
+// ByBatch is a map of []ConflictRows keyed by batch id. This map lets the client
+// access all ConflictRows within this set that contain a given hint.
+type ConflictRowSet struct {
+	ByKey  map[string]ConflictRow
+	ByBatch map[uint16][]ConflictRow
+}
+
+// ConflictScanSet contains a set of scans under conflict.
+// ByBatch is a map of array of ScanOps keyed by batch id.
+type ConflictScanSet struct {
+	ByBatch map[uint16][]wire.ScanOp
+}
+
+// ConflictRow represents a row under conflict.
+// Key is the key for the row.
+// LocalValue is the value present in the local db.
+// RemoteValue is the value received via sync.
+// AncestorValue is the value for the key which is the lowest common
+// ancestor of the two values represented by LocalValue and RemoteValue.
+// AncestorValue is nil if the ConflictRow is a part of the read set.
+// BatchIds is a list of ids of all the batches that this row belongs to.
+type ConflictRow struct {
+	Key           string
+	LocalValue    *Value
+	RemoteValue   *Value
+	AncestorValue *Value
+	BatchIds      []uint16
+}
+
+// Resolution contains the application’s reply to a conflict. It must contain a
+// resolved value for each conflict row within the WriteSet of the given
+// conflict.
+// ResultSet is a map of row key to ResolvedRow.
+type Resolution struct {
+	ResultSet map[string]ResolvedRow
+	// TODO(jlodhia): Hint []string
+}
+
+// ResolvedRow represents a result of resolution of a row under conflict.
+// Key is the key for the row.
+// Selection represents the value that was selected for resolution.
+// Value is the resolved value for the key. This field should be used only
+// if value of Selection field is 'Other'.
+type ResolvedRow struct {
+	Key    string
+	Result *Value
+}
+
+// Value contains a specific version of data for the row under conflict along
+// with the write timestamp and hints associated with the version.
+// WriteTs is the write timestamp for this value.
+type Value struct {
+	val       []byte
+	WriteTs   time.Time
+	selection wire.ValueSelection
+}
+
+// Get takes a reference to an instance of a type that is expected to be
+// represented by Value.
+func (v *Value) Get(value interface{}) error {
+	return vom.Decode(v.val, value)
+}
+
+// NewValue creates a new Value to be added to Resolution.
+func NewValue(ctx *context.T, data interface{}) (*Value, error) {
+	if data == nil {
+		return nil, verror.New(verror.ErrBadArg, ctx, "data cannot be nil")
+	}
+	bytes, err := vom.Encode(data)
+	if err != nil {
+		return nil, err
+	}
+	return &Value{
+		val:       bytes,
+		WriteTs:   time.Now(),  // ignored by syncbase
+		selection: wire.ValueSelectionOther,
+	}, nil
 }
diff --git a/v23/syncbase/nosql/schema.go b/v23/syncbase/nosql/schema.go
index 9f52a1d..8dd480d 100644
--- a/v23/syncbase/nosql/schema.go
+++ b/v23/syncbase/nosql/schema.go
@@ -21,14 +21,12 @@
 // Implementation of SchemaManager (Not part of public client API)
 
 type schemaManagerImpl struct {
-	dbName string
-	c      wire.DatabaseClientMethods
+	c wire.DatabaseClientMethods
 }
 
-func newSchemaManager(dbName string) schemaManagerImpl {
+func newSchemaManager(client wire.DatabaseClientMethods) schemaManagerImpl {
 	return schemaManagerImpl{
-		dbName: dbName,
-		c:      wire.DatabaseClient(dbName),
+		c: client,
 	}
 }
 
diff --git a/v23/syncbase/nosql/schema_test.go b/v23/syncbase/nosql/schema_test.go
index 6d7ed0d..78ebd95 100644
--- a/v23/syncbase/nosql/schema_test.go
+++ b/v23/syncbase/nosql/schema_test.go
@@ -19,10 +19,10 @@
 // This test as following steps:
 // 1) Call NoSQLDatabase() for a non existent db.
 // 2) Create the database, and verify if Schema got stored properly.
-// 3) Call UpgradeIfOutdated() to make sure that the method is no-op and is
+// 3) Call EnforceSchema() to make sure that the method is no-op and is
 //    able to read the schema from db.
 // 4) Call NoSQLDatabase() on the same db to create a new handle with an
-//    upgraded schema, call UpgradeIfOutdated() and check if SchemaUpgrader
+//    upgraded schema, call EnforceSchema() and check if SchemaUpgrader
 //    is called and if the new schema is stored appropriately.
 func TestSchemaCheck(t *testing.T) {
 	ctx, sName, cleanup := tu.SetupOrDie(nil)
@@ -35,9 +35,9 @@
 
 	// Verify that calling Upgrade on a non existing database does not throw
 	// errors.
-	_, err := db1.UpgradeIfOutdated(ctx)
+	err := db1.EnforceSchema(ctx)
 	if err != nil {
-		t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err)
+		t.Fatalf("db1.EnforceSchema() failed: %v", err)
 	}
 	if mockUpgrader.CallCount > 0 {
 		t.Fatal("Call to upgrader was not expected.")
@@ -53,12 +53,8 @@
 	}
 
 	// Make redundant call to Upgrade to verify that it is a no-op
-	result, err1 := db1.UpgradeIfOutdated(ctx)
-	if result {
-		t.Fatalf("db1.UpgradeIfOutdated() should not return true")
-	}
-	if err1 != nil {
-		t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err1)
+	if err := db1.EnforceSchema(ctx); err != nil {
+		t.Fatalf("db1.EnforceSchema() failed: %v", err)
 	}
 	if mockUpgrader.CallCount > 0 {
 		t.Fatal("Call to upgrader was not expected.")
@@ -73,13 +69,8 @@
 	}
 	schema.Metadata.Policy = policy
 	otherdb1 := a.NoSQLDatabase("db1", schema)
-	otherresult, othererr := otherdb1.UpgradeIfOutdated(ctx)
-
-	if !otherresult {
-		t.Fatalf("otherdb1.UpgradeIfOutdated() expected to return true")
-	}
-	if othererr != nil {
-		t.Fatalf("otherdb1.UpgradeIfOutdated() failed: %v", othererr)
+	if err := otherdb1.EnforceSchema(ctx); err != nil {
+		t.Fatalf("otherdb1.EnforceSchema() failed: %v", err)
 	}
 	if mockUpgrader.CallCount != 1 {
 		t.Fatalf("Unexpected number of calls to upgrader. Expected: %d, Actual: %d.", 1, mockUpgrader.CallCount)
@@ -226,7 +217,7 @@
 	// Upgrade schema version for underlying db using a different handle
 	schema2 := tu.DefaultSchema(1)
 	dbHandle2 := a.NoSQLDatabase("db1", schema2)
-	dbHandle2.UpgradeIfOutdated(ctx)
+	dbHandle2.EnforceSchema(ctx)
 
 	// Commit batch1, abort batch2, attempt writing a row using batch3.
 	// Each of these operations should fail.
diff --git a/v23/syncbase/util/util.go b/v23/syncbase/util/util.go
index 6eeaaaa..dda4c6d 100644
--- a/v23/syncbase/util/util.go
+++ b/v23/syncbase/util/util.go
@@ -64,6 +64,12 @@
 	return string(x)
 }
 
+// IsPrefix returns true if start and limit strings together represent a prefix
+// range. If true, start represents the prefix.
+func IsPrefix(start string, limit string) bool {
+	return PrefixRangeLimit(start) == limit
+}
+
 // AccessController provides access control for various syncbase objects.
 type AccessController interface {
 	// SetPermissions replaces the current Permissions for an object.
diff --git a/v23/syncbase/util/util_test.go b/v23/syncbase/util/util_test.go
index 321691a..2d6384a 100644
--- a/v23/syncbase/util/util_test.go
+++ b/v23/syncbase/util/util_test.go
@@ -61,3 +61,42 @@
 		}
 	}
 }
+
+func TestIsPrefix(t *testing.T) {
+	tests := []struct {
+		isPrefix bool
+		start  string
+		limit  string
+	}{
+		{true, "", ""},
+		{true, "a", "b"},
+		{true, "aa", "ab"},
+		{true, "\xfe", "\xff"},
+		{true, "a\xfe", "a\xff"},
+		{true, "aa\xfe", "aa\xff"},
+		{true, "a\xff", "b"},
+		{true, "aa\xff", "ab"},
+		{true, "a\xff\xff", "b"},
+		{true, "aa\xff\xff", "ab"},
+		{true, "\xff", ""},
+		{true, "\xff\xff", ""},
+
+		{false, "", "\x00"},
+		{false, "a", "aa"},
+		{false, "aa", "aa"},
+		{false, "\xfe", "\x00"},
+		{false, "a\xfe", "b\xfe"},
+		{false, "aa\xfe", "aa\x00"},
+		{false, "a\xff", "b\x00"},
+		{false, "aa\xff", "ab\x00"},
+		{false, "a\xff\xff", "a\xff\xff\xff"},
+		{false, "aa\xff\xff", "a"},
+		{false, "\xff", "\x00"},
+	}
+	for _, test := range tests {
+		result := util.IsPrefix(test.start, test.limit)
+		if result != test.isPrefix {
+			t.Errorf("%q, %q: got %v, want %v", test.start, test.limit, result, test.isPrefix)
+		}
+	}
+}
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 2818a8c..a9f503f 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -49,6 +49,11 @@
 	mu  sync.Mutex // protects the fields below
 	sns map[uint64]store.Snapshot
 	txs map[uint64]store.Transaction
+
+	// Active ConflictResolver connection from the app to this database.
+	// NOTE: For now, we assume there's only one open conflict resolution stream
+	// per database (typically, from the app that owns the database).
+	resolver wire.ConflictManagerStartConflictResolverServerCall
 }
 
 // databaseReq is a per-request object that handles Database RPCs.
diff --git a/x/ref/services/syncbase/server/nosql/database_crm.go b/x/ref/services/syncbase/server/nosql/database_crm.go
new file mode 100644
index 0000000..e1135a9
--- /dev/null
+++ b/x/ref/services/syncbase/server/nosql/database_crm.go
@@ -0,0 +1,20 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/v23/context"
+)
+
+////////////////////////////////////////
+// ConflictManager RPC methods
+
+func (d *databaseReq) StartConflictResolver(ctx *context.T, call wire.ConflictManagerStartConflictResolverServerCall) error {
+	// Store the conflict resolver connection in the per-app, per-database
+	// singleton so that sync can access it.
+	d.database.resolver = call
+	return nil
+}