Merge branch 'master' into vtrace
diff --git a/discovery/.api b/discovery/.api
index 6fc8be6..0174f42 100644
--- a/discovery/.api
+++ b/discovery/.api
@@ -9,6 +9,7 @@
 pkg discovery, type AdvertiseCloser interface, Close()
 pkg discovery, type Advertiser interface { Advertise }
 pkg discovery, type Advertiser interface, Advertise(*context.T, *Service, []security.BlessingPattern) (<-chan struct{}, error)
+pkg discovery, type Attachments map[string][]byte
 pkg discovery, type Attributes map[string]string
 pkg discovery, type Closer interface { Close }
 pkg discovery, type Closer interface, Close()
@@ -23,6 +24,7 @@
 pkg discovery, type Scanner interface, Scan(*context.T, string) (<-chan Update, error)
 pkg discovery, type Service struct
 pkg discovery, type Service struct, Addrs []string
+pkg discovery, type Service struct, Attachments Attachments
 pkg discovery, type Service struct, Attrs Attributes
 pkg discovery, type Service struct, InstanceId string
 pkg discovery, type Service struct, InstanceName string
diff --git a/discovery/types.vdl b/discovery/types.vdl
index b915af8..b48dd1e 100644
--- a/discovery/types.vdl
+++ b/discovery/types.vdl
@@ -20,6 +20,11 @@
 	// The addresses (vanadium object names) that the service is served on.
 	// E.g., '/host:port/a/b/c', '/ns.dev.v.io:8101/blah/blah'.
 	Addrs []string
+	// The service attachments.
+	// E.g., {'thumbnail': binary_data }.
+	//
+	// WARNING: THIS FIELD IS NOT SUPPORTED YET.
+	Attachments Attachments
 }
 
 // Attributes represents service attributes as a key/value pair.
@@ -28,6 +33,15 @@
 // and should not start with '_' character.
 type Attributes map[string]string
 
+// Attachments represents service attachments as a key/value pair. Unlike
+// attributes, attachments are mostly for larger binary data and they are
+// not queryable. It is recommended to put as small attachments as possible
+// since it may require additional RPC calls causing delay in discovery.
+//
+// The key must be US-ASCII printable characters, excluding the '=' character
+// and should not start with '_' character.
+type Attachments map[string][]byte
+
 // Found represents a service that is discovered by scan.
 type Found struct {
 	Service Service
diff --git a/discovery/types.vdl.go b/discovery/types.vdl.go
index d3cec69..9f613b7 100644
--- a/discovery/types.vdl.go
+++ b/discovery/types.vdl.go
@@ -28,6 +28,11 @@
 	// The addresses (vanadium object names) that the service is served on.
 	// E.g., '/host:port/a/b/c', '/ns.dev.v.io:8101/blah/blah'.
 	Addrs []string
+	// The service attachments.
+	// E.g., {'thumbnail': binary_data }.
+	//
+	// WARNING: THIS FIELD IS NOT SUPPORTED YET.
+	Attachments Attachments
 }
 
 func (Service) __VDLReflect(struct {
@@ -46,6 +51,20 @@
 }) {
 }
 
