Merge "syncbase profile: add Snappy"
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index c1c6dcd..188ef16 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -12,6 +12,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
@@ -61,7 +62,15 @@
 	return data.Perms, util.FormatVersion(data.Version), nil
 }
 
-// TODO(sadovsky): Implement Glob.
+func (a *app) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+	// Check perms.
+	sn := a.s.st.NewSnapshot()
+	if err := util.Get(ctx, call, sn, a, &appData{}); err != nil {
+		sn.Close()
+		return nil, err
+	}
+	return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+}
 
 ////////////////////////////////////////
 // util.App methods
diff --git a/services/syncbase/server/db_info.go b/services/syncbase/server/db_info.go
index 6ccc664..e284fae 100644
--- a/services/syncbase/server/db_info.go
+++ b/services/syncbase/server/db_info.go
@@ -37,7 +37,7 @@
 }
 
 func (d *dbInfoLayer) StKey() string {
-	return util.JoinKeyParts(util.DbInfoPrefix, d.stKeyPart())
+	return util.JoinKeyParts(util.DbInfoPrefix, d.a.name, d.stKeyPart())
 }
 
 ////////////////////////////////////////
diff --git a/services/syncbase/server/dispatcher.go b/services/syncbase/server/dispatcher.go
index a6127cd..c676f97 100644
--- a/services/syncbase/server/dispatcher.go
+++ b/services/syncbase/server/dispatcher.go
@@ -8,8 +8,8 @@
 	"strings"
 
 	wire "v.io/syncbase/v23/services/syncbase"
+	pubutil "v.io/syncbase/v23/syncbase/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/nosql"
-	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/verror"
@@ -37,7 +37,7 @@
 	// Validate all key atoms up front, so that we can avoid doing so in all our
 	// method implementations.
 	appName := parts[0]
-	if !util.ValidKeyAtom(appName) {
+	if !pubutil.ValidName(appName) {
 		return nil, nil, wire.NewErrInvalidName(nil, suffix)
 	}
 
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 03377fc..7ca9271 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -10,6 +10,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
 	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
@@ -34,7 +35,7 @@
 // Designed for use from within App.CreateNoSQLDatabase.
 func NewDatabase(ctx *context.T, call rpc.ServerCall, a util.App, name string, perms access.Permissions) (*database, error) {
 	if perms == nil {
-		return nil, verror.New(verror.ErrInternal, nil, "perms must be specified")
+		return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
 	}
 	// TODO(sadovsky): Make storage engine pluggable.
 	d := &database{
@@ -90,7 +91,15 @@
 	return data.Perms, util.FormatVersion(data.Version), nil
 }
 
-// TODO(sadovsky): Implement Glob.
+func (d *database) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+	// Check perms.
+	sn := d.st.NewSnapshot()
+	if err := util.Get(ctx, call, sn, d, &databaseData{}); err != nil {
+		sn.Close()
+		return nil, err
+	}
+	return util.Glob(ctx, call, pattern, sn, util.TablePrefix)
+}
 
 ////////////////////////////////////////
 // util.Database methods
diff --git a/services/syncbase/server/nosql/dispatcher.go b/services/syncbase/server/nosql/dispatcher.go
index 50737c5..db18382 100644
--- a/services/syncbase/server/nosql/dispatcher.go
+++ b/services/syncbase/server/nosql/dispatcher.go
@@ -9,6 +9,7 @@
 
 	wire "v.io/syncbase/v23/services/syncbase"
 	nosqlWire "v.io/syncbase/v23/services/syncbase/nosql"
+	pubutil "v.io/syncbase/v23/syncbase/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
@@ -37,7 +38,7 @@
 	// Validate all key atoms up front, so that we can avoid doing so in all our
 	// method implementations.
 	for _, s := range parts {
-		if !util.ValidKeyAtom(s) {
+		if !pubutil.ValidName(s) {
 			return nil, nil, wire.NewErrInvalidName(nil, suffix)
 		}
 	}
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index c699e88..c26eb39 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -10,7 +10,6 @@
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
-	"v.io/v23/vdl"
 	"v.io/v23/verror"
 )
 
@@ -32,11 +31,11 @@
 ////////////////////////////////////////
 // RPC methods
 
-func (r *row) Get(ctx *context.T, call rpc.ServerCall) (*vdl.Value, error) {
+func (r *row) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
 	return r.get(ctx, call, r.t.d.st)
 }
 
-func (r *row) Put(ctx *context.T, call rpc.ServerCall, value *vdl.Value) error {
+func (r *row) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
 	return r.put(ctx, call, r.t.d.st, value)
 }
 
@@ -68,18 +67,18 @@
 // an authorization check (currently against the table perms).
 // Returns a VDL-compatible error.
 func (r *row) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
-	return util.Get(ctx, call, st, r.t.d, &databaseData{})
+	return util.Get(ctx, call, st, r.t, &tableData{})
 }
 
 // get reads data from the storage engine.
 // Performs authorization check.
 // Returns a VDL-compatible error.
-func (r *row) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) (*vdl.Value, error) {
+func (r *row) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
 	if err := r.checkAccess(ctx, call, st); err != nil {
 		return nil, err
 	}
-	value := &vdl.Value{}
-	if err := util.GetObject(st, r.StKey(), value); err != nil {
+	value, err := st.Get([]byte(r.StKey()), nil)
+	if err != nil {
 		if _, ok := err.(*store.ErrUnknownKey); ok {
 			// We've already done an auth check, so here we can safely return NoExist
 			// rather than NoExistOrNoAccess.
@@ -93,11 +92,14 @@
 // put writes data to the storage engine.
 // Performs authorization check.
 // Returns a VDL-compatible error.
-func (r *row) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value *vdl.Value) error {
+func (r *row) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
 	if err := r.checkAccess(ctx, call, st); err != nil {
 		return err
 	}
-	return util.Put(ctx, call, st, r, value)
+	if err := st.Put([]byte(r.StKey()), value); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
 }
 
 // del deletes data from the storage engine.
@@ -107,5 +109,8 @@
 	if err := r.checkAccess(ctx, call, st); err != nil {
 		return err
 	}
-	return util.Delete(ctx, call, st, r)
+	if err := st.Delete([]byte(r.StKey())); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
 }
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 1983ba8..07198d2 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -9,6 +9,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
@@ -67,23 +68,64 @@
 	})
 }
 
-func (t *table) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit string) error {
+func (t *table) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, end string) error {
 	return verror.NewErrNotImplemented(ctx)
 }
 
+func (t *table) Scan(ctx *context.T, call wire.TableScanServerCall, start, end string) error {
+	sn := t.d.st.NewSnapshot()
+	defer sn.Close()
+	it := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), start, end))
+	sender := call.SendStream()
+	key, value := []byte{}, []byte{}
+	for it.Advance() {
+		key = it.Key(key)
+		parts := util.SplitKeyParts(string(key))
+		sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: it.Value(value)})
+	}
+	if err := it.Err(); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
 func (t *table) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
-	return verror.NewErrNotImplemented(ctx)
+	if prefix != "" {
+		return verror.NewErrNotImplemented(ctx)
+	}
+	return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+		data := &tableData{}
+		return util.Update(ctx, call, st, t, data, func() error {
+			data.Perms = perms
+			return nil
+		})
+	})
 }
 
 func (t *table) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
-	return nil, verror.NewErrNotImplemented(ctx)
+	if key != "" {
+		return nil, verror.NewErrNotImplemented(ctx)
+	}
+	data := &tableData{}
+	if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+		return nil, err
+	}
+	return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
 }
 
 func (t *table) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
 	return verror.NewErrNotImplemented(ctx)
 }
 
-// TODO(sadovsky): Implement Glob.
+func (t *table) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+	// Check perms.
+	sn := t.d.st.NewSnapshot()
+	if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
+		sn.Close()
+		return nil, err
+	}
+	return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.RowPrefix, t.name))
+}
 
 ////////////////////////////////////////
 // util.Layer methods
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index e268787..c8ea107 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -12,6 +12,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
 	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
