Merge "syncbase profile: add Snappy"
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index c1c6dcd..188ef16 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -12,6 +12,7 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
+ "v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
@@ -61,7 +62,15 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-// TODO(sadovsky): Implement Glob.
+func (a *app) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+ // Check perms.
+ sn := a.s.st.NewSnapshot()
+ if err := util.Get(ctx, call, sn, a, &appData{}); err != nil {
+ sn.Close()
+ return nil, err
+ }
+ return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+}
////////////////////////////////////////
// util.App methods
diff --git a/services/syncbase/server/db_info.go b/services/syncbase/server/db_info.go
index 6ccc664..e284fae 100644
--- a/services/syncbase/server/db_info.go
+++ b/services/syncbase/server/db_info.go
@@ -37,7 +37,7 @@
}
func (d *dbInfoLayer) StKey() string {
- return util.JoinKeyParts(util.DbInfoPrefix, d.stKeyPart())
+ return util.JoinKeyParts(util.DbInfoPrefix, d.a.name, d.stKeyPart())
}
////////////////////////////////////////
diff --git a/services/syncbase/server/dispatcher.go b/services/syncbase/server/dispatcher.go
index a6127cd..c676f97 100644
--- a/services/syncbase/server/dispatcher.go
+++ b/services/syncbase/server/dispatcher.go
@@ -8,8 +8,8 @@
"strings"
wire "v.io/syncbase/v23/services/syncbase"
+ pubutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/server/nosql"
- "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
@@ -37,7 +37,7 @@
// Validate all key atoms up front, so that we can avoid doing so in all our
// method implementations.
appName := parts[0]
- if !util.ValidKeyAtom(appName) {
+ if !pubutil.ValidName(appName) {
return nil, nil, wire.NewErrInvalidName(nil, suffix)
}
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 03377fc..7ca9271 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -10,6 +10,7 @@
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/memstore"
"v.io/v23/context"
+ "v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
@@ -34,7 +35,7 @@
// Designed for use from within App.CreateNoSQLDatabase.
func NewDatabase(ctx *context.T, call rpc.ServerCall, a util.App, name string, perms access.Permissions) (*database, error) {
if perms == nil {
- return nil, verror.New(verror.ErrInternal, nil, "perms must be specified")
+ return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
}
// TODO(sadovsky): Make storage engine pluggable.
d := &database{
@@ -90,7 +91,15 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-// TODO(sadovsky): Implement Glob.
+func (d *database) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+ // Check perms.
+ sn := d.st.NewSnapshot()
+ if err := util.Get(ctx, call, sn, d, &databaseData{}); err != nil {
+ sn.Close()
+ return nil, err
+ }
+ return util.Glob(ctx, call, pattern, sn, util.TablePrefix)
+}
////////////////////////////////////////
// util.Database methods
diff --git a/services/syncbase/server/nosql/dispatcher.go b/services/syncbase/server/nosql/dispatcher.go
index 50737c5..db18382 100644
--- a/services/syncbase/server/nosql/dispatcher.go
+++ b/services/syncbase/server/nosql/dispatcher.go
@@ -9,6 +9,7 @@
wire "v.io/syncbase/v23/services/syncbase"
nosqlWire "v.io/syncbase/v23/services/syncbase/nosql"
+ pubutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23/rpc"
"v.io/v23/security"
@@ -37,7 +38,7 @@
// Validate all key atoms up front, so that we can avoid doing so in all our
// method implementations.
for _, s := range parts {
- if !util.ValidKeyAtom(s) {
+ if !pubutil.ValidName(s) {
return nil, nil, wire.NewErrInvalidName(nil, suffix)
}
}
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index c699e88..c26eb39 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -10,7 +10,6 @@
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
- "v.io/v23/vdl"
"v.io/v23/verror"
)
@@ -32,11 +31,11 @@
////////////////////////////////////////
// RPC methods
-func (r *row) Get(ctx *context.T, call rpc.ServerCall) (*vdl.Value, error) {
+func (r *row) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
return r.get(ctx, call, r.t.d.st)
}
-func (r *row) Put(ctx *context.T, call rpc.ServerCall, value *vdl.Value) error {
+func (r *row) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
return r.put(ctx, call, r.t.d.st, value)
}
@@ -68,18 +67,18 @@
// an authorization check (currently against the table perms).
// Returns a VDL-compatible error.
func (r *row) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
- return util.Get(ctx, call, st, r.t.d, &databaseData{})
+ return util.Get(ctx, call, st, r.t, &tableData{})
}
// get reads data from the storage engine.
// Performs authorization check.
// Returns a VDL-compatible error.
-func (r *row) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) (*vdl.Value, error) {
+func (r *row) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
if err := r.checkAccess(ctx, call, st); err != nil {
return nil, err
}
- value := &vdl.Value{}
- if err := util.GetObject(st, r.StKey(), value); err != nil {
+ value, err := st.Get([]byte(r.StKey()), nil)
+ if err != nil {
if _, ok := err.(*store.ErrUnknownKey); ok {
// We've already done an auth check, so here we can safely return NoExist
// rather than NoExistOrNoAccess.
@@ -93,11 +92,14 @@
// put writes data to the storage engine.
// Performs authorization check.
// Returns a VDL-compatible error.
-func (r *row) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value *vdl.Value) error {
+func (r *row) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
if err := r.checkAccess(ctx, call, st); err != nil {
return err
}
- return util.Put(ctx, call, st, r, value)
+ if err := st.Put([]byte(r.StKey()), value); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
}
// del deletes data from the storage engine.
@@ -107,5 +109,8 @@
if err := r.checkAccess(ctx, call, st); err != nil {
return err
}
- return util.Delete(ctx, call, st, r)
+ if err := st.Delete([]byte(r.StKey())); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
}
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 1983ba8..07198d2 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -9,6 +9,7 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
+ "v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
@@ -67,23 +68,64 @@
})
}
-func (t *table) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit string) error {
+func (t *table) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, end string) error {
return verror.NewErrNotImplemented(ctx)
}
+func (t *table) Scan(ctx *context.T, call wire.TableScanServerCall, start, end string) error {
+ sn := t.d.st.NewSnapshot()
+ defer sn.Close()
+ it := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), start, end))
+ sender := call.SendStream()
+ key, value := []byte{}, []byte{}
+ for it.Advance() {
+ key = it.Key(key)
+ parts := util.SplitKeyParts(string(key))
+ sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: it.Value(value)})
+ }
+ if err := it.Err(); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
func (t *table) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
- return verror.NewErrNotImplemented(ctx)
+ if prefix != "" {
+ return verror.NewErrNotImplemented(ctx)
+ }
+ return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+ data := &tableData{}
+ return util.Update(ctx, call, st, t, data, func() error {
+ data.Perms = perms
+ return nil
+ })
+ })
}
func (t *table) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
- return nil, verror.NewErrNotImplemented(ctx)
+ if key != "" {
+ return nil, verror.NewErrNotImplemented(ctx)
+ }
+ data := &tableData{}
+ if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+ return nil, err
+ }
+ return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
}
func (t *table) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
return verror.NewErrNotImplemented(ctx)
}
-// TODO(sadovsky): Implement Glob.
+func (t *table) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+ // Check perms.
+ sn := t.d.st.NewSnapshot()
+ if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
+ sn.Close()
+ return nil, err
+ }
+ return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.RowPrefix, t.name))
+}
////////////////////////////////////////
// util.Layer methods
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index e268787..c8ea107 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -12,6 +12,7 @@
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/memstore"
"v.io/v23/context"
+ "v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
@@ -34,7 +35,7 @@
// Returns a VDL-compatible error.
func NewService(ctx *context.T, call rpc.ServerCall, perms access.Permissions) (*service, error) {
if perms == nil {
- return nil, verror.New(verror.ErrInternal, nil, "perms must be specified")
+ return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
}
// TODO(sadovsky): Make storage engine pluggable.
s := &service{
@@ -75,7 +76,15 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-// TODO(sadovsky): Implement Glob.
+func (s *service) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+ // Check perms.
+ sn := s.st.NewSnapshot()
+ if err := util.Get(ctx, call, sn, s, &serviceData{}); err != nil {
+ sn.Close()
+ return nil, err
+ }
+ return util.Glob(ctx, call, pattern, sn, util.AppPrefix)
+}
////////////////////////////////////////
// App management methods
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
new file mode 100644
index 0000000..98ffd48
--- /dev/null
+++ b/services/syncbase/server/util/glob.go
@@ -0,0 +1,77 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util
+
+import (
+ "strings"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/verror"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/lib/glob"
+)
+
+// globPatternToPrefix takes "foo*" and returns "foo". It returns ErrBadArg for
+// inputs that are not valid glob patterns as well as for inputs that are valid
+// glob patterns but not valid prefixes.
+func globPatternToPrefix(pattern string) (string, error) {
+ if _, err := glob.Parse(pattern); err != nil {
+ return "", verror.NewErrBadArg(nil)
+ }
+ if pattern == "" {
+ return "", verror.NewErrBadArg(nil)
+ }
+ if pattern[len(pattern)-1] != '*' {
+ return "", verror.NewErrBadArg(nil)
+ }
+ res := pattern[:len(pattern)-1]
+ // Disallow chars and char sequences that have special meaning in glob, since
+ // our Glob() does not support these.
+ if strings.ContainsAny(res, "/*?[") {
+ return "", verror.NewErrBadArg(nil)
+ }
+ if strings.Contains(res, "...") {
+ return "", verror.NewErrBadArg(nil)
+ }
+ return res, nil
+}
+
+// TODO(sadovsky): It sucks that Glob must be implemented differently from other
+// streaming RPC handlers. I don't have much confidence that I've implemented
+// both types of streaming correctly.
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan naming.GlobReply, error) {
+ // TODO(sadovsky): Support glob with non-prefix pattern.
+ prefix, err := globPatternToPrefix(pattern)
+ if err != nil {
+ sn.Close()
+ if verror.ErrorID(err) == verror.ErrBadArg.ID {
+ return nil, verror.NewErrNotImplemented(ctx)
+ } else {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ }
+ it := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+ ch := make(chan naming.GlobReply)
+ go func() {
+ defer sn.Close()
+ defer close(ch)
+ key := []byte{}
+ for it.Advance() {
+ key = it.Key(key)
+ parts := SplitKeyParts(string(key))
+ ch <- naming.GlobReplyEntry{naming.MountEntry{Name: parts[len(parts)-1]}}
+ }
+ if err := it.Err(); err != nil {
+ vlog.VI(1).Infof("Glob(%q) failed: %v", pattern, err)
+ ch <- naming.GlobReplyError{naming.GlobError{
+ Error: verror.New(verror.ErrInternal, ctx, err),
+ }}
+ }
+ }()
+ return ch, nil
+}
diff --git a/services/syncbase/server/util/glob_test.go b/services/syncbase/server/util/glob_test.go
new file mode 100644
index 0000000..d9c8b71
--- /dev/null
+++ b/services/syncbase/server/util/glob_test.go
@@ -0,0 +1,54 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Note, we use package util rather than util_test so that we can access the
+// unexported name util.globPatternToPrefix.
+
+package util
+
+import (
+ "testing"
+
+ "v.io/v23/verror"
+)
+
+var (
+ errBadArg = string(verror.ErrBadArg.ID)
+)
+
+func TestGlobPatternToPrefix(t *testing.T) {
+ tests := []struct {
+ pattern, prefix string
+ }{
+ {"", errBadArg},
+ {"*", ""},
+ {"foo", errBadArg},
+ {"foo*", "foo"},
+ {"foo/", errBadArg},
+ {"foo/*", errBadArg},
+ {"foo/bar", errBadArg},
+ {"foo/bar*", errBadArg},
+ {".*", "."},
+ {"..*", ".."},
+ {"...*", errBadArg},
+ {"...foo*", errBadArg},
+ {"foo...foo*", errBadArg},
+ {"..foo*", "..foo"},
+ {"foo..foo*", "foo..foo"},
+ {"...", errBadArg},
+ {"*/...", errBadArg},
+ {"foo/...", errBadArg},
+ {"/", errBadArg},
+ {"/*", errBadArg},
+ }
+ for _, test := range tests {
+ prefix, err := globPatternToPrefix(test.pattern)
+ if err != nil {
+ prefix = string(verror.ErrorID(err))
+ }
+ if prefix != test.prefix {
+ t.Errorf("%q: got %q, want %q", test.pattern, prefix, test.prefix)
+ }
+ }
+}
diff --git a/services/syncbase/server/util/key_util.go b/services/syncbase/server/util/key_util.go
index 542a0ce..d5b6e36 100644
--- a/services/syncbase/server/util/key_util.go
+++ b/services/syncbase/server/util/key_util.go
@@ -4,23 +4,34 @@
package util
-// Note, some of the code below is copied from
-// v.io/syncbase/v23/syncbase/util/util.go.
-
import (
- "regexp"
"strings"
+
+ "v.io/syncbase/v23/syncbase/util"
)
-// TODO(sadovsky): Consider loosening. Perhaps model after MySQL:
-// http://dev.mysql.com/doc/refman/5.7/en/identifiers.html
-var keyAtomRegexp *regexp.Regexp = regexp.MustCompile("^[a-zA-Z0-9_.-]+$")
-
-func ValidKeyAtom(s string) bool {
- return keyAtomRegexp.MatchString(s)
-}
-
+// JoinKeyParts builds keys for accessing data in the storage engine.
func JoinKeyParts(parts ...string) string {
// TODO(sadovsky): Figure out which delimeter makes the most sense.
return strings.Join(parts, ":")
}
+
+// SplitKeyParts is the inverse of JoinKeyParts.
+func SplitKeyParts(key string) []string {
+ return strings.Split(key, ":")
+}
+
+// ScanPrefixArgs returns args for sn.Scan() for the specified prefix.
+func ScanPrefixArgs(stKeyPrefix, prefix string) ([]byte, []byte) {
+ return ScanRangeArgs(stKeyPrefix, util.PrefixRangeStart(prefix), util.PrefixRangeEnd(prefix))
+}
+
+// ScanRangeArgs returns args for sn.Scan() for the specified range.
+// If end is "", all rows with keys >= start are included.
+func ScanRangeArgs(stKeyPrefix, start, end string) ([]byte, []byte) {
+ fullStart, fullEnd := JoinKeyParts(stKeyPrefix, start), JoinKeyParts(stKeyPrefix, end)
+ if end == "" {
+ fullEnd = util.PrefixRangeEnd(fullEnd)
+ }
+ return []byte(fullStart), []byte(fullEnd)
+}
diff --git a/services/syncbase/server/util/key_util_test.go b/services/syncbase/server/util/key_util_test.go
new file mode 100644
index 0000000..1bc1148
--- /dev/null
+++ b/services/syncbase/server/util/key_util_test.go
@@ -0,0 +1,83 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util_test
+
+import (
+ "reflect"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+)
+
+type kpt struct {
+ parts []string
+ key string
+}
+
+var keyPartTests []kpt = []kpt{
+ {[]string{"a", "b"}, "a:b"},
+ {[]string{"aa", "bb"}, "aa:bb"},
+ {[]string{"a", "b", "c"}, "a:b:c"},
+}
+
+func TestJoinKeyParts(t *testing.T) {
+ for _, test := range keyPartTests {
+ got, want := util.JoinKeyParts(test.parts...), test.key
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("%v: got %q, want %q", test.parts, got, want)
+ }
+ }
+}
+
+func TestSplitKeyParts(t *testing.T) {
+ for _, test := range keyPartTests {
+ got, want := util.SplitKeyParts(test.key), test.parts
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("%q: got %v, want %v", test.key, got, want)
+ }
+ }
+}
+
+func TestScanPrefixArgs(t *testing.T) {
+ tests := []struct {
+ stKeyPrefix, prefix, wantStart, wantEnd string
+ }{
+ {"x", "", "x:", "x;"},
+ {"x", "a", "x:a", "x:b"},
+ {"x", "a\xff", "x:a\xff", "x:b"},
+ }
+ for _, test := range tests {
+ start, end := util.ScanPrefixArgs(test.stKeyPrefix, test.prefix)
+ gotStart, gotEnd := string(start), string(end)
+ if gotStart != test.wantStart {
+ t.Errorf("{%q, %q} start: got %q, want %q", test.stKeyPrefix, test.prefix, gotStart, test.wantStart)
+ }
+ if gotEnd != test.wantEnd {
+ t.Errorf("{%q, %q} end: got %q, want %q", test.stKeyPrefix, test.prefix, gotEnd, test.wantEnd)
+ }
+ }
+}
+
+func TestScanRangeArgs(t *testing.T) {
+ tests := []struct {
+ stKeyPrefix, start, end, wantStart, wantEnd string
+ }{
+ {"x", "", "", "x:", "x;"}, // end "" means "no limit"
+ {"x", "a", "", "x:a", "x;"}, // end "" means "no limit"
+ {"x", "a", "b", "x:a", "x:b"},
+ {"x", "a", "a", "x:a", "x:a"}, // empty range
+ {"x", "b", "a", "x:b", "x:a"}, // empty range
+ }
+ for _, test := range tests {
+ start, end := util.ScanRangeArgs(test.stKeyPrefix, test.start, test.end)
+ gotStart, gotEnd := string(start), string(end)
+ if gotStart != test.wantStart {
+ t.Errorf("{%q, %q, %q} start: got %q, want %q", test.stKeyPrefix, test.start, test.end, gotStart, test.wantStart)
+ }
+ if gotEnd != test.wantEnd {
+ t.Errorf("{%q, %q, %q} end: got %q, want %q", test.stKeyPrefix, test.start, test.end, gotEnd, test.wantEnd)
+ }
+ }
+}
diff --git a/services/syncbase/store/benchmark/benchmark.go b/services/syncbase/store/benchmark/benchmark.go
index a4700e6..c794fa9 100644
--- a/services/syncbase/store/benchmark/benchmark.go
+++ b/services/syncbase/store/benchmark/benchmark.go
@@ -110,7 +110,7 @@
func ReadSequential(b *testing.B, config *Config) {
WriteSequential(b, config)
b.ResetTimer()
- s, _ := config.St.Scan([]byte("0"), []byte("z"))
+ s := config.St.Scan([]byte("0"), []byte("z"))
var key, value []byte
for i := 0; i < b.N; i++ {
if !s.Advance() {
diff --git a/services/syncbase/store/invalid_types.go b/services/syncbase/store/invalid_types.go
new file mode 100644
index 0000000..05bfc13
--- /dev/null
+++ b/services/syncbase/store/invalid_types.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package store
+
+// InvalidSnapshot is a store.Snapshot for which all methods return errors.
+type InvalidSnapshot struct {
+ // Error is the error returned by every method call.
+ Error error
+}
+
+// Close implements the store.Snapshot interface.
+func (s *InvalidSnapshot) Close() error {
+ return s.Error
+}
+
+// Get implements the store.StoreReader interface.
+func (s *InvalidSnapshot) Get(key, valbuf []byte) ([]byte, error) {
+ return valbuf, s.Error
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *InvalidSnapshot) Scan(start, end []byte) Stream {
+ return &InvalidStream{s.Error}
+}
+
+// InvalidStream is a store.Stream for which all methods return errors.
+type InvalidStream struct {
+ // Error is the error returned by every method call.
+ Error error
+}
+
+// Advance implements the store.Stream interface.
+func (s *InvalidStream) Advance() bool {
+ return false
+}
+
+// Key implements the store.Stream interface.
+func (s *InvalidStream) Key(keybuf []byte) []byte {
+ panic(s.Error)
+}
+
+// Value implements the store.Stream interface.
+func (s *InvalidStream) Value(valbuf []byte) []byte {
+ panic(s.Error)
+}
+
+// Err implements the store.Stream interface.
+func (s *InvalidStream) Err() error {
+ return s.Error
+}
+
+// Cancel implements the store.Stream interface.
+func (s *InvalidStream) Cancel() {
+}
+
+// InvalidTransaction is a store.Transaction for which all methods return errors.
+type InvalidTransaction struct {
+ // Error is the error returned by every method call.
+ Error error
+}
+
+// ResetForRetry implements the store.Transaction interface.
+func (tx *InvalidTransaction) ResetForRetry() {
+ panic(tx.Error)
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *InvalidTransaction) Get(key, valbuf []byte) ([]byte, error) {
+ return valbuf, tx.Error
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *InvalidTransaction) Scan(start, end []byte) Stream {
+ return &InvalidStream{tx.Error}
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *InvalidTransaction) Put(key, value []byte) error {
+ return tx.Error
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *InvalidTransaction) Delete(key []byte) error {
+ return tx.Error
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *InvalidTransaction) Commit() error {
+ return tx.Error
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *InvalidTransaction) Abort() error {
+ return tx.Error
+}
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index a173992..6f89ac8 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -11,22 +11,29 @@
// #include "syncbase_leveldb.h"
import "C"
import (
+ "errors"
"sync"
"unsafe"
"v.io/syncbase/x/ref/services/syncbase/store"
)
+var (
+ errClosedStore = errors.New("closed store")
+)
+
// db is a wrapper around LevelDB that implements the store.Store interface.
-// TODO(rogulenko): ensure thread safety.
type db struct {
+ // mu protects cDb.
+ mu sync.RWMutex
cDb *C.leveldb_t
// Default read/write options.
readOptions *C.leveldb_readoptions_t
writeOptions *C.leveldb_writeoptions_t
+ err error
// Used to prevent concurrent transactions.
// TODO(rogulenko): improve concurrency.
- mu sync.Mutex
+ txmu sync.Mutex
}
var _ store.Store = (*db)(nil)
@@ -58,9 +65,18 @@
// Close implements the store.Store interface.
func (d *db) Close() error {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ if d.err != nil {
+ return d.err
+ }
C.leveldb_close(d.cDb)
+ d.cDb = nil
C.leveldb_readoptions_destroy(d.readOptions)
+ d.readOptions = nil
C.leveldb_writeoptions_destroy(d.writeOptions)
+ d.writeOptions = nil
+ d.err = errors.New("closed store")
return nil
}
@@ -81,8 +97,13 @@
}
// Scan implements the store.StoreReader interface.
-func (d *db) Scan(start, end []byte) (store.Stream, error) {
- return newStream(d, start, end, d.readOptions), nil
+func (d *db) Scan(start, end []byte) store.Stream {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ return &store.InvalidStream{d.err}
+ }
+ return newStream(d, start, end, d.readOptions)
}
// Put implements the store.StoreWriter interface.
@@ -103,26 +124,44 @@
// NewTransaction implements the store.Store interface.
func (d *db) NewTransaction() store.Transaction {
+ // txmu is held until the transaction is successfully committed or aborted.
+ d.txmu.Lock()
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ d.txmu.Unlock()
+ return &store.InvalidTransaction{d.err}
+ }
return newTransaction(d)
}
// NewSnapshot implements the store.Store interface.
func (d *db) NewSnapshot() store.Snapshot {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ return &store.InvalidSnapshot{d.err}
+ }
return newSnapshot(d)
}
// getWithOpts returns the value for the given key.
// cOpts may contain a pointer to a snapshot.
func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.err != nil {
+ return valbuf, d.err
+ }
var cError *C.char
var valLen C.size_t
cStr, cLen := cSlice(key)
val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError)
if err := goError(cError); err != nil {
- return nil, err
+ return valbuf, err
}
if val == nil {
- return nil, &store.ErrUnknownKey{Key: string(key)}
+ return valbuf, &store.ErrUnknownKey{Key: string(key)}
}
defer C.leveldb_free(unsafe.Pointer(val))
return store.CopyBytes(valbuf, goBytes(val, valLen)), nil
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
index fccad5f..666cc40 100644
--- a/services/syncbase/store/leveldb/db_test.go
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -19,27 +19,37 @@
}
func TestStream(t *testing.T) {
- db, dbPath := newDB()
- defer destroyDB(db, dbPath)
- test.RunStreamTest(t, db)
+ runTest(t, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+ runTest(t, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+ runTest(t, test.RunStoreStateTest)
}
func TestReadWriteBasic(t *testing.T) {
- st, path := newDB()
- defer destroyDB(st, path)
- test.RunReadWriteBasicTest(t, st)
+ runTest(t, test.RunReadWriteBasicTest)
}
func TestReadWriteRandom(t *testing.T) {
- st, path := newDB()
- defer destroyDB(st, path)
- test.RunReadWriteRandomTest(t, st)
+ runTest(t, test.RunReadWriteRandomTest)
+}
+
+func TestTransactionState(t *testing.T) {
+ runTest(t, test.RunTransactionStateTest)
}
func TestTransactionsWithGet(t *testing.T) {
- st, path := newDB()
- defer destroyDB(st, path)
- test.RunTransactionsWithGetTest(t, st)
+ runTest(t, test.RunTransactionsWithGetTest)
+}
+
+func runTest(t *testing.T, f func(t *testing.T, st store.Store)) {
+ st, dbPath := newDB()
+ defer destroyDB(st, dbPath)
+ f(t, st)
}
func newDB() (store.Store, string) {
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index afa9f25..7b82c7c 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -7,15 +7,21 @@
// #include "leveldb/c.h"
import "C"
import (
+ "errors"
+ "sync"
+
"v.io/syncbase/x/ref/services/syncbase/store"
)
// snapshot is a wrapper around LevelDB snapshot that implements
// the store.Snapshot interface.
type snapshot struct {
+ // mu protects the state of the snapshot.
+ mu sync.RWMutex
d *db
cSnapshot *C.leveldb_snapshot_t
cOpts *C.leveldb_readoptions_t
+ err error
}
var _ store.Snapshot = (*snapshot)(nil)
@@ -26,25 +32,43 @@
C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
return &snapshot{
- d,
- cSnapshot,
- cOpts,
+ d: d,
+ cSnapshot: cSnapshot,
+ cOpts: cOpts,
}
}
// Close implements the store.Snapshot interface.
func (s *snapshot) Close() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.err != nil {
+ return s.err
+ }
C.leveldb_readoptions_destroy(s.cOpts)
+ s.cOpts = nil
C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
+ s.cSnapshot = nil
+ s.err = errors.New("closed snapshot")
return nil
}
// Get implements the store.StoreReader interface.
func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if s.err != nil {
+ return valbuf, s.err
+ }
return s.d.getWithOpts(key, valbuf, s.cOpts)
}
// Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
- return newStream(s.d, start, end, s.cOpts), nil
+func (s *snapshot) Scan(start, end []byte) store.Stream {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if s.err != nil {
+ return &store.InvalidStream{s.err}
+ }
+ return newStream(s.d, start, end, s.cOpts)
}
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index f348567..4705e59 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -9,10 +9,10 @@
import "C"
import (
"bytes"
+ "errors"
"sync"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/verror"
)
// stream is a wrapper around LevelDB iterator that implements
@@ -39,7 +39,6 @@
func newStream(d *db, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
cStr, size := cSlice(start)
- // TODO(rogulenko): check if (db.cDb != nil) under a db-scoped mutex.
cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
return &stream{
cIter: cIter,
@@ -57,7 +56,7 @@
s.mu.Lock()
defer s.mu.Unlock()
s.hasValue = false
- if s.cIter == nil {
+ if s.err != nil {
return false
}
// The C iterator starts out initialized, pointing at the first value; we
@@ -67,7 +66,7 @@
} else {
C.syncbase_leveldb_iter_next(s.cIter)
}
- if s.cIter.is_valid != 0 && bytes.Compare(s.end, s.cKey()) > 0 {
+ if s.cIter.is_valid != 0 && (len(s.end) == 0 || bytes.Compare(s.cKey(), s.end) < 0) {
s.hasValue = true
s.key = s.cKey()
s.value = s.cVal()
@@ -113,10 +112,9 @@
func (s *stream) Cancel() {
s.mu.Lock()
defer s.mu.Unlock()
- if s.cIter == nil {
+ if s.err != nil {
return
}
- s.err = verror.New(verror.ErrCanceled, nil)
// s.hasValue might be false if Advance was never called.
if s.hasValue {
// We copy the key and the value from the C heap to the Go heap before
@@ -124,6 +122,7 @@
s.key = store.CopyBytes(nil, s.cKey())
s.value = store.CopyBytes(nil, s.cVal())
}
+ s.err = errors.New("canceled stream")
s.destroyLeveldbIter()
}
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index d358f51..12c3a65 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -7,62 +7,85 @@
// #include "leveldb/c.h"
import "C"
import (
+ "errors"
+ "sync"
+
"v.io/syncbase/x/ref/services/syncbase/store"
)
// transaction is a wrapper around LevelDB WriteBatch that implements
// the store.Transaction interface.
-// TODO(rogulenko): handle incorrect usage.
type transaction struct {
+ mu sync.Mutex
d *db
snapshot store.Snapshot
batch *C.leveldb_writebatch_t
cOpts *C.leveldb_writeoptions_t
+ err error
}
var _ store.Transaction = (*transaction)(nil)
func newTransaction(d *db) *transaction {
- // The lock is held until the transaction is successfully
- // committed or aborted.
- d.mu.Lock()
return &transaction{
- d,
- d.NewSnapshot(),
- C.leveldb_writebatch_create(),
- d.writeOptions,
+ d: d,
+ snapshot: d.NewSnapshot(),
+ batch: C.leveldb_writebatch_create(),
+ cOpts: d.writeOptions,
}
}
// close frees allocated C objects and releases acquired locks.
func (tx *transaction) close() {
- tx.d.mu.Unlock()
- tx.snapshot.Close()
+ tx.d.txmu.Unlock()
C.leveldb_writebatch_destroy(tx.batch)
+ tx.batch = nil
if tx.cOpts != tx.d.writeOptions {
C.leveldb_writeoptions_destroy(tx.cOpts)
}
+ tx.cOpts = nil
}
// ResetForRetry implements the store.Transaction interface.
func (tx *transaction) ResetForRetry() {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.batch == nil {
+ panic(tx.err)
+ }
tx.snapshot.Close()
tx.snapshot = tx.d.NewSnapshot()
+ tx.err = nil
C.leveldb_writebatch_clear(tx.batch)
}
// Get implements the store.StoreReader interface.
func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return valbuf, tx.err
+ }
return tx.snapshot.Get(key, valbuf)
}
// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) store.Stream {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return &store.InvalidStream{tx.err}
+ }
return tx.snapshot.Scan(start, end)
}
// Put implements the store.StoreWriter interface.
func (tx *transaction) Put(key, value []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return tx.err
+ }
cKey, cKeyLen := cSlice(key)
cVal, cValLen := cSlice(value)
C.leveldb_writebatch_put(tx.batch, cKey, cKeyLen, cVal, cValLen)
@@ -71,6 +94,11 @@
// Delete implements the store.StoreWriter interface.
func (tx *transaction) Delete(key []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return tx.err
+ }
cKey, cKeyLen := cSlice(key)
C.leveldb_writebatch_delete(tx.batch, cKey, cKeyLen)
return nil
@@ -78,17 +106,32 @@
// Commit implements the store.Transaction interface.
func (tx *transaction) Commit() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.err != nil {
+ return tx.err
+ }
+ tx.d.mu.Lock()
+ defer tx.d.mu.Unlock()
var cError *C.char
C.leveldb_write(tx.d.cDb, tx.cOpts, tx.batch, &cError)
if err := goError(cError); err != nil {
+ tx.err = errors.New("already attempted to commit transaction")
return err
}
+ tx.err = errors.New("committed transaction")
tx.close()
return nil
}
// Abort implements the store.Transaction interface.
func (tx *transaction) Abort() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ if tx.batch == nil {
+ return tx.err
+ }
+ tx.err = errors.New("aborted transaction")
tx.close()
return nil
}
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index 180cd14..fc7cf0a 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -52,21 +52,21 @@
s.mu.Lock()
defer s.mu.Unlock()
if err := s.error(); err != nil {
- return nil, err
+ return valbuf, err
}
value, ok := s.data[string(key)]
if !ok {
- return nil, &store.ErrUnknownKey{Key: string(key)}
+ return valbuf, &store.ErrUnknownKey{Key: string(key)}
}
return store.CopyBytes(valbuf, value), nil
}
// Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
+func (s *snapshot) Scan(start, end []byte) store.Stream {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.error(); err != nil {
- return nil, err
+ return &store.InvalidStream{err}
}
- return newStream(s, start, end), nil
+ return newStream(s, start, end)
}
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index 36a0758..e844b20 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -40,16 +40,16 @@
defer st.mu.Unlock()
value, ok := st.data[string(key)]
if !ok {
- return nil, &store.ErrUnknownKey{Key: string(key)}
+ return valbuf, &store.ErrUnknownKey{Key: string(key)}
}
return store.CopyBytes(valbuf, value), nil
}
// Scan implements the store.StoreReader interface.
-func (st *memstore) Scan(start, end []byte) (store.Stream, error) {
+func (st *memstore) Scan(start, end []byte) store.Stream {
st.mu.Lock()
defer st.mu.Unlock()
- return newStream(newSnapshot(st), start, end), nil
+ return newStream(newSnapshot(st), start, end)
}
// Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index 1ba6e85..4006ce7 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -8,6 +8,7 @@
"runtime"
"testing"
+ "v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/test"
)
@@ -16,27 +17,39 @@
}
func TestStream(t *testing.T) {
- st := New()
- defer st.Close()
- test.RunStreamTest(t, st)
+ runTest(t, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+ runTest(t, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+ // TODO(rogulenko): Enable this test once memstore.Close causes memstore to
+ // disallow subsequent operations.
+ // runTest(t, test.RunStoreStateTest)
}
func TestReadWriteBasic(t *testing.T) {
- st := New()
- defer st.Close()
- test.RunReadWriteBasicTest(t, st)
+ runTest(t, test.RunReadWriteBasicTest)
}
func TestReadWriteRandom(t *testing.T) {
- st := New()
- defer st.Close()
- test.RunReadWriteRandomTest(t, st)
+ runTest(t, test.RunReadWriteRandomTest)
+}
+
+func TestTransactionState(t *testing.T) {
+ runTest(t, test.RunTransactionStateTest)
}
func TestTransactionsWithGet(t *testing.T) {
- st := New()
- defer st.Close()
// TODO(sadovsky): Enable this test once we've added a retry loop to
// RunInTransaction. Without that, concurrency makes the test fail.
- //test.RunTransactionsWithGetTest(t, st)
+ // runTest(t, test.RunTransactionsWithGetTest)
+}
+
+func runTest(t *testing.T, f func(t *testing.T, st store.Store)) {
+ st := New()
+ defer st.Close()
+ f(t, st)
}
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index f1614eb..221d33c 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -5,11 +5,11 @@
package memstore
import (
+ "errors"
"sort"
"sync"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/verror"
)
type stream struct {
@@ -26,7 +26,7 @@
func newStream(sn *snapshot, start, end []byte) *stream {
keys := []string{}
for k := range sn.data {
- if k >= string(start) && k < string(end) {
+ if k >= string(start) && (len(end) == 0 || k < string(end)) {
keys = append(keys, k)
}
}
@@ -89,5 +89,5 @@
func (s *stream) Cancel() {
s.mu.Lock()
defer s.mu.Unlock()
- s.err = verror.New(verror.ErrCanceled, nil)
+ s.err = errors.New("canceled stream")
}
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 3ef45aa..56dcbdd 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -80,19 +80,19 @@
tx.mu.Lock()
defer tx.mu.Unlock()
if err := tx.error(); err != nil {
- return nil, err
+ return valbuf, err
}
return tx.sn.Get(key, valbuf)
}
// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) store.Stream {
tx.mu.Lock()
defer tx.mu.Unlock()
if err := tx.error(); err != nil {
- return nil, err
+ return &store.InvalidStream{err}
}
- return newStream(tx.sn, start, end), nil
+ return newStream(tx.sn, start, end)
}
// Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index 926b66d..22a5c99 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -12,11 +12,12 @@
// sub-slice of valbuf if valbuf was large enough to hold the entire value.
// Otherwise, a newly allocated slice will be returned. It is valid to pass a
// nil valbuf.
- // Fails if the given key is unknown (ErrUnknownKey).
+ // If the given key is unknown, valbuf is returned unchanged and the function
+ // fails with ErrUnknownKey.
Get(key, valbuf []byte) ([]byte, error)
// Scan returns all rows with keys in range [start, end).
- Scan(start, end []byte) (Stream, error)
+ Scan(start, end []byte) Stream
}
// StoreWriter writes data to a CRUD-capable storage engine.
diff --git a/services/syncbase/store/test/snapshot.go b/services/syncbase/store/test/snapshot.go
new file mode 100644
index 0000000..0276fd1
--- /dev/null
+++ b/services/syncbase/store/test/snapshot.go
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "strings"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// RunSnapshotTest verifies store.Snapshot operations.
+func RunSnapshotTest(t *testing.T, st store.Store) {
+ key1, value1 := []byte("key1"), []byte("value1")
+ st.Put(key1, value1)
+ snapshot := st.NewSnapshot()
+ key2, value2 := []byte("key2"), []byte("value2")
+ st.Put(key2, value2)
+
+ // Test Get and Scan.
+ verifyGet(t, snapshot, key1, value1)
+ verifyGet(t, snapshot, key2, nil)
+ s := snapshot.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, key1, value1)
+ verifyAdvance(t, s, nil, nil)
+
+ // Test functions after Close.
+ if err := snapshot.Close(); err != nil {
+ t.Fatalf("can't close the snapshot: %v", err)
+ }
+ expectedErr := "closed snapshot"
+ if err := snapshot.Close(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if _, err := snapshot.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ s = snapshot.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, nil, nil)
+ if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+}
diff --git a/services/syncbase/store/test/store.go b/services/syncbase/store/test/store.go
new file mode 100644
index 0000000..a0b150f
--- /dev/null
+++ b/services/syncbase/store/test/store.go
@@ -0,0 +1,212 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "fmt"
+ "math/rand"
+ "strings"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type operation int
+
+const (
+ Put operation = 0
+ Delete operation = 1
+)
+
+type testStep struct {
+ op operation
+ key int
+}
+
+func randomBytes(rnd *rand.Rand, length int) []byte {
+ var res []byte
+ for i := 0; i < length; i++ {
+ res = append(res, '0'+byte(rnd.Intn(10)))
+ }
+ return res
+}
+
+// storeState is the in-memory representation of the store state.
+type storeState struct {
+ // We assume that the database has keys [0..size).
+ size int
+ rnd *rand.Rand
+ memtable map[string][]byte
+}
+
+func newStoreState(size int) *storeState {
+ return &storeState{
+ size,
+ rand.New(rand.NewSource(239017)),
+ make(map[string][]byte),
+ }
+}
+
+func (s *storeState) clone() *storeState {
+ other := &storeState{
+ s.size,
+ s.rnd,
+ make(map[string][]byte),
+ }
+ for k, v := range s.memtable {
+ other.memtable[k] = v
+ }
+ return other
+}
+
+// nextKey returns the smallest key in the store that is not less than the
+// provided key. If there is no such key, returns size.
+func (s *storeState) lowerBound(key int) int {
+ for key < s.size {
+ if _, ok := s.memtable[fmt.Sprintf("%05d", key)]; ok {
+ return key
+ }
+ key++
+ }
+ return key
+}
+
+// verify checks that various read operations on store.Store and memtable return
+// the same results.
+func (s *storeState) verify(t *testing.T, st store.StoreReader) {
+ // Verify Get().
+ for i := 0; i < s.size; i++ {
+ keystr := fmt.Sprintf("%05d", i)
+ answer, ok := s.memtable[keystr]
+ if ok {
+ verifyGet(t, st, []byte(keystr), answer)
+ } else {
+ verifyGet(t, st, []byte(keystr), nil)
+ }
+ }
+ // Verify 10 random Scan() calls.
+ for i := 0; i < 10; i++ {
+ start, end := s.rnd.Intn(s.size), s.rnd.Intn(s.size)
+ if start > end {
+ start, end = end, start
+ }
+ end++
+ stream := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
+ for start = s.lowerBound(start); start < end; start = s.lowerBound(start + 1) {
+ keystr := fmt.Sprintf("%05d", start)
+ verifyAdvance(t, stream, []byte(keystr), s.memtable[keystr])
+ }
+ verifyAdvance(t, stream, nil, nil)
+ }
+}
+
+// runReadWriteTest verifies read/write/snapshot operations.
+func runReadWriteTest(t *testing.T, st store.Store, size int, steps []testStep) {
+ s := newStoreState(size)
+ // We verify database state no more than ~100 times to prevent the test from
+ // being slow.
+ frequency := (len(steps) + 99) / 100
+ var states []*storeState
+ var snapshots []store.Snapshot
+ for i, step := range steps {
+ if step.key < 0 || step.key >= s.size {
+ t.Fatalf("invalid test step %v", step)
+ }
+ switch step.op {
+ case Put:
+ key := fmt.Sprintf("%05d", step.key)
+ value := randomBytes(s.rnd, 100)
+ s.memtable[key] = value
+ st.Put([]byte(key), value)
+ case Delete:
+ key := fmt.Sprintf("%05d", step.key)
+ if _, ok := s.memtable[key]; ok {
+ delete(s.memtable, key)
+ st.Delete([]byte(key))
+ }
+ default:
+ t.Fatalf("invalid test step %v", step)
+ }
+ if i%frequency == 0 {
+ s.verify(t, st)
+ states = append(states, s.clone())
+ snapshots = append(snapshots, st.NewSnapshot())
+ }
+ }
+ s.verify(t, st)
+ for i := 0; i < len(states); i++ {
+ states[i].verify(t, snapshots[i])
+ snapshots[i].Close()
+ }
+}
+
+// RunReadWriteBasicTest runs a basic test that verifies reads, writes and
+// snapshots.
+func RunReadWriteBasicTest(t *testing.T, st store.Store) {
+ runReadWriteTest(t, st, 3, []testStep{
+ testStep{Put, 1},
+ testStep{Put, 2},
+ testStep{Delete, 1},
+ testStep{Put, 1},
+ testStep{Put, 2},
+ })
+}
+
+// RunReadWriteRandomTest runs a random-generated test that verifies reads,
+// writes and snapshots.
+func RunReadWriteRandomTest(t *testing.T, st store.Store) {
+ rnd := rand.New(rand.NewSource(239017))
+ var testcase []testStep
+ size := 50
+ for i := 0; i < 10000; i++ {
+ testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
+ }
+ runReadWriteTest(t, st, size, testcase)
+}
+
+// RunStoreStateTest verifies operations that modify the state of a store.Store.
+func RunStoreStateTest(t *testing.T, st store.Store) {
+ key1, value1 := []byte("key1"), []byte("value1")
+ st.Put(key1, value1)
+ key2 := []byte("key2")
+
+ // Test Get and Scan.
+ verifyGet(t, st, key1, value1)
+ verifyGet(t, st, key2, nil)
+ s := st.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, key1, value1)
+ verifyAdvance(t, s, nil, nil)
+
+ // Test functions after Close.
+ if err := st.Close(); err != nil {
+ t.Fatalf("can't close the store: %v", err)
+ }
+ expectedErr := "closed store"
+ if err := st.Close(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ s = st.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, nil, nil)
+ if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ snapshot := st.NewSnapshot()
+ if _, err := snapshot.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ tx := st.NewTransaction()
+ if _, err := tx.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if _, err := st.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := st.Put(key1, value1); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := st.Delete(key1); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+}
diff --git a/services/syncbase/store/test/stream.go b/services/syncbase/store/test/stream.go
index 295d683..4caeb7c 100644
--- a/services/syncbase/store/test/stream.go
+++ b/services/syncbase/store/test/stream.go
@@ -6,10 +6,10 @@
import (
"bytes"
+ "strings"
"testing"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/v23/verror"
)
// RunStreamTest verifies store.Stream operations.
@@ -21,25 +21,14 @@
key3, value3 := []byte("key3"), []byte("value3")
st.Put(key3, value3)
- s, _ := st.Scan([]byte("a"), []byte("z"))
- if !s.Advance() {
- t.Fatalf("can't advance the stream")
- }
- var key, value []byte
- for i := 0; i < 2; i++ {
- if key = s.Key(key); !bytes.Equal(key, key1) {
- t.Fatalf("unexpected key: got %q, want %q", key, key1)
- }
- if value = s.Value(value); !bytes.Equal(value, value1) {
- t.Fatalf("unexpected value: got %q, want %q", value, value1)
- }
- }
-
+ s := st.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, key1, value1)
if !s.Advance() {
t.Fatalf("can't advance the stream")
}
s.Cancel()
for i := 0; i < 2; i++ {
+ var key, value []byte
if key = s.Key(key); !bytes.Equal(key, key2) {
t.Fatalf("unexpected key: got %q, want %q", key, key2)
}
@@ -47,11 +36,8 @@
t.Fatalf("unexpected value: got %q, want %q", value, value2)
}
}
-
- if s.Advance() {
- t.Fatalf("advance returned true unexpectedly")
- }
- if verror.ErrorID(s.Err()) != verror.ErrCanceled.ID {
+ verifyAdvance(t, s, nil, nil)
+ if !strings.Contains(s.Err().Error(), "canceled stream") {
t.Fatalf("unexpected steam error: %v", s.Err())
}
}
diff --git a/services/syncbase/store/test/test.go b/services/syncbase/store/test/test.go
deleted file mode 100644
index 0643bae..0000000
--- a/services/syncbase/store/test/test.go
+++ /dev/null
@@ -1,276 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// TODO(rogulenko): add more tests.
-
-package test
-
-import (
- "bytes"
- "fmt"
- "math/rand"
- "reflect"
- "strconv"
- "sync"
- "testing"
-
- "v.io/syncbase/x/ref/services/syncbase/store"
-)
-
-type operation int
-
-const (
- Put operation = 0
- Delete operation = 1
-)
-
-type testStep struct {
- op operation
- key int
-}
-
-func randomBytes(rnd *rand.Rand, length int) []byte {
- var res []byte
- for i := 0; i < length; i++ {
- res = append(res, '0'+byte(rnd.Intn(10)))
- }
- return res
-}
-
-// storeState is the in-memory representation of the store state.
-type storeState struct {
- // We assume that the database has keys [0..size).
- size int
- rnd *rand.Rand
- memtable map[string][]byte
-}
-
-func newStoreState(size int) *storeState {
- return &storeState{
- size,
- rand.New(rand.NewSource(239017)),
- make(map[string][]byte),
- }
-}
-
-func (s *storeState) clone() *storeState {
- other := &storeState{
- s.size,
- s.rnd,
- make(map[string][]byte),
- }
- for k, v := range s.memtable {
- other.memtable[k] = v
- }
- return other
-}
-
-// nextKey returns the smallest key in the store that is not less than the
-// provided key. If there is no such key, returns size.
-func (s *storeState) lowerBound(key int) int {
- for key < s.size {
- if _, ok := s.memtable[fmt.Sprintf("%05d", key)]; ok {
- return key
- }
- key++
- }
- return key
-}
-
-// verify checks that various read operations on store.Store and memtable return
-// the same results.
-func (s *storeState) verify(t *testing.T, st store.StoreReader) {
- var key, value []byte
- var err error
- // Verify Get().
- for i := 0; i < s.size; i++ {
- keystr := fmt.Sprintf("%05d", i)
- answer, ok := s.memtable[keystr]
- key = []byte(keystr)
- value, err = st.Get(key, value)
- if ok {
- if err != nil || !bytes.Equal(value, answer) {
- t.Fatalf("unexpected get result for %q: got {%q, %v}, want {%q, nil}", keystr, value, err, answer)
- }
- } else {
- if !reflect.DeepEqual(&store.ErrUnknownKey{Key: keystr}, err) {
- t.Fatalf("unexpected get error for key %q: %v", keystr, err)
- }
- }
- }
- // Verify 10 random Scan() calls.
- for i := 0; i < 10; i++ {
- start, end := s.rnd.Intn(s.size), s.rnd.Intn(s.size)
- if start > end {
- start, end = end, start
- }
- end++
- stream, err := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
- if err != nil {
- t.Fatalf("can't create stream: %v", err)
- }
- for stream.Advance() {
- start = s.lowerBound(start)
- keystr := fmt.Sprintf("%05d", start)
- key, value = stream.Key(key), stream.Value(value)
- if string(key) != keystr {
- t.Fatalf("unexpected key during scan: got %q, want %q", key, keystr)
- }
- if !bytes.Equal(value, s.memtable[keystr]) {
- t.Fatalf("unexpected value during scan: got %q, want %q", value, s.memtable[keystr])
- }
- start++
- }
- if start = s.lowerBound(start); start < end {
- t.Fatalf("stream ended unexpectedly")
- }
- }
-}
-
-// runReadWriteTest verifies read/write/snapshot operations.
-func runReadWriteTest(t *testing.T, st store.Store, size int, steps []testStep) {
- s := newStoreState(size)
- // We verify database state no more than ~100 times to prevent the test from
- // being slow.
- frequency := (len(steps) + 99) / 100
- var states []*storeState
- var snapshots []store.Snapshot
- for i, step := range steps {
- if step.key < 0 || step.key >= s.size {
- t.Fatalf("invalid test step %v", step)
- }
- switch step.op {
- case Put:
- key := fmt.Sprintf("%05d", step.key)
- value := randomBytes(s.rnd, 100)
- s.memtable[key] = value
- st.Put([]byte(key), value)
- case Delete:
- key := fmt.Sprintf("%05d", step.key)
- if _, ok := s.memtable[key]; ok {
- delete(s.memtable, key)
- st.Delete([]byte(key))
- }
- default:
- t.Fatalf("invalid test step %v", step)
- }
- if i%frequency == 0 {
- s.verify(t, st)
- states = append(states, s.clone())
- snapshots = append(snapshots, st.NewSnapshot())
- }
- }
- s.verify(t, st)
- for i := 0; i < len(states); i++ {
- states[i].verify(t, snapshots[i])
- snapshots[i].Close()
- }
-}
-
-// RunReadWriteBasicTest runs a basic test that verifies reads, writes and
-// snapshots.
-func RunReadWriteBasicTest(t *testing.T, st store.Store) {
- runReadWriteTest(t, st, 3, []testStep{
- testStep{Put, 1},
- testStep{Put, 2},
- testStep{Delete, 1},
- testStep{Put, 1},
- testStep{Put, 2},
- })
-}
-
-// RunReadWriteRandomTest runs a random-generated test that verifies reads,
-// writes and snapshots.
-func RunReadWriteRandomTest(t *testing.T, st store.Store) {
- rnd := rand.New(rand.NewSource(239017))
- var testcase []testStep
- size := 50
- for i := 0; i < 10000; i++ {
- testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
- }
- runReadWriteTest(t, st, size, testcase)
-}
-
-// RunTransactionsWithGetTest tests transactions that use Put and Get
-// operations.
-// NOTE: consider setting GOMAXPROCS to something greater than 1.
-func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
- // Invariant: value mapped to n is sum of values of 0..n-1.
- // Each of k transactions takes m distinct random values from 0..n-1, adds 1
- // to each and m to value mapped to n.
- // The correctness of sums is checked after all transactions have been
- // committed.
- n, m, k := 10, 3, 100
- for i := 0; i <= n; i++ {
- if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
- t.Fatalf("can't write to database")
- }
- }
- var wg sync.WaitGroup
- wg.Add(k)
- // TODO(sadovsky): This configuration creates huge resource contention.
- // Perhaps we should add some random sleep's to reduce the contention.
- for i := 0; i < k; i++ {
- go func() {
- rnd := rand.New(rand.NewSource(239017 * int64(i)))
- perm := rnd.Perm(n)
- if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- for j := 0; j <= m; j++ {
- var keystr string
- if j < m {
- keystr = fmt.Sprintf("%05d", perm[j])
- } else {
- keystr = fmt.Sprintf("%05d", n)
- }
- key := []byte(keystr)
- val, err := st.Get(key, nil)
- if err != nil {
- return fmt.Errorf("can't get key %q: %v", key, err)
- }
- intValue, err := strconv.ParseInt(string(val), 10, 64)
- if err != nil {
- return fmt.Errorf("can't parse int from %q: %v", val, err)
- }
- var newValue int64
- if j < m {
- newValue = intValue + 1
- } else {
- newValue = intValue + int64(m)
- }
- if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
- return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
- }
- }
- return nil
- }); err != nil {
- panic(fmt.Errorf("can't commit transaction: %v", err))
- }
- wg.Done()
- }()
- }
- wg.Wait()
- var sum int64
- for j := 0; j <= n; j++ {
- keystr := fmt.Sprintf("%05d", j)
- key := []byte(keystr)
- val, err := st.Get(key, nil)
- if err != nil {
- t.Fatalf("can't get key %q: %v", key, err)
- }
- intValue, err := strconv.ParseInt(string(val), 10, 64)
- if err != nil {
- t.Fatalf("can't parse int from %q: %v", val, err)
- }
- if j < n {
- sum += intValue
- } else {
- if intValue != int64(m*k) {
- t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
- }
- }
- }
- if sum != int64(m*k) {
- t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
- }
-}
diff --git a/services/syncbase/store/test/transaction.go b/services/syncbase/store/test/transaction.go
new file mode 100644
index 0000000..e078e49
--- /dev/null
+++ b/services/syncbase/store/test/transaction.go
@@ -0,0 +1,154 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "fmt"
+ "math/rand"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// RunTransactionTest verifies operations that modify the state of a
+// store.Transaction.
+func RunTransactionStateTest(t *testing.T, st store.Store) {
+ abortFunctions := []func(t *testing.T, tx store.Transaction) string{
+ func(t *testing.T, tx store.Transaction) string {
+ if err := tx.Abort(); err != nil {
+ t.Fatalf("can't abort the transaction: %v", err)
+ }
+ return "aborted transaction"
+ },
+ func(t *testing.T, tx store.Transaction) string {
+ if err := tx.Commit(); err != nil {
+ t.Fatalf("can't commit the transaction: %v", err)
+ }
+ return "committed transaction"
+ },
+ }
+ for _, fn := range abortFunctions {
+ key1, value1 := []byte("key1"), []byte("value1")
+ st.Put(key1, value1)
+ key2 := []byte("key2")
+ tx := st.NewTransaction()
+
+ // Test Get and Scan.
+ verifyGet(t, tx, key1, value1)
+ verifyGet(t, tx, key2, nil)
+ s := tx.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, key1, value1)
+ verifyAdvance(t, s, nil, nil)
+
+ // Test functions after Abort.
+ expectedErr := fn(t, tx)
+ if err := tx.Abort(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := tx.Commit(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ s = tx.Scan([]byte("a"), []byte("z"))
+ verifyAdvance(t, s, nil, nil)
+ if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if _, err := tx.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := tx.Put(key1, value1); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ if err := tx.Delete(key1); !strings.Contains(err.Error(), expectedErr) {
+ t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+ }
+ }
+}
+
+// RunTransactionsWithGetTest tests transactions that use Put and Get
+// operations.
+// NOTE: consider setting GOMAXPROCS to something greater than 1.
+func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
+ // Invariant: value mapped to n is sum of values of 0..n-1.
+ // Each of k transactions takes m distinct random values from 0..n-1, adds 1
+ // to each and m to value mapped to n.
+ // The correctness of sums is checked after all transactions have been
+ // committed.
+ n, m, k := 10, 3, 100
+ for i := 0; i <= n; i++ {
+ if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
+ t.Fatalf("can't write to database")
+ }
+ }
+ var wg sync.WaitGroup
+ wg.Add(k)
+ // TODO(sadovsky): This configuration creates huge resource contention.
+ // Perhaps we should add some random sleep's to reduce the contention.
+ for i := 0; i < k; i++ {
+ go func() {
+ rnd := rand.New(rand.NewSource(239017 * int64(i)))
+ perm := rnd.Perm(n)
+ if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ for j := 0; j <= m; j++ {
+ var keystr string
+ if j < m {
+ keystr = fmt.Sprintf("%05d", perm[j])
+ } else {
+ keystr = fmt.Sprintf("%05d", n)
+ }
+ key := []byte(keystr)
+ val, err := st.Get(key, nil)
+ if err != nil {
+ return fmt.Errorf("can't get key %q: %v", key, err)
+ }
+ intValue, err := strconv.ParseInt(string(val), 10, 64)
+ if err != nil {
+ return fmt.Errorf("can't parse int from %q: %v", val, err)
+ }
+ var newValue int64
+ if j < m {
+ newValue = intValue + 1
+ } else {
+ newValue = intValue + int64(m)
+ }
+ if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
+ return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
+ }
+ }
+ return nil
+ }); err != nil {
+ panic(fmt.Errorf("can't commit transaction: %v", err))
+ }
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ var sum int64
+ for j := 0; j <= n; j++ {
+ keystr := fmt.Sprintf("%05d", j)
+ key := []byte(keystr)
+ val, err := st.Get(key, nil)
+ if err != nil {
+ t.Fatalf("can't get key %q: %v", key, err)
+ }
+ intValue, err := strconv.ParseInt(string(val), 10, 64)
+ if err != nil {
+ t.Fatalf("can't parse int from %q: %v", val, err)
+ }
+ if j < n {
+ sum += intValue
+ } else {
+ if intValue != int64(m*k) {
+ t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
+ }
+ }
+ }
+ if sum != int64(m*k) {
+ t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
+ }
+}
diff --git a/services/syncbase/store/test/util.go b/services/syncbase/store/test/util.go
new file mode 100644
index 0000000..3f8ca38
--- /dev/null
+++ b/services/syncbase/store/test/util.go
@@ -0,0 +1,68 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "bytes"
+ "reflect"
+ "runtime/debug"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// verifyGet verifies that st.Get(key) == value. If value is nil, verifies that
+// the key is not found.
+func verifyGet(t *testing.T, st store.StoreReader, key, value []byte) {
+ valbuf := []byte("tmp")
+ var err error
+ if value != nil {
+ if valbuf, err = st.Get(key, valbuf); err != nil {
+ Fatalf(t, "can't get value of %q: %v", key, err)
+ }
+ if !bytes.Equal(valbuf, value) {
+ Fatalf(t, "unexpected value: got %q, want %q", valbuf, value)
+ }
+ } else {
+ valbuf, err = st.Get(key, valbuf)
+ if !reflect.DeepEqual(&store.ErrUnknownKey{Key: string(key)}, err) {
+ Fatalf(t, "unexpected get error for key %q: %v", key, err)
+ }
+ valcopy := []byte("tmp")
+ // Verify that valbuf is not modified if the key is not found.
+ if !bytes.Equal(valbuf, valcopy) {
+ Fatalf(t, "unexpected value: got %q, want %q", valbuf, valcopy)
+ }
+ }
+}
+
+// verifyGet verifies the next key/value pair of the provided stream.
+// If key is nil, verifies that next Advance call on the stream returns false.
+func verifyAdvance(t *testing.T, s store.Stream, key, value []byte) {
+ ok := s.Advance()
+ if key == nil {
+ if ok {
+ Fatalf(t, "advance returned true unexpectedly")
+ }
+ return
+ }
+ if !ok {
+ Fatalf(t, "can't advance the stream")
+ }
+ var k, v []byte
+ for i := 0; i < 2; i++ {
+ if k = s.Key(k); !bytes.Equal(k, key) {
+ Fatalf(t, "unexpected key: got %q, want %q", k, key)
+ }
+ if v = s.Value(v); !bytes.Equal(v, value) {
+ Fatalf(t, "unexpected value: got %q, want %q", v, value)
+ }
+ }
+}
+
+func Fatalf(t *testing.T, format string, args ...interface{}) {
+ debug.PrintStack()
+ t.Fatalf(format, args...)
+}
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
index d36b260..82a8350 100644
--- a/services/syncbase/store/util.go
+++ b/services/syncbase/store/util.go
@@ -25,6 +25,7 @@
// CopyBytes copies elements from a source slice into a destination slice.
// The returned slice may be a sub-slice of dst if dst was large enough to hold
// src. Otherwise, a newly allocated slice will be returned.
+// TODO(rogulenko): add some tests.
func CopyBytes(dst, src []byte) []byte {
if cap(dst) < len(src) {
newlen := cap(dst)*2 + 2
@@ -33,6 +34,7 @@
}
dst = make([]byte, newlen)
}
+ dst = dst[:len(src)]
copy(dst, src)
- return dst[:len(src)]
+ return dst
}
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index 38ab2de..35f935a 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -12,10 +12,12 @@
"flag"
"v.io/v23"
+ "v.io/v23/security"
"v.io/v23/security/access"
"v.io/x/lib/vlog"
"v.io/syncbase/x/ref/services/syncbase/server"
+ "v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
_ "v.io/x/ref/runtime/factories/generic"
)
@@ -25,6 +27,18 @@
name = flag.String("name", "", "Name to mount at.")
)
+// defaultPerms returns a permissions object that grants all permissions to the
+// provided blessing patterns.
+func defaultPerms(blessingPatterns []security.BlessingPattern) access.Permissions {
+ perms := access.Permissions{}
+ for _, tag := range access.AllTypicalTags() {
+ for _, bp := range blessingPatterns {
+ perms.Add(bp, string(tag))
+ }
+ }
+ return perms
+}
+
func main() {
ctx, shutdown := v23.Init()
defer shutdown()
@@ -37,8 +51,19 @@
vlog.Fatal("s.Listen() failed: ", err)
}
- // TODO(sadovsky): Use a real Permissions.
- service, err := server.NewService(nil, nil, access.Permissions{})
+ perms, err := securityflag.PermissionsFromFlag()
+ if err != nil {
+ vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
+ }
+
+ if perms != nil {
+ vlog.Info("Using permissions from command line flag.")
+ } else {
+ vlog.Info("No permissions flag provided. Giving local principal all permissions.")
+ perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
+ }
+
+ service, err := server.NewService(nil, nil, perms)
if err != nil {
vlog.Fatal("server.NewService() failed: ", err)
}