+// Attachments represents service attachments as a key/value pair. Unlike
+// attributes, attachments are mostly for larger binary data and they are
+// not queryable. It is recommended to put as small attachments as possible
+// since it may require additional RPC calls causing delay in discovery.
+//
+// The key must be US-ASCII printable characters, excluding the '=' character
+// and should not start with '_' character.
+type Attachments map[string][]byte
+
+func (Attachments) __VDLReflect(struct {
+	Name string `vdl:"v.io/v23/discovery.Attachments"`
+}) {
+}
+
 // Found represents a service that is discovered by scan.
 type Found struct {
 	Service Service
@@ -108,6 +127,7 @@
 func init() {
 	vdl.Register((*Service)(nil))
 	vdl.Register((*Attributes)(nil))
+	vdl.Register((*Attachments)(nil))
 	vdl.Register((*Found)(nil))
 	vdl.Register((*Lost)(nil))
 	vdl.Register((*Update)(nil))
diff --git a/security/access/.api b/security/access/.api
index 294a21e..ab5cecd 100644
--- a/security/access/.api
+++ b/security/access/.api
@@ -5,7 +5,9 @@
 pkg access, const Write Tag
 pkg access, func AllTypicalTags() []Tag
 pkg access, func IsUnenforceablePatterns(error) []security.BlessingPattern
+pkg access, func NewAccessTagCaveat(...Tag) (security.Caveat, error)
 pkg access, func NewErrAccessListMatch(*context.T, []string, []security.RejectedBlessing) error
+pkg access, func NewErrAccessTagCaveatValidation(*context.T, []string, []Tag) error
 pkg access, func NewErrInvalidOpenAccessList(*context.T) error
 pkg access, func NewErrNoPermissions(*context.T, []string, []security.RejectedBlessing, string) error
 pkg access, func NewErrTooBig(*context.T) error
@@ -29,7 +31,9 @@
 pkg access, type AccessList struct, NotIn []string
 pkg access, type Permissions map[string]AccessList
 pkg access, type Tag string
+pkg access, var AccessTagCaveat security.CaveatDescriptor
 pkg access, var ErrAccessListMatch verror.IDAction
+pkg access, var ErrAccessTagCaveatValidation verror.IDAction
 pkg access, var ErrInvalidOpenAccessList verror.IDAction
 pkg access, var ErrNoPermissions verror.IDAction
 pkg access, var ErrTooBig verror.IDAction
diff --git a/security/access/caveat.go b/security/access/caveat.go
new file mode 100644
index 0000000..5acb2ca
--- /dev/null
+++ b/security/access/caveat.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package access
+
+import (
+	"v.io/v23/context"
+	"v.io/v23/security"
+	"v.io/v23/vdl"
+)
+
+func init() {
+	security.RegisterCaveatValidator(AccessTagCaveat, func(ctx *context.T, call security.Call, params []Tag) error {
+		wantT := TypicalTagType()
+		methodTags := call.MethodTags()
+		for _, mt := range methodTags {
+			if mt.Type() == wantT {
+				for _, ct := range params {
+					if mt.RawString() == vdl.ValueOf(ct).RawString() {
+						return nil
+					}
+				}
+			}
+		}
+		strs := make([]string, len(methodTags))
+		for i, mt := range methodTags {
+			strs[i] = mt.RawString()
+		}
+		return NewErrAccessTagCaveatValidation(ctx, strs, params)
+	})
+}
+
+// NewAccessTagCaveat returns a Caveat that will validate iff the intersection
+// of the tags on the method being invoked and those in 'tags' is non-empty.
+func NewAccessTagCaveat(tags ...Tag) (security.Caveat, error) {
+	return security.NewCaveat(AccessTagCaveat, tags)
+}
diff --git a/security/access/caveat_test.go b/security/access/caveat_test.go
new file mode 100644
index 0000000..dc7586b
--- /dev/null
+++ b/security/access/caveat_test.go
@@ -0,0 +1,53 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package access_test
+
+import (
+	"testing"
+
+	"v.io/v23/context"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	"v.io/v23/vdl"
+)
+
+func TestAccessTagCaveat(t *testing.T) {
+	var (
+		server     = newPrincipal(t)
+		bserver, _ = server.BlessSelf("server")
+		caveat, _  = access.NewAccessTagCaveat(access.Debug, access.Resolve)
+		bclient, _ = server.Bless(newPrincipal(t).PublicKey(), bserver, "debugger", caveat)
+		tests      = []struct {
+			MethodTags []*vdl.Value
+			OK         bool
+		}{
+			{nil, false},
+			{[]*vdl.Value{vdl.ValueOf(access.Debug)}, true},
+			{[]*vdl.Value{vdl.ValueOf(access.Resolve)}, true},
+			{[]*vdl.Value{vdl.ValueOf(access.Read), vdl.ValueOf(access.Debug)}, true},
+			{[]*vdl.Value{vdl.ValueOf(access.Read), vdl.ValueOf(access.Write)}, false},
+			{[]*vdl.Value{vdl.ValueOf("Debug"), vdl.ValueOf("Resolve")}, false},
+		}
+	)
+	security.AddToRoots(server, bserver)
+	ctx, cancel := context.RootContext()
+	defer cancel()
+	for idx, test := range tests {
+		call := security.NewCall(&security.CallParams{
+			MethodTags:      test.MethodTags,
+			LocalPrincipal:  server,
+			RemoteBlessings: bclient,
+		})
+		got, rejected := security.RemoteBlessingNames(ctx, call)
+		if test.OK {
+			if len(got) != 1 || got[0] != "server:debugger" {
+				t.Errorf("Got (%v, %v), wanted ([%q], nil) for method tags %v (test case #%d)", got, rejected, "server:debugger", test.MethodTags, idx)
+			}
+		}
+		if !test.OK && len(got) != 0 {
+			t.Errorf("Got (%v, %v), wanted all blessings to be rejected for method tags %v (test case #%d)", got, rejected, test.MethodTags, idx)
+		}
+	}
+}
diff --git a/security/access/types.vdl b/security/access/types.vdl
index e72bbcf..9405064 100644
--- a/security/access/types.vdl
+++ b/security/access/types.vdl
@@ -100,6 +100,7 @@
 package access
 
 import "v.io/v23/security"
+import "v.io/v23/uniqueid"
 
 // AccessList represents a set of blessings that should be granted access.
 //
@@ -148,6 +149,13 @@
   Read    = Tag("Read")     // Operations that do not mutate the state of the object.
   Write   = Tag("Write")    // Operations that mutate the state of the object.
   Resolve = Tag("Resolve")  // Operations involving namespace navigation.
+
+  // AccessTagCaveat represents a caveat that validates iff the method being invoked has
+  // at least one of the tags listed in the caveat.
+  AccessTagCaveat  = security.CaveatDescriptor{
+    Id:        uniqueid.Id{0xef, 0xcd, 0xe3, 0x75, 0x14, 0x16, 0xc7, 0x3b, 0x18, 0x9c, 0xe8, 0x9c, 0xcc, 0x93, 0x80, 0x0},
+    ParamType: typeobject([]Tag),
+  }
 )
 
 // Note: For "bad version" errors, use verror.ErrBadVersion.
@@ -162,4 +170,8 @@
 	UnenforceablePatterns(rejectedPatterns []security.BlessingPattern){"en":"AccessList contains the following invalid or unrecognized patterns in the In list: {rejectedPatterns}"}
 
 	InvalidOpenAccessList(){"en": "AccessList with the pattern ... in its In list must have no other patterns in the In or NotIn lists"}
+
+	AccessTagCaveatValidation(methodTags []string, caveatTags []Tag){
+		"en": "access tags on method ({methodTags}) do not include any of the ones in the caveat ({caveatTags}), or the method is using a different tag type",
+	}
 )