@@ -34,7 +35,7 @@
 // Returns a VDL-compatible error.
 func NewService(ctx *context.T, call rpc.ServerCall, perms access.Permissions) (*service, error) {
 	if perms == nil {
-		return nil, verror.New(verror.ErrInternal, nil, "perms must be specified")
+		return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
 	}
 	// TODO(sadovsky): Make storage engine pluggable.
 	s := &service{
@@ -75,7 +76,15 @@
 	return data.Perms, util.FormatVersion(data.Version), nil
 }
 
-// TODO(sadovsky): Implement Glob.
+func (s *service) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+	// Check perms.
+	sn := s.st.NewSnapshot()
+	if err := util.Get(ctx, call, sn, s, &serviceData{}); err != nil {
+		sn.Close()
+		return nil, err
+	}
+	return util.Glob(ctx, call, pattern, sn, util.AppPrefix)
+}
 
 ////////////////////////////////////////
 // App management methods
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
new file mode 100644
index 0000000..98ffd48
--- /dev/null
+++ b/services/syncbase/server/util/glob.go
@@ -0,0 +1,77 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util
+
+import (
+	"strings"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
+	"v.io/x/ref/lib/glob"
+)
+
+// globPatternToPrefix takes "foo*" and returns "foo". It returns ErrBadArg for
+// inputs that are not valid glob patterns as well as for inputs that are valid
+// glob patterns but not valid prefixes.
+func globPatternToPrefix(pattern string) (string, error) {
+	if _, err := glob.Parse(pattern); err != nil {
+		return "", verror.NewErrBadArg(nil)
+	}
+	if pattern == "" {
+		return "", verror.NewErrBadArg(nil)
+	}
+	if pattern[len(pattern)-1] != '*' {
+		return "", verror.NewErrBadArg(nil)
+	}
+	res := pattern[:len(pattern)-1]
+	// Disallow chars and char sequences that have special meaning in glob, since
+	// our Glob() does not support these.
+	if strings.ContainsAny(res, "/*?[") {
+		return "", verror.NewErrBadArg(nil)
+	}
+	if strings.Contains(res, "...") {
+		return "", verror.NewErrBadArg(nil)
+	}
+	return res, nil
+}
+
+// TODO(sadovsky): It sucks that Glob must be implemented differently from other
+// streaming RPC handlers. I don't have much confidence that I've implemented
+// both types of streaming correctly.
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan naming.GlobReply, error) {
+	// TODO(sadovsky): Support glob with non-prefix pattern.
+	prefix, err := globPatternToPrefix(pattern)
+	if err != nil {
+		sn.Close()
+		if verror.ErrorID(err) == verror.ErrBadArg.ID {
+			return nil, verror.NewErrNotImplemented(ctx)
+		} else {
+			return nil, verror.New(verror.ErrInternal, ctx, err)
+		}
+	}
+	it := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+	ch := make(chan naming.GlobReply)
+	go func() {
+		defer sn.Close()
+		defer close(ch)
+		key := []byte{}
+		for it.Advance() {
+			key = it.Key(key)
+			parts := SplitKeyParts(string(key))
+			ch <- naming.GlobReplyEntry{naming.MountEntry{Name: parts[len(parts)-1]}}
+		}
+		if err := it.Err(); err != nil {
+			vlog.VI(1).Infof("Glob(%q) failed: %v", pattern, err)
+			ch <- naming.GlobReplyError{naming.GlobError{
+				Error: verror.New(verror.ErrInternal, ctx, err),
+			}}
+		}
+	}()
+	return ch, nil
+}
diff --git a/services/syncbase/server/util/glob_test.go b/services/syncbase/server/util/glob_test.go
new file mode 100644
index 0000000..d9c8b71
--- /dev/null
+++ b/services/syncbase/server/util/glob_test.go
@@ -0,0 +1,54 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Note, we use package util rather than util_test so that we can access the
+// unexported name util.globPatternToPrefix.
+
+package util
+
+import (
+	"testing"
+
+	"v.io/v23/verror"
+)
+
+var (
+	errBadArg = string(verror.ErrBadArg.ID)
+)
+
+func TestGlobPatternToPrefix(t *testing.T) {
+	tests := []struct {
+		pattern, prefix string
+	}{
+		{"", errBadArg},
+		{"*", ""},
+		{"foo", errBadArg},
+		{"foo*", "foo"},
+		{"foo/", errBadArg},
+		{"foo/*", errBadArg},
+		{"foo/bar", errBadArg},
+		{"foo/bar*", errBadArg},
+		{".*", "."},
+		{"..*", ".."},
+		{"...*", errBadArg},
+		{"...foo*", errBadArg},
+		{"foo...foo*", errBadArg},
+		{"..foo*", "..foo"},
+		{"foo..foo*", "foo..foo"},
+		{"...", errBadArg},
+		{"*/...", errBadArg},
+		{"foo/...", errBadArg},
+		{"/", errBadArg},
+		{"/*", errBadArg},
+	}
+	for _, test := range tests {
+		prefix, err := globPatternToPrefix(test.pattern)
+		if err != nil {
+			prefix = string(verror.ErrorID(err))
+		}
+		if prefix != test.prefix {
+			t.Errorf("%q: got %q, want %q", test.pattern, prefix, test.prefix)
+		}
+	}
+}
diff --git a/services/syncbase/server/util/key_util.go b/services/syncbase/server/util/key_util.go
index 542a0ce..d5b6e36 100644
--- a/services/syncbase/server/util/key_util.go
+++ b/services/syncbase/server/util/key_util.go
@@ -4,23 +4,34 @@
 
 package util
 
-// Note, some of the code below is copied from
-// v.io/syncbase/v23/syncbase/util/util.go.
-
 import (
-	"regexp"
 	"strings"
+
+	"v.io/syncbase/v23/syncbase/util"
 )
 
-// TODO(sadovsky): Consider loosening. Perhaps model after MySQL:
-// http://dev.mysql.com/doc/refman/5.7/en/identifiers.html
-var keyAtomRegexp *regexp.Regexp = regexp.MustCompile("^[a-zA-Z0-9_.-]+$")
-
-func ValidKeyAtom(s string) bool {
-	return keyAtomRegexp.MatchString(s)
-}
-
+// JoinKeyParts builds keys for accessing data in the storage engine.
 func JoinKeyParts(parts ...string) string {
 	// TODO(sadovsky): Figure out which delimeter makes the most sense.
 	return strings.Join(parts, ":")
 }
