Merge branch 'master' into vtrace
diff --git a/discovery/.api b/discovery/.api
index 6fc8be6..0174f42 100644
--- a/discovery/.api
+++ b/discovery/.api
@@ -9,6 +9,7 @@
pkg discovery, type AdvertiseCloser interface, Close()
pkg discovery, type Advertiser interface { Advertise }
pkg discovery, type Advertiser interface, Advertise(*context.T, *Service, []security.BlessingPattern) (<-chan struct{}, error)
+pkg discovery, type Attachments map[string][]byte
pkg discovery, type Attributes map[string]string
pkg discovery, type Closer interface { Close }
pkg discovery, type Closer interface, Close()
@@ -23,6 +24,7 @@
pkg discovery, type Scanner interface, Scan(*context.T, string) (<-chan Update, error)
pkg discovery, type Service struct
pkg discovery, type Service struct, Addrs []string
+pkg discovery, type Service struct, Attachments Attachments
pkg discovery, type Service struct, Attrs Attributes
pkg discovery, type Service struct, InstanceId string
pkg discovery, type Service struct, InstanceName string
diff --git a/discovery/types.vdl b/discovery/types.vdl
index b915af8..b48dd1e 100644
--- a/discovery/types.vdl
+++ b/discovery/types.vdl
@@ -20,6 +20,11 @@
// The addresses (vanadium object names) that the service is served on.
// E.g., '/host:port/a/b/c', '/ns.dev.v.io:8101/blah/blah'.
Addrs []string
+ // The service attachments.
+ // E.g., {'thumbnail': binary_data }.
+ //
+ // WARNING: THIS FIELD IS NOT SUPPORTED YET.
+ Attachments Attachments
}
// Attributes represents service attributes as a key/value pair.
@@ -28,6 +33,15 @@
// and should not start with '_' character.
type Attributes map[string]string
+// Attachments represents service attachments as a key/value pair. Unlike
+// attributes, attachments are mostly for larger binary data and they are
+// not queryable. It is recommended to put as small attachments as possible
+// since it may require additional RPC calls causing delay in discovery.
+//
+// The key must be US-ASCII printable characters, excluding the '=' character
+// and should not start with '_' character.
+type Attachments map[string][]byte
+
// Found represents a service that is discovered by scan.
type Found struct {
Service Service
diff --git a/discovery/types.vdl.go b/discovery/types.vdl.go
index d3cec69..9f613b7 100644
--- a/discovery/types.vdl.go
+++ b/discovery/types.vdl.go
@@ -28,6 +28,11 @@
// The addresses (vanadium object names) that the service is served on.
// E.g., '/host:port/a/b/c', '/ns.dev.v.io:8101/blah/blah'.
Addrs []string
+ // The service attachments.
+ // E.g., {'thumbnail': binary_data }.
+ //
+ // WARNING: THIS FIELD IS NOT SUPPORTED YET.
+ Attachments Attachments
}
func (Service) __VDLReflect(struct {
@@ -46,6 +51,20 @@
}) {
}
+// Attachments represents service attachments as a key/value pair. Unlike
+// attributes, attachments are mostly for larger binary data and they are
+// not queryable. It is recommended to put as small attachments as possible
+// since it may require additional RPC calls causing delay in discovery.
+//
+// The key must be US-ASCII printable characters, excluding the '=' character
+// and should not start with '_' character.
+type Attachments map[string][]byte
+
+func (Attachments) __VDLReflect(struct {
+ Name string `vdl:"v.io/v23/discovery.Attachments"`
+}) {
+}
+
// Found represents a service that is discovered by scan.
type Found struct {
Service Service
@@ -108,6 +127,7 @@
func init() {
vdl.Register((*Service)(nil))
vdl.Register((*Attributes)(nil))
+ vdl.Register((*Attachments)(nil))
vdl.Register((*Found)(nil))
vdl.Register((*Lost)(nil))
vdl.Register((*Update)(nil))
diff --git a/security/access/.api b/security/access/.api
index 294a21e..ab5cecd 100644
--- a/security/access/.api
+++ b/security/access/.api
@@ -5,7 +5,9 @@
pkg access, const Write Tag
pkg access, func AllTypicalTags() []Tag
pkg access, func IsUnenforceablePatterns(error) []security.BlessingPattern
+pkg access, func NewAccessTagCaveat(...Tag) (security.Caveat, error)
pkg access, func NewErrAccessListMatch(*context.T, []string, []security.RejectedBlessing) error
+pkg access, func NewErrAccessTagCaveatValidation(*context.T, []string, []Tag) error
pkg access, func NewErrInvalidOpenAccessList(*context.T) error
pkg access, func NewErrNoPermissions(*context.T, []string, []security.RejectedBlessing, string) error
pkg access, func NewErrTooBig(*context.T) error
@@ -29,7 +31,9 @@
pkg access, type AccessList struct, NotIn []string
pkg access, type Permissions map[string]AccessList
pkg access, type Tag string
+pkg access, var AccessTagCaveat security.CaveatDescriptor
pkg access, var ErrAccessListMatch verror.IDAction
+pkg access, var ErrAccessTagCaveatValidation verror.IDAction
pkg access, var ErrInvalidOpenAccessList verror.IDAction
pkg access, var ErrNoPermissions verror.IDAction
pkg access, var ErrTooBig verror.IDAction
diff --git a/security/access/caveat.go b/security/access/caveat.go
new file mode 100644
index 0000000..5acb2ca
--- /dev/null
+++ b/security/access/caveat.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package access
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/security"
+ "v.io/v23/vdl"
+)
+
+func init() {
+ security.RegisterCaveatValidator(AccessTagCaveat, func(ctx *context.T, call security.Call, params []Tag) error {
+ wantT := TypicalTagType()
+ methodTags := call.MethodTags()
+ for _, mt := range methodTags {
+ if mt.Type() == wantT {
+ for _, ct := range params {
+ if mt.RawString() == vdl.ValueOf(ct).RawString() {
+ return nil
+ }
+ }
+ }
+ }
+ strs := make([]string, len(methodTags))
+ for i, mt := range methodTags {
+ strs[i] = mt.RawString()
+ }
+ return NewErrAccessTagCaveatValidation(ctx, strs, params)
+ })
+}
+
+// NewAccessTagCaveat returns a Caveat that will validate iff the intersection
+// of the tags on the method being invoked and those in 'tags' is non-empty.
+func NewAccessTagCaveat(tags ...Tag) (security.Caveat, error) {
+ return security.NewCaveat(AccessTagCaveat, tags)
+}
diff --git a/security/access/caveat_test.go b/security/access/caveat_test.go
new file mode 100644
index 0000000..dc7586b
--- /dev/null
+++ b/security/access/caveat_test.go
@@ -0,0 +1,53 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package access_test
+
+import (
+ "testing"
+
+ "v.io/v23/context"
+ "v.io/v23/security"
+ "v.io/v23/security/access"
+ "v.io/v23/vdl"
+)
+
+func TestAccessTagCaveat(t *testing.T) {
+ var (
+ server = newPrincipal(t)
+ bserver, _ = server.BlessSelf("server")
+ caveat, _ = access.NewAccessTagCaveat(access.Debug, access.Resolve)
+ bclient, _ = server.Bless(newPrincipal(t).PublicKey(), bserver, "debugger", caveat)
+ tests = []struct {
+ MethodTags []*vdl.Value
+ OK bool
+ }{
+ {nil, false},
+ {[]*vdl.Value{vdl.ValueOf(access.Debug)}, true},
+ {[]*vdl.Value{vdl.ValueOf(access.Resolve)}, true},
+ {[]*vdl.Value{vdl.ValueOf(access.Read), vdl.ValueOf(access.Debug)}, true},
+ {[]*vdl.Value{vdl.ValueOf(access.Read), vdl.ValueOf(access.Write)}, false},
+ {[]*vdl.Value{vdl.ValueOf("Debug"), vdl.ValueOf("Resolve")}, false},
+ }
+ )
+ security.AddToRoots(server, bserver)
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ for idx, test := range tests {
+ call := security.NewCall(&security.CallParams{
+ MethodTags: test.MethodTags,
+ LocalPrincipal: server,
+ RemoteBlessings: bclient,
+ })
+ got, rejected := security.RemoteBlessingNames(ctx, call)
+ if test.OK {
+ if len(got) != 1 || got[0] != "server:debugger" {
+ t.Errorf("Got (%v, %v), wanted ([%q], nil) for method tags %v (test case #%d)", got, rejected, "server:debugger", test.MethodTags, idx)
+ }
+ }
+ if !test.OK && len(got) != 0 {
+ t.Errorf("Got (%v, %v), wanted all blessings to be rejected for method tags %v (test case #%d)", got, rejected, test.MethodTags, idx)
+ }
+ }
+}
diff --git a/security/access/types.vdl b/security/access/types.vdl
index e72bbcf..9405064 100644
--- a/security/access/types.vdl
+++ b/security/access/types.vdl
@@ -100,6 +100,7 @@
package access
import "v.io/v23/security"
+import "v.io/v23/uniqueid"
// AccessList represents a set of blessings that should be granted access.
//
@@ -148,6 +149,13 @@
Read = Tag("Read") // Operations that do not mutate the state of the object.
Write = Tag("Write") // Operations that mutate the state of the object.
Resolve = Tag("Resolve") // Operations involving namespace navigation.
+
+ // AccessTagCaveat represents a caveat that validates iff the method being invoked has
+ // at least one of the tags listed in the caveat.
+ AccessTagCaveat = security.CaveatDescriptor{
+ Id: uniqueid.Id{0xef, 0xcd, 0xe3, 0x75, 0x14, 0x16, 0xc7, 0x3b, 0x18, 0x9c, 0xe8, 0x9c, 0xcc, 0x93, 0x80, 0x0},
+ ParamType: typeobject([]Tag),
+ }
)
// Note: For "bad version" errors, use verror.ErrBadVersion.
@@ -162,4 +170,8 @@
UnenforceablePatterns(rejectedPatterns []security.BlessingPattern){"en":"AccessList contains the following invalid or unrecognized patterns in the In list: {rejectedPatterns}"}
InvalidOpenAccessList(){"en": "AccessList with the pattern ... in its In list must have no other patterns in the In or NotIn lists"}
+
+ AccessTagCaveatValidation(methodTags []string, caveatTags []Tag){
+ "en": "access tags on method ({methodTags}) do not include any of the ones in the caveat ({caveatTags}), or the method is using a different tag type",
+ }
)
diff --git a/security/access/types.vdl.go b/security/access/types.vdl.go
index d0133d2..2ce46f5 100644
--- a/security/access/types.vdl.go
+++ b/security/access/types.vdl.go
@@ -111,6 +111,7 @@
// VDL user imports
"v.io/v23/security"
+ "v.io/v23/uniqueid"
)
// AccessList represents a set of blessings that should be granted access.
@@ -184,13 +185,38 @@
const Resolve = Tag("Resolve") // Operations involving namespace navigation.
+// AccessTagCaveat represents a caveat that validates iff the method being invoked has
+// at least one of the tags listed in the caveat.
+var AccessTagCaveat = security.CaveatDescriptor{
+ Id: uniqueid.Id{
+ 239,
+ 205,
+ 227,
+ 117,
+ 20,
+ 22,
+ 199,
+ 59,
+ 24,
+ 156,
+ 232,
+ 156,
+ 204,
+ 147,
+ 128,
+ 0,
+ },
+ ParamType: vdl.TypeOf([]Tag(nil)),
+}
+
var (
// The AccessList is too big. Use groups to represent large sets of principals.
- ErrTooBig = verror.Register("v.io/v23/security/access.TooBig", verror.NoRetry, "{1:}{2:} AccessList is too big")
- ErrNoPermissions = verror.Register("v.io/v23/security/access.NoPermissions", verror.NoRetry, "{1:}{2:} {3} does not have {5} access (rejected blessings: {4})")
- ErrAccessListMatch = verror.Register("v.io/v23/security/access.AccessListMatch", verror.NoRetry, "{1:}{2:} {3} does not match the access list (rejected blessings: {4})")
- ErrUnenforceablePatterns = verror.Register("v.io/v23/security/access.UnenforceablePatterns", verror.NoRetry, "{1:}{2:} AccessList contains the following invalid or unrecognized patterns in the In list: {3}")
- ErrInvalidOpenAccessList = verror.Register("v.io/v23/security/access.InvalidOpenAccessList", verror.NoRetry, "{1:}{2:} AccessList with the pattern ... in its In list must have no other patterns in the In or NotIn lists")
+ ErrTooBig = verror.Register("v.io/v23/security/access.TooBig", verror.NoRetry, "{1:}{2:} AccessList is too big")
+ ErrNoPermissions = verror.Register("v.io/v23/security/access.NoPermissions", verror.NoRetry, "{1:}{2:} {3} does not have {5} access (rejected blessings: {4})")
+ ErrAccessListMatch = verror.Register("v.io/v23/security/access.AccessListMatch", verror.NoRetry, "{1:}{2:} {3} does not match the access list (rejected blessings: {4})")
+ ErrUnenforceablePatterns = verror.Register("v.io/v23/security/access.UnenforceablePatterns", verror.NoRetry, "{1:}{2:} AccessList contains the following invalid or unrecognized patterns in the In list: {3}")
+ ErrInvalidOpenAccessList = verror.Register("v.io/v23/security/access.InvalidOpenAccessList", verror.NoRetry, "{1:}{2:} AccessList with the pattern ... in its In list must have no other patterns in the In or NotIn lists")
+ ErrAccessTagCaveatValidation = verror.Register("v.io/v23/security/access.AccessTagCaveatValidation", verror.NoRetry, "{1:}{2:} access tags on method ({3}) do not include any of the ones in the caveat ({4}), or the method is using a different tag type")
)
func init() {
@@ -199,6 +225,7 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAccessListMatch.ID), "{1:}{2:} {3} does not match the access list (rejected blessings: {4})")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnenforceablePatterns.ID), "{1:}{2:} AccessList contains the following invalid or unrecognized patterns in the In list: {3}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidOpenAccessList.ID), "{1:}{2:} AccessList with the pattern ... in its In list must have no other patterns in the In or NotIn lists")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAccessTagCaveatValidation.ID), "{1:}{2:} access tags on method ({3}) do not include any of the ones in the caveat ({4}), or the method is using a different tag type")
}
// NewErrTooBig returns an error with the ErrTooBig ID.
@@ -225,3 +252,8 @@
func NewErrInvalidOpenAccessList(ctx *context.T) error {
return verror.New(ErrInvalidOpenAccessList, ctx)
}
+
+// NewErrAccessTagCaveatValidation returns an error with the ErrAccessTagCaveatValidation ID.
+func NewErrAccessTagCaveatValidation(ctx *context.T, methodTags []string, caveatTags []Tag) error {
+ return verror.New(ErrAccessTagCaveatValidation, ctx, methodTags, caveatTags)
+}
diff --git a/security/caveat.go b/security/caveat.go
index 380dce6..2603154 100644
--- a/security/caveat.go
+++ b/security/caveat.go
@@ -221,21 +221,13 @@
// NewExpiryCaveat returns a Caveat that validates iff the current time is before t.
func NewExpiryCaveat(t time.Time) (Caveat, error) {
- c, err := NewCaveat(ExpiryCaveat, t)
- if err != nil {
- return c, err
- }
- return c, nil
+ return NewCaveat(ExpiryCaveat, t)
}
// NewMethodCaveat returns a Caveat that validates iff the method being invoked by
// the peer is listed in an argument to this function.
func NewMethodCaveat(method string, additionalMethods ...string) (Caveat, error) {
- c, err := NewCaveat(MethodCaveat, append(additionalMethods, method))
- if err != nil {
- return c, err
- }
- return c, nil
+ return NewCaveat(MethodCaveat, append(additionalMethods, method))
}
// NewPublicKeyCaveat returns a third-party caveat, i.e., the returned
@@ -297,10 +289,10 @@
return nil
}
-func (c *publicKeyThirdPartyCaveatParam) discharger(cxt *context.T) (PublicKey, error) {
+func (c *publicKeyThirdPartyCaveatParam) discharger(ctx *context.T) (PublicKey, error) {
key, err := UnmarshalPublicKey(c.DischargerKey)
if err != nil {
- return nil, verror.New(errCantUnmarshalDischargeKey, cxt, fmt.Sprintf("%T", *c), err)
+ return nil, verror.New(errCantUnmarshalDischargeKey, ctx, fmt.Sprintf("%T", *c), err)
}
return key, nil
}
@@ -317,9 +309,9 @@
return hash.sum(msg)
}
-func (d *publicKeyDischarge) verify(cxt *context.T, key PublicKey) error {
+func (d *publicKeyDischarge) verify(ctx *context.T, key PublicKey) error {
if !bytes.Equal(d.Signature.Purpose, dischargePurpose) {
- return verror.New(errInapproriateDischargeSignature, cxt, d.ThirdPartyCaveatId, d.Signature.Purpose)
+ return verror.New(errInapproriateDischargeSignature, ctx, d.ThirdPartyCaveatId, d.Signature.Purpose)
}
digest := d.digest(key.hash())
cachekey, err := d.signatureCacheKey(digest, key, d.Signature)
@@ -327,7 +319,7 @@
return nil
}
if !d.Signature.Verify(key, digest) {
- return verror.New(errBadDischargeSignature, cxt, d.ThirdPartyCaveatId)
+ return verror.New(errBadDischargeSignature, ctx, d.ThirdPartyCaveatId)
}
dischargeSignatureCache.cache([][]byte{cachekey})
return nil
diff --git a/syncbase/featuretests/cr_v23_test.go b/syncbase/featuretests/cr_v23_test.go
index b7f84b1..52365d8 100644
--- a/syncbase/featuretests/cr_v23_test.go
+++ b/syncbase/featuretests/cr_v23_test.go
@@ -18,6 +18,7 @@
"v.io/v23/verror"
"v.io/x/ref/lib/v23test"
"v.io/x/ref/services/syncbase/server/util"
+ "v.io/x/ref/test/testutil"
)
// Tests the conflict resolution configuration rules.
@@ -33,10 +34,6 @@
// TODO(jlodhia): Add more rules based on value type and combination of key
// prefix and value type once its implemented.
func TestV23CRRuleConfig(t *testing.T) {
- // TODO(jlodhia): Re-enable test after following issue is resolved.
- // https://github.com/vanadium/issues/issues/1027
- t.Skip()
-
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, v23test.Opts{})
defer sh.Cleanup()
@@ -52,7 +49,8 @@
ok(t, updateData(client1Ctx, "s1", 0, 10, "concurrentUpdate"))
schemaKeyPrefix := "foo0"
- runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaKeyPrefix, 2, func() {
+ expectedOnConflictCallCount := 1
+ runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaKeyPrefix, expectedOnConflictCallCount, func() {
// Re enable sync between the two syncbases and wait for a bit to let the
// syncbases sync and call conflict resolution.
ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -157,10 +155,6 @@
// 2) 5 rows written as a single batch on both syncbases resulting into a
// single conflict for the batch.
func TestV23CRAppResolved(t *testing.T) {
- // TODO(jlodhia): Re-enable test after following issue is resolved.
- // https://github.com/vanadium/issues/issues/1027
- t.Skip()
-
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, v23test.Opts{})
defer sh.Cleanup()
@@ -180,9 +174,8 @@
schemaPrefix := "foo"
keyPrefix := "foo"
- // TODO(jlodhia): change the expected num conflicts from 12 to 6 once
- // sync's cr code handles duplicate resolutions internally.
- runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 12, func() {
+ expectedOnConflictCallCount := 6
+ runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
// Re enable sync between the two syncbases and wait for a bit to let the
// syncbases sync and call conflict resolution.
ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -228,9 +221,8 @@
schemaPrefix := "foo1"
keyPrefix := "foo"
- // TODO(jlodhia): change the expected num conflicts from 2 to 1 once
- // sync's cr code handles duplicate resolutions internally.
- runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 2, func() {
+ expectedOnConflictCallCount := 1
+ runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
// Re enable sync between the two syncbases and wait for a bit to let the
// syncbases sync and call conflict resolution.
ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -282,9 +274,8 @@
schemaPrefix := "foo"
keyPrefix := "foo"
- // TODO(jlodhia): change the expected num conflicts from 2 to 1 once
- // sync's cr code handles duplicate resolutions internally.
- runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, 2, func() {
+ expectedOnConflictCallCount := 1
+ runWithAppBasedResolver(t, client0Ctx, client1Ctx, schemaPrefix, expectedOnConflictCallCount, func() {
// Re enable sync between the two syncbases and wait for a bit to let the
// syncbases sync and call conflict resolution.
ok(t, toggleSync(client0Ctx, "s0", sgName, "root:s0;root:s1"))
@@ -321,7 +312,29 @@
// harder to follow. It would be better to define two helper functions: one
// to do the stuff before fn(), and another to do the stuff after fn(). (Also,
// switching to channel-based signalling should simplify things substantially.)
-func runWithAppBasedResolver(t *testing.T, client0Ctx, client1Ctx *context.T, schemaPrefix string, maxCallCount int, fn func()) {
+func runWithAppBasedResolver(t *testing.T, client0Ctx, client1Ctx *context.T, schemaPrefix string, expectedCallCount int, fn func()) {
+ // 1) During tests, S0 and S1 may sync with each other at the same time. Hence
+ // they end up doing conflict resolution for the same objects on both sides
+ // creating new resolved objects independently. These resolved versions
+ // get synced again creating another set of conflicts causing 2X the amount
+ // of OnConflict calls. TODO(jlodhia): Once Sync handles duplicate
+ // resolutions internally, this will no longer be true.
+ //
+ // 2) When the CPU load goes high, the watcher can fall behind in updating
+ // sync dag with the latest updates. This leads to scenarios where S0's dag
+ // may catch up with all N updates and send them to S1 while S1's watcher
+ // has only caught up to N - k updates. This means that S1 will only
+ // recognize N - k conflicts instead of N conflicts. These conflicts result
+ // in up to N - k OnConflict calls. When S1 tries to commit the resolved
+ // updates, the store rejects the commit on account of k objects it has
+ // not yet caught up on. Initiator will sleep for 1 sec and then retry
+ // processing the N objects received from S0, causing up to N more
+ // OnConflict calls and finally succeeding in committing the result.
+ //
+ // Due to (1) and (2), the total number of calls to OnConflict can be up to
+ // 3X the expected call count.
+ maxCallCount := 3 * expectedCallCount
+
// Create and hold a conflict resolution connection on s0 and s1 to receive
// future conflicts. The expected call count is 2 * the number of batches
// because each batch is being concurrently resolved on s0 and s1 creating new
@@ -443,18 +456,15 @@
r := tb.Row(key)
want := valuePrefix + key
- // Wait up to 5 seconds for the correct key and value to appear.
- sleepTimeMs, maxAttempts := 50, 100
var value string
- for i := 0; i < maxAttempts; i++ {
+ return testutil.RetryFor(10 * time.Second, func() error {
if err := r.Get(ctx, &value); (err == nil) && (value == want) {
- return nil
+ return nil // Value found.
} else if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
return fmt.Errorf("Syncbase Error while fetching key %v: %v", key, err)
}
- time.Sleep(time.Duration(sleepTimeMs) * time.Millisecond)
- }
- return fmt.Errorf("Timed out waiting for value %v but found %v after %d milliseconds.", want, value, maxAttempts*sleepTimeMs)
+ return testutil.TryAgain(fmt.Errorf("Value expected: %v, found: %v", want, value))
+ })
}
func endTest(ctx *context.T, syncbaseName, prefix, signalKey string) error {
@@ -478,9 +488,7 @@
r := tb.Row(signalKey)
var end bool
- sleepTimeMs, maxAttempts := 50, 100 // Max wait time of 5 seconds.
- for cnt := 0; cnt < maxAttempts; cnt++ {
- time.Sleep(time.Duration(sleepTimeMs) * time.Millisecond)
+ return testutil.RetryFor(10 * time.Second, func() error {
if err := r.Get(ctx, &end); err != nil {
if verror.ErrorID(err) != verror.ErrNoExist.ID {
return fmt.Errorf("r.Get() for endkey failed: %v", err)
@@ -489,8 +497,8 @@
if end {
return nil
}
- }
- return fmt.Errorf("Timed out waiting for signal %v after %d milliseconds.", signalKey, maxAttempts*sleepTimeMs)
+ return testutil.TryAgain(fmt.Errorf("Signal %v not received", signalKey))
+ })
}
////////////////////////////////////////////////////////
diff --git a/syncbase/nosql/database.go b/syncbase/nosql/database.go
index c491203..9deef44 100644
--- a/syncbase/nosql/database.go
+++ b/syncbase/nosql/database.go
@@ -296,7 +296,7 @@
childCtx := context.WithValue(ctx, reconnectionCount, count)
// listenForConflicts is a blocking method that returns only when the
// conflict stream is broken.
- if err := d.listenForConflicts(childCtx); err != nil {
+ if err := d.listenForConflicts(childCtx); err != nil && !d.crState.isDisconnected() {
vlog.Errorf("Conflict resolution connection ended with error: %v", err)
}
@@ -349,7 +349,7 @@
count++
ri := toResolutionInfo(v, count != size)
if err := stream.Send(ri); err != nil {
- vlog.Error("Error while sending resolution")
+ vlog.Errorf("Error while sending resolution: %v", err)
return err
}
}