diff --git a/security/access/types.vdl.go b/security/access/types.vdl.go
index d0133d2..2ce46f5 100644
--- a/security/access/types.vdl.go
+++ b/security/access/types.vdl.go
@@ -111,6 +111,7 @@
 
 	// VDL user imports
 	"v.io/v23/security"
+	"v.io/v23/uniqueid"
 )
 
 // AccessList represents a set of blessings that should be granted access.
@@ -184,13 +185,38 @@
 
 const Resolve = Tag("Resolve") // Operations involving namespace navigation.
 
+// AccessTagCaveat represents a caveat that validates iff the method being invoked has
+// at least one of the tags listed in the caveat.
+var AccessTagCaveat = security.CaveatDescriptor{
+	Id: uniqueid.Id{
+		239,
+		205,
+		227,
+		117,
+		20,
+		22,
+		199,
+		59,
+		24,
+		156,
+		232,
+		156,
+		204,
+		147,
+		128,
+		0,
+	},
+	ParamType: vdl.TypeOf([]Tag(nil)),
+}
+
 var (
 	// The AccessList is too big.  Use groups to represent large sets of principals.
-	ErrTooBig                = verror.Register("v.io/v23/security/access.TooBig", verror.NoRetry, "{1:}{2:} AccessList is too big")
-	ErrNoPermissions         = verror.Register("v.io/v23/security/access.NoPermissions", verror.NoRetry, "{1:}{2:} {3} does not have {5} access (rejected blessings: {4})")
-	ErrAccessListMatch       = verror.Register("v.io/v23/security/access.AccessListMatch", verror.NoRetry, "{1:}{2:} {3} does not match the access list (rejected blessings: {4})")
-	ErrUnenforceablePatterns = verror.Register("v.io/v23/security/access.UnenforceablePatterns", verror.NoRetry, "{1:}{2:} AccessList contains the following invalid or unrecognized patterns in the In list: {3}")
-	ErrInvalidOpenAccessList = verror.Register("v.io/v23/security/access.InvalidOpenAccessList", verror.NoRetry, "{1:}{2:} AccessList with the pattern ... in its In list must have no other patterns in the In or NotIn lists")
+	ErrTooBig                    = verror.Register("v.io/v23/security/access.TooBig", verror.NoRetry, "{1:}{2:} AccessList is too big")
+	ErrNoPermissions             = verror.Register("v.io/v23/security/access.NoPermissions", verror.NoRetry, "{1:}{2:} {3} does not have {5} access (rejected blessings: {4})")
+	ErrAccessListMatch           = verror.Register("v.io/v23/security/access.AccessListMatch", verror.NoRetry, "{1:}{2:} {3} does not match the access list (rejected blessings: {4})")
+	ErrUnenforceablePatterns     = verror.Register("v.io/v23/security/access.UnenforceablePatterns", verror.NoRetry, "{1:}{2:} AccessList contains the following invalid or unrecognized patterns in the In list: {3}")
+	ErrInvalidOpenAccessList     = verror.Register("v.io/v23/security/access.InvalidOpenAccessList", verror.NoRetry, "{1:}{2:} AccessList with the pattern ... in its In list must have no other patterns in the In or NotIn lists")
+	ErrAccessTagCaveatValidation = verror.Register("v.io/v23/security/access.AccessTagCaveatValidation", verror.NoRetry, "{1:}{2:} access tags on method ({3}) do not include any of the ones in the caveat ({4}), or the method is using a different tag type")
 )
 
 func init() {
@@ -199,6 +225,7 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAccessListMatch.ID), "{1:}{2:} {3} does not match the access list (rejected blessings: {4})")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnenforceablePatterns.ID), "{1:}{2:} AccessList contains the following invalid or unrecognized patterns in the In list: {3}")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidOpenAccessList.ID), "{1:}{2:} AccessList with the pattern ... in its In list must have no other patterns in the In or NotIn lists")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAccessTagCaveatValidation.ID), "{1:}{2:} access tags on method ({3}) do not include any of the ones in the caveat ({4}), or the method is using a different tag type")
 }
 
 // NewErrTooBig returns an error with the ErrTooBig ID.