+
+// SplitKeyParts is the inverse of JoinKeyParts.
+func SplitKeyParts(key string) []string {
+	return strings.Split(key, ":")
+}
+
+// ScanPrefixArgs returns args for sn.Scan() for the specified prefix.
+func ScanPrefixArgs(stKeyPrefix, prefix string) ([]byte, []byte) {
+	return ScanRangeArgs(stKeyPrefix, util.PrefixRangeStart(prefix), util.PrefixRangeEnd(prefix))
+}
+
+// ScanRangeArgs returns args for sn.Scan() for the specified range.
+// If end is "", all rows with keys >= start are included.
+func ScanRangeArgs(stKeyPrefix, start, end string) ([]byte, []byte) {
+	fullStart, fullEnd := JoinKeyParts(stKeyPrefix, start), JoinKeyParts(stKeyPrefix, end)
+	if end == "" {
+		fullEnd = util.PrefixRangeEnd(fullEnd)
+	}
+	return []byte(fullStart), []byte(fullEnd)
+}
diff --git a/services/syncbase/server/util/key_util_test.go b/services/syncbase/server/util/key_util_test.go
new file mode 100644
index 0000000..1bc1148
--- /dev/null
+++ b/services/syncbase/server/util/key_util_test.go
@@ -0,0 +1,83 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util_test
+
+import (
+	"reflect"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+)
+
+type kpt struct {
+	parts []string
+	key   string
+}
+
+var keyPartTests []kpt = []kpt{
+	{[]string{"a", "b"}, "a:b"},
+	{[]string{"aa", "bb"}, "aa:bb"},
+	{[]string{"a", "b", "c"}, "a:b:c"},
+}
+
+func TestJoinKeyParts(t *testing.T) {
+	for _, test := range keyPartTests {
+		got, want := util.JoinKeyParts(test.parts...), test.key
+		if !reflect.DeepEqual(got, want) {
+			t.Errorf("%v: got %q, want %q", test.parts, got, want)
+		}
+	}
+}
+
+func TestSplitKeyParts(t *testing.T) {
+	for _, test := range keyPartTests {
+		got, want := util.SplitKeyParts(test.key), test.parts
+		if !reflect.DeepEqual(got, want) {
+			t.Errorf("%q: got %v, want %v", test.key, got, want)
+		}
+	}
+}
+
+func TestScanPrefixArgs(t *testing.T) {
+	tests := []struct {
+		stKeyPrefix, prefix, wantStart, wantEnd string
+	}{
+		{"x", "", "x:", "x;"},
+		{"x", "a", "x:a", "x:b"},
+		{"x", "a\xff", "x:a\xff", "x:b"},
+	}
+	for _, test := range tests {
+		start, end := util.ScanPrefixArgs(test.stKeyPrefix, test.prefix)
+		gotStart, gotEnd := string(start), string(end)
+		if gotStart != test.wantStart {
+			t.Errorf("{%q, %q} start: got %q, want %q", test.stKeyPrefix, test.prefix, gotStart, test.wantStart)
+		}
+		if gotEnd != test.wantEnd {
+			t.Errorf("{%q, %q} end: got %q, want %q", test.stKeyPrefix, test.prefix, gotEnd, test.wantEnd)
+		}
+	}
+}
+
+func TestScanRangeArgs(t *testing.T) {
+	tests := []struct {
+		stKeyPrefix, start, end, wantStart, wantEnd string
+	}{
+		{"x", "", "", "x:", "x;"},   // end "" means "no limit"
+		{"x", "a", "", "x:a", "x;"}, // end "" means "no limit"
+		{"x", "a", "b", "x:a", "x:b"},
+		{"x", "a", "a", "x:a", "x:a"}, // empty range
+		{"x", "b", "a", "x:b", "x:a"}, // empty range
+	}
+	for _, test := range tests {
+		start, end := util.ScanRangeArgs(test.stKeyPrefix, test.start, test.end)
+		gotStart, gotEnd := string(start), string(end)
+		if gotStart != test.wantStart {
+			t.Errorf("{%q, %q, %q} start: got %q, want %q", test.stKeyPrefix, test.start, test.end, gotStart, test.wantStart)
+		}
+		if gotEnd != test.wantEnd {
+			t.Errorf("{%q, %q, %q} end: got %q, want %q", test.stKeyPrefix, test.start, test.end, gotEnd, test.wantEnd)
+		}
+	}
+}
diff --git a/services/syncbase/store/benchmark/benchmark.go b/services/syncbase/store/benchmark/benchmark.go
index a4700e6..c794fa9 100644
--- a/services/syncbase/store/benchmark/benchmark.go
+++ b/services/syncbase/store/benchmark/benchmark.go
@@ -110,7 +110,7 @@
 func ReadSequential(b *testing.B, config *Config) {
 	WriteSequential(b, config)
 	b.ResetTimer()
-	s, _ := config.St.Scan([]byte("0"), []byte("z"))
+	s := config.St.Scan([]byte("0"), []byte("z"))
 	var key, value []byte
 	for i := 0; i < b.N; i++ {
 		if !s.Advance() {
diff --git a/services/syncbase/store/invalid_types.go b/services/syncbase/store/invalid_types.go
new file mode 100644
index 0000000..05bfc13
--- /dev/null
+++ b/services/syncbase/store/invalid_types.go
@@ -0,0 +1,97 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package store
+
+// InvalidSnapshot is a store.Snapshot for which all methods return errors.
+type InvalidSnapshot struct {
+	// Error is the error returned by every method call.
+	Error error
+}
+
+// Close implements the store.Snapshot interface.
+func (s *InvalidSnapshot) Close() error {
+	return s.Error
+}
+
+// Get implements the store.StoreReader interface.
+func (s *InvalidSnapshot) Get(key, valbuf []byte) ([]byte, error) {
+	return valbuf, s.Error
+}
+
+// Scan implements the store.StoreReader interface.
+func (s *InvalidSnapshot) Scan(start, end []byte) Stream {
+	return &InvalidStream{s.Error}
+}
+
+// InvalidStream is a store.Stream for which all methods return errors.
+type InvalidStream struct {
+	// Error is the error returned by every method call.
+	Error error
+}
+
+// Advance implements the store.Stream interface.
+func (s *InvalidStream) Advance() bool {
+	return false
+}
+
+// Key implements the store.Stream interface.
+func (s *InvalidStream) Key(keybuf []byte) []byte {
+	panic(s.Error)
+}
+
+// Value implements the store.Stream interface.
+func (s *InvalidStream) Value(valbuf []byte) []byte {
+	panic(s.Error)
+}
+
+// Err implements the store.Stream interface.
+func (s *InvalidStream) Err() error {
+	return s.Error
+}
+
+// Cancel implements the store.Stream interface.
+func (s *InvalidStream) Cancel() {
+}
+
+// InvalidTransaction is a store.Transaction for which all methods return errors.
+type InvalidTransaction struct {
+	// Error is the error returned by every method call.
+	Error error
+}
+
+// ResetForRetry implements the store.Transaction interface.
+func (tx *InvalidTransaction) ResetForRetry() {
+	panic(tx.Error)
+}
+
+// Get implements the store.StoreReader interface.
+func (tx *InvalidTransaction) Get(key, valbuf []byte) ([]byte, error) {
+	return valbuf, tx.Error
+}
+
+// Scan implements the store.StoreReader interface.
+func (tx *InvalidTransaction) Scan(start, end []byte) Stream {
+	return &InvalidStream{tx.Error}
+}
+
+// Put implements the store.StoreWriter interface.
+func (tx *InvalidTransaction) Put(key, value []byte) error {
+	return tx.Error
+}
+
+// Delete implements the store.StoreWriter interface.
+func (tx *InvalidTransaction) Delete(key []byte) error {
+	return tx.Error
+}
+
+// Commit implements the store.Transaction interface.
+func (tx *InvalidTransaction) Commit() error {
+	return tx.Error
+}
+
+// Abort implements the store.Transaction interface.
+func (tx *InvalidTransaction) Abort() error {
+	return tx.Error
+}
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index a173992..6f89ac8 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -11,22 +11,29 @@
 // #include "syncbase_leveldb.h"
 import "C"
 import (
+	"errors"
 	"sync"
 	"unsafe"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
+var (
+	errClosedStore = errors.New("closed store")
+)
+
 // db is a wrapper around LevelDB that implements the store.Store interface.
-// TODO(rogulenko): ensure thread safety.
 type db struct {
+	// mu protects cDb.
+	mu  sync.RWMutex
 	cDb *C.leveldb_t
 	// Default read/write options.
 	readOptions  *C.leveldb_readoptions_t
 	writeOptions *C.leveldb_writeoptions_t
+	err          error
 	// Used to prevent concurrent transactions.
 	// TODO(rogulenko): improve concurrency.
-	mu sync.Mutex
+	txmu sync.Mutex
 }
 
 var _ store.Store = (*db)(nil)
@@ -58,9 +65,18 @@
 
 // Close implements the store.Store interface.
 func (d *db) Close() error {
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	if d.err != nil {
+		return d.err
+	}
 	C.leveldb_close(d.cDb)
+	d.cDb = nil
 	C.leveldb_readoptions_destroy(d.readOptions)
+	d.readOptions = nil
 	C.leveldb_writeoptions_destroy(d.writeOptions)
+	d.writeOptions = nil
+	d.err = errors.New("closed store")
 	return nil
 }
 
@@ -81,8 +97,13 @@
 }
 
 // Scan implements the store.StoreReader interface.
-func (d *db) Scan(start, end []byte) (store.Stream, error) {
-	return newStream(d, start, end, d.readOptions), nil
+func (d *db) Scan(start, end []byte) store.Stream {
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		return &store.InvalidStream{d.err}
+	}
+	return newStream(d, start, end, d.readOptions)
 }
 
 // Put implements the store.StoreWriter interface.
@@ -103,26 +124,44 @@
 
 // NewTransaction implements the store.Store interface.
 func (d *db) NewTransaction() store.Transaction {
+	// txmu is held until the transaction is successfully committed or aborted.
+	d.txmu.Lock()
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		d.txmu.Unlock()
+		return &store.InvalidTransaction{d.err}
+	}
 	return newTransaction(d)
 }
 
 // NewSnapshot implements the store.Store interface.
 func (d *db) NewSnapshot() store.Snapshot {
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		return &store.InvalidSnapshot{d.err}
+	}
 	return newSnapshot(d)
 }
 
 // getWithOpts returns the value for the given key.
 // cOpts may contain a pointer to a snapshot.
 func (d *db) getWithOpts(key, valbuf []byte, cOpts *C.leveldb_readoptions_t) ([]byte, error) {
+	d.mu.RLock()
+	defer d.mu.RUnlock()
+	if d.err != nil {
+		return valbuf, d.err
+	}
 	var cError *C.char
 	var valLen C.size_t
 	cStr, cLen := cSlice(key)
 	val := C.leveldb_get(d.cDb, cOpts, cStr, cLen, &valLen, &cError)
 	if err := goError(cError); err != nil {
-		return nil, err
+		return valbuf, err
 	}
 	if val == nil {
-		return nil, &store.ErrUnknownKey{Key: string(key)}
+		return valbuf, &store.ErrUnknownKey{Key: string(key)}
 	}
 	defer C.leveldb_free(unsafe.Pointer(val))
 	return store.CopyBytes(valbuf, goBytes(val, valLen)), nil
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
index fccad5f..666cc40 100644
--- a/services/syncbase/store/leveldb/db_test.go
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -19,27 +19,37 @@
 }
 
 func TestStream(t *testing.T) {
-	db, dbPath := newDB()
-	defer destroyDB(db, dbPath)
-	test.RunStreamTest(t, db)
+	runTest(t, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+	runTest(t, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+	runTest(t, test.RunStoreStateTest)
 }
 
 func TestReadWriteBasic(t *testing.T) {
-	st, path := newDB()
-	defer destroyDB(st, path)
-	test.RunReadWriteBasicTest(t, st)
+	runTest(t, test.RunReadWriteBasicTest)
 }
 
 func TestReadWriteRandom(t *testing.T) {
-	st, path := newDB()
-	defer destroyDB(st, path)
-	test.RunReadWriteRandomTest(t, st)
+	runTest(t, test.RunReadWriteRandomTest)
+}
+
+func TestTransactionState(t *testing.T) {
+	runTest(t, test.RunTransactionStateTest)
 }
 
 func TestTransactionsWithGet(t *testing.T) {
-	st, path := newDB()
-	defer destroyDB(st, path)
-	test.RunTransactionsWithGetTest(t, st)
+	runTest(t, test.RunTransactionsWithGetTest)
+}
+
+func runTest(t *testing.T, f func(t *testing.T, st store.Store)) {
+	st, dbPath := newDB()
+	defer destroyDB(st, dbPath)
+	f(t, st)
 }
 
 func newDB() (store.Store, string) {
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index afa9f25..7b82c7c 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -7,15 +7,21 @@
 // #include "leveldb/c.h"
 import "C"
 import (
+	"errors"
+	"sync"
+
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
 // snapshot is a wrapper around LevelDB snapshot that implements
 // the store.Snapshot interface.
 type snapshot struct {
+	// mu protects the state of the snapshot.
+	mu        sync.RWMutex
 	d         *db
 	cSnapshot *C.leveldb_snapshot_t
 	cOpts     *C.leveldb_readoptions_t
+	err       error
 }
 
 var _ store.Snapshot = (*snapshot)(nil)
@@ -26,25 +32,43 @@
 	C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
 	C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
 	return &snapshot{
-		d,
-		cSnapshot,
-		cOpts,
+		d:         d,
+		cSnapshot: cSnapshot,
+		cOpts:     cOpts,
 	}
 }
 
 // Close implements the store.Snapshot interface.
 func (s *snapshot) Close() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.err != nil {
+		return s.err
+	}
 	C.leveldb_readoptions_destroy(s.cOpts)
+	s.cOpts = nil
 	C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
+	s.cSnapshot = nil
+	s.err = errors.New("closed snapshot")
 	return nil
 }
 
 // Get implements the store.StoreReader interface.
 func (s *snapshot) Get(key, valbuf []byte) ([]byte, error) {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	if s.err != nil {
+		return valbuf, s.err
+	}
 	return s.d.getWithOpts(key, valbuf, s.cOpts)
 }
 
 // Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
-	return newStream(s.d, start, end, s.cOpts), nil
+func (s *snapshot) Scan(start, end []byte) store.Stream {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	if s.err != nil {
+		return &store.InvalidStream{s.err}
+	}
+	return newStream(s.d, start, end, s.cOpts)
 }
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index f348567..4705e59 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -9,10 +9,10 @@
 import "C"
 import (
 	"bytes"
+	"errors"
 	"sync"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/v23/verror"
 )
 
 // stream is a wrapper around LevelDB iterator that implements
@@ -39,7 +39,6 @@
 
 func newStream(d *db, start, end []byte, cOpts *C.leveldb_readoptions_t) *stream {
 	cStr, size := cSlice(start)
-	// TODO(rogulenko): check if (db.cDb != nil) under a db-scoped mutex.
 	cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
 	return &stream{
 		cIter: cIter,
@@ -57,7 +56,7 @@
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	s.hasValue = false
-	if s.cIter == nil {
+	if s.err != nil {
 		return false
 	}
 	// The C iterator starts out initialized, pointing at the first value; we
@@ -67,7 +66,7 @@
 	} else {
 		C.syncbase_leveldb_iter_next(s.cIter)
 	}
-	if s.cIter.is_valid != 0 && bytes.Compare(s.end, s.cKey()) > 0 {
+	if s.cIter.is_valid != 0 && (len(s.end) == 0 || bytes.Compare(s.cKey(), s.end) < 0) {
 		s.hasValue = true
 		s.key = s.cKey()
 		s.value = s.cVal()
@@ -113,10 +112,9 @@
 func (s *stream) Cancel() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	if s.cIter == nil {
+	if s.err != nil {
 		return
 	}
-	s.err = verror.New(verror.ErrCanceled, nil)
 	// s.hasValue might be false if Advance was never called.
 	if s.hasValue {
 		// We copy the key and the value from the C heap to the Go heap before
@@ -124,6 +122,7 @@
 		s.key = store.CopyBytes(nil, s.cKey())
 		s.value = store.CopyBytes(nil, s.cVal())
 	}
+	s.err = errors.New("canceled stream")
 	s.destroyLeveldbIter()
 }
 
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index d358f51..12c3a65 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -7,62 +7,85 @@
 // #include "leveldb/c.h"
 import "C"
 import (
+	"errors"
+	"sync"
+
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
 
 // transaction is a wrapper around LevelDB WriteBatch that implements
 // the store.Transaction interface.
-// TODO(rogulenko): handle incorrect usage.
 type transaction struct {
+	mu       sync.Mutex
 	d        *db
 	snapshot store.Snapshot
 	batch    *C.leveldb_writebatch_t
 	cOpts    *C.leveldb_writeoptions_t
+	err      error
 }
 
 var _ store.Transaction = (*transaction)(nil)
 
 func newTransaction(d *db) *transaction {
-	// The lock is held until the transaction is successfully
-	// committed or aborted.
-	d.mu.Lock()
 	return &transaction{
-		d,
-		d.NewSnapshot(),
-		C.leveldb_writebatch_create(),
-		d.writeOptions,
+		d:        d,
+		snapshot: d.NewSnapshot(),
+		batch:    C.leveldb_writebatch_create(),
+		cOpts:    d.writeOptions,
 	}
 }
 
 // close frees allocated C objects and releases acquired locks.
 func (tx *transaction) close() {
-	tx.d.mu.Unlock()
-	tx.snapshot.Close()
+	tx.d.txmu.Unlock()
 	C.leveldb_writebatch_destroy(tx.batch)
+	tx.batch = nil
 	if tx.cOpts != tx.d.writeOptions {
 		C.leveldb_writeoptions_destroy(tx.cOpts)
 	}
+	tx.cOpts = nil
 }
 
 // ResetForRetry implements the store.Transaction interface.
 func (tx *transaction) ResetForRetry() {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.batch == nil {
+		panic(tx.err)
+	}
 	tx.snapshot.Close()
 	tx.snapshot = tx.d.NewSnapshot()
+	tx.err = nil
 	C.leveldb_writebatch_clear(tx.batch)
 }
 
 // Get implements the store.StoreReader interface.
 func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return valbuf, tx.err
+	}
 	return tx.snapshot.Get(key, valbuf)
 }
 
 // Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) store.Stream {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return &store.InvalidStream{tx.err}
+	}
 	return tx.snapshot.Scan(start, end)
 }
 
 // Put implements the store.StoreWriter interface.
 func (tx *transaction) Put(key, value []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return tx.err
+	}
 	cKey, cKeyLen := cSlice(key)
 	cVal, cValLen := cSlice(value)
 	C.leveldb_writebatch_put(tx.batch, cKey, cKeyLen, cVal, cValLen)
@@ -71,6 +94,11 @@
 
 // Delete implements the store.StoreWriter interface.
 func (tx *transaction) Delete(key []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return tx.err
+	}
 	cKey, cKeyLen := cSlice(key)
 	C.leveldb_writebatch_delete(tx.batch, cKey, cKeyLen)
 	return nil
@@ -78,17 +106,32 @@
 
 // Commit implements the store.Transaction interface.
 func (tx *transaction) Commit() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.err != nil {
+		return tx.err
+	}
+	tx.d.mu.Lock()
+	defer tx.d.mu.Unlock()
 	var cError *C.char
 	C.leveldb_write(tx.d.cDb, tx.cOpts, tx.batch, &cError)
 	if err := goError(cError); err != nil {
+		tx.err = errors.New("already attempted to commit transaction")
 		return err
 	}
+	tx.err = errors.New("committed transaction")
 	tx.close()
 	return nil
 }
 
 // Abort implements the store.Transaction interface.
 func (tx *transaction) Abort() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	if tx.batch == nil {
+		return tx.err
+	}
+	tx.err = errors.New("aborted transaction")
 	tx.close()
 	return nil
 }
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index 180cd14..fc7cf0a 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -52,21 +52,21 @@
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if err := s.error(); err != nil {
-		return nil, err
+		return valbuf, err
 	}
 	value, ok := s.data[string(key)]
 	if !ok {
-		return nil, &store.ErrUnknownKey{Key: string(key)}
+		return valbuf, &store.ErrUnknownKey{Key: string(key)}
 	}
 	return store.CopyBytes(valbuf, value), nil
 }
 
 // Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) (store.Stream, error) {
+func (s *snapshot) Scan(start, end []byte) store.Stream {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if err := s.error(); err != nil {
-		return nil, err
+		return &store.InvalidStream{err}
 	}
-	return newStream(s, start, end), nil
+	return newStream(s, start, end)
 }
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index 36a0758..e844b20 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -40,16 +40,16 @@
 	defer st.mu.Unlock()
 	value, ok := st.data[string(key)]
 	if !ok {
-		return nil, &store.ErrUnknownKey{Key: string(key)}
+		return valbuf, &store.ErrUnknownKey{Key: string(key)}
 	}
 	return store.CopyBytes(valbuf, value), nil
 }
 
 // Scan implements the store.StoreReader interface.
