Merge "syncbase/vsync: Integrate log and gen vector functionality (Part 2/2)."
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index 8803897..c990976 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -67,10 +67,7 @@
Delete() error {access.Write}
// Delete deletes all rows in the given half-open range [start, limit). If
- // limit is "", all rows with keys >= start are included. If the last row that
- // is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
- // pair is removed.
- // TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
+ // limit is "", all rows with keys >= start are included.
DeleteRowRange(start, limit []byte) error {access.Write}
// Scan returns all rows in the given half-open range [start, limit). If limit
@@ -78,6 +75,13 @@
// from a consistent snapshot taken at the time of the Scan RPC.
Scan(start, limit []byte) stream<_, KeyValue> error {access.Read}
+ // GetPermissions returns an array of (prefix, perms) pairs. The array is
+ // sorted from longest prefix to shortest, so element zero is the one that
+ // applies to the row with the given key. The last element is always the
+ // prefix "" which represents the table's permissions -- the array will always
+ // have at least one element.
+ GetPermissions(key string) ([]PrefixPermissions | error) {access.Admin}
+
// SetPermissions sets the permissions for all current and future rows with
// the given prefix. If the prefix overlaps with an existing prefix, the
// longest prefix that matches a row applies. For example:
@@ -85,18 +89,8 @@
// SetPermissions(ctx, Prefix("a/b/c"), perms2)
// The permissions for row "a/b/1" are perms1, and the permissions for row
// "a/b/c/1" are perms2.
- //
- // SetPermissions will fail if called with a prefix that does not match any
- // rows.
SetPermissions(prefix string, perms access.Permissions) error {access.Admin}
- // GetPermissions returns an array of (prefix, perms) pairs. The array is
- // sorted from longest prefix to shortest, so element zero is the one that
- // applies to the row with the given key. The last element is always the
- // prefix "" which represents the table's permissions -- the array will always
- // have at least one element.
- GetPermissions(key string) ([]PrefixPermissions | error) {access.Admin}
-
// DeletePermissions deletes the permissions for the specified prefix. Any
// rows covered by this prefix will use the next longest prefix's permissions
// (see the array returned by GetPermissions).
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 5319420..2af0a60 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -954,15 +954,18 @@
// Delete deletes this Table.
Delete(*context.T, ...rpc.CallOpt) error
// Delete deletes all rows in the given half-open range [start, limit). If
- // limit is "", all rows with keys >= start are included. If the last row that
- // is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
- // pair is removed.
- // TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
+ // limit is "", all rows with keys >= start are included.
DeleteRowRange(ctx *context.T, start []byte, limit []byte, opts ...rpc.CallOpt) error
// Scan returns all rows in the given half-open range [start, limit). If limit
// is "", all rows with keys >= start are included. The returned stream reads
// from a consistent snapshot taken at the time of the Scan RPC.
Scan(ctx *context.T, start []byte, limit []byte, opts ...rpc.CallOpt) (TableScanClientCall, error)
+ // GetPermissions returns an array of (prefix, perms) pairs. The array is
+ // sorted from longest prefix to shortest, so element zero is the one that
+ // applies to the row with the given key. The last element is always the
+ // prefix "" which represents the table's permissions -- the array will always
+ // have at least one element.
+ GetPermissions(ctx *context.T, key string, opts ...rpc.CallOpt) ([]PrefixPermissions, error)
// SetPermissions sets the permissions for all current and future rows with
// the given prefix. If the prefix overlaps with an existing prefix, the
// longest prefix that matches a row applies. For example:
@@ -970,16 +973,7 @@
// SetPermissions(ctx, Prefix("a/b/c"), perms2)
// The permissions for row "a/b/1" are perms1, and the permissions for row
// "a/b/c/1" are perms2.
- //
- // SetPermissions will fail if called with a prefix that does not match any
- // rows.
SetPermissions(ctx *context.T, prefix string, perms access.Permissions, opts ...rpc.CallOpt) error
- // GetPermissions returns an array of (prefix, perms) pairs. The array is
- // sorted from longest prefix to shortest, so element zero is the one that
- // applies to the row with the given key. The last element is always the
- // prefix "" which represents the table's permissions -- the array will always
- // have at least one element.
- GetPermissions(ctx *context.T, key string, opts ...rpc.CallOpt) ([]PrefixPermissions, error)
// DeletePermissions deletes the permissions for the specified prefix. Any
// rows covered by this prefix will use the next longest prefix's permissions
// (see the array returned by GetPermissions).
@@ -1025,13 +1019,13 @@
return
}
-func (c implTableClientStub) SetPermissions(ctx *context.T, i0 string, i1 access.Permissions, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "SetPermissions", []interface{}{i0, i1}, nil, opts...)
+func (c implTableClientStub) GetPermissions(ctx *context.T, i0 string, opts ...rpc.CallOpt) (o0 []PrefixPermissions, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "GetPermissions", []interface{}{i0}, []interface{}{&o0}, opts...)
return
}
-func (c implTableClientStub) GetPermissions(ctx *context.T, i0 string, opts ...rpc.CallOpt) (o0 []PrefixPermissions, err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "GetPermissions", []interface{}{i0}, []interface{}{&o0}, opts...)
+func (c implTableClientStub) SetPermissions(ctx *context.T, i0 string, i1 access.Permissions, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "SetPermissions", []interface{}{i0, i1}, nil, opts...)
return
}
@@ -1121,15 +1115,18 @@
// Delete deletes this Table.
Delete(*context.T, rpc.ServerCall) error
// Delete deletes all rows in the given half-open range [start, limit). If
- // limit is "", all rows with keys >= start are included. If the last row that
- // is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
- // pair is removed.
- // TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
+ // limit is "", all rows with keys >= start are included.
DeleteRowRange(ctx *context.T, call rpc.ServerCall, start []byte, limit []byte) error
// Scan returns all rows in the given half-open range [start, limit). If limit
// is "", all rows with keys >= start are included. The returned stream reads
// from a consistent snapshot taken at the time of the Scan RPC.
Scan(ctx *context.T, call TableScanServerCall, start []byte, limit []byte) error
+ // GetPermissions returns an array of (prefix, perms) pairs. The array is
+ // sorted from longest prefix to shortest, so element zero is the one that
+ // applies to the row with the given key. The last element is always the
+ // prefix "" which represents the table's permissions -- the array will always
+ // have at least one element.
+ GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]PrefixPermissions, error)
// SetPermissions sets the permissions for all current and future rows with
// the given prefix. If the prefix overlaps with an existing prefix, the
// longest prefix that matches a row applies. For example:
@@ -1137,16 +1134,7 @@
// SetPermissions(ctx, Prefix("a/b/c"), perms2)
// The permissions for row "a/b/1" are perms1, and the permissions for row
// "a/b/c/1" are perms2.
- //
- // SetPermissions will fail if called with a prefix that does not match any
- // rows.
SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error
- // GetPermissions returns an array of (prefix, perms) pairs. The array is
- // sorted from longest prefix to shortest, so element zero is the one that
- // applies to the row with the given key. The last element is always the
- // prefix "" which represents the table's permissions -- the array will always
- // have at least one element.
- GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]PrefixPermissions, error)
// DeletePermissions deletes the permissions for the specified prefix. Any
// rows covered by this prefix will use the next longest prefix's permissions
// (see the array returned by GetPermissions).
@@ -1164,15 +1152,18 @@
// Delete deletes this Table.
Delete(*context.T, rpc.ServerCall) error
// Delete deletes all rows in the given half-open range [start, limit). If
- // limit is "", all rows with keys >= start are included. If the last row that
- // is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
- // pair is removed.
- // TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
+ // limit is "", all rows with keys >= start are included.
DeleteRowRange(ctx *context.T, call rpc.ServerCall, start []byte, limit []byte) error
// Scan returns all rows in the given half-open range [start, limit). If limit
// is "", all rows with keys >= start are included. The returned stream reads
// from a consistent snapshot taken at the time of the Scan RPC.
Scan(ctx *context.T, call *TableScanServerCallStub, start []byte, limit []byte) error
+ // GetPermissions returns an array of (prefix, perms) pairs. The array is
+ // sorted from longest prefix to shortest, so element zero is the one that
+ // applies to the row with the given key. The last element is always the
+ // prefix "" which represents the table's permissions -- the array will always
+ // have at least one element.
+ GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]PrefixPermissions, error)
// SetPermissions sets the permissions for all current and future rows with
// the given prefix. If the prefix overlaps with an existing prefix, the
// longest prefix that matches a row applies. For example:
@@ -1180,16 +1171,7 @@
// SetPermissions(ctx, Prefix("a/b/c"), perms2)
// The permissions for row "a/b/1" are perms1, and the permissions for row
// "a/b/c/1" are perms2.
- //
- // SetPermissions will fail if called with a prefix that does not match any
- // rows.
SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error
- // GetPermissions returns an array of (prefix, perms) pairs. The array is
- // sorted from longest prefix to shortest, so element zero is the one that
- // applies to the row with the given key. The last element is always the
- // prefix "" which represents the table's permissions -- the array will always
- // have at least one element.
- GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]PrefixPermissions, error)
// DeletePermissions deletes the permissions for the specified prefix. Any
// rows covered by this prefix will use the next longest prefix's permissions
// (see the array returned by GetPermissions).
@@ -1241,14 +1223,14 @@
return s.impl.Scan(ctx, call, i0, i1)
}
-func (s implTableServerStub) SetPermissions(ctx *context.T, call rpc.ServerCall, i0 string, i1 access.Permissions) error {
- return s.impl.SetPermissions(ctx, call, i0, i1)
-}
-
func (s implTableServerStub) GetPermissions(ctx *context.T, call rpc.ServerCall, i0 string) ([]PrefixPermissions, error) {
return s.impl.GetPermissions(ctx, call, i0)
}
+func (s implTableServerStub) SetPermissions(ctx *context.T, call rpc.ServerCall, i0 string, i1 access.Permissions) error {
+ return s.impl.SetPermissions(ctx, call, i0, i1)
+}
+
func (s implTableServerStub) DeletePermissions(ctx *context.T, call rpc.ServerCall, i0 string) error {
return s.impl.DeletePermissions(ctx, call, i0)
}
@@ -1285,7 +1267,7 @@
},
{
Name: "DeleteRowRange",
- Doc: "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included. If the last row that\n// is covered by a prefix from SetPermissions is deleted, that (prefix, perms)\n// pair is removed.\n// TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.",
+ Doc: "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.",
InArgs: []rpc.ArgDesc{
{"start", ``}, // []byte
{"limit", ``}, // []byte
@@ -1302,15 +1284,6 @@
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
{
- Name: "SetPermissions",
- Doc: "// SetPermissions sets the permissions for all current and future rows with\n// the given prefix. If the prefix overlaps with an existing prefix, the\n// longest prefix that matches a row applies. For example:\n// SetPermissions(ctx, Prefix(\"a/b\"), perms1)\n// SetPermissions(ctx, Prefix(\"a/b/c\"), perms2)\n// The permissions for row \"a/b/1\" are perms1, and the permissions for row\n// \"a/b/c/1\" are perms2.\n//\n// SetPermissions will fail if called with a prefix that does not match any\n// rows.",
- InArgs: []rpc.ArgDesc{
- {"prefix", ``}, // string
- {"perms", ``}, // access.Permissions
- },
- Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
- },
- {
Name: "GetPermissions",
Doc: "// GetPermissions returns an array of (prefix, perms) pairs. The array is\n// sorted from longest prefix to shortest, so element zero is the one that\n// applies to the row with the given key. The last element is always the\n// prefix \"\" which represents the table's permissions -- the array will always\n// have at least one element.",
InArgs: []rpc.ArgDesc{
@@ -1322,6 +1295,15 @@
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
},
{
+ Name: "SetPermissions",
+ Doc: "// SetPermissions sets the permissions for all current and future rows with\n// the given prefix. If the prefix overlaps with an existing prefix, the\n// longest prefix that matches a row applies. For example:\n// SetPermissions(ctx, Prefix(\"a/b\"), perms1)\n// SetPermissions(ctx, Prefix(\"a/b/c\"), perms2)\n// The permissions for row \"a/b/1\" are perms1, and the permissions for row\n// \"a/b/c/1\" are perms2.",
+ InArgs: []rpc.ArgDesc{
+ {"prefix", ``}, // string
+ {"perms", ``}, // access.Permissions
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
+ },
+ {
Name: "DeletePermissions",
Doc: "// DeletePermissions deletes the permissions for the specified prefix. Any\n// rows covered by this prefix will use the next longest prefix's permissions\n// (see the array returned by GetPermissions).",
InArgs: []rpc.ArgDesc{
diff --git a/v23/syncbase/nosql/client_test.go b/v23/syncbase/nosql/client_test.go
index 435ee50..38f66d2 100644
--- a/v23/syncbase/nosql/client_test.go
+++ b/v23/syncbase/nosql/client_test.go
@@ -19,6 +19,7 @@
)
// TODO(sadovsky): Finish writing tests.
+// TODO(rogulenko): Test perms checking for Glob and Exec.
// Tests various Name, FullName, and Key methods.
func TestNameAndKey(t *testing.T) {
@@ -177,14 +178,160 @@
// Tests that Table.{Set,Get,Delete}Permissions methods work as expected.
func TestTablePerms(t *testing.T) {
- // TODO(sadovsky): Implement.
+ clientACtx, sName, cleanup, sp, ctx := tu.SetupOrDieCustom("clientA", "server", nil)
+ defer cleanup()
+ clientBCtx := tu.NewClient("clientB", "server", ctx, sp)
+ a := tu.CreateApp(t, clientACtx, syncbase.NewService(sName), "a")
+ d := tu.CreateNoSQLDatabase(t, clientACtx, a, "d")
+ tb := tu.CreateTable(t, clientACtx, d, "tb")
+
+ // Permission objects.
+ aAndB := tu.DefaultPerms("server/clientA", "server/clientB")
+ aOnly := tu.DefaultPerms("server/clientA")
+ bOnly := tu.DefaultPerms("server/clientB")
+
+ // Set initial permissions.
+ if err := tb.SetPermissions(clientACtx, nosql.Prefix(""), aAndB); err != nil {
+ t.Fatalf("tb.SetPermissions() failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientACtx, nosql.Prefix("prefix"), aAndB); err != nil {
+ t.Fatalf("tb.SetPermissions() failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix"), aAndB); err != nil {
+ t.Fatalf("tb.SetPermissions() failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientACtx, nosql.Prefix("prefix_a"), aOnly); err != nil {
+ t.Fatalf("tb.SetPermissions() failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix_b"), bOnly); err != nil {
+ t.Fatalf("tb.SetPermissions() failed: %v", err)
+ }
+
+ // Checks A has no access to 'prefix_b' and vice versa.
+ if err := tb.SetPermissions(clientACtx, nosql.Prefix("prefix_b"), aOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientACtx, nosql.Prefix("prefix_b_suffix"), aOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix_a"), bOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix_a_suffix"), bOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+ }
+
+ // Check GetPermissions.
+ wantPerms := []nosql.PrefixPermissions{
+ nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+ }
+ if got, _ := tb.GetPermissions(clientACtx, ""); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "abc"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ wantPerms = []nosql.PrefixPermissions{
+ nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix"), Perms: aAndB},
+ nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix_c"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ wantPerms = []nosql.PrefixPermissions{
+ nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix_a"), Perms: aOnly},
+ nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix"), Perms: aAndB},
+ nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix_a"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix_a_suffix"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ wantPerms = []nosql.PrefixPermissions{
+ nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix_b"), Perms: bOnly},
+ nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix"), Perms: aAndB},
+ nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix_b"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix_b_suffix"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+
+ // Delete some prefix permissions and check again.
+ // Check that A can't delete permissions of B.
+ if err := tb.DeletePermissions(clientACtx, nosql.Prefix("prefix_b")); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.DeletePermissions() should have failed: %v", err)
+ }
+ if err := tb.DeletePermissions(clientBCtx, nosql.Prefix("prefix_a")); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.DeletePermissions() should have failed: %v", err)
+ }
+ // Delete 'prefix' and 'prefix_a'
+ if err := tb.DeletePermissions(clientACtx, nosql.Prefix("prefix")); err != nil {
+ t.Fatalf("tb.DeletePermissions() failed: %v", err)
+ }
+ if err := tb.DeletePermissions(clientACtx, nosql.Prefix("prefix_a")); err != nil {
+ t.Fatalf("tb.DeletePermissions() failed: %v", err)
+ }
+ // Check DeletePermissions is idempotent.
+ if err := tb.DeletePermissions(clientACtx, nosql.Prefix("prefix")); err != nil {
+ t.Fatalf("tb.DeletePermissions() failed: %v", err)
+ }
+
+ // Check GetPermissions again.
+ wantPerms = []nosql.PrefixPermissions{
+ nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+ }
+ if got, _ := tb.GetPermissions(clientACtx, ""); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix_a"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix_a_suffix"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ wantPerms = []nosql.PrefixPermissions{
+ nosql.PrefixPermissions{Prefix: nosql.Prefix("prefix_b"), Perms: bOnly},
+ nosql.PrefixPermissions{Prefix: nosql.Prefix(""), Perms: aAndB},
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix_b"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+ if got, _ := tb.GetPermissions(clientACtx, "prefix_b_suffix"); !reflect.DeepEqual(got, wantPerms) {
+ t.Fatalf("Unexpected permissions: got %v, want %v", got, wantPerms)
+ }
+
+ // Remove B from table-level permissions and check B has no access.
+ if err := tb.SetPermissions(clientACtx, nosql.Prefix(""), aOnly); err != nil {
+ t.Fatalf("tb.SetPermissions() failed: %v", err)
+ }
+ if _, err := tb.GetPermissions(clientBCtx, ""); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.GetPermissions() should have failed: %v", err)
+ }
+ if _, err := tb.GetPermissions(clientBCtx, "prefix_b"); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.GetPermissions() should have failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientBCtx, nosql.Prefix(""), bOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientBCtx, nosql.Prefix("prefix_b"), bOnly); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.SetPermissions() should have failed: %v", err)
+ }
}
////////////////////////////////////////
// Tests involving rows
-// TODO(sadovsky): Test perms-checking in all test functions below.
-
type Foo struct {
I int
S string
@@ -365,3 +512,68 @@
t.Fatalf("r.Get() should have failed: %v", err)
}
}
+
+// Test permission checking in Row.{Get,Put,Delete} and
+// Table.{Scan, DeleteRowRange}.
+func TestRowPermissions(t *testing.T) {
+ clientACtx, sName, cleanup, sp, ctx := tu.SetupOrDieCustom("clientA", "server", nil)
+ defer cleanup()
+ clientBCtx := tu.NewClient("clientB", "server", ctx, sp)
+ a := tu.CreateApp(t, clientACtx, syncbase.NewService(sName), "a")
+ d := tu.CreateNoSQLDatabase(t, clientACtx, a, "d")
+ tb := tu.CreateTable(t, clientACtx, d, "tb")
+
+ // Permission objects.
+ aAndB := tu.DefaultPerms("server/clientA", "server/clientB")
+ aOnly := tu.DefaultPerms("server/clientA")
+ bOnly := tu.DefaultPerms("server/clientB")
+
+ // Set initial permissions.
+ if err := tb.SetPermissions(clientACtx, nosql.Prefix(""), aAndB); err != nil {
+ t.Fatalf("tb.SetPermissions() failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientACtx, nosql.Prefix("a"), aOnly); err != nil {
+ t.Fatalf("tb.SetPermissions() failed: %v", err)
+ }
+ if err := tb.SetPermissions(clientBCtx, nosql.Prefix("b"), bOnly); err != nil {
+ t.Fatalf("tb.SetPermissions() failed: %v", err)
+ }
+
+ // Add some key-value pairs.
+ ra := tb.Row("afoo")
+ rb := tb.Row("bfoo")
+ if err := ra.Put(clientACtx, Foo{}); err != nil {
+ t.Fatalf("ra.Put() failed: %v", err)
+ }
+ if err := rb.Put(clientBCtx, Foo{}); err != nil {
+ t.Fatalf("rb.Put() failed: %v", err)
+ }
+
+ // Check A doesn't have access to 'b'.
+ if err := rb.Get(clientACtx, &Foo{}); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("rb.Get() should have failed: %v", err)
+ }
+ if err := rb.Put(clientACtx, Foo{}); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("rb.Put() should have failed: %v", err)
+ }
+ if err := rb.Delete(clientACtx); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("rb.Delete() should have failed: %v", err)
+ }
+ // Test Table.Delete and Scan.
+ if err := tb.Delete(clientACtx, nosql.Prefix("")); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("tb.Delete should have failed: %v", err)
+ }
+ s := tb.Scan(clientACtx, nosql.Prefix(""))
+ if !s.Advance() {
+ t.Fatalf("Stream should have advanced: %v", s.Err())
+ }
+ if s.Key() != "afoo" {
+ t.Fatalf("Unexpected key: got %q, want %q", s.Key(), "afoo")
+ }
+ if s.Advance() {
+ t.Fatalf("Stream advanced unexpectedly")
+ }
+ if err := s.Err(); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("Unexpected stream error: %v", err)
+ }
+}
diff --git a/v23/syncbase/nosql/internal/query/demo/db/db.go b/v23/syncbase/nosql/internal/query/demo/db/db.go
index 71434a2..8a96f0b 100644
--- a/v23/syncbase/nosql/internal/query/demo/db/db.go
+++ b/v23/syncbase/nosql/internal/query/demo/db/db.go
@@ -7,7 +7,6 @@
import (
"errors"
"fmt"
- "strings"
"time"
"v.io/syncbase/v23/syncbase/nosql/query_db"
@@ -43,10 +42,10 @@
}
type keyValueStreamImpl struct {
- table table
- cursor int
- prefixes []string
- prefixCursor int
+ table table
+ cursor int
+ keyRanges query_db.KeyRanges
+ rangeCursor int
}
func (kvs *keyValueStreamImpl) Advance() bool {
@@ -55,16 +54,17 @@
if kvs.cursor >= len(kvs.table.rows) {
return false
}
- for kvs.prefixCursor < len(kvs.prefixes) {
- // does it match any prefix
- if kvs.prefixes[kvs.prefixCursor] == "" || strings.HasPrefix(kvs.table.rows[kvs.cursor].key, kvs.prefixes[kvs.prefixCursor]) {
+ for kvs.rangeCursor < len(kvs.keyRanges) {
+ // does it match any keyRange (or is the keyRange the 0-255 wildcard)?
+ if (kvs.keyRanges[kvs.rangeCursor].Start == string([]byte{0}) && kvs.keyRanges[kvs.rangeCursor].Limit == string([]byte{255})) ||
+ (kvs.table.rows[kvs.cursor].key >= kvs.keyRanges[kvs.rangeCursor].Start && kvs.table.rows[kvs.cursor].key <= kvs.keyRanges[kvs.rangeCursor].Limit) {
return true
}
- // Keys and prefixes are both sorted low to high, so we can increment
- // prefixCursor if the prefix is < the key.
- if kvs.prefixes[kvs.prefixCursor] < kvs.table.rows[kvs.cursor].key {
- kvs.prefixCursor++
- if kvs.prefixCursor >= len(kvs.prefixes) {
+ // Keys and keyRanges are both sorted low to high, so we can increment
+ // rangeCursor if the keyRange.Limit is < the key.
+ if kvs.keyRanges[kvs.rangeCursor].Limit < kvs.table.rows[kvs.cursor].key {
+ kvs.rangeCursor++
+ if kvs.rangeCursor >= len(kvs.keyRanges) {
return false
}
} else {
@@ -86,11 +86,11 @@
func (kvs *keyValueStreamImpl) Cancel() {
}
-func (t table) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t table) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
var keyValueStreamImpl keyValueStreamImpl
keyValueStreamImpl.table = t
keyValueStreamImpl.cursor = -1
- keyValueStreamImpl.prefixes = prefixes
+ keyValueStreamImpl.keyRanges = keyRanges
return &keyValueStreamImpl, nil
}
diff --git a/v23/syncbase/nosql/internal/query/demo/demo.go b/v23/syncbase/nosql/internal/query/demo/demo.go
index b19e0cc..e369dc4 100644
--- a/v23/syncbase/nosql/internal/query/demo/demo.go
+++ b/v23/syncbase/nosql/internal/query/demo/demo.go
@@ -24,7 +24,7 @@
var (
format = flag.String("format", "table",
- "Output format. 'table': human-readable table; 'csv': comma-separated values, use -csv-delimiter to control the delimiter.")
+ "Output format. 'table': human-readable table; 'csv': comma-separated values, use -csv-delimiter to control the delimiter; 'json': JSON arrays.")
csvDelimiter = flag.String("csv-delimiter", ",", "Delimiter to use when printing data as CSV (e.g. \"\t\", \",\")")
)
@@ -63,10 +63,16 @@
if err := writer.WriteTable(os.Stdout, columnNames, rs); err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
}
- } else {
+ } else if *format == "csv" {
if err := writer.WriteCSV(os.Stdout, columnNames, rs, *csvDelimiter); err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
}
+ } else if *format == "json" {
+ if err := writer.WriteJson(os.Stdout, columnNames, rs); err != nil {
+ fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
+ }
+ } else {
+ panic(fmt.Sprintf("invalid format flag value: %v", *format))
}
}
}
@@ -75,8 +81,8 @@
shutdown := db.InitDB()
defer shutdown()
- if *format != "table" && *format != "csv" {
- fmt.Fprintf(os.Stderr, "Unsupported -format %q. Must be one of 'table' or 'csv'.\n", *format)
+ if *format != "table" && *format != "csv" && *format != "json" {
+ fmt.Fprintf(os.Stderr, "Unsupported -format %q. Must be one of 'table', 'csv', or 'json'.\n", *format)
os.Exit(-1)
}
@@ -103,10 +109,11 @@
break
}
} else {
- if q == "" || strings.ToLower(q) == "exit" || strings.ToLower(q) == "quit" {
+ tq := strings.ToLower(strings.TrimSpace(q))
+ if tq == "" || tq == "exit" || tq == "quit" {
break
}
- if strings.ToLower(q) == "dump" {
+ if tq == "dump" {
dumpDB(d)
} else {
queryExec(d, q)
diff --git a/v23/syncbase/nosql/internal/query/demo/writer/writer.go b/v23/syncbase/nosql/internal/query/demo/writer/writer.go
index 3e2d997..20d61fe 100644
--- a/v23/syncbase/nosql/internal/query/demo/writer/writer.go
+++ b/v23/syncbase/nosql/internal/query/demo/writer/writer.go
@@ -5,6 +5,7 @@
package writer
import (
+ "encoding/json"
"errors"
"fmt"
"io"
@@ -138,6 +139,34 @@
return str
}
+// WriteJson formats the result as a JSON array of arrays (rows) of values.
+func WriteJson(out io.Writer, columnNames []string, rs query.ResultStream) error {
+ io.WriteString(out, "[\n")
+ io.WriteString(out, " [")
+ delim := ""
+ for _, cName := range columnNames {
+ str, err := json.Marshal(cName)
+ if err != nil {
+ panic(fmt.Sprintf("JSON marshalling failed for column name: %v", err))
+ }
+ io.WriteString(out, fmt.Sprintf("%s%s", delim, str))
+ delim = ", "
+ }
+ io.WriteString(out, "]\n")
+ for rs.Advance() {
+ io.WriteString(out, fmt.Sprintf(" ,["))
+ delim := ""
+ for _, column := range rs.Result() {
+ str := toJson(column)
+ io.WriteString(out, fmt.Sprintf("%s%s", delim, str))
+ delim = ", "
+ }
+ io.WriteString(out, "]\n")
+ }
+ io.WriteString(out, "]\n")
+ return rs.Err()
+}
+
// Converts VDL value to readable yet parseable string representation.
// If nested is not set, strings outside composites are left unquoted.
// TODO(ivanpi): Handle cycles and improve non-tree DAG handling.
@@ -248,3 +277,129 @@
}
return begin + strings.Join(elems, sep) + end
}
+
+// Converts VDL value to JSON representation.
+func toJson(val *vdl.Value) string {
+ jf := toJsonFriendly(val)
+ jOut, err := json.Marshal(jf)
+ if err != nil {
+ panic(fmt.Sprintf("JSON marshalling failed: %v", err))
+ }
+ return string(jOut)
+}
+
+// Converts VDL value to Go type compatible with json.Marshal().
+func toJsonFriendly(val *vdl.Value) interface{} {
+ switch val.Type() {
+ case vdl.TypeOf(vtime.Time{}), vdl.TypeOf(vtime.Duration{}):
+ s, err := toStringNative(val)
+ if err != nil {
+ panic(fmt.Sprintf("toStringNative failed for builtin time type: %v", err))
+ }
+ return s
+ default:
+ // fall through to Kind switch
+ }
+ switch val.Kind() {
+ case vdl.Bool:
+ return val.Bool()
+ case vdl.Byte:
+ return val.Byte()
+ case vdl.Uint16, vdl.Uint32, vdl.Uint64:
+ return val.Uint()
+ case vdl.Int16, vdl.Int32, vdl.Int64:
+ return val.Int()
+ case vdl.Float32, vdl.Float64:
+ return val.Float()
+ case vdl.Complex64, vdl.Complex128:
+ // Go doesn't support marshalling complex values, we need to stringify.
+ c := val.Complex()
+ return fmt.Sprintf("%v+%vi", real(c), imag(c))
+ case vdl.String:
+ return val.RawString()
+ case vdl.Enum:
+ return val.EnumLabel()
+ case vdl.Array, vdl.List:
+ arr := make([]interface{}, val.Len())
+ for i, _ := range arr {
+ arr[i] = toJsonFriendly(val.Index(i))
+ }
+ return arr
+ case vdl.Any, vdl.Optional:
+ if val.IsNil() {
+ return nil
+ }
+ return toJsonFriendly(val.Elem())
+ case vdl.Struct:
+ // TODO(ivanpi): Consider lowercasing field names.
+ return toOrderedMap(val.Type().NumField(), func(i int) (string, interface{}) {
+ return val.Type().Field(i).Name, toJsonFriendly(val.StructField(i))
+ })
+ case vdl.Union:
+ // TODO(ivanpi): Consider lowercasing field name.
+ ui, uv := val.UnionField()
+ return toOrderedMap(1, func(_ int) (string, interface{}) {
+ return val.Type().Field(ui).Name, toJsonFriendly(uv)
+ })
+ case vdl.Set:
+ // TODO(ivanpi): vdl.SortValuesAsString() used for predictable output ordering.
+ // Use a more sensible sort for numbers etc.
+ keys := vdl.SortValuesAsString(val.Keys())
+ return toOrderedMap(len(keys), func(i int) (string, interface{}) {
+ return toString(keys[i], false), true
+ })
+ case vdl.Map:
+ // TODO(ivanpi): vdl.SortValuesAsString() used for predictable output ordering.
+ // Use a more sensible sort for numbers etc.
+ keys := vdl.SortValuesAsString(val.Keys())
+ return toOrderedMap(len(keys), func(i int) (string, interface{}) {
+ return toString(keys[i], false), toJsonFriendly(val.MapIndex(keys[i]))
+ })
+ case vdl.TypeObject:
+ return val.String()
+ default:
+ panic(fmt.Sprintf("unknown Kind %s", val.Kind()))
+ }
+}
+
+// Serializes to JSON object, preserving key order.
+// Native Go map will serialize to JSON object with sorted keys, which is
+// unexpected behaviour for a struct.
+type orderedMap []orderedMapElem
+
+type orderedMapElem struct {
+ Key string
+ Val interface{}
+}
+
+var _ json.Marshaler = (*orderedMap)(nil)
+
+// Builds an orderedMap with n elements, obtaining the key and value of element
+// i using elemToKeyVal(i).
+func toOrderedMap(n int, elemToKeyVal func(i int) (string, interface{})) orderedMap {
+ om := make(orderedMap, n)
+ for i, _ := range om {
+ om[i].Key, om[i].Val = elemToKeyVal(i)
+ }
+ return om
+}
+
+// Serializes orderedMap to JSON object, preserving key order.
+func (om orderedMap) MarshalJSON() (_ []byte, rerr error) {
+ defer func() {
+ if r := recover(); r != nil {
+ rerr = fmt.Errorf("orderedMap: %v", r)
+ }
+ }()
+ return []byte(listToString("{", ",", "}", len(om), func(i int) string {
+ keyJson, err := json.Marshal(om[i].Key)
+ if err != nil {
+ panic(err)
+ }
+ valJson, err := json.Marshal(om[i].Val)
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("%s:%s", keyJson, valJson)
+ })), nil
+}
diff --git a/v23/syncbase/nosql/internal/query/demo/writer/writer_test.go b/v23/syncbase/nosql/internal/query/demo/writer/writer_test.go
index 3dd5b95..3c8c642 100644
--- a/v23/syncbase/nosql/internal/query/demo/writer/writer_test.go
+++ b/v23/syncbase/nosql/internal/query/demo/writer/writer_test.go
@@ -6,6 +6,7 @@
import (
"bytes"
+ "encoding/json"
"testing"
"time"
@@ -348,3 +349,136 @@
}
}
}
+
+func TestWriteJson(t *testing.T) {
+ type testCase struct {
+ columns []string
+ rows [][]interface{}
+ // To make the test cases easier to read, output should have a leading
+ // newline.
+ output string
+ }
+ tests := []testCase{
+ { // Basic.
+ []string{"c\n1", "c鶊2"},
+ [][]interface{}{
+ {5, "foo\nbar"},
+ {6, "bar\tfoo"},
+ },
+ `
+[
+ ["c\n1", "c鶊2"]
+ ,[5, "foo\nbar"]
+ ,[6, "bar\tfoo"]
+]
+`,
+ },
+ { // Numbers.
+ []string{"byte", "uint16", "uint32", "uint64", "int16", "int32", "int64",
+ "float32", "float64", "complex64", "complex128"},
+ [][]interface{}{
+ {
+ byte(12), uint16(1234), uint32(5678), uint64(999888777666), int16(9876), int32(876543), int64(128),
+ float32(3.14159), float64(2.71828182846), complex64(123.0 + 7.0i), complex128(456.789 + 10.1112i),
+ },
+ {
+ byte(9), uint16(99), uint32(999), uint64(9999999), int16(9), int32(99), int64(88),
+ float32(1.41421356237), float64(1.73205080757), complex64(9.87 + 7.65i), complex128(4.32 + 1.0i),
+ },
+ },
+ `
+[
+ ["byte", "uint16", "uint32", "uint64", "int16", "int32", "int64", "float32", "float64", "complex64", "complex128"]
+ ,[12, 1234, 5678, 999888777666, 9876, 876543, 128, 3.141590118408203, 2.71828182846, "123+7i", "456.789+10.1112i"]
+ ,[9, 99, 999, 9999999, 9, 99, 88, 1.4142135381698608, 1.73205080757, "9.869999885559082+7.650000095367432i", "4.32+1i"]
+]
+`,
+ },
+ { // Empty result.
+ []string{},
+ [][]interface{}{},
+ `
+[
+ []
+]
+`,
+ },
+ { // Empty values.
+ []string{"blank", "empty", "nil"},
+ [][]interface{}{
+ {struct{}{}, []string{}, nil},
+ {},
+ },
+ `
+[
+ ["blank", "empty", "nil"]
+ ,[{}, [], null]
+ ,[]
+]
+`,
+ },
+ {
+ []string{"c1"},
+ [][]interface{}{
+ {db.Customer{"John Smith", 1, true, db.AddressInfo{"1 Main St.", "Palo Alto", "CA", "94303"}, db.CreditReport{Agency: db.CreditAgencyEquifax, Report: db.AgencyReportEquifaxReport{db.EquifaxCreditReport{'A'}}}}},
+ {db.Invoice{1, 1000, 42, db.AddressInfo{"1 Main St.", "Palo Alto", "CA", "94303"}}},
+ },
+ `
+[
+ ["c1"]
+ ,[{"Name":"John Smith","Id":1,"Active":true,"Address":{"Street":"1 Main St.","City":"Palo Alto","State":"CA","Zip":"94303"},"Credit":{"Agency":"Equifax","Report":{"EquifaxReport":{"Rating":65}}}}]
+ ,[{"CustId":1,"InvoiceNum":1000,"Amount":42,"ShipTo":{"Street":"1 Main St.","City":"Palo Alto","State":"CA","Zip":"94303"}}]
+]
+`,
+ },
+ {
+ []string{"nil", "composite", "typeobj"},
+ [][]interface{}{
+ {
+ nil,
+ db.Composite{db.Array2String{"foo", "bar"}, []int32{1, 2}, map[int32]struct{}{1: struct{}{}, 2: struct{}{}}, map[string]int32{"foo": 1, "bar": 2}},
+ vdl.TypeOf(map[string]struct{}{}),
+ },
+ },
+ `
+[
+ ["nil", "composite", "typeobj"]
+ ,[null, {"Arr":["foo","bar"],"ListInt":[1,2],"MySet":{"1":true,"2":true},"Map":{"bar":2,"foo":1}}, "typeobject(set[string])"]
+]
+`,
+ },
+ {
+ []string{"c1"},
+ [][]interface{}{
+ {
+ db.Recursive{nil, &db.Times{time.Unix(123456789, 42244224), time.Duration(1337)}, map[db.Array2String]db.Recursive{
+ db.Array2String{"a", "棎鶊鵱"}: db.Recursive{},
+ db.Array2String{"x", "y"}: db.Recursive{vdl.ValueOf(db.CreditReport{Agency: db.CreditAgencyExperian, Report: db.AgencyReportExperianReport{db.ExperianCreditReport{db.ExperianRatingGood}}}), nil, nil},
+ }},
+ },
+ },
+ `
+[
+ ["c1"]
+ ,[{"Any":null,"Maybe":{"Stamp":"1973-11-29 21:33:09.042244224 +0000 UTC","Interval":"1.337µs"},"Rec":{"[\"a\", \"棎鶊鵱\"]":{"Any":null,"Maybe":null,"Rec":{}},"[\"x\", \"y\"]":{"Any":{"Agency":"Experian","Report":{"ExperianReport":{"Rating":"Good"}}},"Maybe":null,"Rec":{}}}}]
+]
+`,
+ },
+ }
+ for _, test := range tests {
+ var b bytes.Buffer
+ if err := writer.WriteJson(&b, test.columns, newResultStream(test.rows)); err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ continue
+ }
+ var decoded interface{}
+ if err := json.Unmarshal(b.Bytes(), &decoded); err != nil {
+ t.Errorf("Got invalid JSON: %v", err)
+ }
+ // Add a leading newline to the output to match the leading newline
+ // in our test cases.
+ if got, want := "\n"+b.String(), test.output; got != want {
+ t.Errorf("Wrong output:\nGOT: %q\nWANT:%q", got, want)
+ }
+ }
+}
diff --git a/v23/syncbase/nosql/internal/query/query.go b/v23/syncbase/nosql/internal/query/query.go
index da87502..bcbb3cf 100644
--- a/v23/syncbase/nosql/internal/query/query.go
+++ b/v23/syncbase/nosql/internal/query/query.go
@@ -65,18 +65,18 @@
return projection
}
-// Given a query (i.e.,, select statement), return the key prefixes needed to satisfy the query.
-// A return of a single empty string ([]string{ "" }) means fetch all keys.
-func CompileKeyPrefixes(w *query_parser.WhereClause) []string {
+// Given a query (i.e.,, select statement), return the key ranges needed to satisfy the query.
+// A return of a single element array of string([]byte{0}), string([]byte{255}) means fetch all.
+func CompileKeyRanges(w *query_parser.WhereClause) query_db.KeyRanges {
// First determine if every key needs to be fetched. To do this, evaluate the
// where clause substituting false for every key expression and true for every
// other (type for value) expression. If the where clause evaluates to true,
// it is possible for a row to be selected without any dependence on the contents
// of the key. In that case, all keys must be fetched.
if w == nil || CheckIfAllKeysMustBeFetched(w.Expr) {
- return []string{""}
+ return query_db.KeyRanges{query_db.KeyRange{string([]byte{0}), string([]byte{255})}}
} else {
- return query_checker.CompileKeyPrefixes(w)
+ return query_checker.CompileKeyRanges(w)
}
}
@@ -176,8 +176,8 @@
}
func execSelect(db query_db.Database, s *query_parser.SelectStatement) ([]string, ResultStream, error) {
- prefixes := CompileKeyPrefixes(s.Where)
- keyValueStream, err := s.From.Table.DBTable.Scan(prefixes)
+ keyRanges := CompileKeyRanges(s.Where)
+ keyValueStream, err := s.From.Table.DBTable.Scan(keyRanges)
if err != nil {
return nil, nil, syncql.NewErrScanError(db.GetContext(), s.Off, err)
}
diff --git a/v23/syncbase/nosql/internal/query/query_checker/query_checker.go b/v23/syncbase/nosql/internal/query/query_checker/query_checker.go
index ce807c0..81bfea3 100644
--- a/v23/syncbase/nosql/internal/query/query_checker/query_checker.go
+++ b/v23/syncbase/nosql/internal/query/query_checker/query_checker.go
@@ -107,10 +107,18 @@
e.Operand2.Prefix = prefix
// Compute the regular expression now to to check for errors.
// Save the regex (testing) and the compiled regex (for later use in evaluation).
- regex, compRegex, err := computeRegex(db, e.Operand2.Off, e.Operand2.Str)
+ regex, compRegex, foundWildcard, err := computeRegex(db, e.Operand2.Off, e.Operand2.Str)
if err != nil {
return err
}
+ // Optimization: If like argument contains no wildcards, convert the expression to equals.
+ if !foundWildcard {
+ e.Operator.Type = query_parser.Equal
+ // Since this is no longer a like expression, we need to unescape
+ // any escaped chars (i.e., "\\", "\_" and "\%" become
+ // "\", "_" and "%", respectively).
+ e.Operand2.Str = unescapeLikeExpression(e.Operand2.Str)
+ }
e.Operand2.Regex = regex
e.Operand2.CompRegex = compRegex
}
@@ -227,68 +235,85 @@
// Convert Like expression to a regex. That is, convert:
// % to .*?
// _ to .
+// Unescape '\\', '\%' and '\_' to '\', '%' and '_', respectively.
// Escape everything that would be incorrectly interpreted as a regex.
-// Note: \% and \_ are used to escape % and _, respectively.
-func computeRegex(db query_db.Database, off int64, s string) (string, *regexp.Regexp, error) {
- // Escape everything, this will escape too much as like wildcards can
- // also be escaped by a backslash (\%, \_, \\).
- escaped := regexp.QuoteMeta(s)
- // Change all unescaped '%' chars to ".*?" and all unescaped '_' chars to '.'.
- var buf bytes.Buffer
- buf.WriteString("^")
- backslash_level := 0
- for _, c := range escaped {
- switch backslash_level {
- case 0:
- switch c {
- case '%':
- buf.WriteString(".*?")
- case '_':
- buf.WriteString(".")
- case '\\':
- backslash_level++
- default:
- buf.WriteString(string(c))
+//
+// The approach this function takes is to collect characters to be escaped
+// into toBeEscapedBuf. When a wildcard is encountered, first toBeEscapedBuf
+// is escaped and written to the regex buffer, next the wildcard is translated
+// to regex (either ".*?" or ".") and written to the regex buffer.
+// At the end, any remaining chars in toBeEscapedBuf are written.
+//
+// Return values are:
+// 1. string: uncompiled regular expression
+// 2. *Regexp: compiled regular expression
+// 3. bool: true if wildcards were found (if false, like is converted to equal)
+// 4. error: non-nil if error encountered
+func computeRegex(db query_db.Database, off int64, s string) (string, *regexp.Regexp, bool, error) {
+ var buf bytes.Buffer // buffer for return regex
+ var toBeEscapedBuf bytes.Buffer // buffer to hold characters waiting to be escaped
+
+ buf.WriteString("^") // '^<regex_str>$'
+ escapedMode := false
+ foundWildcard := false
+
+ for _, c := range s {
+ switch c {
+ case '%', '_':
+ if escapedMode {
+ toBeEscapedBuf.WriteString(string(c))
+ } else {
+ // Write out any chars waiting to be escaped, then
+ // write ".*?' or '.'.
+ buf.WriteString(regexp.QuoteMeta(toBeEscapedBuf.String()))
+ toBeEscapedBuf.Reset()
+ if c == '%' {
+ buf.WriteString(".*?")
+ } else {
+ buf.WriteString(".")
+ }
+ foundWildcard = true
}
- case 1:
- switch c {
- case '\\':
- // backslashes become double backslashes because of the
- // QuoteMeta above. Let's see what's next.
- backslash_level++
- default:
- // In this case, QuoteMeta is escaping a regex character. We
- // need to honor the escape (e.g., \*, \[, \]).
- buf.WriteString("\\")
- buf.WriteString(string(c))
- backslash_level = 0
+ escapedMode = false
+ case '\\':
+ if escapedMode {
+ toBeEscapedBuf.WriteString(string(c))
}
- case 2:
- switch c {
- case '\\':
- // We've hit a third backslash.
- // Write out the first \ (escaped as \\).
- // Set backslash_level to 1 since we've encountered
- // another backslash and need to see what follows.
- buf.WriteString("\\\\")
- backslash_level = 1
- default:
- // The user wrote \% or \_.
- // It was escaped by QuoteMeta to \\% or \\_.
- // Since the % or _ was escaped, just write it so regex can
- // treat it like any character.
- buf.WriteString(string(c))
- backslash_level = 0
- }
+ escapedMode = !escapedMode
+ default:
+ toBeEscapedBuf.WriteString(string(c))
}
}
- buf.WriteString("$")
+ // Write any remaining chars in toBeEscapedBuf.
+ buf.WriteString(regexp.QuoteMeta(toBeEscapedBuf.String()))
+ buf.WriteString("$") // '^<regex_str>$'
regex := buf.String()
compRegex, err := regexp.Compile(regex)
if err != nil {
- return "", nil, syncql.NewErrErrorCompilingRegularExpression(db.GetContext(), off, regex, err)
+ return "", nil, false, syncql.NewErrErrorCompilingRegularExpression(db.GetContext(), off, regex, err)
}
- return regex, compRegex, nil
+ return regex, compRegex, foundWildcard, nil
+}
+
+// Unescape '\\', '\%' and '\_' to '\', '%' and '_', respectively.
+func unescapeLikeExpression(s string) string {
+ var buf bytes.Buffer // buffer for returned unescaped string
+
+ escapedMode := false
+
+ for _, c := range s {
+ switch c {
+ case '\\':
+ if escapedMode {
+ buf.WriteString(string(c))
+ }
+ escapedMode = !escapedMode
+ default:
+ buf.WriteString(string(c))
+ escapedMode = false
+ }
+ }
+ return buf.String()
}
func IsLogicalOperator(o *query_parser.BinaryOperator) bool {
@@ -323,53 +348,97 @@
return o.Type == query_parser.TypExpr
}
-// Compile a list of key prefixes to fetch with scan. Prefixes are returned in sorted order
-// and do not overlap (e.g., prefixes of "ab" and "abc" would be combined as "ab").
-// A single empty string (array len of 1) is returned if all keys are to be fetched.
-// Used by query package. In query_checker package so prefixes can be tested.
-func CompileKeyPrefixes(where *query_parser.WhereClause) []string {
- if where == nil {
- return []string{""}
- } else {
- // Collect all key string literal operands.
- p := collectKeyPrefixes(where.Expr)
- // Sort
- sort.Strings(p)
- // Elminate overlaps
- var p2 []string
- for i, s := range p {
- // Skip over prefixes that are already covered by prior key prefixes.
- if i != 0 && strings.HasPrefix(s, p2[len(p2)-1]) {
- continue
- }
- p2 = append(p2, s)
+// Function copied from syncbase.
+func computeKeyRangeForPrefix(prefix string) query_db.KeyRange {
+ if prefix == "" {
+ return query_db.KeyRange{string([]byte{0}), string([]byte{255})}
+ }
+ limit := []byte(prefix)
+ for len(limit) > 0 {
+ if limit[len(limit)-1] == 255 {
+ limit = limit[:len(limit)-1] // chop off trailing \x00
+ } else {
+ limit[len(limit)-1] += 1 // add 1
+ break // no carry
}
- return p2
+ }
+ return query_db.KeyRange{prefix, string(limit)}
+}
+
+// The limit for a single value range is simply a zero byte appended.
+// In this way, only the single 'start' value will be returned (or nothing if that single
+// value is not present).
+func computeKeyRangeForSingleValue(start string) query_db.KeyRange {
+ limit := []byte(start)
+ limit = append(limit, 0)
+ return query_db.KeyRange{start, string(limit)}
+}
+
+// Compute a list of key ranges to be used by query_db's Table.Scan implementation.
+func CompileKeyRanges(where *query_parser.WhereClause) query_db.KeyRanges {
+ if where == nil {
+ return query_db.KeyRanges{computeKeyRangeForPrefix("")}
+ }
+ var keyRanges query_db.KeyRanges
+ collectKeyRanges(where.Expr, &keyRanges)
+ return keyRanges
+}
+
+func collectKeyRanges(expr *query_parser.Expression, keyRanges *query_db.KeyRanges) {
+ if IsKey(expr.Operand1) {
+ if expr.Operator.Type == query_parser.Like {
+ addKeyRange(computeKeyRangeForPrefix(expr.Operand2.Prefix), keyRanges)
+ } else { // OpEqual
+ addKeyRange(computeKeyRangeForSingleValue(expr.Operand2.Str), keyRanges)
+ }
+ }
+ if IsExpr(expr.Operand1) {
+ collectKeyRanges(expr.Operand1.Expr, keyRanges)
+ }
+ if IsExpr(expr.Operand2) {
+ collectKeyRanges(expr.Operand2.Expr, keyRanges)
}
}
-// Collect all operand2 string literals where operand1 of the expression is ident "k".
-func collectKeyPrefixes(expr *query_parser.Expression) []string {
- var prefixes []string
- if IsKey(expr.Operand1) {
- if expr.Operator.Type == query_parser.Like {
- prefixes = append(prefixes, expr.Operand2.Prefix)
- } else { // OpEqual
- prefixes = append(prefixes, expr.Operand2.Str)
- }
- return prefixes
- }
- if IsExpr(expr.Operand1) {
- for _, p := range collectKeyPrefixes(expr.Operand1.Expr) {
- prefixes = append(prefixes, p)
+func addKeyRange(keyRange query_db.KeyRange, keyRanges *query_db.KeyRanges) {
+ handled := false
+ // Is there an overlap with an existing range?
+ for i, r := range *keyRanges {
+ // In the following if,
+ // the first paren expr is true if the start of the range to be added is contained in r
+ // the second paren expr is true if the limit of the range to be added is contained in r
+ // the third paren expr is true if the range to be added entirely contains r
+ if (keyRange.Start >= r.Start && keyRange.Start <= r.Limit) ||
+ (keyRange.Limit >= r.Start && keyRange.Limit <= r.Limit) ||
+ (keyRange.Start <= r.Start && keyRange.Limit >= r.Limit) {
+ // keyRange overlaps with existing range at keyRanges[i]
+ // set newKeyRange to a range that ecompasses both
+ var newKeyRange query_db.KeyRange
+ if keyRange.Start < r.Start {
+ newKeyRange.Start = keyRange.Start
+ } else {
+ newKeyRange.Start = r.Start
+ }
+ if keyRange.Limit > r.Limit {
+ newKeyRange.Limit = keyRange.Limit
+ } else {
+ newKeyRange.Limit = r.Limit
+ }
+ // The new range may overlap with other ranges in keyRanges
+ // delete the current range and call addKeyRange again
+ // This recursion will continue until no ranges overlap.
+ *keyRanges = append((*keyRanges)[:i], (*keyRanges)[i+1:]...)
+ addKeyRange(newKeyRange, keyRanges)
+ handled = true // we don't want to add keyRange below
+ break
}
}
- if IsExpr(expr.Operand2) {
- for _, p := range collectKeyPrefixes(expr.Operand2.Expr) {
- prefixes = append(prefixes, p)
- }
+ // no overlap, just add it
+ if !handled {
+ *keyRanges = append(*keyRanges, keyRange)
}
- return prefixes
+ // sort before returning
+ sort.Sort(*keyRanges)
}
// Check limit clause. Limit must be >= 1.
diff --git a/v23/syncbase/nosql/internal/query/query_checker/query_checker_test.go b/v23/syncbase/nosql/internal/query/query_checker/query_checker_test.go
index 16dd2d7..2ea1196 100644
--- a/v23/syncbase/nosql/internal/query/query_checker/query_checker_test.go
+++ b/v23/syncbase/nosql/internal/query/query_checker/query_checker_test.go
@@ -41,11 +41,11 @@
defer shutdown()
}
-func (t invoiceTable) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t invoiceTable) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
return nil, errors.New("unimplemented")
}
-func (t customerTable) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t customerTable) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
return nil, errors.New("unimplemented")
}
@@ -66,9 +66,9 @@
query string
}
-type keyPrefixesTest struct {
- query string
- keyPrefixes []string
+type keyRangesTest struct {
+ query string
+ keyRanges query_db.KeyRanges
}
type regularExpressionsTest struct {
@@ -121,35 +121,69 @@
}
}
-func TestKeyPrefixes(t *testing.T) {
- basic := []keyPrefixesTest{
+func appendZeroByte(start string) string {
+ limit := []byte(start)
+ limit = append(limit, 0)
+ return string(limit)
+
+}
+
+func TestKeyRanges(t *testing.T) {
+ basic := []keyRangesTest{
{
"select k, v from Customer",
- []string{""},
+ query_db.KeyRanges{
+ query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+ },
},
{
"select k, v from Customer where t = \"Foo.Bar\" and k like \"abc%\" limit 100 offset 200",
- []string{"abc"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"abc", "abd"},
+ },
},
{
"select k, v from \n Customer where k like \"002%\" or k like \"001%\" or k like \"%\"",
- []string{""},
+ query_db.KeyRanges{
+ query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+ },
},
{
"select k, v from Customer where k = \"Foo.Bar\" and k like \"abc%\" limit 100 offset 200",
- []string{"Foo.Bar", "abc"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"Foo.Bar", appendZeroByte("Foo.Bar")},
+ query_db.KeyRange{"abc", "abd"},
+ },
},
{
+ // Note: 'like "Foo"' is optimized to '= "Foo"
"select k, v from Customer where k = \"Foo.Bar\" or k like \"Foo\" or k like \"abc%\" limit 100 offset 200",
- []string{"Foo", "abc"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"Foo", appendZeroByte("Foo")},
+ query_db.KeyRange{"Foo.Bar", appendZeroByte("Foo.Bar")},
+ query_db.KeyRange{"abc", "abd"},
+ },
},
{
"select k, v from Customer where k like \"Foo\\%Bar\" or k like \"abc%\" limit 100 offset 200",
- []string{"Foo%Bar", "abc"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"Foo%Bar", appendZeroByte("Foo%Bar")},
+ query_db.KeyRange{"abc", "abd"},
+ },
},
{
"select k, v from Customer where k like \"Foo\\\\%Bar\" or k like \"abc%\" limit 100 offset 200",
- []string{"Foo\\", "abc"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"Foo\\", "Foo]"},
+ query_db.KeyRange{"abc", "abd"},
+ },
+ },
+ {
+ "select k, v from Customer where k like \"Foo\\\\\\%Bar\" or k like \"abc%\" limit 100 offset 200",
+ query_db.KeyRanges{
+ query_db.KeyRange{"Foo\\%Bar", appendZeroByte("Foo\\%Bar")},
+ query_db.KeyRange{"abc", "abd"},
+ },
},
}
@@ -164,9 +198,9 @@
}
switch sel := (*s).(type) {
case query_parser.SelectStatement:
- prefixes := query_checker.CompileKeyPrefixes(sel.Where)
- if !reflect.DeepEqual(test.keyPrefixes, prefixes) {
- t.Errorf("query: %s;\nGOT %v\nWANT %v", test.query, prefixes, test.keyPrefixes)
+ keyRanges := query_checker.CompileKeyRanges(sel.Where)
+ if !reflect.DeepEqual(test.keyRanges, keyRanges) {
+ t.Errorf("query: %s;\nGOT %v\nWANT %v", test.query, keyRanges, test.keyRanges)
}
default:
t.Errorf("query: %s;\nGOT %v\nWANT query_parser.SelectStatement", test.query, *s)
diff --git a/v23/syncbase/nosql/internal/query/test/query_test.go b/v23/syncbase/nosql/internal/query/test/query_test.go
index 86e395e..00498a6 100644
--- a/v23/syncbase/nosql/internal/query/test/query_test.go
+++ b/v23/syncbase/nosql/internal/query/test/query_test.go
@@ -8,7 +8,6 @@
"errors"
"fmt"
"reflect"
- "strings"
"testing"
"time"
@@ -36,10 +35,10 @@
}
type keyValueStreamImpl struct {
- table table
- cursor int
- prefixes []string
- prefixCursor int
+ table table
+ cursor int
+ keyRanges query_db.KeyRanges
+ keyRangesCursor int
}
func (kvs *keyValueStreamImpl) Advance() bool {
@@ -48,16 +47,16 @@
if kvs.cursor >= len(kvs.table.rows) {
return false
}
- for kvs.prefixCursor < len(kvs.prefixes) {
- // does it match any prefix
- if kvs.prefixes[kvs.prefixCursor] == "" || strings.HasPrefix(kvs.table.rows[kvs.cursor].key, kvs.prefixes[kvs.prefixCursor]) {
+ for kvs.keyRangesCursor < len(kvs.keyRanges) {
+ // does it match any keyRange (or is the keyRange the 0-255 wildcard)?
+ if (kvs.keyRanges[kvs.keyRangesCursor].Start == string([]byte{0}) && kvs.keyRanges[kvs.keyRangesCursor].Limit == string([]byte{255})) || (kvs.table.rows[kvs.cursor].key >= kvs.keyRanges[kvs.keyRangesCursor].Start && kvs.table.rows[kvs.cursor].key <= kvs.keyRanges[kvs.keyRangesCursor].Limit) {
return true
}
- // Keys and prefixes are both sorted low to high, so we can increment
- // prefixCursor if the prefix is < the key.
- if kvs.prefixes[kvs.prefixCursor] < kvs.table.rows[kvs.cursor].key {
- kvs.prefixCursor++
- if kvs.prefixCursor >= len(kvs.prefixes) {
+ // Keys and keyRanges are both sorted low to high, so we can increment
+ // keyRangesCursor if the keyRange.Limit is < the key.
+ if kvs.keyRanges[kvs.keyRangesCursor].Limit < kvs.table.rows[kvs.cursor].key {
+ kvs.keyRangesCursor++
+ if kvs.keyRangesCursor >= len(kvs.keyRanges) {
return false
}
} else {
@@ -79,11 +78,11 @@
func (kvs *keyValueStreamImpl) Cancel() {
}
-func (t table) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t table) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
var keyValueStreamImpl keyValueStreamImpl
keyValueStreamImpl.table = t
keyValueStreamImpl.cursor = -1
- keyValueStreamImpl.prefixes = prefixes
+ keyValueStreamImpl.keyRanges = keyRanges
return &keyValueStreamImpl, nil
}
@@ -221,10 +220,10 @@
db.tables = append(db.tables, fooTable)
}
-type keyPrefixesTest struct {
- query string
- keyPrefixes []string
- err error
+type keyRangesTest struct {
+ query string
+ keyRanges query_db.KeyRanges
+ err error
}
type evalWhereUsingOnlyKeyTest struct {
@@ -983,120 +982,206 @@
}
}
-func TestKeyPrefixes(t *testing.T) {
- basic := []keyPrefixesTest{
+func appendZeroByte(start string) string {
+ limit := []byte(start)
+ limit = append(limit, 0)
+ return string(limit)
+
+}
+
+func plusOne(start string) string {
+ limit := []byte(start)
+ for len(limit) > 0 {
+ if limit[len(limit)-1] == 255 {
+ limit = limit[:len(limit)-1] // chop off trailing \x00
+ } else {
+ limit[len(limit)-1] += 1 // add 1
+ break // no carry
+ }
+ }
+ return string(limit)
+}
+
+func TestKeyRanges(t *testing.T) {
+ basic := []keyRangesTest{
{
- // Need all keys (single prefix of "").
+ // Need all keys
"select k, v from Customer",
- []string{""},
+ query_db.KeyRanges{
+ query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+ },
nil,
},
{
- // Need all keys (single prefix of "").
- " sElEcT k, v from \n Customer WhErE k lIkE \"002%\" oR k LiKe \"001%\" or k lIkE \"%\"",
- []string{""},
+ // Keys 001 and 003
+ " select k, v from Customer where k = \"001\" or k = \"003\"",
+ query_db.KeyRanges{
+ query_db.KeyRange{"001", appendZeroByte("001")},
+ query_db.KeyRange{"003", appendZeroByte("003")},
+ },
+ nil,
+ },
+ {
+ // Need all keys
+ "select k, v from Customer where k like \"%\" or k like \"001%\" or k like \"002%\"",
+ query_db.KeyRanges{
+ query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+ },
+ nil,
+ },
+ {
+ // Need all keys, likes in where clause in different order
+ "select k, v from Customer where k like \"002%\" or k like \"001%\" or k like \"%\"",
+ query_db.KeyRanges{
+ query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+ },
nil,
},
{
// All selected rows will have key prefix of "abc".
"select k, v from Customer where t = \"Foo.Bar\" and k like \"abc%\"",
- []string{"abc"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"abc", plusOne("abc")},
+ },
nil,
},
{
- // Need all keys (single prefix of "").
+ // Need all keys
"select k, v from Customer where t = \"Foo.Bar\" or k like \"abc%\"",
- []string{""},
+ query_db.KeyRanges{
+ query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+ },
nil,
},
{
- // Need all keys (single prefix of "").
+ // Need all keys
"select k, v from Customer where k like \"abc%\" or v.zip = \"94303\"",
- []string{""},
+ query_db.KeyRanges{
+ query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+ },
nil,
},
{
// All selected rows will have key prefix of "foo".
"select k, v from Customer where t = \"Foo.Bar\" and k like \"foo_bar\"",
- []string{"foo"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"foo", plusOne("foo")},
+ },
nil,
},
{
- // All selected rows will have key prefix of "baz" or "foo".
+ // All selected rows will have key == "baz" or prefix of "foo".
"select k, v from Customer where k like \"foo_bar\" or k = \"baz\"",
- []string{"baz", "foo"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"baz", appendZeroByte("baz")},
+ query_db.KeyRange{"foo", plusOne("foo")},
+ },
nil,
},
{
- // All selected rows will have key prefix of "fo".
+ // All selected rows will have key == "fo" or prefix of "foo".
"select k, v from Customer where k like \"foo_bar\" or k = \"fo\"",
- []string{"fo"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"fo", appendZeroByte("fo")},
+ query_db.KeyRange{"foo", plusOne("foo")},
+ },
+ nil,
+ },
+ {
+ // All selected rows will have prefix of "fo".
+ // k == foo is a subset of above prefix
+ "select k, v from Customer where k like \"fo_bar\" or k = \"foo\"",
+ query_db.KeyRanges{
+ query_db.KeyRange{"fo", plusOne("fo")},
+ },
nil,
},
{
// All selected rows will have key prefix of "foo".
"select k, v from Customer where k like \"foo%bar\"",
- []string{"foo"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"foo", plusOne("foo")},
+ },
nil,
},
{
- // All selected rows will have key prefix of "foo\bar".
+ // Select "foo\bar" row.
"select k, v from Customer where k like \"foo\\\\bar\"",
- []string{"foo\\bar"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"foo\\bar", appendZeroByte("foo\\bar")},
+ },
nil,
},
{
- // All selected rows will have key prefix of "foo%bar".
+ // Select "foo%bar" row.
"select k, v from Customer where k like \"foo\\%bar\"",
- []string{"foo%bar"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"foo%bar", appendZeroByte("foo%bar")},
+ },
nil,
},
{
- // All selected rows will have key prefix of "foo\%bar".
+ // Select "foo\%bar" row.
"select k, v from Customer where k like \"foo\\\\\\%bar\"",
- []string{"foo\\%bar"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"foo\\%bar", appendZeroByte("foo\\%bar")},
+ },
nil,
},
{
- // Need all keys (single prefix of "").
+ // Need all keys
"select k, v from Customer where k like \"%foo\"",
- []string{""},
+ query_db.KeyRanges{
+ query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+ },
nil,
},
{
- // Need all keys (single prefix of "").
+ // Need all keys
"select k, v from Customer where k like \"_foo\"",
- []string{""},
+ query_db.KeyRanges{
+ query_db.KeyRange{string([]byte{0}), string([]byte{255})},
+ },
nil,
},
{
- // All selected rows will have key prefix of "foo_bar".
+ // Select "foo_bar" row.
"select k, v from Customer where k like \"foo\\_bar\"",
- []string{"foo_bar"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"foo_bar", appendZeroByte("foo_bar")},
+ },
nil,
},
{
- // All selected rows will have key prefix of "foobar%".
+ // Select "foobar%" row.
"select k, v from Customer where k like \"foobar\\%\"",
- []string{"foobar%"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"foobar%", appendZeroByte("foobar%")},
+ },
nil,
},
{
- // All selected rows will have key prefix of "foobar_".
+ // Select "foobar_" row.
"select k, v from Customer where k like \"foobar\\_\"",
- []string{"foobar_"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"foobar_", appendZeroByte("foobar_")},
+ },
nil,
},
{
- // All selected rows will have key prefix of "\%_".
+ // Select "\%_" row.
"select k, v from Customer where k like \"\\\\\\%\\_\"",
- []string{"\\%_"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"\\%_", appendZeroByte("\\%_")},
+ },
nil,
},
{
- // All selected rows will have key prefix of "%_abc\".
+ // Select "%_abc\" row.
"select k, v from Customer where k = \"%_abc\\\"",
- []string{"%_abc\\"},
+ query_db.KeyRanges{
+ query_db.KeyRange{"%_abc\\", appendZeroByte("%_abc\\")},
+ },
nil,
},
}
@@ -1114,9 +1199,9 @@
if semErr == nil {
switch sel := (*s).(type) {
case query_parser.SelectStatement:
- keyPrefixes := query.CompileKeyPrefixes(sel.Where)
- if !reflect.DeepEqual(test.keyPrefixes, keyPrefixes) {
- t.Errorf("query: %s;\nGOT %v\nWANT %v", test.query, keyPrefixes, test.keyPrefixes)
+ keyRanges := query.CompileKeyRanges(sel.Where)
+ if !reflect.DeepEqual(test.keyRanges, keyRanges) {
+ t.Errorf("query: %s;\nGOT %v\nWANT %v", test.query, keyRanges, test.keyRanges)
}
default:
t.Errorf("query: %s; got %v, want query_parser.SelectStatement", test.query, reflect.TypeOf(*s))
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index cf3124e..bd0903b 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -151,11 +151,8 @@
Put(ctx *context.T, key string, value interface{}) error
// Delete deletes all rows in the given half-open range [start, limit). If
- // limit is "", all rows with keys >= start are included. If the last row that
- // is covered by a prefix from SetPermissions is deleted, that (prefix, perms)
- // pair is removed.
+ // limit is "", all rows with keys >= start are included.
// See helpers nosql.Prefix(), nosql.Range(), nosql.SingleRow().
- // TODO(sadovsky): Automatic GC interacts poorly with sync. Revisit this API.
Delete(ctx *context.T, r RowRange) error
// Scan returns all rows in the given half-open range [start, limit). If limit
@@ -164,6 +161,13 @@
// See helpers nosql.Prefix(), nosql.Range(), nosql.SingleRow().
Scan(ctx *context.T, r RowRange) Stream
+ // GetPermissions returns an array of (prefix, perms) pairs. The array is
+ // sorted from longest prefix to shortest, so element zero is the one that
+ // applies to the row with the given key. The last element is always the
+ // prefix "" which represents the table's permissions -- the array will always
+ // have at least one element.
+ GetPermissions(ctx *context.T, key string) ([]PrefixPermissions, error)
+
// SetPermissions sets the permissions for all current and future rows with
// the given prefix. If the prefix overlaps with an existing prefix, the
// longest prefix that matches a row applies. For example:
@@ -171,18 +175,8 @@
// SetPermissions(ctx, Prefix("a/b/c"), perms2)
// The permissions for row "a/b/1" are perms1, and the permissions for row
// "a/b/c/1" are perms2.
- //
- // SetPermissions will fail if called with a prefix that does not match any
- // rows.
SetPermissions(ctx *context.T, prefix PrefixRange, perms access.Permissions) error
- // GetPermissions returns an array of (prefix, perms) pairs. The array is
- // sorted from longest prefix to shortest, so element zero is the one that
- // applies to the row with the given key. The last element is always the
- // prefix "" which represents the table's permissions -- the array will always
- // have at least one element.
- GetPermissions(ctx *context.T, key string) ([]PrefixPermissions, error)
-
// DeletePermissions deletes the permissions for the specified prefix. Any
// rows covered by this prefix will use the next longest prefix's permissions
// (see the array returned by GetPermissions).
diff --git a/v23/syncbase/nosql/query_db/query_db.go b/v23/syncbase/nosql/query_db/query_db.go
index d6634dc..c1eba0c 100644
--- a/v23/syncbase/nosql/query_db/query_db.go
+++ b/v23/syncbase/nosql/query_db/query_db.go
@@ -19,7 +19,7 @@
// of the prefixes arguments.
// Note: an empty string prefix (""), matches all keys.
// The prefixes argument will be sorted (low to high).
- Scan(prefixes []string) (KeyValueStream, error)
+ Scan(keyRanges KeyRanges) (KeyValueStream, error)
}
type KeyValueStream interface {
@@ -50,3 +50,28 @@
// return false. Cancel does not block.
Cancel()
}
+
+type KeyRange struct {
+ Start string
+ Limit string
+}
+
+type KeyRanges []KeyRange
+
+// Implement sort interface for KeyRanges.
+func (keyRanges KeyRanges) Len() int {
+ return len(keyRanges)
+}
+
+func (keyRanges KeyRanges) Less(i, j int) bool {
+ return keyRanges[i].Start < keyRanges[j].Start
+}
+
+func (keyRanges KeyRanges) Swap(i, j int) {
+ saveStart := keyRanges[i].Start
+ saveLimit := keyRanges[i].Limit
+ keyRanges[i].Start = keyRanges[j].Start
+ keyRanges[i].Limit = keyRanges[j].Limit
+ keyRanges[j].Start = saveStart
+ keyRanges[j].Limit = saveLimit
+}
diff --git a/v23/syncbase/nosql/table.go b/v23/syncbase/nosql/table.go
index c77905c..67aa074 100644
--- a/v23/syncbase/nosql/table.go
+++ b/v23/syncbase/nosql/table.go
@@ -70,11 +70,6 @@
return newStream(cancel, call)
}
-// SetPermissions implements Table.SetPermissions.
-func (t *table) SetPermissions(ctx *context.T, prefix PrefixRange, perms access.Permissions) error {
- return t.c.SetPermissions(ctx, prefix.Prefix(), perms)
-}
-
// GetPermissions implements Table.GetPermissions.
func (t *table) GetPermissions(ctx *context.T, key string) ([]PrefixPermissions, error) {
wirePermsList, err := t.c.GetPermissions(ctx, key)
@@ -85,6 +80,11 @@
return permsList, err
}
+// SetPermissions implements Table.SetPermissions.
+func (t *table) SetPermissions(ctx *context.T, prefix PrefixRange, perms access.Permissions) error {
+ return t.c.SetPermissions(ctx, prefix.Prefix(), perms)
+}
+
// DeletePermissions implements Table.DeletePermissions.
func (t *table) DeletePermissions(ctx *context.T, prefix PrefixRange) error {
return t.c.DeletePermissions(ctx, prefix.Prefix())
diff --git a/v23/syncbase/testutil/layer.go b/v23/syncbase/testutil/layer.go
index 14e8497..ccfe62d 100644
--- a/v23/syncbase/testutil/layer.go
+++ b/v23/syncbase/testutil/layer.go
@@ -34,7 +34,7 @@
if err := self.Create(ctx, nil); err != nil {
t.Fatalf("self.Create() failed: %v", err)
}
- if gotPerms, wantPerms := getPermsOrDie(t, ctx, self), defaultPerms(); !reflect.DeepEqual(gotPerms, wantPerms) {
+ if gotPerms, wantPerms := getPermsOrDie(t, ctx, self), DefaultPerms("server/client"); !reflect.DeepEqual(gotPerms, wantPerms) {
t.Errorf("Perms do not match: got %v, want %v", gotPerms, wantPerms)
}
@@ -60,7 +60,7 @@
}
// Test that create fails if the parent perms disallow access.
- perms = defaultPerms()
+ perms = DefaultPerms("server/client")
perms.Blacklist("server/client", string(access.Write))
if err := parent.SetPermissions(ctx, perms, ""); err != nil {
t.Fatalf("parent.SetPermissions() failed: %v", err)
@@ -107,7 +107,7 @@
if err := self2.Create(ctx, nil); err != nil {
t.Fatalf("self2.Create() failed: %v", err)
}
- perms := defaultPerms()
+ perms := DefaultPerms("server/client")
perms.Blacklist("server/client", string(access.Write))
if err := self2.SetPermissions(ctx, perms, ""); err != nil {
t.Fatalf("self2.SetPermissions() failed: %v", err)
@@ -117,7 +117,7 @@
}
// Test that delete succeeds even if the parent perms disallow access.
- perms = defaultPerms()
+ perms = DefaultPerms("server/client")
perms.Blacklist("server/client", string(access.Write))
if err := parent.SetPermissions(ctx, perms, ""); err != nil {
t.Fatalf("parent.SetPermissions() failed: %v", err)
diff --git a/v23/syncbase/testutil/util.go b/v23/syncbase/testutil/util.go
index 0783e54..c5df00f 100644
--- a/v23/syncbase/testutil/util.go
+++ b/v23/syncbase/testutil/util.go
@@ -6,6 +6,7 @@
package testutil
import (
+ "fmt"
"io/ioutil"
"os"
"reflect"
@@ -18,13 +19,13 @@
"v.io/syncbase/x/ref/services/syncbase/server"
"v.io/v23"
"v.io/v23/context"
- "v.io/v23/naming"
- "v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/security/access"
"v.io/v23/vdl"
"v.io/v23/verror"
"v.io/x/lib/vlog"
+ "v.io/x/ref/lib/flags"
+ "v.io/x/ref/lib/xrpc"
tsecurity "v.io/x/ref/test/testutil"
)
@@ -67,6 +68,12 @@
}
func SetupOrDieCustom(client, server string, perms access.Permissions) (clientCtx *context.T, serverName string, cleanup func(), sp security.Principal, ctx *context.T) {
+ // TODO(mattr): Instead of SetDefaultHostPort the arguably more correct thing
+ // would be to call v.io/x/ref/test.Init() from the test packages that import
+ // the profile. Note you should only call that from the package that imports
+ // the profile, not from libraries like this. Also, it would be better if
+ // v23.Init was test.V23Init().
+ flags.SetDefaultHostPort("127.0.0.1:0")
ctx, shutdown := v23.Init()
sp = tsecurity.NewPrincipal(server)
@@ -77,7 +84,7 @@
vlog.Fatal("v23.WithPrincipal() failed: ", err)
}
- serverName, stopServer := newServer(serverCtx, perms)
+ serverName, stopServer := newServer(client, server, serverCtx, perms)
cleanup = func() {
stopServer()
shutdown()
@@ -85,10 +92,12 @@
return
}
-func defaultPerms() access.Permissions {
+func DefaultPerms(patterns ...string) access.Permissions {
perms := access.Permissions{}
for _, tag := range access.AllTypicalTags() {
- perms.Add(security.BlessingPattern("server/client"), string(tag))
+ for _, pattern := range patterns {
+ perms.Add(security.BlessingPattern(pattern), string(tag))
+ }
}
return perms
}
@@ -175,18 +184,9 @@
return perms
}
-func newServer(ctx *context.T, perms access.Permissions) (string, func()) {
- s, err := v23.NewServer(ctx)
- if err != nil {
- vlog.Fatal("v23.NewServer() failed: ", err)
- }
- eps, err := s.Listen(rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}}})
- if err != nil {
- vlog.Fatal("s.Listen() failed: ", err)
- }
-
+func newServer(clientName, serverName string, ctx *context.T, perms access.Permissions) (string, func()) {
if perms == nil {
- perms = defaultPerms()
+ perms = DefaultPerms(fmt.Sprintf("%s/%s", serverName, clientName))
}
rootDir, err := ioutil.TempDir("", "syncbase")
if err != nil {
@@ -200,13 +200,11 @@
if err != nil {
vlog.Fatal("server.NewService() failed: ", err)
}
- d := server.NewDispatcher(service)
-
- if err := s.ServeDispatcher("", d); err != nil {
- vlog.Fatal("s.ServeDispatcher() failed: ", err)
+ s, err := xrpc.NewDispatchingServer(ctx, "", server.NewDispatcher(service))
+ if err != nil {
+ vlog.Fatal("xrpc.NewDispatchingServer() failed: ", err)
}
-
- name := naming.JoinAddressName(eps[0].String(), "")
+ name := s.Status().Endpoints[0].Name()
return name, func() {
s.Stop()
os.RemoveAll(rootDir)
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
index 8e173d1..43cc542 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore.go
@@ -41,6 +41,7 @@
import "sync"
import "time"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
import "v.io/v23/context"
import "v.io/v23/verror"
@@ -247,22 +248,13 @@
// -----------------------------------------------------------
-// A BlockOrFile represents a vector of bytes, and contains either a data block
-// (as a []byte), or a (file name, size, offset) triple.
-type BlockOrFile struct {
- Block []byte // If FileName is empty, the bytes represented.
- FileName string // If non-empty, the name of the file containing the bytes.
- Size int64 // If FileName is non-empty, the number of bytes (or -1 for "all")
- Offset int64 // If FileName is non-empty, the offset of the relevant bytes within the file.
-}
-
// addFragment() ensures that the store *fscabs contains a fragment comprising
// the catenation of the byte vectors named by item[..].block and the contents
// of the files named by item[..].filename. The block field is ignored if
// fileName!="". The fragment is not physically added if already present.
// The fragment is added to the fragment list of the descriptor *desc.
func (fscabs *FsCaBlobStore) addFragment(ctx *context.T, extHasher hash.Hash,
- desc *blobDesc, item ...BlockOrFile) (fileName string, size int64, err error) {
+ desc *blobDesc, item ...localblobstore.BlockOrFile) (fileName string, size int64, err error) {
hasher := md5.New()
var buf []byte
@@ -543,12 +535,12 @@
// should not be used concurrently by multiple threads. The returned handle
// should be closed with either the Close() or CloseWithoutFinalize() method to
// avoid leaking file handles.
-func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (bw *BlobWriter, err error) {
+func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T) (localblobstore.BlobWriter, error) {
+ var bw *BlobWriter
newName := newBlobName()
- var f *file
fileName := filepath.Join(fscabs.rootName, newName)
os.MkdirAll(filepath.Dir(fileName), dirPermissions)
- f, err = newFile(os.Create(fileName))
+ f, err := newFile(os.Create(fileName))
if err == nil {
bw = new(BlobWriter)
bw.fscabs = fscabs
@@ -568,8 +560,9 @@
// ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on an
// old, but unfinalized blob name.
-func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (bw *BlobWriter, err error) {
- bw = new(BlobWriter)
+func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (localblobstore.BlobWriter, error) {
+ var err error
+ bw := new(BlobWriter)
bw.fscabs = fscabs
bw.ctx = ctx
bw.desc, err = bw.fscabs.getBlob(ctx, blobName)
@@ -639,7 +632,7 @@
// AppendFragment() appends a fragment to the blob being written by *bw, where
// the fragment is composed of the byte vectors described by the elements of
// item[]. The fragment is copied into the blob store.
-func (bw *BlobWriter) AppendFragment(item ...BlockOrFile) (err error) {
+func (bw *BlobWriter) AppendFragment(item ...localblobstore.BlockOrFile) (err error) {
if bw.f == nil {
panic("fs_cablobstore.BlobWriter programming error: AppendFragment() after Close()")
}
@@ -782,7 +775,7 @@
// NewBlobReader() returns a pointer to a newly allocated BlobReader on the
// specified blobName. BlobReaders should not be used concurrently by multiple
// threads. Returned handles should be closed with Close().
-func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br *BlobReader, err error) {
+func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br localblobstore.BlobReader, err error) {
var desc *blobDesc
desc, err = fscabs.getBlob(ctx, blobName)
if err == nil {
@@ -954,7 +947,7 @@
// if fscabsi.Err() != nil {
// // The loop terminated early due to an error.
// }
-func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) (fscabsi *FsCasIter) {
+func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Iter {
stack := make([]dirListing, 1)
stack[0] = dirListing{pos: -1, nameList: []string{blobDir}}
return &FsCasIter{fscabs: fscabs, stack: stack}
@@ -970,7 +963,7 @@
// if fscabsi.Err() != nil {
// // The loop terminated early due to an error.
// }
-func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) (fscabsi *FsCasIter) {
+func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Iter {
stack := make([]dirListing, 1)
stack[0] = dirListing{pos: -1, nameList: []string{casDir}}
return &FsCasIter{fscabs: fscabs, stack: stack}
diff --git a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
index ded5e3c..886f5c4 100644
--- a/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
+++ b/x/ref/services/syncbase/localblobstore/fs_cablobstore/fs_cablobstore_test.go
@@ -5,345 +5,16 @@
// A test for fs_cablobstore
package fs_cablobstore_test
-import "bytes"
-import "crypto/md5"
-import "fmt"
-import "io"
import "io/ioutil"
import "os"
-import "path/filepath"
import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
-import "v.io/v23/context"
-import "v.io/v23/verror"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
import "v.io/x/ref/test"
import _ "v.io/x/ref/runtime/factories/generic"
-func TestCreate(t *testing.T) {
- ctx, shutdown := test.V23Init()
- defer shutdown()
-
- // Make a temporary directory.
- var err error
- var testDirName string
- testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
- if err != nil {
- t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
- }
- defer os.RemoveAll(testDirName)
-
- // Check that we can create an fs_cablobstore.
- var fscabs *fs_cablobstore.FsCaBlobStore
- fscabs, err = fs_cablobstore.Create(ctx, testDirName)
- if err != nil {
- t.Errorf("fs_cablobstore.Create failed: %v", err)
- }
-
- // Check that there are no files in the newly-created tree.
- iterator := fscabs.ListBlobIds(ctx)
- for iterator.Advance() {
- fileName := iterator.Value()
- t.Errorf("unexpected file %q\n", fileName)
- }
- if iterator.Err() != nil {
- t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
- }
-}
-
-// A blobOrBlockOrFile represents some bytes that may be contained in a named
-// blob, a named file, or in an explicit slice of bytes.
-type blobOrBlockOrFile struct {
- blob string // If non-emtpy, the name of the blob containing the bytes.
- file string // If non-empty and blob is empty, the name of the file containing the bytes.
- size int64 // Size of part of file or blob, or -1 for "everything until EOF".
- offset int64 // Offset within file or blob.
- block []byte // If both blob and file are empty, a slice containing the bytes.
-}
-
-// A testBlob records that some specified content has been stored with a given
-// blob name in the blob store.
-type testBlob struct {
- content []byte // content that has been stored.
- blobName string // the name of the blob.
-}
-
-// removeBlobFromBlobVector() removes the entry named blobName from
-// blobVector[], returning the new vector.
-func removeBlobFromBlobVector(blobVector []testBlob, blobName string) []testBlob {
- n := len(blobVector)
- i := 0
- for i = 0; i != n && blobName != blobVector[i].blobName; i++ {
- }
- if i != n {
- blobVector[i] = blobVector[n-1]
- blobVector = blobVector[0 : n-1]
- }
- return blobVector
-}
-
-// writeBlob() writes a new blob to *fscabs, and returns its name. The new
-// blob's content is described by the elements of data[]. Any error messages
-// generated include the index of the blob in blobVector and its content; the
-// latter is assumed to be printable. The expected content of the the blob is
-// "content", so that this routine can check it. If useResume is true, and data[]
-// has length more than 1, the function artificially uses ResumeBlobWriter(),
-// to test it.
-func writeBlob(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob,
- content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
- var bw *fs_cablobstore.BlobWriter
- var err error
- bw, err = fscabs.NewBlobWriter(ctx)
- if err != nil {
- t.Fatalf("fs_cablobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
- }
- blobName := bw.Name()
-
- // Construct the blob from the pieces.
- // There is a loop within the loop to exercise the possibility of
- // passing multiple fragments to AppendFragment().
- for i := 0; i != len(data) && err == nil; {
- if len(data[i].blob) != 0 {
- err = bw.AppendBlob(data[i].blob, data[i].size, data[i].offset)
- if err != nil {
- t.Errorf("fs_cablobstore.AppendBlob %d:%s blob %s failed: %v", len(blobVector), string(content), data[i].blob, err)
- }
- i++
- } else {
- var pieces []fs_cablobstore.BlockOrFile
- for ; i != len(data) && len(data[i].blob) == 0; i++ {
- if len(data[i].file) != 0 {
- pieces = append(pieces, fs_cablobstore.BlockOrFile{
- FileName: data[i].file,
- Size: data[i].size,
- Offset: data[i].offset})
- } else {
- pieces = append(pieces, fs_cablobstore.BlockOrFile{Block: data[i].block})
- }
- }
- err = bw.AppendFragment(pieces...)
- if err != nil {
- t.Errorf("fs_cablobstore.AppendFragment %d:%s failed on %v: %v", len(blobVector), string(content), pieces, err)
- }
- }
- if useResume && i < len(data)-1 && err == nil {
- err = bw.CloseWithoutFinalize()
- if err == nil {
- bw, err = fscabs.ResumeBlobWriter(ctx, blobName)
- }
- }
- }
-
- if bw != nil {
- if bw.Size() != int64(len(content)) {
- t.Errorf("fs_cablobstore.Size before finalization %d:%s got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
- }
- if bw.IsFinalized() {
- t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
- }
- err = bw.Close()
- if err != nil {
- t.Errorf("fs_cablobstore.Close %d:%s failed: %v", len(blobVector), string(content), err)
- }
- if !bw.IsFinalized() {
- t.Errorf("fs_cablobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
- }
- if bw.Size() != int64(len(content)) {
- t.Errorf("fs_cablobstore.Size %d:%s after finalization got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
- }
- if bw.Name() != blobName {
- t.Errorf("fs_cablobstore %d:%s name changed when finalized was %s now %s", len(blobVector), string(content), blobName, bw.Name())
- }
- hasher := md5.New()
- hasher.Write(content)
- if bytes.Compare(bw.Hash(), hasher.Sum(nil)) != 0 {
- t.Errorf("fs_cablobstore %d:%s BlobWriter.Hash got %v, expected %v", len(blobVector), string(content), bw.Hash(), hasher.Sum(nil))
- }
- }
-
- return append(blobVector,
- testBlob{
- content: content,
- blobName: blobName,
- })
-}
-
-// readBlob() returns a substring of the content of the blob named blobName in *fscabs.
-// The return values are:
-// - the "size" bytes from the content, starting at the given "offset",
-// measured from "whence" (as defined by io.Seeker.Seek).
-// - the position to which BlobBeader seeks to,
-// - the md5 hash of the bytes read, and
-// - the md5 hash of the bytes of the blob, as returned by BlobReader.Hash(),
-// - and error.
-func readBlob(ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobName string,
- size int64, offset int64, whence int) (content []byte, pos int64, hash []byte, fullHash []byte, err error) {
-
- var br *fs_cablobstore.BlobReader
- hasher := md5.New()
- br, err = fscabs.NewBlobReader(ctx, blobName)
- if err == nil {
- buf := make([]byte, 8192, 8192)
- fullHash = br.Hash()
- pos, err = br.Seek(offset, whence)
- if err == nil {
- var n int
- first := true // Read at least once, to test reading zero bytes.
- for err == nil && (size == -1 || int64(len(content)) < size || first) {
- // Read just what was asked for.
- var toRead []byte = buf
- if size >= 0 && int(size)-len(content) < len(buf) {
- toRead = buf[0 : int(size)-len(content)]
- }
- n, err = br.Read(toRead)
- hasher.Write(toRead[0:n])
- if size >= 0 && int64(len(content)+n) > size {
- n = int(size) - len(content)
- }
- content = append(content, toRead[0:n]...)
- first = false
- }
- }
- br.Close()
- }
- return content, pos, hasher.Sum(nil), fullHash, err
-}
-
-// checkWrittenBlobsAreReadable() checks that the blobs in blobVector[] can be
-// read, and that they contain the appropriate data.
-func checkWrittenBlobsAreReadable(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob) {
- for i := range blobVector {
- var size int64
- data := blobVector[i].content
- dataLen := int64(len(data))
- blobName := blobVector[i].blobName
- for size = -1; size != dataLen+1; size++ {
- var offset int64
- for offset = -dataLen - 1; offset != dataLen+1; offset++ {
- for whence := -1; whence != 4; whence++ {
- content, pos, hash, fullHash, err := readBlob(ctx, fscabs, blobName, size, offset, whence)
-
- // Compute expected seek position.
- expectedPos := offset
- if whence == 2 {
- expectedPos += dataLen
- }
-
- // Computed expected size.
- expectedSize := size
- if expectedSize == -1 || expectedPos+expectedSize > dataLen {
- expectedSize = dataLen - expectedPos
- }
-
- // Check that reads behave as expected.
- if (whence == -1 || whence == 3) &&
- verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errBadSeekWhence" {
- // Expected error from bad "whence" value.
- } else if expectedPos < 0 &&
- verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errNegativeSeekPosition" {
- // Expected error from negative Seek position.
- } else if expectedPos > dataLen &&
- verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errIllegalPositionForRead" {
- // Expected error from too high a Seek position.
- } else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
- bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == io.EOF &&
- pos == expectedPos && expectedPos+expectedSize == dataLen {
- // Expected success with EOF.
- } else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
- bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == nil &&
- pos == expectedPos && expectedPos+expectedSize != dataLen {
- if pos == 0 && size == -1 && bytes.Compare(hash, fullHash) != 0 {
- t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d; got hash %v, expected %v (blob is %q)",
- string(data), size, offset, whence,
- hash, fullHash, blobName)
- } // Else expected success without EOF.
- } else {
- t.Errorf("fs_cablobstore read test on %q size %d offset %d whence %d yields %q pos %d %v (blob is %q)",
- string(data), size, offset, whence,
- content, pos, err, blobName)
- }
- }
- }
- }
- }
-}
-
-// checkAllBlobs() checks all the blobs in *fscabs to ensure they correspond to
-// those in blobVector[].
-func checkAllBlobs(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, blobVector []testBlob, testDirName string) {
- blobCount := 0
- iterator := fscabs.ListBlobIds(ctx)
- for iterator.Advance() {
- fileName := iterator.Value()
- i := 0
- for ; i != len(blobVector) && fileName != blobVector[i].blobName; i++ {
- }
- if i == len(blobVector) {
- t.Errorf("fs_cablobstore.ListBlobIds found unexpected file %s", fileName)
- } else {
- content, pos, hash, fullHash, err := readBlob(ctx, fscabs, fileName, -1, 0, 0)
- if err != nil && err != io.EOF {
- t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
- } else if bytes.Compare(blobVector[i].content, content) != 0 {
- t.Errorf("fs_cablobstore.ListCAIds found unexpected blob content: %q, contains %q, expected %q",
- filepath.Join(testDirName, fileName), content, string(blobVector[i].content))
- } else if pos != 0 {
- t.Errorf("fs_cablobstore.ListCAIds Seek on %q returned %d instead of 0",
- filepath.Join(testDirName, fileName), pos)
- }
- if bytes.Compare(hash, fullHash) != 0 {
- t.Errorf("fs_cablobstore.ListCAIds read on %q; got hash %v, expected %v",
- fileName, hash, fullHash)
- }
- }
- blobCount++
- }
- if iterator.Err() != nil {
- t.Errorf("fs_cablobstore.ListBlobIds iteration failed: %v", iterator.Err())
- }
- if blobCount != len(blobVector) {
- t.Errorf("fs_cablobstore.ListBlobIds iteration expected 4 files, got %d", blobCount)
- }
-}
-
-// checkFragments() checks all the fragments in *fscabs to ensure they
-// correspond to those fragmentMap[].
-func checkFragments(t *testing.T, ctx *context.T, fscabs *fs_cablobstore.FsCaBlobStore, fragmentMap map[string]bool, testDirName string) {
- caCount := 0
- iterator := fscabs.ListCAIds(ctx)
- for iterator.Advance() {
- fileName := iterator.Value()
- content, err := ioutil.ReadFile(filepath.Join(testDirName, fileName))
- if err != nil && err != io.EOF {
- t.Errorf("fs_cablobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
- } else if !fragmentMap[string(content)] {
- t.Errorf("fs_cablobstore.ListCAIds found unexpected fragment entry: %q, contains %q", filepath.Join(testDirName, fileName), content)
- } else {
- hasher := md5.New()
- hasher.Write(content)
- hash := hasher.Sum(nil)
- nameFromContent := filepath.Join("cas",
- fmt.Sprintf("%02x", hash[0]),
- fmt.Sprintf("%02x", hash[1]),
- fmt.Sprintf("%02x", hash[2]),
- fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
- hash[3],
- hash[4], hash[5], hash[6], hash[7],
- hash[8], hash[9], hash[10], hash[11],
- hash[12], hash[13], hash[14], hash[15]))
- if nameFromContent != fileName {
- t.Errorf("fs_cablobstore.ListCAIds hash of fragment: got %q, expected %q (content=%s)", nameFromContent, fileName, string(content))
- }
- }
- caCount++
- }
- if iterator.Err() != nil {
- t.Errorf("fs_cablobstore.ListCAIds iteration failed: %v", iterator.Err())
- }
- if caCount != len(fragmentMap) {
- t.Errorf("fs_cablobstore.ListCAIds iteration expected %d files, got %d", len(fragmentMap), caCount)
- }
-}
-
// This test case tests adding files, retrieving them and deleting them. One
// can't retrieve or delete something that hasn't been created, so it's all one
// test case.
@@ -354,231 +25,19 @@
// Make a temporary directory.
var err error
var testDirName string
- testDirName, err = ioutil.TempDir("", "fs_cablobstore_test")
+ testDirName, err = ioutil.TempDir("", "localblobstore_test")
if err != nil {
- t.Fatalf("fs_cablobstore_test: can't make tmp directory: %v\n", err)
+ t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
}
defer os.RemoveAll(testDirName)
// Create an fs_cablobstore.
- var fscabs *fs_cablobstore.FsCaBlobStore
- fscabs, err = fs_cablobstore.Create(ctx, testDirName)
+ var bs localblobstore.BlobStore
+ bs, err = fs_cablobstore.Create(ctx, testDirName)
if err != nil {
t.Fatalf("fs_cablobstore.Create failed: %v", err)
}
- // Create the strings: "wom", "bat", "wombat", "batwom", "atwo", "atwoatwoombatatwo".
- womData := []byte("wom")
- batData := []byte("bat")
- wombatData := []byte("wombat")
- batwomData := []byte("batwom")
- atwoData := []byte("atwo")
- atwoatwoombatatwoData := []byte("atwoatwoombatatwo")
-
- // fragmentMap will have an entry per content-addressed fragment.
- fragmentMap := make(map[string]bool)
-
- // Create the blobs, by various means.
-
- var blobVector []testBlob // Accumulate the blobs we create here.
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- womData, false,
- blobOrBlockOrFile{block: womData})
- womName := blobVector[len(blobVector)-1].blobName
- fragmentMap[string(womData)] = true
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- batData, false,
- blobOrBlockOrFile{block: batData})
- batName := blobVector[len(blobVector)-1].blobName
- fragmentMap[string(batData)] = true
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- wombatData, false,
- blobOrBlockOrFile{block: wombatData})
- firstWombatName := blobVector[len(blobVector)-1].blobName
- fragmentMap[string(wombatData)] = true
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- wombatData, true,
- blobOrBlockOrFile{block: womData},
- blobOrBlockOrFile{block: batData})
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- wombatData, false,
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: -1,
- offset: 0})
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- wombatData, false,
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: 6,
- offset: 0})
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- batwomData, false,
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: 3,
- offset: 3},
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: 3,
- offset: 0})
- batwomName := blobVector[len(blobVector)-1].blobName
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- atwoData, false,
- blobOrBlockOrFile{
- blob: batwomName,
- size: 4,
- offset: 1})
- atwoName := blobVector[len(blobVector)-1].blobName
-
- blobVector = writeBlob(t, ctx, fscabs, blobVector,
- atwoatwoombatatwoData, true,
- blobOrBlockOrFile{
- blob: atwoName,
- size: -1,
- offset: 0},
- blobOrBlockOrFile{
- blob: atwoName,
- size: 4,
- offset: 0},
- blobOrBlockOrFile{
- blob: firstWombatName,
- size: -1,
- offset: 1},
- blobOrBlockOrFile{
- blob: batName,
- size: -1,
- offset: 1},
- blobOrBlockOrFile{
- blob: womName,
- size: 2,
- offset: 0})
- atwoatwoombatatwoName := blobVector[len(blobVector)-1].blobName
-
- // -------------------------------------------------
- // Check that the state is as we expect.
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Nothing should change if we garbage collect.
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Ensure that deleting non-existent blobs fails.
- err = fscabs.DeleteBlob(ctx, "../../../../etc/passwd")
- if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
- t.Errorf("DeleteBlob attempted to delete a bogus blob name")
- }
- err = fscabs.DeleteBlob(ctx, "foo/00/00/00/00000000000000000000000000")
- if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
- t.Errorf("DeleteBlob attempted to delete a bogus blob name")
- }
-
- // -------------------------------------------------
- // Delete a blob.
- err = fscabs.DeleteBlob(ctx, batName)
- if err != nil {
- t.Errorf("DeleteBlob failed to delete blob %q: %v", batName, err)
- }
- blobVector = removeBlobFromBlobVector(blobVector, batName)
-
- // -------------------------------------------------
- // Check that the state is as we expect.
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Nothing should change if we garbage collect.
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Open a BlobReader on a blob we're about to delete,
- // so its fragments won't be garbage collected.
-
- var br *fs_cablobstore.BlobReader
- br, err = fscabs.NewBlobReader(ctx, atwoatwoombatatwoName)
- if err != nil {
- t.Errorf("NewBlobReader failed in blob %q: %v", atwoatwoombatatwoName, err)
- }
-
- // -------------------------------------------------
- // Delete a blob. This should be the last on-disc reference to the
- // content-addressed fragment "bat", but the fragment won't be deleted
- // until close the reader and garbage collect.
- err = fscabs.DeleteBlob(ctx, atwoatwoombatatwoName)
- if err != nil {
- t.Errorf("DeleteBlob failed to delete blob %q: %v", atwoatwoombatatwoName, err)
- }
- blobVector = removeBlobFromBlobVector(blobVector, atwoatwoombatatwoName)
-
- // -------------------------------------------------
- // Check that the state is as we expect.
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Garbage collection should change nothing; the fragment involved
- // is still referenced from the open reader *br.
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
-
- // Close the open BlobReader and garbage collect.
- err = br.Close()
- if err != nil {
- t.Errorf("BlobReader.Close failed on blob %q: %v", atwoatwoombatatwoName, err)
- }
- delete(fragmentMap, string(batData))
-
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // Delete all blobs.
- for len(blobVector) != 0 {
- err = fscabs.DeleteBlob(ctx, blobVector[0].blobName)
- if err != nil {
- t.Errorf("DeleteBlob failed to delete blob %q: %v", blobVector[0].blobName, err)
- }
- blobVector = removeBlobFromBlobVector(blobVector, blobVector[0].blobName)
- }
-
- // -------------------------------------------------
- // Check that the state is as we expect.
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
-
- // -------------------------------------------------
- // The remaining fragments should be removed when we garbage collect.
- for frag := range fragmentMap {
- delete(fragmentMap, frag)
- }
- fscabs.GC(ctx)
- checkWrittenBlobsAreReadable(t, ctx, fscabs, blobVector)
- checkAllBlobs(t, ctx, fscabs, blobVector, testDirName)
- checkFragments(t, ctx, fscabs, fragmentMap, testDirName)
+ // Test it.
+ localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_test.go b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
new file mode 100644
index 0000000..973a9c8
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_test.go
@@ -0,0 +1,43 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test for localblobstore
+package localblobstore_test
+
+import "io/ioutil"
+import "os"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/localblobstore_testlib"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
+
+// This test case tests adding files, retrieving them and deleting them. One
+// can't retrieve or delete something that hasn't been created, so it's all one
+// test case.
+func TestAddRetrieveAndDelete(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ // Make a temporary directory.
+ var err error
+ var testDirName string
+ testDirName, err = ioutil.TempDir("", "localblobstore_test")
+ if err != nil {
+ t.Fatalf("localblobstore_test: can't make tmp directory: %v\n", err)
+ }
+ defer os.RemoveAll(testDirName)
+
+ // Create an fs_cablobstore.
+ var bs localblobstore.BlobStore
+ bs, err = fs_cablobstore.Create(ctx, testDirName)
+ if err != nil {
+ t.Fatalf("fs_cablobstore.Create failed: %v", err)
+ }
+
+ // Test it.
+ localblobstore_testlib.AddRetrieveAndDelete(t, ctx, bs, testDirName)
+}
diff --git a/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
new file mode 100644
index 0000000..662e964
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/localblobstore_testlib/localblobstore_testlib.go
@@ -0,0 +1,548 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test library for localblobstores.
+package localblobstore_testlib
+
+import "bytes"
+import "crypto/md5"
+import "fmt"
+import "io"
+import "io/ioutil"
+import "path/filepath"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore"
+import "v.io/v23/context"
+import "v.io/v23/verror"
+
+// A blobOrBlockOrFile represents some bytes that may be contained in a named
+// blob, a named file, or in an explicit slice of bytes.
+type blobOrBlockOrFile struct {
+ blob string // If non-empty, the name of the blob containing the bytes.
+ file string // If non-empty and blob is empty, the name of the file containing the bytes.
+ size int64 // Size of part of file or blob, or -1 for "everything until EOF".
+ offset int64 // Offset within file or blob.
+ block []byte // If both blob and file are empty, a slice containing the bytes.
+}
+
+// A testBlob records that some specified content has been stored with a given
+// blob name in the blob store.
+type testBlob struct {
+ content []byte // content that has been stored.
+ blobName string // the name of the blob.
+}
+
+// removeBlobFromBlobVector() removes the entry named blobName from
+// blobVector[], returning the new vector.
+func removeBlobFromBlobVector(blobVector []testBlob, blobName string) []testBlob {
+ n := len(blobVector)
+ i := 0
+ for i = 0; i != n && blobName != blobVector[i].blobName; i++ {
+ }
+ if i != n {
+ blobVector[i] = blobVector[n-1]
+ blobVector = blobVector[0 : n-1]
+ }
+ return blobVector
+}
+
+// writeBlob() writes a new blob to bs, and returns its name. The new
+// blob's content is described by the elements of data[]. Any error messages
+// generated include the index of the blob in blobVector and its content; the
+// latter is assumed to be printable. The expected content of the the blob is
+// "content", so that this routine can check it. If useResume is true, and data[]
+// has length more than 1, the function artificially uses ResumeBlobWriter(),
+// to test it.
+func writeBlob(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob,
+ content []byte, useResume bool, data ...blobOrBlockOrFile) []testBlob {
+ var bw localblobstore.BlobWriter
+ var err error
+ bw, err = bs.NewBlobWriter(ctx)
+ if err != nil {
+ t.Errorf("localblobstore.NewBlobWriter blob %d:%s failed: %v", len(blobVector), string(content), err)
+ }
+ blobName := bw.Name()
+
+ // Construct the blob from the pieces.
+ // There is a loop within the loop to exercise the possibility of
+ // passing multiple fragments to AppendFragment().
+ for i := 0; i != len(data) && err == nil; {
+ if len(data[i].blob) != 0 {
+ err = bw.AppendBlob(data[i].blob, data[i].size, data[i].offset)
+ if err != nil {
+ t.Errorf("localblobstore.AppendBlob %d:%s blob %s failed: %v", len(blobVector), string(content), data[i].blob, err)
+ }
+ i++
+ } else {
+ var pieces []localblobstore.BlockOrFile
+ for ; i != len(data) && len(data[i].blob) == 0; i++ {
+ if len(data[i].file) != 0 {
+ pieces = append(pieces, localblobstore.BlockOrFile{
+ FileName: data[i].file,
+ Size: data[i].size,
+ Offset: data[i].offset})
+ } else {
+ pieces = append(pieces, localblobstore.BlockOrFile{Block: data[i].block})
+ }
+ }
+ err = bw.AppendFragment(pieces...)
+ if err != nil {
+ t.Errorf("localblobstore.AppendFragment %d:%s failed on %v: %v", len(blobVector), string(content), pieces, err)
+ }
+ }
+ if useResume && i < len(data)-1 && err == nil {
+ err = bw.CloseWithoutFinalize()
+ if err == nil {
+ bw, err = bs.ResumeBlobWriter(ctx, blobName)
+ }
+ }
+ }
+
+ if bw != nil {
+ if bw.Size() != int64(len(content)) {
+ t.Errorf("localblobstore.Size before finalization %d:%s got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+ }
+ if bw.IsFinalized() {
+ t.Errorf("localblobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+ }
+ err = bw.Close()
+ if err != nil {
+ t.Errorf("localblobstore.Close %d:%s failed: %v", len(blobVector), string(content), err)
+ }
+ if !bw.IsFinalized() {
+ t.Errorf("localblobstore.IsFinalized %d:%s got true, expected false", len(blobVector), string(content))
+ }
+ if bw.Size() != int64(len(content)) {
+ t.Errorf("localblobstore.Size %d:%s after finalization got %d, expected %d", len(blobVector), string(content), bw.Size(), len(content))
+ }
+ if bw.Name() != blobName {
+ t.Errorf("localblobstore %d:%s name changed when finalized was %s now %s", len(blobVector), string(content), blobName, bw.Name())
+ }
+ hasher := md5.New()
+ hasher.Write(content)
+ if bytes.Compare(bw.Hash(), hasher.Sum(nil)) != 0 {
+ t.Errorf("localblobstore %d:%s BlobWriter.Hash got %v, expected %v", len(blobVector), string(content), bw.Hash(), hasher.Sum(nil))
+ }
+ }
+
+ return append(blobVector,
+ testBlob{
+ content: content,
+ blobName: blobName,
+ })
+}
+
+// readBlob() returns a substring of the content of the blob named blobName in bs.
+// The return values are:
+// - the "size" bytes from the content, starting at the given "offset",
+// measured from "whence" (as defined by io.Seeker.Seek).
+// - the position to which BlobBeader seeks to,
+// - the md5 hash of the bytes read, and
+// - the md5 hash of the bytes of the blob, as returned by BlobReader.Hash(),
+// - and error.
+func readBlob(ctx *context.T, bs localblobstore.BlobStore, blobName string,
+ size int64, offset int64, whence int) (content []byte, pos int64, hash []byte, fullHash []byte, err error) {
+
+ var br localblobstore.BlobReader
+ hasher := md5.New()
+ br, err = bs.NewBlobReader(ctx, blobName)
+ if err == nil {
+ buf := make([]byte, 8192, 8192)
+ fullHash = br.Hash()
+ pos, err = br.Seek(offset, whence)
+ if err == nil {
+ var n int
+ first := true // Read at least once, to test reading zero bytes.
+ for err == nil && (size == -1 || int64(len(content)) < size || first) {
+ // Read just what was asked for.
+ var toRead []byte = buf
+ if size >= 0 && int(size)-len(content) < len(buf) {
+ toRead = buf[0 : int(size)-len(content)]
+ }
+ n, err = br.Read(toRead)
+ hasher.Write(toRead[0:n])
+ if size >= 0 && int64(len(content)+n) > size {
+ n = int(size) - len(content)
+ }
+ content = append(content, toRead[0:n]...)
+ first = false
+ }
+ }
+ br.Close()
+ }
+ return content, pos, hasher.Sum(nil), fullHash, err
+}
+
+// checkWrittenBlobsAreReadable() checks that the blobs in blobVector[] can be
+// read, and that they contain the appropriate data.
+func checkWrittenBlobsAreReadable(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob) {
+ for i := range blobVector {
+ var size int64
+ data := blobVector[i].content
+ dataLen := int64(len(data))
+ blobName := blobVector[i].blobName
+ for size = -1; size != dataLen+1; size++ {
+ var offset int64
+ for offset = -dataLen - 1; offset != dataLen+1; offset++ {
+ for whence := -1; whence != 4; whence++ {
+ content, pos, hash, fullHash, err := readBlob(ctx, bs, blobName, size, offset, whence)
+
+ // Compute expected seek position.
+ expectedPos := offset
+ if whence == 2 {
+ expectedPos += dataLen
+ }
+
+ // Computed expected size.
+ expectedSize := size
+ if expectedSize == -1 || expectedPos+expectedSize > dataLen {
+ expectedSize = dataLen - expectedPos
+ }
+
+ // Check that reads behave as expected.
+ if (whence == -1 || whence == 3) &&
+ verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errBadSeekWhence" {
+ // Expected error from bad "whence" value.
+ } else if expectedPos < 0 &&
+ verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errNegativeSeekPosition" {
+ // Expected error from negative Seek position.
+ } else if expectedPos > dataLen &&
+ verror.ErrorID(err) == "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errIllegalPositionForRead" {
+ // Expected error from too high a Seek position.
+ } else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+ bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == io.EOF &&
+ pos == expectedPos && expectedPos+expectedSize == dataLen {
+ // Expected success with EOF.
+ } else if 0 <= expectedPos && expectedPos+expectedSize <= int64(len(data)) &&
+ bytes.Compare(data[expectedPos:expectedPos+expectedSize], content) == 0 && err == nil &&
+ pos == expectedPos && expectedPos+expectedSize != dataLen {
+ if pos == 0 && size == -1 && bytes.Compare(hash, fullHash) != 0 {
+ t.Errorf("localblobstore read test on %q size %d offset %d whence %d; got hash %v, expected %v (blob is %q)",
+ string(data), size, offset, whence,
+ hash, fullHash, blobName)
+ } // Else expected success without EOF.
+ } else {
+ t.Errorf("localblobstore read test on %q size %d offset %d whence %d yields %q pos %d %v (blob is %q)",
+ string(data), size, offset, whence,
+ content, pos, err, blobName)
+ }
+ }
+ }
+ }
+ }
+}
+
+// checkAllBlobs() checks all the blobs in bs to ensure they correspond to
+// those in blobVector[].
+func checkAllBlobs(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, blobVector []testBlob, testDirName string) {
+ blobCount := 0
+ iterator := bs.ListBlobIds(ctx)
+ for iterator.Advance() {
+ fileName := iterator.Value()
+ i := 0
+ for ; i != len(blobVector) && fileName != blobVector[i].blobName; i++ {
+ }
+ if i == len(blobVector) {
+ t.Errorf("localblobstore.ListBlobIds found unexpected file %s", fileName)
+ } else {
+ content, pos, hash, fullHash, err := readBlob(ctx, bs, fileName, -1, 0, 0)
+ if err != nil && err != io.EOF {
+ t.Errorf("localblobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+ } else if bytes.Compare(blobVector[i].content, content) != 0 {
+ t.Errorf("localblobstore.ListCAIds found unexpected blob content: %q, contains %q, expected %q",
+ filepath.Join(testDirName, fileName), content, string(blobVector[i].content))
+ } else if pos != 0 {
+ t.Errorf("localblobstore.ListCAIds Seek on %q returned %d instead of 0",
+ filepath.Join(testDirName, fileName), pos)
+ }
+ if bytes.Compare(hash, fullHash) != 0 {
+ t.Errorf("localblobstore.ListCAIds read on %q; got hash %v, expected %v",
+ fileName, hash, fullHash)
+ }
+ }
+ blobCount++
+ }
+ if iterator.Err() != nil {
+ t.Errorf("localblobstore.ListBlobIds iteration failed: %v", iterator.Err())
+ }
+ if blobCount != len(blobVector) {
+ t.Errorf("localblobstore.ListBlobIds iteration expected 4 files, got %d", blobCount)
+ }
+}
+
+// checkFragments() checks all the fragments in bs to ensure they
+// correspond to those fragmentMap[], iff testDirName is non-empty.
+func checkFragments(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, fragmentMap map[string]bool, testDirName string) {
+ if testDirName != "" {
+ caCount := 0
+ iterator := bs.ListCAIds(ctx)
+ for iterator.Advance() {
+ fileName := iterator.Value()
+ content, err := ioutil.ReadFile(filepath.Join(testDirName, fileName))
+ if err != nil && err != io.EOF {
+ t.Errorf("localblobstore.ListCAIds can't read %q: %v", filepath.Join(testDirName, fileName), err)
+ } else if !fragmentMap[string(content)] {
+ t.Errorf("localblobstore.ListCAIds found unexpected fragment entry: %q, contains %q", filepath.Join(testDirName, fileName), content)
+ } else {
+ hasher := md5.New()
+ hasher.Write(content)
+ hash := hasher.Sum(nil)
+ nameFromContent := filepath.Join("cas",
+ fmt.Sprintf("%02x", hash[0]),
+ fmt.Sprintf("%02x", hash[1]),
+ fmt.Sprintf("%02x", hash[2]),
+ fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+ hash[3],
+ hash[4], hash[5], hash[6], hash[7],
+ hash[8], hash[9], hash[10], hash[11],
+ hash[12], hash[13], hash[14], hash[15]))
+ if nameFromContent != fileName {
+ t.Errorf("localblobstore.ListCAIds hash of fragment: got %q, expected %q (content=%s)", nameFromContent, fileName, string(content))
+ }
+ }
+ caCount++
+ }
+ if iterator.Err() != nil {
+ t.Errorf("localblobstore.ListCAIds iteration failed: %v", iterator.Err())
+ }
+ if caCount != len(fragmentMap) {
+ t.Errorf("localblobstore.ListCAIds iteration expected %d files, got %d", len(fragmentMap), caCount)
+ }
+ }
+}
+
+// AddRetrieveAndDelete() tests adding, retrieving, and deleting blobs from a
+// blobstore bs. One can't retrieve or delete something that hasn't been
+// created, so it's all done in one routine. If testDirName is non-empty,
+// the blobstore is assumed to be accessible in the file system, and its
+// files are checked.
+func AddRetrieveAndDelete(t *testing.T, ctx *context.T, bs localblobstore.BlobStore, testDirName string) {
+ var err error
+
+ // Check that there are no files in the blobstore we were given.
+ iterator := bs.ListBlobIds(ctx)
+ for iterator.Advance() {
+ fileName := iterator.Value()
+ t.Errorf("unexpected file %q\n", fileName)
+ }
+ if iterator.Err() != nil {
+ t.Errorf("localblobstore.ListBlobIds iteration failed: %v", iterator.Err())
+ }
+
+ // Create the strings: "wom", "bat", "wombat", "batwom", "atwo", "atwoatwoombatatwo".
+ womData := []byte("wom")
+ batData := []byte("bat")
+ wombatData := []byte("wombat")
+ batwomData := []byte("batwom")
+ atwoData := []byte("atwo")
+ atwoatwoombatatwoData := []byte("atwoatwoombatatwo")
+
+ // fragmentMap will have an entry per content-addressed fragment.
+ fragmentMap := make(map[string]bool)
+
+ // Create the blobs, by various means.
+
+ var blobVector []testBlob // Accumulate the blobs we create here.
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ womData, false,
+ blobOrBlockOrFile{block: womData})
+ womName := blobVector[len(blobVector)-1].blobName
+ fragmentMap[string(womData)] = true
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ batData, false,
+ blobOrBlockOrFile{block: batData})
+ batName := blobVector[len(blobVector)-1].blobName
+ fragmentMap[string(batData)] = true
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ wombatData, false,
+ blobOrBlockOrFile{block: wombatData})
+ firstWombatName := blobVector[len(blobVector)-1].blobName
+ fragmentMap[string(wombatData)] = true
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ wombatData, true,
+ blobOrBlockOrFile{block: womData},
+ blobOrBlockOrFile{block: batData})
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ wombatData, false,
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: -1,
+ offset: 0})
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ wombatData, false,
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: 6,
+ offset: 0})
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ batwomData, false,
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: 3,
+ offset: 3},
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: 3,
+ offset: 0})
+ batwomName := blobVector[len(blobVector)-1].blobName
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ atwoData, false,
+ blobOrBlockOrFile{
+ blob: batwomName,
+ size: 4,
+ offset: 1})
+ atwoName := blobVector[len(blobVector)-1].blobName
+
+ blobVector = writeBlob(t, ctx, bs, blobVector,
+ atwoatwoombatatwoData, true,
+ blobOrBlockOrFile{
+ blob: atwoName,
+ size: -1,
+ offset: 0},
+ blobOrBlockOrFile{
+ blob: atwoName,
+ size: 4,
+ offset: 0},
+ blobOrBlockOrFile{
+ blob: firstWombatName,
+ size: -1,
+ offset: 1},
+ blobOrBlockOrFile{
+ blob: batName,
+ size: -1,
+ offset: 1},
+ blobOrBlockOrFile{
+ blob: womName,
+ size: 2,
+ offset: 0})
+ atwoatwoombatatwoName := blobVector[len(blobVector)-1].blobName
+
+ // -------------------------------------------------
+ // Check that the state is as we expect.
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Nothing should change if we garbage collect.
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Ensure that deleting non-existent blobs fails.
+ err = bs.DeleteBlob(ctx, "../../../../etc/passwd")
+ if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+ t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+ }
+ err = bs.DeleteBlob(ctx, "foo/00/00/00/00000000000000000000000000")
+ if verror.ErrorID(err) != "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore.errInvalidBlobName" {
+ t.Errorf("DeleteBlob attempted to delete a bogus blob name")
+ }
+
+ // -------------------------------------------------
+ // Delete a blob.
+ err = bs.DeleteBlob(ctx, batName)
+ if err != nil {
+ t.Errorf("DeleteBlob failed to delete blob %q: %v", batName, err)
+ }
+ blobVector = removeBlobFromBlobVector(blobVector, batName)
+
+ // -------------------------------------------------
+ // Check that the state is as we expect.
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Nothing should change if we garbage collect.
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Open a BlobReader on a blob we're about to delete,
+ // so its fragments won't be garbage collected.
+
+ var br localblobstore.BlobReader
+ br, err = bs.NewBlobReader(ctx, atwoatwoombatatwoName)
+ if err != nil {
+ t.Errorf("NewBlobReader failed in blob %q: %v", atwoatwoombatatwoName, err)
+ }
+
+ // -------------------------------------------------
+ // Delete a blob. This should be the last on-disc reference to the
+ // content-addressed fragment "bat", but the fragment won't be deleted
+ // until close the reader and garbage collect.
+ err = bs.DeleteBlob(ctx, atwoatwoombatatwoName)
+ if err != nil {
+ t.Errorf("DeleteBlob failed to delete blob %q: %v", atwoatwoombatatwoName, err)
+ }
+ blobVector = removeBlobFromBlobVector(blobVector, atwoatwoombatatwoName)
+
+ // -------------------------------------------------
+ // Check that the state is as we expect.
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Garbage collection should change nothing; the fragment involved
+ // is still referenced from the open reader *br.
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+
+ // Close the open BlobReader and garbage collect.
+ err = br.Close()
+ if err != nil {
+ t.Errorf("BlobReader.Close failed on blob %q: %v", atwoatwoombatatwoName, err)
+ }
+ delete(fragmentMap, string(batData))
+
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // Delete all blobs.
+ for len(blobVector) != 0 {
+ err = bs.DeleteBlob(ctx, blobVector[0].blobName)
+ if err != nil {
+ t.Errorf("DeleteBlob failed to delete blob %q: %v", blobVector[0].blobName, err)
+ }
+ blobVector = removeBlobFromBlobVector(blobVector, blobVector[0].blobName)
+ }
+
+ // -------------------------------------------------
+ // Check that the state is as we expect.
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+
+ // -------------------------------------------------
+ // The remaining fragments should be removed when we garbage collect.
+ for frag := range fragmentMap {
+ delete(fragmentMap, frag)
+ }
+ bs.GC(ctx)
+ checkWrittenBlobsAreReadable(t, ctx, bs, blobVector)
+ checkAllBlobs(t, ctx, bs, blobVector, testDirName)
+ checkFragments(t, ctx, bs, fragmentMap, testDirName)
+}
diff --git a/x/ref/services/syncbase/localblobstore/model.go b/x/ref/services/syncbase/localblobstore/model.go
new file mode 100644
index 0000000..b1a4565
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/model.go
@@ -0,0 +1,170 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package localblobstore is the interface to a local blob store.
+// Implementations include fs_cablobstore.
+package localblobstore
+
+import "v.io/v23/context"
+
+// A BlobStore represents a simple, content-addressable store.
+type BlobStore interface {
+ // NewBlobReader() returns a pointer to a newly allocated BlobReader on
+ // the specified blobName. BlobReaders should not be used concurrently
+ // by multiple threads. Returned handles should be closed with
+ // Close().
+ NewBlobReader(ctx *context.T, blobName string) (br BlobReader, err error)
+
+ // NewBlobWriter() returns a pointer to a newly allocated BlobWriter on
+ // a newly created blob name, which can be found using the Name()
+ // method. BlobWriters should not be used concurrently by multiple
+ // threads. The returned handle should be closed with either the
+ // Close() or CloseWithoutFinalize() method to avoid leaking file
+ // handles.
+ NewBlobWriter(ctx *context.T) (bw BlobWriter, err error)
+
+ // ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on
+ // an old, but unfinalized blob name.
+ ResumeBlobWriter(ctx *context.T, blobName string) (bw BlobWriter, err error)
+
+ // DeleteBlob() deletes the named blob from the BlobStore.
+ DeleteBlob(ctx *context.T, blobName string) (err error)
+
+ // GC() removes old temp files and content-addressed blocks that are no
+ // longer referenced by any blob. It may be called concurrently with
+ // other calls to GC(), and with uses of BlobReaders and BlobWriters.
+ GC(ctx *context.T) error
+
+ // ListBlobIds() returns an iterator that can be used to enumerate the
+ // blobs in a BlobStore. Expected use is:
+ //
+ // iter := bs.ListBlobIds(ctx)
+ // for iter.Advance() {
+ // // Process iter.Value() here.
+ // }
+ // if iter.Err() != nil {
+ // // The loop terminated early due to an error.
+ // }
+ ListBlobIds(ctx *context.T) (iter Iter)
+
+ // ListCAIds() returns an iterator that can be used to enumerate the
+ // content-addressable fragments in a BlobStore. Expected use is:
+ //
+ // iter := bs.ListCAIds(ctx)
+ // for iter.Advance() {
+ // // Process iter.Value() here.
+ // }
+ // if iter.Err() != nil {
+ // // The loop terminated early due to an error.
+ // }
+ ListCAIds(ctx *context.T) (iter Iter)
+
+ // Root() returns the name of the root directory where the BlobStore is stored.
+ Root() string
+}
+
+// A BlobReader allows a blob to be read using the standard ReadAt(), Read(),
+// and Seek() calls. A BlobReader can be created with NewBlobReader(), and
+// should be closed with the Close() method to avoid leaking file handles.
+type BlobReader interface {
+ // ReadAt() fills b[] with up to len(b) bytes of data starting at
+ // position "at" within the blob that the BlobReader indicates, and
+ // returns the number of bytes read.
+ ReadAt(b []byte, at int64) (n int, err error)
+
+ // Read() fills b[] with up to len(b) bytes of data starting at the
+ // current seek position of the BlobReader within the blob that the
+ // BlobReader indicates, and then both returns the number of bytes read
+ // and advances the BlobReader's seek position by that amount.
+ Read(b []byte) (n int, err error)
+
+ // Seek() sets the seek position of the BlobReader to offset if
+ // whence==0, offset+current_seek_position if whence==1, and
+ // offset+end_of_blob if whence==2, and then returns the current seek
+ // position.
+ Seek(offset int64, whence int) (result int64, err error)
+
+ // Close() indicates that the client will perform no further operations
+ // on the BlobReader. It releases any resources held by the
+ // BlobReader.
+ Close() error
+
+ // Name() returns the BlobReader's name.
+ Name() string
+
+ // Size() returns the BlobReader's size.
+ Size() int64
+
+ // IsFinalized() returns whether the BlobReader has been finalized.
+ IsFinalized() bool
+
+ // Hash() returns the BlobReader's hash. It may be nil if the blob is
+ // not finalized.
+ Hash() []byte
+}
+
+// A BlockOrFile represents a vector of bytes, and contains either a data
+// block (as a []byte), or a (file name, size, offset) triple.
+type BlockOrFile struct {
+ Block []byte // If FileName is empty, the bytes represented.
+ FileName string // If non-empty, the name of the file containing the bytes.
+ Size int64 // If FileName is non-empty, the number of bytes (or -1 for "all")
+ Offset int64 // If FileName is non-empty, the offset of the relevant bytes within the file.
+}
+
+// A BlobWriter allows a blob to be written. If a blob has not yet been
+// finalized, it also allows that blob to be extended. A BlobWriter may be
+// created with NewBlobWriter(), and should be closed with Close() or
+// CloseWithoutFinalize().
+type BlobWriter interface {
+ // AppendBlob() adds a (substring of a) pre-existing blob to the blob
+ // being written by the BlobWriter. The fragments of the pre-existing
+ // blob are not physically copied; they are referenced by both blobs.
+ AppendBlob(blobName string, size int64, offset int64) (err error)
+
+ // AppendFragment() appends a fragment to the blob being written by the
+ // BlobWriter, where the fragment is composed of the byte vectors
+ // described by the elements of item[]. The fragment is copied into
+ // the blob store.
+ AppendFragment(item ...BlockOrFile) (err error)
+
+ // Close() finalizes the BlobWriter, and indicates that the client will
+ // perform no further append operations on the BlobWriter. Any
+ // internal open file handles are closed.
+ Close() (err error)
+
+ // CloseWithoutFinalize() indicates that the client will perform no
+ // further append operations on the BlobWriter, but does not finalize
+ // the blob. Any internal open file handles are closed. Clients are
+ // expected to need this operation infrequently.
+ CloseWithoutFinalize() (err error)
+
+ // Name() returns the BlobWriter's name.
+ Name() string
+
+ // Size() returns the BlobWriter's size.
+ Size() int64
+
+ // IsFinalized() returns whether the BlobWriter has been finalized.
+ IsFinalized() bool
+
+ // Hash() returns the BlobWriter's hash, reflecting the bytes written so far.
+ Hash() []byte
+}
+
+// A Iter represents an iterator that allows the client to enumerate
+// all the blobs of fragments in a BlobStore.
+type Iter interface {
+ // Advance() stages an item so that it may be retrieved via Value.
+ // Returns true iff there is an item to retrieve. Advance must be
+ // called before Value is called.
+ Advance() (advanced bool)
+
+ // Value() returns the item that was staged by Advance. May panic if
+ // Advance returned false or was not called. Never blocks.
+ Value() (name string)
+
+ // Err() returns any error encountered by Advance. Never blocks.
+ Err() error
+}
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index b35afce..827eaf9 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -161,7 +161,7 @@
return err
}
// Check for "database already exists".
- if _, err := a.getDbInfo(ctx, call, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -172,7 +172,7 @@
info := &dbInfo{
Name: dbName,
}
- return a.putDbInfo(ctx, call, st, dbName, info)
+ return a.putDbInfo(ctx, st, dbName, info)
}); err != nil {
return err
}
@@ -192,7 +192,7 @@
// 3. Flip dbInfo.Initialized to true.
if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+ return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
info.Initialized = true
return nil
})
@@ -234,7 +234,7 @@
// 2. Flip dbInfo.Deleted to true.
if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, call, st, dbName, func(info *dbInfo) error {
+ return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
info.Deleted = true
return nil
})
@@ -251,7 +251,7 @@
}
// 4. Delete dbInfo record.
- if err := a.delDbInfo(ctx, call, a.s.st, dbName); err != nil {
+ if err := a.delDbInfo(ctx, a.s.st, dbName); err != nil {
return err
}
diff --git a/x/ref/services/syncbase/server/db_info.go b/x/ref/services/syncbase/server/db_info.go
index 98216bc..cbfcebd 100644
--- a/x/ref/services/syncbase/server/db_info.go
+++ b/x/ref/services/syncbase/server/db_info.go
@@ -17,7 +17,6 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
- "v.io/v23/rpc"
)
type dbInfoLayer struct {
@@ -49,9 +48,9 @@
// getDbInfo reads data from the storage engine.
// Returns a VDL-compatible error.
-func (a *app) getDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
info := &dbInfo{}
- if err := util.GetWithoutAuth(ctx, call, st, &dbInfoLayer{dbName, a}, info); err != nil {
+ if err := util.GetWithoutAuth(ctx, st, &dbInfoLayer{dbName, a}, info); err != nil {
return nil, err
}
return info, nil
@@ -59,27 +58,27 @@
// putDbInfo writes data to the storage engine.
// Returns a VDL-compatible error.
-func (a *app) putDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string, info *dbInfo) error {
- return util.Put(ctx, call, st, &dbInfoLayer{dbName, a}, info)
+func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
+ return util.Put(ctx, st, &dbInfoLayer{dbName, a}, info)
}
// delDbInfo deletes data from the storage engine.
// Returns a VDL-compatible error.
-func (a *app) delDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreWriter, dbName string) error {
- return util.Delete(ctx, call, st, &dbInfoLayer{dbName, a})
+func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
+ return util.Delete(ctx, st, &dbInfoLayer{dbName, a})
}
// updateDbInfo performs a read-modify-write.
// fn should "modify" v, and should return a VDL-compatible error.
// Returns a VDL-compatible error.
-func (a *app) updateDbInfo(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
+func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
_ = st.(store.Transaction) // panics on failure, as desired
- info, err := a.getDbInfo(ctx, call, st, dbName)
+ info, err := a.getDbInfo(ctx, st, dbName)
if err != nil {
return err
}
if err := fn(info); err != nil {
return err
}
- return a.putDbInfo(ctx, call, st, dbName, info)
+ return a.putDbInfo(ctx, st, dbName, info)
}
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 0971923..d18ab04 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -15,7 +15,6 @@
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase/nosql/query_db"
"v.io/syncbase/v23/syncbase/nosql/query_exec"
- prefixutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -103,7 +102,7 @@
Name: d.name,
Perms: opts.Perms,
}
- if err := util.Put(ctx, call, d.st, d, data); err != nil {
+ if err := util.Put(ctx, d.st, d, data); err != nil {
return nil, err
}
return d, nil
@@ -404,20 +403,20 @@
req *tableReq
}
-func (t *tableDb) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+func (t *tableDb) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
return &kvs{
- t: t,
- prefixes: prefixes,
- curr: -1,
- validRow: false,
- it: nil,
- err: nil,
+ t: t,
+ keyRanges: keyRanges,
+ curr: -1,
+ validRow: false,
+ it: nil,
+ err: nil,
}, nil
}
type kvs struct {
t *tableDb
- prefixes []string
+ keyRanges query_db.KeyRanges
curr int // current index into prefixes, -1 at start
validRow bool
currKey string
@@ -433,16 +432,22 @@
if s.curr == -1 {
s.curr++
}
- for s.curr < len(s.prefixes) {
+ for s.curr < len(s.keyRanges) {
if s.it == nil {
- start := prefixutil.PrefixRangeStart(s.prefixes[s.curr])
- limit := prefixutil.PrefixRangeLimit(s.prefixes[s.curr])
- s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), string(start), string(limit)))
+ start := s.keyRanges[s.curr].Start
+ limit := s.keyRanges[s.curr].Limit
+ // 0-255 means examine all rows
+ if start == string([]byte{0}) && limit == string([]byte{255}) {
+ start = ""
+ limit = ""
+ }
+ s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), start, limit))
}
if s.it.Advance() {
// key
keyBytes := s.it.Key(nil)
parts := util.SplitKeyParts(string(keyBytes))
+ // TODO(rogulenko): Check access for the key.
s.currKey = parts[len(parts)-1]
// value
valueBytes := s.it.Value(nil)
@@ -489,8 +494,8 @@
s.it.Cancel()
s.it = nil
}
- // set curr to end of prefixes so Advance will return false
- s.curr = len(s.prefixes)
+ // set curr to end of keyRanges so Advance will return false
+ s.curr = len(s.keyRanges)
}
////////////////////////////////////////
diff --git a/x/ref/services/syncbase/server/nosql/row.go b/x/ref/services/syncbase/server/nosql/row.go
index 3397fd1..542224a 100644
--- a/x/ref/services/syncbase/server/nosql/row.go
+++ b/x/ref/services/syncbase/server/nosql/row.go
@@ -91,11 +91,10 @@
}
// checkAccess checks that this row's table exists in the database, and performs
-// an authorization check (currently against the table perms).
+// an authorization check.
// Returns a VDL-compatible error.
-// TODO(sadovsky): Use prefix permissions.
func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
- return util.Get(ctx, call, st, r.t, &tableData{})
+ return r.t.checkAccess(ctx, call, st, r.key)
}
// get reads data from the storage engine.
diff --git a/x/ref/services/syncbase/server/nosql/table.go b/x/ref/services/syncbase/server/nosql/table.go
index 7619fc4..82a0cab 100644
--- a/x/ref/services/syncbase/server/nosql/table.go
+++ b/x/ref/services/syncbase/server/nosql/table.go
@@ -5,6 +5,8 @@
package nosql
import (
+ "strings"
+
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
@@ -12,6 +14,7 @@
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
+ "v.io/v23/vom"
)
// tableReq is a per-request object that handles Table RPCs.
@@ -39,7 +42,7 @@
return err
}
// Check for "table already exists".
- if err := util.GetWithoutAuth(ctx, call, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.GetWithoutAuth(ctx, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -54,7 +57,7 @@
Name: t.name,
Perms: perms,
}
- return util.Put(ctx, call, st, t, data)
+ return util.Put(ctx, st, t, data)
})
}
@@ -71,20 +74,30 @@
return err
}
// TODO(sadovsky): Delete all rows in this table.
- return util.Delete(ctx, call, st, t)
+ return util.Delete(ctx, st, t)
})
}
func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
impl := func(st store.StoreReadWriter) error {
- // Check perms.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ // Check for table-level access before doing a scan.
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
return err
}
it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
key := []byte{}
for it.Advance() {
key = it.Key(key)
+ // Check perms.
+ parts := util.SplitKeyParts(string(key))
+ externalKey := parts[len(parts)-1]
+ if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ // TODO(rogulenko): Revisit this behavior. Probably we should
+ // delete all rows that we have access to.
+ it.Cancel()
+ return err
+ }
+ // Delete the key-value pair.
if err := st.Delete(key); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -107,8 +120,8 @@
func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
impl := func(st store.StoreReader) error {
- // Check perms.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ // Check for table-level access before doing a scan.
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
return err
}
it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
@@ -116,8 +129,14 @@
key, value := []byte{}, []byte{}
for it.Advance() {
key, value = it.Key(key), it.Value(value)
+ // Check perms.
parts := util.SplitKeyParts(string(key))
- sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+ externalKey := parts[len(parts)-1]
+ if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ it.Cancel()
+ return err
+ }
+ sender.Send(wire.KeyValue{Key: externalKey, Value: value})
}
if err := it.Err(); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
@@ -135,38 +154,27 @@
return impl(st)
}
-func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
- if prefix != "" {
- return verror.NewErrNotImplemented(ctx)
- }
- impl := func(st store.StoreReadWriter) error {
- data := &tableData{}
- return util.Update(ctx, call, st, t, data, func() error {
- data.Perms = perms
- return nil
- })
- }
- if t.d.batchId != nil {
- if st, err := t.d.batchReadWriter(); err != nil {
- return err
- } else {
- return impl(st)
- }
- } else {
- return store.RunInTransaction(t.d.st, impl)
- }
-}
-
func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
- if key != "" {
- return nil, verror.NewErrNotImplemented(ctx)
- }
impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
- data := &tableData{}
- if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+ // Check permissions only at table level.
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
return nil, err
}
- return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+ // Get the most specific permissions object.
+ prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+ if err != nil {
+ return nil, err
+ }
+ result := []wire.PrefixPermissions{{Prefix: prefix, Perms: prefixPerms.Perms}}
+ // Collect all parent permissions objects all the way up to the table level.
+ for prefix != "" {
+ prefix = prefixPerms.Parent
+ if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+ return nil, err
+ }
+ result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
+ }
+ return result, nil
}
var st store.StoreReader
if t.d.batchId != nil {
@@ -179,17 +187,114 @@
return impl(st)
}
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+ impl := func(st store.StoreReadWriter) error {
+ if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ return err
+ }
+ // Concurrent transactions that touch this table should fail with
+ // ErrConcurrentTransaction when this transaction commits.
+ if err := t.lock(ctx, st); err != nil {
+ return err
+ }
+ if prefix == "" {
+ data := &tableData{}
+ return util.Update(ctx, call, st, t, data, func() error {
+ data.Perms = perms
+ return nil
+ })
+ }
+ // Get the most specific permissions object.
+ parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ if err != nil {
+ return err
+ }
+ // In case there is no permissions object for the given prefix, we need
+ // to add a new node to the prefix permissions tree. We do it by updating
+ // parents for all children of the prefix to the node corresponding to
+ // the prefix.
+ if parent != prefix {
+ if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+ return err
+ }
+ } else {
+ parent = prefixPerms.Parent
+ }
+ stPrefix := t.prefixPermsKey(prefix)
+ stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
+ prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
+ // Put the (prefix, perms) pair to the database.
+ if err := util.PutObject(st, stPrefix, prefixPerms); err != nil {
+ return err
+ }
+ return util.PutObject(st, stPrefixLimit, prefixPerms)
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
+}
+
func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
- return verror.NewErrNotImplemented(ctx)
+ if prefix == "" {
+ return verror.New(verror.ErrBadArg, ctx, prefix)
+ }
+ impl := func(st store.StoreReadWriter) error {
+ if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ return err
+ }
+ // Concurrent transactions that touch this table should fail with
+ // ErrConcurrentTransaction when this transaction commits.
+ if err := t.lock(ctx, st); err != nil {
+ return err
+ }
+ // Get the most specific permissions object.
+ parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ if err != nil {
+ return err
+ }
+ if parent != prefix {
+ // This can happen only if there is no permissions object for the
+ // given prefix. Since DeletePermissions is idempotent, return nil.
+ return nil
+ }
+ // We need to delete the node corresponding to the prefix from the prefix
+ // permissions tree. We do it by updating parents for all children of the
+ // prefix to the parent of the node corresponding to the prefix.
+ if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+ return err
+ }
+ stPrefix := []byte(t.prefixPermsKey(prefix))
+ stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+ if err := st.Delete(stPrefix); err != nil {
+ return err
+ }
+ return st.Delete(stPrefixLimit)
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
}
func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
// Check perms.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ if err := t.checkAccess(ctx, call, st, ""); err != nil {
closeStoreReader()
return nil, err
}
+ // TODO(rogulenko): Check prefix permissions for children.
return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
}
var st store.StoreReader
@@ -226,3 +331,145 @@
func (t *tableReq) stKeyPart() string {
return t.name
}
+
+// updateParentRefs updates the parent for all children of the given
+// prefix to newParent.
+func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+ stPrefix := []byte(t.prefixPermsKey(prefix))
+ stPrefixStart := append(stPrefix, 0)
+ stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
+ it := st.Scan(stPrefixStart, stPrefixLimit)
+ var key, value []byte
+ for it.Advance() {
+ key, value = it.Key(key), it.Value(value)
+ var prefixPerms stPrefixPerms
+ if err := vom.Decode(value, &prefixPerms); err != nil {
+ it.Cancel()
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ prefixPerms.Parent = newParent
+ if err := util.PutObject(st, string(key), prefixPerms); err != nil {
+ it.Cancel()
+ return err
+ }
+ }
+ if err := it.Err(); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// lock invalidates all concurrent transactions with ErrConcurrentTransaction
+// that have accessed this table.
+// Returns a VDL-compatible error.
+//
+// It is necessary to call lock() every time prefix permissions are updated,
+// so snapshots inside all transactions reflect up-to-date permissions. Since
+// every public function that touches this table has to read the table-level
+// permissions object, it is enough to add the key of table-level permissions
+// to the write set of the current transaction.
+//
+// TODO(rogulenko): Revisit this behavior to provide more granularity.
+// A possible option would be to add prefix and its parent to the write set
+// of the current transaction when permissions object for a prefix is updated.
+func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+ var data tableData
+ if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+ return err
+ }
+ return util.Put(ctx, st, t, data)
+}
+
+// checkAccess checks that this table exists in the database, and performs
+// an authorization check. The access is checked at table level and at the
+// level of the most specific prefix for the given key.
+// Returns a VDL-compatible error.
+// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
+// access check to be a check for "Resolve", i.e. also check access to
+// service, app and database.
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
+ prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+ if err != nil {
+ return err
+ }
+ if prefix != "" {
+ if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ return err
+ }
+ }
+ auth, _ := access.PermissionsAuthorizer(prefixPerms.Perms, access.TypicalTagType())
+ if err := auth.Authorize(ctx, call.Security()); err != nil {
+ return verror.New(verror.ErrNoAccess, ctx, prefix)
+ }
+ return nil
+}
+
+// permsForKey returns the longest prefix of the given key that has
+// associated permissions with its permissions object.
+// permsForKey doesn't perform an authorization check.
+// Returns a VDL-compatible error.
+//
+// Virtually we represent all prefixes as a forest T, where each vertex maps to
+// a prefix. A parent for a string is the maximum proper prefix of it that
+// belongs to T. Each prefix P from T is represented as a pair of entries with
+// keys P and P~ with values of type stPrefixPerms (parent + perms).
+// High level of how this function works:
+// 1 iter = db.Scan(K, "")
+// Here last character of iter.Key() is removed automatically if it is '~'
+// 2 if hasPrefix(K, iter.Key()) return iter.Value()
+// 3 return parent(iter.Key())
+// Short proof:
+// iter returned on line 1 points to one of the following:
+// - a string t that is equal to K;
+// - a string t~: if t is not a prefix of K, then K < t < t~ which
+// contradicts with property of returned iterator on line 1 => t is prefix of
+// K; also t is the largest prefix of K, as all larger prefixes of K are
+// less than t~; in this case line 2 returns correct result;
+// - a string t that doesn't end with '~': it can't be a prefix of K, as all
+// proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
+// K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
+// prefix of K; in this case line 3 returns correct result.
+func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
+ it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+ if !it.Advance() {
+ prefixPerms, err := t.permsForPrefix(ctx, st, "")
+ return "", prefixPerms, err
+ }
+ defer it.Cancel()
+ parts := util.SplitKeyParts(string(it.Key(nil)))
+ prefix := strings.TrimSuffix(parts[len(parts)-1], util.PrefixRangeLimitSuffix)
+ value := it.Value(nil)
+ var prefixPerms stPrefixPerms
+ if err := vom.Decode(value, &prefixPerms); err != nil {
+ return "", stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+ }
+ if strings.HasPrefix(key, prefix) {
+ return prefix, prefixPerms, nil
+ }
+ prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+ return prefixPerms.Parent, prefixPerms, err
+}
+
+// permsForPrefix returns the permissions object associated with the
+// provided prefix.
+// Returns a VDL-compatible error.
+func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+ if prefix == "" {
+ var data tableData
+ if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+ return stPrefixPerms{}, err
+ }
+ return stPrefixPerms{Perms: data.Perms}, nil
+ }
+ var prefixPerms stPrefixPerms
+ if err := util.GetObject(st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+ return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return prefixPerms, nil
+}
+
+// prefixPermsKey returns the key used for storing permissions for the given
+// prefix in the table.
+func (t *tableReq) prefixPermsKey(prefix string) string {
+ return util.JoinKeyParts(util.PermsPrefix, t.name, prefix)
+}
diff --git a/x/ref/services/syncbase/server/nosql/types.vdl b/x/ref/services/syncbase/server/nosql/types.vdl
index 4a8a328..33d3883 100644
--- a/x/ref/services/syncbase/server/nosql/types.vdl
+++ b/x/ref/services/syncbase/server/nosql/types.vdl
@@ -21,3 +21,17 @@
Name string
Perms access.Permissions
}
+
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key" - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+ Parent string
+ Perms access.Permissions
+}
diff --git a/x/ref/services/syncbase/server/nosql/types.vdl.go b/x/ref/services/syncbase/server/nosql/types.vdl.go
index 313d982..021cef6 100644
--- a/x/ref/services/syncbase/server/nosql/types.vdl.go
+++ b/x/ref/services/syncbase/server/nosql/types.vdl.go
@@ -39,7 +39,27 @@
}) {
}
+// stPrefixPerms describes internal representation of prefix permissions
+// in the store.
+//
+// Each pair of (key, perms) is stored as two key-value pairs:
+// "$perms:%table:key" - stPrefixPerms{parent, perms}
+// "$perms:%table:key~" - stPrefixPerms{parent, perms}
+// where "~" represents a reserved char that's lexicographically greater than
+// all chars allowed by clients, %table is the name of the table and parent is
+// the longest proper prefix of the key that has associated permissions object.
+type stPrefixPerms struct {
+ Parent string
+ Perms access.Permissions
+}
+
+func (stPrefixPerms) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/nosql.stPrefixPerms"`
+}) {
+}
+
func init() {
vdl.Register((*databaseData)(nil))
vdl.Register((*tableData)(nil))
+ vdl.Register((*stPrefixPerms)(nil))
}
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index e551431..135ee35 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -68,7 +68,7 @@
data := &serviceData{
Perms: opts.Perms,
}
- if err := util.Put(ctx, call, s.st, s, data); err != nil {
+ if err := util.Put(ctx, s.st, s, data); err != nil {
return nil, err
}
if s.sync, err = vsync.New(ctx, call, s); err != nil {
@@ -178,7 +178,7 @@
return err
}
// Check for "app already exists".
- if err := util.GetWithoutAuth(ctx, call, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.GetWithoutAuth(ctx, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -192,7 +192,7 @@
Name: appName,
Perms: perms,
}
- return util.Put(ctx, call, st, a, data)
+ return util.Put(ctx, st, a, data)
}); err != nil {
return err
}
@@ -218,7 +218,7 @@
return err
}
// TODO(sadovsky): Delete all databases in this app.
- return util.Delete(ctx, call, st, a)
+ return util.Delete(ctx, st, a)
}); err != nil {
return err
}
diff --git a/x/ref/services/syncbase/server/util/constants.go b/x/ref/services/syncbase/server/util/constants.go
index 44d0ff6..9251375 100644
--- a/x/ref/services/syncbase/server/util/constants.go
+++ b/x/ref/services/syncbase/server/util/constants.go
@@ -12,6 +12,7 @@
DatabasePrefix = "$database"
DbInfoPrefix = "$dbInfo"
LogPrefix = "$log"
+ PermsPrefix = "$perms"
RowPrefix = "$row"
ServicePrefix = "$service"
SyncPrefix = "$sync"
@@ -27,4 +28,8 @@
BatchSep = ":"
// Separator for parts of storage engine keys.
KeyPartSep = ":"
+ // PrefixRangeLimitSuffix is the suffix of a key which indicates the end of
+ // a prefix range. Should be more than any regular key in the store.
+ // TODO(rogulenko): Change this constant to something out of the UTF8 space.
+ PrefixRangeLimitSuffix = "~"
)
diff --git a/x/ref/services/syncbase/server/util/store_util.go b/x/ref/services/syncbase/server/util/store_util.go
index a495ebd..fb739ca 100644
--- a/x/ref/services/syncbase/server/util/store_util.go
+++ b/x/ref/services/syncbase/server/util/store_util.go
@@ -47,7 +47,7 @@
// GetWithoutAuth does st.Get(l.StKey(), v), populating v.
// Returns a VDL-compatible error.
-func GetWithoutAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v interface{}) error {
+func GetWithoutAuth(ctx *context.T, st store.StoreReader, l Layer, v interface{}) error {
if err := GetObject(st, l.StKey(), v); err != nil {
if verror.ErrorID(err) == store.ErrUnknownKey.ID {
return verror.New(verror.ErrNoExist, ctx, l.Name())
@@ -60,7 +60,7 @@
// Get does GetWithoutAuth followed by an auth check.
// Returns a VDL-compatible error.
func Get(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v Permser) error {
- if err := GetWithoutAuth(ctx, call, st, l, v); err != nil {
+ if err := GetWithoutAuth(ctx, st, l, v); err != nil {
return err
}
auth, _ := access.PermissionsAuthorizer(v.GetPerms(), access.TypicalTagType())
@@ -73,7 +73,7 @@
// Put does st.Put(l.StKey(), v).
// Returns a VDL-compatible error.
// If you need to perform an authorization check, use Update().
-func Put(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer, v interface{}) error {
+func Put(ctx *context.T, st store.StoreWriter, l Layer, v interface{}) error {
if err := PutObject(st, l.StKey(), v); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -83,7 +83,7 @@
// Delete does st.Delete(l.StKey()).
// Returns a VDL-compatible error.
// If you need to perform an authorization check, call Get() first.
-func Delete(ctx *context.T, _ rpc.ServerCall, st store.StoreWriter, l Layer) error {
+func Delete(ctx *context.T, st store.StoreWriter, l Layer) error {
if err := st.Delete([]byte(l.StKey())); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
@@ -103,7 +103,7 @@
if err := fn(); err != nil {
return err
}
- return Put(ctx, call, st, l, v)
+ return Put(ctx, st, l, v)
}
////////////////////////////////////////////////////////////
diff --git a/x/ref/services/syncbase/syncbased/main.go b/x/ref/services/syncbase/syncbased/main.go
index df96244..a75b471 100644
--- a/x/ref/services/syncbase/syncbased/main.go
+++ b/x/ref/services/syncbase/syncbased/main.go
@@ -19,6 +19,7 @@
"v.io/syncbase/x/ref/services/syncbase/server"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
+ "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
)
@@ -44,26 +45,16 @@
ctx, shutdown := v23.Init()
defer shutdown()
- s, err := v23.NewServer(ctx)
- if err != nil {
- vlog.Fatal("v23.NewServer() failed: ", err)
- }
- if _, err := s.Listen(v23.GetListenSpec(ctx)); err != nil {
- vlog.Fatal("s.Listen() failed: ", err)
- }
-
perms, err := securityflag.PermissionsFromFlag()
if err != nil {
vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
}
-
if perms != nil {
vlog.Info("Using permissions from command line flag.")
} else {
vlog.Info("No permissions flag provided. Giving local principal all permissions.")
perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
}
-
service, err := server.NewService(nil, nil, server.ServiceOptions{
Perms: perms,
RootDir: *rootDir,
@@ -74,9 +65,8 @@
}
d := server.NewDispatcher(service)
- // Publish the service in the mount table.
- if err := s.ServeDispatcher(*name, d); err != nil {
- vlog.Fatal("s.ServeDispatcher() failed: ", err)
+ if _, err = xrpc.NewDispatchingServer(ctx, *name, d); err != nil {
+ vlog.Fatal("xrpc.NewDispatchingServer() failed: ", err)
}
vlog.Info("Mounted at: ", *name)