@@ -225,3 +252,8 @@
 func NewErrInvalidOpenAccessList(ctx *context.T) error {
 	return verror.New(ErrInvalidOpenAccessList, ctx)
 }
+
+// NewErrAccessTagCaveatValidation returns an error with the ErrAccessTagCaveatValidation ID.
+func NewErrAccessTagCaveatValidation(ctx *context.T, methodTags []string, caveatTags []Tag) error {
+	return verror.New(ErrAccessTagCaveatValidation, ctx, methodTags, caveatTags)
+}
diff --git a/security/caveat.go b/security/caveat.go
index 380dce6..2603154 100644
--- a/security/caveat.go
+++ b/security/caveat.go
@@ -221,21 +221,13 @@
 
 // NewExpiryCaveat returns a Caveat that validates iff the current time is before t.
 func NewExpiryCaveat(t time.Time) (Caveat, error) {
-	c, err := NewCaveat(ExpiryCaveat, t)
-	if err != nil {
-		return c, err
-	}
-	return c, nil
+	return NewCaveat(ExpiryCaveat, t)
 }
 
 // NewMethodCaveat returns a Caveat that validates iff the method being invoked by
 // the peer is listed in an argument to this function.
 func NewMethodCaveat(method string, additionalMethods ...string) (Caveat, error) {
-	c, err := NewCaveat(MethodCaveat, append(additionalMethods, method))
-	if err != nil {
-		return c, err
-	}
-	return c, nil
+	return NewCaveat(MethodCaveat, append(additionalMethods, method))
 }
 
 // NewPublicKeyCaveat returns a third-party caveat, i.e., the returned