-func (st *memstore) Scan(start, end []byte) (store.Stream, error) {
+func (st *memstore) Scan(start, end []byte) store.Stream {
 	st.mu.Lock()
 	defer st.mu.Unlock()
-	return newStream(newSnapshot(st), start, end), nil
+	return newStream(newSnapshot(st), start, end)
 }
 
 // Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index 1ba6e85..4006ce7 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -8,6 +8,7 @@
 	"runtime"
 	"testing"
 
+	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/test"
 )
 
@@ -16,27 +17,39 @@
 }
 
 func TestStream(t *testing.T) {
-	st := New()
-	defer st.Close()
-	test.RunStreamTest(t, st)
+	runTest(t, test.RunStreamTest)
+}
+
+func TestSnapshot(t *testing.T) {
+	runTest(t, test.RunSnapshotTest)
+}
+
+func TestStoreState(t *testing.T) {
+	// TODO(rogulenko): Enable this test once memstore.Close causes memstore to
+	// disallow subsequent operations.
+	// runTest(t, test.RunStoreStateTest)
 }
 
 func TestReadWriteBasic(t *testing.T) {
-	st := New()
-	defer st.Close()
-	test.RunReadWriteBasicTest(t, st)
+	runTest(t, test.RunReadWriteBasicTest)
 }
 
 func TestReadWriteRandom(t *testing.T) {
-	st := New()
-	defer st.Close()
-	test.RunReadWriteRandomTest(t, st)
+	runTest(t, test.RunReadWriteRandomTest)
+}
+
+func TestTransactionState(t *testing.T) {
+	runTest(t, test.RunTransactionStateTest)
 }
 
 func TestTransactionsWithGet(t *testing.T) {
-	st := New()
-	defer st.Close()
 	// TODO(sadovsky): Enable this test once we've added a retry loop to
 	// RunInTransaction. Without that, concurrency makes the test fail.
-	//test.RunTransactionsWithGetTest(t, st)
+	// runTest(t, test.RunTransactionsWithGetTest)
+}
+
+func runTest(t *testing.T, f func(t *testing.T, st store.Store)) {
+	st := New()
+	defer st.Close()
+	f(t, st)
 }
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index f1614eb..221d33c 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -5,11 +5,11 @@
 package memstore
 
 import (
+	"errors"
 	"sort"
 	"sync"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/v23/verror"
 )
 
 type stream struct {
@@ -26,7 +26,7 @@
 func newStream(sn *snapshot, start, end []byte) *stream {
 	keys := []string{}
 	for k := range sn.data {
-		if k >= string(start) && k < string(end) {
+		if k >= string(start) && (len(end) == 0 || k < string(end)) {
 			keys = append(keys, k)
 		}
 	}
@@ -89,5 +89,5 @@
 func (s *stream) Cancel() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	s.err = verror.New(verror.ErrCanceled, nil)
+	s.err = errors.New("canceled stream")
 }
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 3ef45aa..56dcbdd 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -80,19 +80,19 @@
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if err := tx.error(); err != nil {
-		return nil, err
+		return valbuf, err
 	}
 	return tx.sn.Get(key, valbuf)
 }
 
 // Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) (store.Stream, error) {
+func (tx *transaction) Scan(start, end []byte) store.Stream {
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if err := tx.error(); err != nil {
-		return nil, err
+		return &store.InvalidStream{err}
 	}
-	return newStream(tx.sn, start, end), nil
+	return newStream(tx.sn, start, end)
 }
 
 // Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index 926b66d..22a5c99 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -12,11 +12,12 @@
 	// sub-slice of valbuf if valbuf was large enough to hold the entire value.
 	// Otherwise, a newly allocated slice will be returned. It is valid to pass a
 	// nil valbuf.
-	// Fails if the given key is unknown (ErrUnknownKey).
+	// If the given key is unknown, valbuf is returned unchanged and the function
+	// fails with ErrUnknownKey.
 	Get(key, valbuf []byte) ([]byte, error)
 
 	// Scan returns all rows with keys in range [start, end).
-	Scan(start, end []byte) (Stream, error)
+	Scan(start, end []byte) Stream
 }
 
 // StoreWriter writes data to a CRUD-capable storage engine.
diff --git a/services/syncbase/store/test/snapshot.go b/services/syncbase/store/test/snapshot.go
new file mode 100644
index 0000000..0276fd1
--- /dev/null
+++ b/services/syncbase/store/test/snapshot.go
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+	"strings"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// RunSnapshotTest verifies store.Snapshot operations.
+func RunSnapshotTest(t *testing.T, st store.Store) {
+	key1, value1 := []byte("key1"), []byte("value1")
+	st.Put(key1, value1)
+	snapshot := st.NewSnapshot()
+	key2, value2 := []byte("key2"), []byte("value2")
+	st.Put(key2, value2)
+
+	// Test Get and Scan.
+	verifyGet(t, snapshot, key1, value1)
+	verifyGet(t, snapshot, key2, nil)
+	s := snapshot.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, key1, value1)
+	verifyAdvance(t, s, nil, nil)
+
+	// Test functions after Close.
+	if err := snapshot.Close(); err != nil {
+		t.Fatalf("can't close the snapshot: %v", err)
+	}
+	expectedErr := "closed snapshot"
+	if err := snapshot.Close(); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	if _, err := snapshot.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	s = snapshot.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, nil, nil)
+	if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+}
diff --git a/services/syncbase/store/test/store.go b/services/syncbase/store/test/store.go
new file mode 100644
index 0000000..a0b150f
--- /dev/null
+++ b/services/syncbase/store/test/store.go
@@ -0,0 +1,212 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+	"fmt"
+	"math/rand"
+	"strings"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+type operation int
+
+const (
+	Put    operation = 0
+	Delete operation = 1
+)
+
+type testStep struct {
+	op  operation
+	key int
+}
+
+func randomBytes(rnd *rand.Rand, length int) []byte {
+	var res []byte
+	for i := 0; i < length; i++ {
+		res = append(res, '0'+byte(rnd.Intn(10)))
+	}
+	return res
+}
+
+// storeState is the in-memory representation of the store state.
+type storeState struct {
+	// We assume that the database has keys [0..size).
+	size     int
+	rnd      *rand.Rand
+	memtable map[string][]byte
+}
+
+func newStoreState(size int) *storeState {
+	return &storeState{
+		size,
+		rand.New(rand.NewSource(239017)),
+		make(map[string][]byte),
+	}
+}
+
+func (s *storeState) clone() *storeState {
+	other := &storeState{
+		s.size,
+		s.rnd,
+		make(map[string][]byte),
+	}
+	for k, v := range s.memtable {
+		other.memtable[k] = v
+	}
+	return other
+}
+
+// nextKey returns the smallest key in the store that is not less than the
+// provided key. If there is no such key, returns size.
+func (s *storeState) lowerBound(key int) int {
+	for key < s.size {
+		if _, ok := s.memtable[fmt.Sprintf("%05d", key)]; ok {
+			return key
+		}
+		key++
+	}
+	return key
+}
+
+// verify checks that various read operations on store.Store and memtable return
+// the same results.
+func (s *storeState) verify(t *testing.T, st store.StoreReader) {
+	// Verify Get().
+	for i := 0; i < s.size; i++ {
+		keystr := fmt.Sprintf("%05d", i)
+		answer, ok := s.memtable[keystr]
+		if ok {
+			verifyGet(t, st, []byte(keystr), answer)
+		} else {
+			verifyGet(t, st, []byte(keystr), nil)
+		}
+	}
+	// Verify 10 random Scan() calls.
+	for i := 0; i < 10; i++ {
+		start, end := s.rnd.Intn(s.size), s.rnd.Intn(s.size)
+		if start > end {
+			start, end = end, start
+		}
+		end++
+		stream := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
+		for start = s.lowerBound(start); start < end; start = s.lowerBound(start + 1) {
+			keystr := fmt.Sprintf("%05d", start)
+			verifyAdvance(t, stream, []byte(keystr), s.memtable[keystr])
+		}
+		verifyAdvance(t, stream, nil, nil)
+	}
+}
+
+// runReadWriteTest verifies read/write/snapshot operations.
+func runReadWriteTest(t *testing.T, st store.Store, size int, steps []testStep) {
+	s := newStoreState(size)
+	// We verify database state no more than ~100 times to prevent the test from
+	// being slow.
+	frequency := (len(steps) + 99) / 100
+	var states []*storeState
+	var snapshots []store.Snapshot
+	for i, step := range steps {
+		if step.key < 0 || step.key >= s.size {
+			t.Fatalf("invalid test step %v", step)
+		}
+		switch step.op {
+		case Put:
+			key := fmt.Sprintf("%05d", step.key)
+			value := randomBytes(s.rnd, 100)
+			s.memtable[key] = value
+			st.Put([]byte(key), value)
+		case Delete:
+			key := fmt.Sprintf("%05d", step.key)
+			if _, ok := s.memtable[key]; ok {
+				delete(s.memtable, key)
+				st.Delete([]byte(key))
+			}
+		default:
+			t.Fatalf("invalid test step %v", step)
+		}
+		if i%frequency == 0 {
+			s.verify(t, st)
+			states = append(states, s.clone())
+			snapshots = append(snapshots, st.NewSnapshot())
+		}
+	}
+	s.verify(t, st)
+	for i := 0; i < len(states); i++ {
+		states[i].verify(t, snapshots[i])
+		snapshots[i].Close()
+	}
+}
+
+// RunReadWriteBasicTest runs a basic test that verifies reads, writes and
+// snapshots.
+func RunReadWriteBasicTest(t *testing.T, st store.Store) {
+	runReadWriteTest(t, st, 3, []testStep{
+		testStep{Put, 1},
+		testStep{Put, 2},
+		testStep{Delete, 1},
+		testStep{Put, 1},
+		testStep{Put, 2},
+	})
+}
+
+// RunReadWriteRandomTest runs a random-generated test that verifies reads,
+// writes and snapshots.
+func RunReadWriteRandomTest(t *testing.T, st store.Store) {
+	rnd := rand.New(rand.NewSource(239017))
+	var testcase []testStep
+	size := 50
+	for i := 0; i < 10000; i++ {
+		testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
+	}
+	runReadWriteTest(t, st, size, testcase)
+}
+
+// RunStoreStateTest verifies operations that modify the state of a store.Store.
+func RunStoreStateTest(t *testing.T, st store.Store) {
+	key1, value1 := []byte("key1"), []byte("value1")
+	st.Put(key1, value1)
+	key2 := []byte("key2")
+
+	// Test Get and Scan.
+	verifyGet(t, st, key1, value1)
+	verifyGet(t, st, key2, nil)
+	s := st.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, key1, value1)
+	verifyAdvance(t, s, nil, nil)
+
+	// Test functions after Close.
+	if err := st.Close(); err != nil {
+		t.Fatalf("can't close the store: %v", err)
+	}
+	expectedErr := "closed store"
+	if err := st.Close(); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	s = st.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, nil, nil)
+	if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	snapshot := st.NewSnapshot()
+	if _, err := snapshot.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	tx := st.NewTransaction()
+	if _, err := tx.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	if _, err := st.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	if err := st.Put(key1, value1); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+	if err := st.Delete(key1); !strings.Contains(err.Error(), expectedErr) {
+		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+	}
+}
diff --git a/services/syncbase/store/test/stream.go b/services/syncbase/store/test/stream.go
index 295d683..4caeb7c 100644
--- a/services/syncbase/store/test/stream.go
+++ b/services/syncbase/store/test/stream.go
@@ -6,10 +6,10 @@
 
 import (
 	"bytes"
+	"strings"
 	"testing"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/v23/verror"
 )
 
 // RunStreamTest verifies store.Stream operations.