@@ -297,10 +289,10 @@
 	return nil
 }
 
-func (c *publicKeyThirdPartyCaveatParam) discharger(cxt *context.T) (PublicKey, error) {
+func (c *publicKeyThirdPartyCaveatParam) discharger(ctx *context.T) (PublicKey, error) {
 	key, err := UnmarshalPublicKey(c.DischargerKey)
 	if err != nil {
-		return nil, verror.New(errCantUnmarshalDischargeKey, cxt, fmt.Sprintf("%T", *c), err)
+		return nil, verror.New(errCantUnmarshalDischargeKey, ctx, fmt.Sprintf("%T", *c), err)
 	}
 	return key, nil
 }
@@ -317,9 +309,9 @@
 	return hash.sum(msg)
 }
 
-func (d *publicKeyDischarge) verify(cxt *context.T, key PublicKey) error {
+func (d *publicKeyDischarge) verify(ctx *context.T, key PublicKey) error {
 	if !bytes.Equal(d.Signature.Purpose, dischargePurpose) {
-		return verror.New(errInapproriateDischargeSignature, cxt, d.ThirdPartyCaveatId, d.Signature.Purpose)
+		return verror.New(errInapproriateDischargeSignature, ctx, d.ThirdPartyCaveatId, d.Signature.Purpose)
 	}
 	digest := d.digest(key.hash())
 	cachekey, err := d.signatureCacheKey(digest, key, d.Signature)
@@ -327,7 +319,7 @@
 		return nil
 	}
 	if !d.Signature.Verify(key, digest) {
-		return verror.New(errBadDischargeSignature, cxt, d.ThirdPartyCaveatId)
+		return verror.New(errBadDischargeSignature, ctx, d.ThirdPartyCaveatId)
 	}
 	dischargeSignatureCache.cache([][]byte{cachekey})
 	return nil
diff --git a/syncbase/featuretests/cr_v23_test.go b/syncbase/featuretests/cr_v23_test.go
index b7f84b1..52365d8 100644
--- a/syncbase/featuretests/cr_v23_test.go
+++ b/syncbase/featuretests/cr_v23_test.go
@@ -18,6 +18,7 @@
 	"v.io/v23/verror"
 	"v.io/x/ref/lib/v23test"
 	"v.io/x/ref/services/syncbase/server/util"
+	"v.io/x/ref/test/testutil"
 )
 
 // Tests the conflict resolution configuration rules.