@@ -21,25 +21,14 @@
 	key3, value3 := []byte("key3"), []byte("value3")
 	st.Put(key3, value3)
 
-	s, _ := st.Scan([]byte("a"), []byte("z"))
-	if !s.Advance() {
-		t.Fatalf("can't advance the stream")
-	}
-	var key, value []byte
-	for i := 0; i < 2; i++ {
-		if key = s.Key(key); !bytes.Equal(key, key1) {
-			t.Fatalf("unexpected key: got %q, want %q", key, key1)
-		}
-		if value = s.Value(value); !bytes.Equal(value, value1) {
-			t.Fatalf("unexpected value: got %q, want %q", value, value1)
-		}
-	}
-
+	s := st.Scan([]byte("a"), []byte("z"))
+	verifyAdvance(t, s, key1, value1)
 	if !s.Advance() {
 		t.Fatalf("can't advance the stream")
 	}
 	s.Cancel()
 	for i := 0; i < 2; i++ {
+		var key, value []byte
 		if key = s.Key(key); !bytes.Equal(key, key2) {
 			t.Fatalf("unexpected key: got %q, want %q", key, key2)
 		}
@@ -47,11 +36,8 @@
 			t.Fatalf("unexpected value: got %q, want %q", value, value2)
 		}
 	}
-
-	if s.Advance() {
-		t.Fatalf("advance returned true unexpectedly")
-	}
-	if verror.ErrorID(s.Err()) != verror.ErrCanceled.ID {
+	verifyAdvance(t, s, nil, nil)
+	if !strings.Contains(s.Err().Error(), "canceled stream") {
 		t.Fatalf("unexpected steam error: %v", s.Err())
 	}
 }
diff --git a/services/syncbase/store/test/test.go b/services/syncbase/store/test/test.go
deleted file mode 100644
index 0643bae..0000000
--- a/services/syncbase/store/test/test.go
+++ /dev/null
@@ -1,276 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// TODO(rogulenko): add more tests.
-
-package test
-
-import (
-	"bytes"
-	"fmt"
-	"math/rand"
-	"reflect"
-	"strconv"
-	"sync"
-	"testing"
-
-	"v.io/syncbase/x/ref/services/syncbase/store"
-)
-
-type operation int
-
-const (
-	Put    operation = 0
-	Delete operation = 1
-)
-
-type testStep struct {
-	op  operation
-	key int
-}
-
-func randomBytes(rnd *rand.Rand, length int) []byte {
-	var res []byte
-	for i := 0; i < length; i++ {
-		res = append(res, '0'+byte(rnd.Intn(10)))
-	}
-	return res
-}
-
-// storeState is the in-memory representation of the store state.
-type storeState struct {
-	// We assume that the database has keys [0..size).
-	size     int
-	rnd      *rand.Rand
-	memtable map[string][]byte
-}
-
-func newStoreState(size int) *storeState {
-	return &storeState{
-		size,
-		rand.New(rand.NewSource(239017)),
-		make(map[string][]byte),
-	}
-}
-
-func (s *storeState) clone() *storeState {
-	other := &storeState{
-		s.size,
-		s.rnd,
-		make(map[string][]byte),
-	}
-	for k, v := range s.memtable {
-		other.memtable[k] = v
-	}
-	return other
-}
-
-// nextKey returns the smallest key in the store that is not less than the
-// provided key. If there is no such key, returns size.
-func (s *storeState) lowerBound(key int) int {
-	for key < s.size {
-		if _, ok := s.memtable[fmt.Sprintf("%05d", key)]; ok {
-			return key
-		}
-		key++
-	}
-	return key
-}
-
-// verify checks that various read operations on store.Store and memtable return
-// the same results.
-func (s *storeState) verify(t *testing.T, st store.StoreReader) {
-	var key, value []byte
-	var err error
-	// Verify Get().
-	for i := 0; i < s.size; i++ {
-		keystr := fmt.Sprintf("%05d", i)
-		answer, ok := s.memtable[keystr]
-		key = []byte(keystr)
-		value, err = st.Get(key, value)
-		if ok {
-			if err != nil || !bytes.Equal(value, answer) {
-				t.Fatalf("unexpected get result for %q: got {%q, %v}, want {%q, nil}", keystr, value, err, answer)
-			}
-		} else {
-			if !reflect.DeepEqual(&store.ErrUnknownKey{Key: keystr}, err) {
-				t.Fatalf("unexpected get error for key %q: %v", keystr, err)
-			}
-		}
-	}
-	// Verify 10 random Scan() calls.
-	for i := 0; i < 10; i++ {
-		start, end := s.rnd.Intn(s.size), s.rnd.Intn(s.size)
-		if start > end {
-			start, end = end, start
-		}
-		end++
-		stream, err := st.Scan([]byte(fmt.Sprintf("%05d", start)), []byte(fmt.Sprintf("%05d", end)))
-		if err != nil {
-			t.Fatalf("can't create stream: %v", err)
-		}
-		for stream.Advance() {
-			start = s.lowerBound(start)
-			keystr := fmt.Sprintf("%05d", start)
-			key, value = stream.Key(key), stream.Value(value)
-			if string(key) != keystr {
-				t.Fatalf("unexpected key during scan: got %q, want %q", key, keystr)
-			}
-			if !bytes.Equal(value, s.memtable[keystr]) {
-				t.Fatalf("unexpected value during scan: got %q, want %q", value, s.memtable[keystr])
-			}
-			start++
-		}
-		if start = s.lowerBound(start); start < end {
-			t.Fatalf("stream ended unexpectedly")
-		}
-	}
-}
-
-// runReadWriteTest verifies read/write/snapshot operations.
-func runReadWriteTest(t *testing.T, st store.Store, size int, steps []testStep) {
-	s := newStoreState(size)
-	// We verify database state no more than ~100 times to prevent the test from
-	// being slow.
-	frequency := (len(steps) + 99) / 100
-	var states []*storeState
-	var snapshots []store.Snapshot
-	for i, step := range steps {
-		if step.key < 0 || step.key >= s.size {
-			t.Fatalf("invalid test step %v", step)
-		}
-		switch step.op {
-		case Put:
-			key := fmt.Sprintf("%05d", step.key)
-			value := randomBytes(s.rnd, 100)
-			s.memtable[key] = value
-			st.Put([]byte(key), value)
-		case Delete:
-			key := fmt.Sprintf("%05d", step.key)
-			if _, ok := s.memtable[key]; ok {
-				delete(s.memtable, key)
-				st.Delete([]byte(key))
-			}
-		default:
-			t.Fatalf("invalid test step %v", step)
-		}
-		if i%frequency == 0 {
-			s.verify(t, st)
-			states = append(states, s.clone())
-			snapshots = append(snapshots, st.NewSnapshot())
-		}
-	}
-	s.verify(t, st)
-	for i := 0; i < len(states); i++ {
-		states[i].verify(t, snapshots[i])
-		snapshots[i].Close()
-	}
-}
-
-// RunReadWriteBasicTest runs a basic test that verifies reads, writes and
-// snapshots.
-func RunReadWriteBasicTest(t *testing.T, st store.Store) {
-	runReadWriteTest(t, st, 3, []testStep{
-		testStep{Put, 1},
-		testStep{Put, 2},
-		testStep{Delete, 1},
-		testStep{Put, 1},
-		testStep{Put, 2},
-	})
-}
-
-// RunReadWriteRandomTest runs a random-generated test that verifies reads,
-// writes and snapshots.
-func RunReadWriteRandomTest(t *testing.T, st store.Store) {
-	rnd := rand.New(rand.NewSource(239017))
-	var testcase []testStep
-	size := 50
-	for i := 0; i < 10000; i++ {
-		testcase = append(testcase, testStep{operation(rnd.Intn(2)), rnd.Intn(size)})
-	}
-	runReadWriteTest(t, st, size, testcase)
-}
-
-// RunTransactionsWithGetTest tests transactions that use Put and Get
-// operations.
-// NOTE: consider setting GOMAXPROCS to something greater than 1.
-func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
-	// Invariant: value mapped to n is sum of values of 0..n-1.
-	// Each of k transactions takes m distinct random values from 0..n-1, adds 1
-	// to each and m to value mapped to n.
-	// The correctness of sums is checked after all transactions have been
-	// committed.
-	n, m, k := 10, 3, 100
-	for i := 0; i <= n; i++ {
-		if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
-			t.Fatalf("can't write to database")
-		}
-	}
-	var wg sync.WaitGroup
-	wg.Add(k)
-	// TODO(sadovsky): This configuration creates huge resource contention.
-	// Perhaps we should add some random sleep's to reduce the contention.
-	for i := 0; i < k; i++ {
-		go func() {
-			rnd := rand.New(rand.NewSource(239017 * int64(i)))
-			perm := rnd.Perm(n)
-			if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-				for j := 0; j <= m; j++ {
-					var keystr string
-					if j < m {
-						keystr = fmt.Sprintf("%05d", perm[j])
-					} else {
-						keystr = fmt.Sprintf("%05d", n)
-					}
-					key := []byte(keystr)
-					val, err := st.Get(key, nil)
-					if err != nil {
-						return fmt.Errorf("can't get key %q: %v", key, err)
-					}
-					intValue, err := strconv.ParseInt(string(val), 10, 64)
-					if err != nil {
-						return fmt.Errorf("can't parse int from %q: %v", val, err)
-					}
-					var newValue int64
-					if j < m {
-						newValue = intValue + 1
-					} else {
-						newValue = intValue + int64(m)
-					}
-					if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
-						return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
-					}
-				}
-				return nil
-			}); err != nil {
-				panic(fmt.Errorf("can't commit transaction: %v", err))
-			}
-			wg.Done()
-		}()
-	}
-	wg.Wait()
-	var sum int64
-	for j := 0; j <= n; j++ {
-		keystr := fmt.Sprintf("%05d", j)
-		key := []byte(keystr)
-		val, err := st.Get(key, nil)
-		if err != nil {
-			t.Fatalf("can't get key %q: %v", key, err)
-		}
-		intValue, err := strconv.ParseInt(string(val), 10, 64)
-		if err != nil {
-			t.Fatalf("can't parse int from %q: %v", val, err)
-		}
-		if j < n {
-			sum += intValue
-		} else {
-			if intValue != int64(m*k) {
-				t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
-			}
-		}
-	}
-	if sum != int64(m*k) {
-		t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
-	}
-}
diff --git a/services/syncbase/store/test/transaction.go b/services/syncbase/store/test/transaction.go
new file mode 100644
index 0000000..e078e49
--- /dev/null
+++ b/services/syncbase/store/test/transaction.go
@@ -0,0 +1,154 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+	"fmt"
+	"math/rand"
+	"strconv"
+	"strings"
+	"sync"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// RunTransactionTest verifies operations that modify the state of a
+// store.Transaction.
+func RunTransactionStateTest(t *testing.T, st store.Store) {
+	abortFunctions := []func(t *testing.T, tx store.Transaction) string{
+		func(t *testing.T, tx store.Transaction) string {
+			if err := tx.Abort(); err != nil {
+				t.Fatalf("can't abort the transaction: %v", err)
+			}
+			return "aborted transaction"
+		},
+		func(t *testing.T, tx store.Transaction) string {
+			if err := tx.Commit(); err != nil {
+				t.Fatalf("can't commit the transaction: %v", err)
+			}
+			return "committed transaction"
+		},
+	}
+	for _, fn := range abortFunctions {
+		key1, value1 := []byte("key1"), []byte("value1")
+		st.Put(key1, value1)
+		key2 := []byte("key2")
+		tx := st.NewTransaction()
+
+		// Test Get and Scan.
+		verifyGet(t, tx, key1, value1)
+		verifyGet(t, tx, key2, nil)
+		s := tx.Scan([]byte("a"), []byte("z"))
+		verifyAdvance(t, s, key1, value1)
+		verifyAdvance(t, s, nil, nil)
+
+		// Test functions after Abort.
+		expectedErr := fn(t, tx)
+		if err := tx.Abort(); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		if err := tx.Commit(); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		s = tx.Scan([]byte("a"), []byte("z"))
+		verifyAdvance(t, s, nil, nil)
+		if err := s.Err(); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		if _, err := tx.Get(key1, nil); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		if err := tx.Put(key1, value1); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+		if err := tx.Delete(key1); !strings.Contains(err.Error(), expectedErr) {
+			t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
+		}
+	}
+}
+
+// RunTransactionsWithGetTest tests transactions that use Put and Get
+// operations.
+// NOTE: consider setting GOMAXPROCS to something greater than 1.
+func RunTransactionsWithGetTest(t *testing.T, st store.Store) {
+	// Invariant: value mapped to n is sum of values of 0..n-1.
+	// Each of k transactions takes m distinct random values from 0..n-1, adds 1
+	// to each and m to value mapped to n.
+	// The correctness of sums is checked after all transactions have been
+	// committed.
+	n, m, k := 10, 3, 100
+	for i := 0; i <= n; i++ {
+		if err := st.Put([]byte(fmt.Sprintf("%05d", i)), []byte{'0'}); err != nil {
+			t.Fatalf("can't write to database")
+		}
+	}
+	var wg sync.WaitGroup
+	wg.Add(k)
+	// TODO(sadovsky): This configuration creates huge resource contention.
+	// Perhaps we should add some random sleep's to reduce the contention.
+	for i := 0; i < k; i++ {
+		go func() {
+			rnd := rand.New(rand.NewSource(239017 * int64(i)))
+			perm := rnd.Perm(n)
+			if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+				for j := 0; j <= m; j++ {
+					var keystr string
+					if j < m {
+						keystr = fmt.Sprintf("%05d", perm[j])
+					} else {
+						keystr = fmt.Sprintf("%05d", n)
+					}
+					key := []byte(keystr)
+					val, err := st.Get(key, nil)
+					if err != nil {
+						return fmt.Errorf("can't get key %q: %v", key, err)
+					}
+					intValue, err := strconv.ParseInt(string(val), 10, 64)
+					if err != nil {
+						return fmt.Errorf("can't parse int from %q: %v", val, err)
+					}
+					var newValue int64
+					if j < m {
+						newValue = intValue + 1
+					} else {
+						newValue = intValue + int64(m)
+					}
+					if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
+						return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
+					}
+				}
+				return nil
+			}); err != nil {
+				panic(fmt.Errorf("can't commit transaction: %v", err))
+			}
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+	var sum int64
+	for j := 0; j <= n; j++ {
+		keystr := fmt.Sprintf("%05d", j)
+		key := []byte(keystr)
+		val, err := st.Get(key, nil)
+		if err != nil {
+			t.Fatalf("can't get key %q: %v", key, err)
+		}
+		intValue, err := strconv.ParseInt(string(val), 10, 64)
+		if err != nil {
+			t.Fatalf("can't parse int from %q: %v", val, err)
+		}
+		if j < n {
+			sum += intValue
+		} else {
+			if intValue != int64(m*k) {
+				t.Fatalf("invalid sum value in the database: got %d, want %d", intValue, m*k)
+			}
+		}
+	}
+	if sum != int64(m*k) {
+		t.Fatalf("invalid sum of values in the database: got %d, want %d", sum, m*k)
+	}
+}
diff --git a/services/syncbase/store/test/util.go b/services/syncbase/store/test/util.go
new file mode 100644
index 0000000..3f8ca38
--- /dev/null
+++ b/services/syncbase/store/test/util.go
@@ -0,0 +1,68 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+	"bytes"
+	"reflect"
+	"runtime/debug"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// verifyGet verifies that st.Get(key) == value. If value is nil, verifies that
+// the key is not found.
+func verifyGet(t *testing.T, st store.StoreReader, key, value []byte) {
+	valbuf := []byte("tmp")
+	var err error
+	if value != nil {
+		if valbuf, err = st.Get(key, valbuf); err != nil {
+			Fatalf(t, "can't get value of %q: %v", key, err)
+		}
+		if !bytes.Equal(valbuf, value) {
+			Fatalf(t, "unexpected value: got %q, want %q", valbuf, value)
+		}
+	} else {
+		valbuf, err = st.Get(key, valbuf)
+		if !reflect.DeepEqual(&store.ErrUnknownKey{Key: string(key)}, err) {
+			Fatalf(t, "unexpected get error for key %q: %v", key, err)
+		}
+		valcopy := []byte("tmp")
+		// Verify that valbuf is not modified if the key is not found.
+		if !bytes.Equal(valbuf, valcopy) {
+			Fatalf(t, "unexpected value: got %q, want %q", valbuf, valcopy)
+		}
+	}
+}
+
+// verifyGet verifies the next key/value pair of the provided stream.
+// If key is nil, verifies that next Advance call on the stream returns false.
+func verifyAdvance(t *testing.T, s store.Stream, key, value []byte) {
+	ok := s.Advance()
+	if key == nil {
+		if ok {
+			Fatalf(t, "advance returned true unexpectedly")
+		}
+		return
+	}
+	if !ok {
+		Fatalf(t, "can't advance the stream")
+	}
+	var k, v []byte
+	for i := 0; i < 2; i++ {
+		if k = s.Key(k); !bytes.Equal(k, key) {
+			Fatalf(t, "unexpected key: got %q, want %q", k, key)
+		}
+		if v = s.Value(v); !bytes.Equal(v, value) {
+			Fatalf(t, "unexpected value: got %q, want %q", v, value)
+		}
+	}
+}
+
+func Fatalf(t *testing.T, format string, args ...interface{}) {
+	debug.PrintStack()
+	t.Fatalf(format, args...)
+}
diff --git a/services/syncbase/store/util.go b/services/syncbase/store/util.go
index d36b260..82a8350 100644
--- a/services/syncbase/store/util.go
+++ b/services/syncbase/store/util.go
@@ -25,6 +25,7 @@
 // CopyBytes copies elements from a source slice into a destination slice.
 // The returned slice may be a sub-slice of dst if dst was large enough to hold
 // src. Otherwise, a newly allocated slice will be returned.