@@ -33,10 +34,6 @@
 // TODO(jlodhia): Add more rules based on value type and combination of key
 // prefix and value type once its implemented.
 func TestV23CRRuleConfig(t *testing.T) {
-	// TODO(jlodhia): Re-enable test after following issue is resolved.
-	// https://github.com/vanadium/issues/issues/1027
-	t.Skip()
-
 	v23test.SkipUnlessRunningIntegrationTests(t)
 	sh := v23test.NewShell(t, v23test.Opts{})
 	defer sh.Cleanup()
@@ -52,7 +49,8 @@
 	ok(t, updateData(client1Ctx, "s1", 0, 10, "concurrentUpdate"))
 
 	schemaKeyPrefix := "foo0"
-	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaKeyPrefix, 2, func() {
+	expectedOnConflictCallCount := 1
+	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaKeyPrefix, expectedOnConflictCallCount, func() {
 		// Re enable sync between the two syncbases and wait for a bit to let the
 		// syncbases sync and call conflict resolution.
 		ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -157,10 +155,6 @@
 // 2) 5 rows written as a single batch on both syncbases resulting into a
 //    single conflict for the batch.
 func TestV23CRAppResolved(t *testing.T) {
-	// TODO(jlodhia): Re-enable test after following issue is resolved.
-	// https://github.com/vanadium/issues/issues/1027
-	t.Skip()
-
 	v23test.SkipUnlessRunningIntegrationTests(t)
 	sh := v23test.NewShell(t, v23test.Opts{})
 	defer sh.Cleanup()
@@ -180,9 +174,8 @@
 
 	schemaPrefix := "foo"
 	keyPrefix := "foo"
-	// TODO(jlodhia): change the expected num conflicts from 12 to 6 once
-	// sync's cr code handles duplicate resolutions internally.
-	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 12, func() {
+	expectedOnConflictCallCount := 6
+	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
 		// Re enable sync between the two syncbases and wait for a bit to let the
 		// syncbases sync and call conflict resolution.
 		ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -228,9 +221,8 @@
 
 	schemaPrefix := "foo1"
 	keyPrefix := "foo"
-	// TODO(jlodhia): change the expected num conflicts from 2 to 1 once
-	// sync's cr code handles duplicate resolutions internally.
-	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 2, func() {
+	expectedOnConflictCallCount := 1
+	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
 		// Re enable sync between the two syncbases and wait for a bit to let the
 		// syncbases sync and call conflict resolution.
 		ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -282,9 +274,8 @@
 
 	schemaPrefix := "foo"
 	keyPrefix := "foo"
-	// TODO(jlodhia): change the expected num conflicts from 2 to 1 once
-	// sync's cr code handles duplicate resolutions internally.
-	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 2, func() {
+	expectedOnConflictCallCount := 1
+	runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
 		// Re enable sync between the two syncbases and wait for a bit to let the
 		// syncbases sync and call conflict resolution.
 		ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -321,7 +312,29 @@
 // harder to follow. It would be better to define two helper functions: one
 // to do the stuff before fn(), and another to do the stuff after fn(). (Also,
 // switching to channel-based signalling should simplify things substantially.)
-func runWithAppBasedResolver(t *testing.T, client0Ctx, client1Ctx *context.T, schemaPrefix string, maxCallCount int, fn func()) {
+func runWithAppBasedResolver(t *testing.T, client0Ctx, client1Ctx *context.T, schemaPrefix string, expectedCallCount int, fn func()) {
+	// 1) During tests, S0 and S1 may sync with each other at the same time. Hence
+	// they end up doing conflict resolution for the same objects on both sides
+	// creating new resolved objects independently. These resolved versions
+	// get synced again creating another set of conflicts causing 2X the amount
+	// of OnConflict calls. TODO(jlodhia): Once Sync handles duplicate
+	// resolutions internally, this will no longer be true.
+	//
+	// 2) When the CPU load goes high, the watcher can fall behind in updating
+	// sync dag with the latest updates. This leads to scenarios where S0's dag
+	// may catch up with all N updates and send them to S1 while S1's watcher
+	// has only caught up to N - k updates. This means that S1 will only
+	// recognize N - k conflicts instead of N conflicts. These conflicts result
+	// in up to N - k OnConflict calls. When S1 tries to commit the resolved
+	// updates, the store rejects the commit on account of k objects it has
+	// not yet caught up on. Initiator will sleep for 1 sec and then retry
+	// processing the N objects received from S0, causing up to N more
+	// OnConflict calls and finally succeeding in committing the result.
+	//
+	// Due to (1) and (2), the total number of calls to OnConflict can be up to
+	// 3X the expected call count.
+	maxCallCount := 3 * expectedCallCount
+
 	// Create and hold a conflict resolution connection on s0 and s1 to receive
 	// future conflicts. The expected call count is 2 * the number of batches
 	// because each batch is being concurrently resolved on s0 and s1 creating new
@@ -443,18 +456,15 @@
 	r := tb.Row(key)
 	want := valuePrefix + key
 
-	// Wait up to 5 seconds for the correct key and value to appear.
-	sleepTimeMs, maxAttempts := 50, 100
 	var value string
-	for i := 0; i < maxAttempts; i++ {
+	return testutil.RetryFor(10 * time.Second, func() error {
 		if err := r.Get(ctx, &value); (err == nil) && (value == want) {
-			return nil
+			return nil  // Value found.
 		} else if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
 			return fmt.Errorf("Syncbase Error while fetching key %v: %v", key, err)
 		}
-		time.Sleep(time.Duration(sleepTimeMs) * time.Millisecond)
-	}
-	return fmt.Errorf("Timed out waiting for value %v but found %v after %d milliseconds.", want, value, maxAttempts*sleepTimeMs)
+		return testutil.TryAgain(fmt.Errorf("Value expected: %v, found: %v", want, value))
+	})
 }
 
 func endTest(ctx *context.T, syncbaseName, prefix, signalKey string) error {
@@ -478,9 +488,7 @@
 	r := tb.Row(signalKey)
 
 	var end bool
-	sleepTimeMs, maxAttempts := 50, 100 // Max wait time of 5 seconds.
-	for cnt := 0; cnt < maxAttempts; cnt++ {
-		time.Sleep(time.Duration(sleepTimeMs) * time.Millisecond)
+	return testutil.RetryFor(10 * time.Second, func() error {
 		if err := r.Get(ctx, &end); err != nil {
 			if verror.ErrorID(err) != verror.ErrNoExist.ID {
 				return fmt.Errorf("r.Get() for endkey failed: %v", err)
@@ -489,8 +497,8 @@
 		if end {
 			return nil
 		}
-	}
-	return fmt.Errorf("Timed out waiting for signal %v after %d milliseconds.", signalKey, maxAttempts*sleepTimeMs)
+		return testutil.TryAgain(fmt.Errorf("Signal %v not received", signalKey))
+	})
 }
 
 ////////////////////////////////////////////////////////
diff --git a/syncbase/nosql/database.go b/syncbase/nosql/database.go
index c491203..9deef44 100644
--- a/syncbase/nosql/database.go
+++ b/syncbase/nosql/database.go
@@ -296,7 +296,7 @@
 		childCtx := context.WithValue(ctx, reconnectionCount, count)
 		// listenForConflicts is a blocking method that returns only when the
 		// conflict stream is broken.
-		if err := d.listenForConflicts(childCtx); err != nil {
+		if err := d.listenForConflicts(childCtx); err != nil && !d.crState.isDisconnected() {
 			vlog.Errorf("Conflict resolution connection ended with error: %v", err)
 		}
 
@@ -349,7 +349,7 @@
 		count++
 		ri := toResolutionInfo(v, count != size)
 		if err := stream.Send(ri); err != nil {
-			vlog.Error("Error while sending resolution")
+			vlog.Errorf("Error while sending resolution: %v", err)
 			return err
 		}
 	}