+// TODO(rogulenko): add some tests.
 func CopyBytes(dst, src []byte) []byte {
 	if cap(dst) < len(src) {
 		newlen := cap(dst)*2 + 2
@@ -33,6 +34,7 @@
 		}
 		dst = make([]byte, newlen)
 	}
+	dst = dst[:len(src)]
 	copy(dst, src)
-	return dst[:len(src)]
+	return dst
 }
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index 38ab2de..35f935a 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -12,10 +12,12 @@
 	"flag"
 
 	"v.io/v23"
+	"v.io/v23/security"
 	"v.io/v23/security/access"
 	"v.io/x/lib/vlog"
 
 	"v.io/syncbase/x/ref/services/syncbase/server"
+	"v.io/x/ref/lib/security/securityflag"
 	"v.io/x/ref/lib/signals"
 	_ "v.io/x/ref/runtime/factories/generic"
 )
@@ -25,6 +27,18 @@
 	name = flag.String("name", "", "Name to mount at.")
 )
 
+// defaultPerms returns a permissions object that grants all permissions to the
+// provided blessing patterns.
+func defaultPerms(blessingPatterns []security.BlessingPattern) access.Permissions {
+	perms := access.Permissions{}
+	for _, tag := range access.AllTypicalTags() {
+		for _, bp := range blessingPatterns {
+			perms.Add(bp, string(tag))
+		}
+	}
+	return perms
+}
+
 func main() {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
@@ -37,8 +51,19 @@
 		vlog.Fatal("s.Listen() failed: ", err)
 	}
 
-	// TODO(sadovsky): Use a real Permissions.
-	service, err := server.NewService(nil, nil, access.Permissions{})
+	perms, err := securityflag.PermissionsFromFlag()
+	if err != nil {
+		vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
+	}
+
+	if perms != nil {
+		vlog.Info("Using permissions from command line flag.")
+	} else {
+		vlog.Info("No permissions flag provided. Giving local principal all permissions.")
+		perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
+	}
+
+	service, err := server.NewService(nil, nil, perms)
 	if err != nil {
 		vlog.Fatal("server.NewService() failed: ", err)
 	}