syncbase: implement client batches
Change-Id: I9556f926c32b6e37772d276975285c6bffb3a9fc
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index a906176..bee1209 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -188,4 +188,5 @@
error (
BoundToBatch() {"en": "bound to batch"}
NotBoundToBatch() {"en": "not bound to batch"}
+ ReadOnlyBatch() {"en": "batch is read-only"}
)
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index ed109d8..15f6af7 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -26,11 +26,13 @@
var (
ErrBoundToBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.BoundToBatch", verror.NoRetry, "{1:}{2:} bound to batch")
ErrNotBoundToBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.NotBoundToBatch", verror.NoRetry, "{1:}{2:} not bound to batch")
+ ErrReadOnlyBatch = verror.Register("v.io/syncbase/v23/services/syncbase/nosql.ReadOnlyBatch", verror.NoRetry, "{1:}{2:} batch is read-only")
)
func init() {
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBoundToBatch.ID), "{1:}{2:} bound to batch")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNotBoundToBatch.ID), "{1:}{2:} not bound to batch")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrReadOnlyBatch.ID), "{1:}{2:} batch is read-only")
}
// NewErrBoundToBatch returns an error with the ErrBoundToBatch ID.
@@ -43,6 +45,11 @@
return verror.New(ErrNotBoundToBatch, ctx)
}
+// NewErrReadOnlyBatch returns an error with the ErrReadOnlyBatch ID.
+func NewErrReadOnlyBatch(ctx *context.T) error {
+ return verror.New(ErrReadOnlyBatch, ctx)
+}
+
// SyncGroupManagerClientMethods is the client interface
// containing SyncGroupManager methods.
//
diff --git a/v23/syncbase/client_test.go b/v23/syncbase/client_test.go
index 43dbfc6..eee2a29 100644
--- a/v23/syncbase/client_test.go
+++ b/v23/syncbase/client_test.go
@@ -9,18 +9,24 @@
"v.io/syncbase/v23/syncbase"
tu "v.io/syncbase/v23/syncbase/testutil"
+ "v.io/v23/naming"
_ "v.io/x/ref/runtime/factories/generic"
)
-////////////////////////////////////////
-// Test cases
+// Tests various Name and FullName methods.
+func TestName(t *testing.T) {
+ s := syncbase.NewService("s")
+ a := s.App("a")
-func TestNameAndKey(t *testing.T) {
- a := syncbase.NewService("s").App("a")
-
+ if s.FullName() != "s" {
+ t.Errorf("Wrong full name: %q", s.FullName())
+ }
if a.Name() != "a" {
t.Errorf("Wrong name: %q", a.Name())
}
+ if a.FullName() != naming.Join("s", "a") {
+ t.Errorf("Wrong name: %q", a.FullName())
+ }
}
// Tests that Service.ListApps works as expected.
diff --git a/v23/syncbase/nosql/batch.go b/v23/syncbase/nosql/batch.go
index a49d726..7655a6c 100644
--- a/v23/syncbase/nosql/batch.go
+++ b/v23/syncbase/nosql/batch.go
@@ -5,11 +5,28 @@
package nosql
import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/v23/context"
)
+type batch struct {
+ database
+}
+
+var _ BatchDatabase = (*batch)(nil)
+
+// Commit implements BatchDatabase.Commit.
+func (b *batch) Commit(ctx *context.T) error {
+ return b.c.Commit(ctx)
+}
+
+// Abort implements BatchDatabase.Abort.
+func (b *batch) Abort(ctx *context.T) {
+ b.c.Abort(ctx)
+}
+
// TODO(sadovsky): Add retry loop.
-func RunInBatch(ctx *context.T, d Database, opts BatchOptions, fn func(b BatchDatabase) error) error {
+func RunInBatch(ctx *context.T, d Database, opts wire.BatchOptions, fn func(b BatchDatabase) error) error {
b, err := d.BeginBatch(ctx, opts)
if err != nil {
return err
diff --git a/v23/syncbase/nosql/batch_test.go b/v23/syncbase/nosql/batch_test.go
new file mode 100644
index 0000000..291ddd6
--- /dev/null
+++ b/v23/syncbase/nosql/batch_test.go
@@ -0,0 +1,275 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// TODO(sadovsky): Once the storage engine layer is made public, implement a
+// Syncbase-based storage engine so that we can run all the storage engine tests
+// against Syncbase itself.
+
+package nosql_test
+
+import (
+ "testing"
+
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/v23/syncbase"
+ "v.io/syncbase/v23/syncbase/nosql"
+ tu "v.io/syncbase/v23/syncbase/testutil"
+ "v.io/v23/naming"
+ "v.io/v23/verror"
+ _ "v.io/x/ref/runtime/factories/generic"
+)
+
+// Tests various Name and FullName methods.
+func TestName(t *testing.T) {
+ ctx, sName, cleanup := tu.SetupOrDie(nil)
+ defer cleanup()
+ a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+ d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+
+ b, err := d.BeginBatch(ctx, wire.BatchOptions{})
+ if err != nil {
+ t.Fatalf("d.BeginBatch() failed: %v", err)
+ }
+
+ if d.Name() != "d" {
+ t.Errorf("Wrong name: %q", d.Name())
+ }
+ if d.FullName() != naming.Join(sName, "a", "d") {
+ t.Errorf("Wrong full name: %q", d.FullName())
+ }
+ if b.Name() == d.Name() {
+ t.Errorf("Names should not match: %q", b.Name())
+ }
+ if b.FullName() == d.FullName() {
+ t.Errorf("Full names should not match: %q", b.FullName())
+ }
+}
+
+// Tests basic functionality of BatchDatabase.
+func TestBatchBasics(t *testing.T) {
+ ctx, sName, cleanup := tu.SetupOrDie(nil)
+ defer cleanup()
+ a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+ d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+ tb := tu.CreateTable(t, ctx, d, "tb")
+
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+
+ var b1, b2 nosql.BatchDatabase
+ var b1tb, b2tb nosql.Table
+ var err error
+
+ // Test that the effects of a transaction are not visible until commit.
+ b1, err = d.BeginBatch(ctx, wire.BatchOptions{})
+ if err != nil {
+ t.Fatalf("d.BeginBatch() failed: %v", err)
+ }
+ b1tb = b1.Table("tb")
+
+ if err := b1tb.Put(ctx, "fooKey", "fooValue"); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+
+ // Check that foo is not yet visible.
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+
+ if err := b1.Commit(ctx); err != nil {
+ t.Fatalf("b1.Commit() failed: %v", err)
+ }
+
+ // Check that foo is now visible.
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"fooKey"}, []interface{}{"fooValue"})
+
+ // Test that concurrent transactions are isolated.
+ if b1, err = d.BeginBatch(ctx, wire.BatchOptions{}); err != nil {
+ t.Fatalf("d.BeginBatch() failed: %v", err)
+ }
+ if b2, err = d.BeginBatch(ctx, wire.BatchOptions{}); err != nil {
+ t.Fatalf("d.BeginBatch() failed: %v", err)
+ }
+ b1tb, b2tb = b1.Table("tb"), b2.Table("tb")
+
+ if err := b1tb.Put(ctx, "barKey", "barValue"); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+ if err := b1tb.Put(ctx, "bazKey", "bazValue"); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+
+ var got string
+ if err := b2tb.Get(ctx, "barKey", &got); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ t.Fatalf("Get() should have failed: %v", err)
+ }
+ if err := b2tb.Put(ctx, "rabKey", "rabValue"); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+
+ if err := b1.Commit(ctx); err != nil {
+ t.Fatalf("b1.Commit() failed: %v", err)
+ }
+ if err := b2.Commit(ctx); err == nil {
+ t.Fatalf("b2.Commit() should have failed: %v", err)
+ }
+
+ // Check that foo, bar, and baz (but not rab) are now visible.
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"barKey", "bazKey", "fooKey"}, []interface{}{"barValue", "bazValue", "fooValue"})
+}
+
+// Tests enforcement of BatchOptions.ReadOnly.
+func TestReadOnlyBatch(t *testing.T) {
+ ctx, sName, cleanup := tu.SetupOrDie(nil)
+ defer cleanup()
+ a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+ d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+ tb := tu.CreateTable(t, ctx, d, "tb")
+
+ if err := tb.Put(ctx, "fooKey", "fooValue"); err != nil {
+ t.Fatalf("tb.Put() failed: %v", err)
+ }
+
+ b1, err := d.BeginBatch(ctx, wire.BatchOptions{ReadOnly: true})
+ if err != nil {
+ t.Fatalf("d.BeginBatch() failed: %v", err)
+ }
+ b1tb := b1.Table("tb")
+
+ if err := b1tb.Put(ctx, "barKey", "barValue"); verror.ErrorID(err) != wire.ErrReadOnlyBatch.ID {
+ t.Fatalf("Put() should have failed: %v", err)
+ }
+ if err := b1tb.Delete(ctx, nosql.Prefix("fooKey")); verror.ErrorID(err) != wire.ErrReadOnlyBatch.ID {
+ t.Fatalf("Table.Delete() should have failed: %v", err)
+ }
+ if err := b1tb.Row("fooKey").Delete(ctx); verror.ErrorID(err) != wire.ErrReadOnlyBatch.ID {
+ t.Fatalf("Row.Delete() should have failed: %v", err)
+ }
+}
+
+// Tests that all ops fail after attempted commit or abort.
+func TestOpAfterFinalize(t *testing.T) {
+ ctx, sName, cleanup := tu.SetupOrDie(nil)
+ defer cleanup()
+ a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+ d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+ tb := tu.CreateTable(t, ctx, d, "tb")
+
+ // TODO(sadovsky): Add some sort of "op after finalize" error type and check
+ // for it specifically below.
+ checkOpsFail := func(b nosql.BatchDatabase) {
+ btb := b.Table("tb")
+ var got string
+ if err := btb.Get(ctx, "fooKey", &got); err == nil {
+ tu.Fatal(t, "Get() should have failed")
+ }
+ it := btb.Scan(ctx, nosql.Prefix(""))
+ it.Advance()
+ if it.Err() == nil {
+ tu.Fatal(t, "Scan() should have failed")
+ }
+ if err := btb.Put(ctx, "barKey", "barValue"); err == nil {
+ tu.Fatal(t, "Put() should have failed")
+ }
+ if err := btb.Delete(ctx, nosql.Prefix("fooKey")); err == nil {
+ tu.Fatal(t, "Table.Delete() should have failed: %v", err)
+ }
+ if err := btb.Row("fooKey").Delete(ctx); err == nil {
+ tu.Fatal(t, "Row.Delete() should have failed: %v", err)
+ }
+ if err := b.Commit(ctx); err == nil {
+ tu.Fatal(t, "Commit() should have failed")
+ }
+ }
+
+ // Commit a transaction, check that subsequent ops fail.
+ b1, err := d.BeginBatch(ctx, wire.BatchOptions{})
+ if err != nil {
+ t.Fatalf("d.BeginBatch() failed: %v", err)
+ }
+ b1tb := b1.Table("tb")
+
+ if err := b1tb.Put(ctx, "fooKey", "fooValue"); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+ if err := b1.Commit(ctx); err != nil {
+ t.Fatalf("b1.Commit() failed: %v", err)
+ }
+ checkOpsFail(b1)
+
+ // Create a transaction with a conflict, check that the commit fails, then
+ // check that subsequent ops fail.
+ if b1, err = d.BeginBatch(ctx, wire.BatchOptions{}); err != nil {
+ t.Fatalf("d.BeginBatch() failed: %v", err)
+ }
+ b1tb = b1.Table("tb")
+
+ // Conflicts with future b1tb.Get().
+ if err := tb.Put(ctx, "fooKey", "v2"); err != nil {
+ t.Fatalf("tb.Put() failed: %v", err)
+ }
+
+ var got string
+ if err := b1tb.Get(ctx, "fooKey", &got); err != nil {
+ t.Fatalf("Get() failed: %v", err)
+ }
+ want := "fooValue"
+ if got != want {
+ t.Fatalf("Get() returned unexpected value: got %q, want %q", got, want)
+ }
+ if err := b1.Commit(ctx); err == nil {
+ t.Fatalf("b1.Commit() should have failed: %v", err)
+ }
+ checkOpsFail(b1)
+
+ // Create a transaction and immediately abort it, then check that subsequent
+ // ops fail.
+ if b1, err = d.BeginBatch(ctx, wire.BatchOptions{}); err != nil {
+ t.Fatalf("d.BeginBatch() failed: %v", err)
+ }
+ b1tb = b1.Table("tb")
+ b1.Abort(ctx)
+ checkOpsFail(b1)
+}
+
+// Tests that batch methods called on non-batch return ErrNotBoundToBatch and
+// that non-batch methods called on batch return ErrBoundToBatch.
+func TestDisallowedMethods(t *testing.T) {
+ ctx, sName, cleanup := tu.SetupOrDie(nil)
+ defer cleanup()
+ a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+ d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
+ b, err := d.BeginBatch(ctx, wire.BatchOptions{})
+ if err != nil {
+ t.Fatalf("d.BeginBatch() failed: %v", err)
+ }
+
+ // Batch methods on non-batch.
+ dc := wire.DatabaseClient(d.FullName())
+ if err := dc.Commit(ctx); verror.ErrorID(err) != wire.ErrNotBoundToBatch.ID {
+ t.Fatalf("dc.Commit() should have failed: %v", err)
+ }
+ if err := dc.Abort(ctx); verror.ErrorID(err) != wire.ErrNotBoundToBatch.ID {
+ t.Fatalf("dc.Abort() should have failed: %v", err)
+ }
+
+ // Non-batch methods on batch.
+ bc := wire.DatabaseClient(b.FullName())
+ if err := bc.Create(ctx, nil); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+ t.Fatalf("bc.Create() should have failed: %v", err)
+ }
+ if err := bc.Delete(ctx); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+ t.Fatalf("bc.Delete() should have failed: %v", err)
+ }
+ if _, err := bc.BeginBatch(ctx, wire.BatchOptions{}); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+ t.Fatalf("bc.BeginBatch() should have failed: %v", err)
+ }
+ if _, _, err := bc.GetPermissions(ctx); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+ t.Fatalf("bc.GetPermissions() should have failed: %v", err)
+ }
+ if err := bc.SetPermissions(ctx, nil, ""); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+ t.Fatalf("bc.SetPermissions() should have failed: %v", err)
+ }
+ // TODO(sadovsky): Test all other SyncGroupManager methods.
+ if _, err := bc.GetSyncGroupNames(ctx); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
+ t.Fatalf("bc.GetSyncGroupNames() should have failed: %v", err)
+ }
+}
diff --git a/v23/syncbase/nosql/client_test.go b/v23/syncbase/nosql/client_test.go
index 77d565e..87d0581 100644
--- a/v23/syncbase/nosql/client_test.go
+++ b/v23/syncbase/nosql/client_test.go
@@ -11,31 +11,37 @@
"v.io/syncbase/v23/syncbase"
"v.io/syncbase/v23/syncbase/nosql"
tu "v.io/syncbase/v23/syncbase/testutil"
- "v.io/v23/context"
+ "v.io/v23/naming"
"v.io/v23/verror"
_ "v.io/x/ref/runtime/factories/generic"
)
-////////////////////////////////////////
-// Test cases
-
// TODO(sadovsky): Finish writing tests.
+// Tests various Name, FullName, and Key methods.
func TestNameAndKey(t *testing.T) {
- a := syncbase.NewService("s").App("a")
- d := a.NoSQLDatabase("d")
+ d := syncbase.NewService("s").App("a").NoSQLDatabase("d")
tb := d.Table("tb")
r := tb.Row("r")
if d.Name() != "d" {
t.Errorf("Wrong name: %q", d.Name())
}
+ if d.FullName() != naming.Join("s", "a", "d") {
+ t.Errorf("Wrong full name: %q", d.FullName())
+ }
if tb.Name() != "tb" {
t.Errorf("Wrong name: %q", tb.Name())
}
+ if tb.FullName() != naming.Join("s", "a", "d", "tb") {
+ t.Errorf("Wrong full name: %q", tb.FullName())
+ }
if r.Key() != "r" {
t.Errorf("Wrong key: %q", r.Key())
}
+ if r.FullName() != naming.Join("s", "a", "d", "tb", "r") {
+ t.Errorf("Wrong full name: %q", r.FullName())
+ }
}
// Tests that Database.Create works as expected.
@@ -110,42 +116,6 @@
S string
}
-func checkScan(t *testing.T, ctx *context.T, tb nosql.Table, r nosql.RowRange, wantKeys []string, wantValues []interface{}) {
- if len(wantKeys) != len(wantValues) {
- panic("bad input args")
- }
- it := tb.Scan(ctx, r)
- gotKeys := []string{}
- for it.Advance() {
- gotKey := it.Key()
- gotKeys = append(gotKeys, gotKey)
- i := len(gotKeys) - 1
- if i >= len(wantKeys) {
- continue
- }
- // Check key.
- wantKey := wantKeys[i]
- if gotKey != wantKey {
- tu.Fatalf(t, "Keys do not match: got %q, want %q", gotKey, wantKey)
- }
- // Check value.
- wantValue := wantValues[i]
- gotValue := reflect.Zero(reflect.TypeOf(wantValue)).Interface()
- if err := it.Value(&gotValue); err != nil {
- tu.Fatalf(t, "it.Value() failed: %v", err)
- }
- if !reflect.DeepEqual(gotValue, wantValue) {
- tu.Fatalf(t, "Values do not match: got %v, want %v", gotValue, wantValue)
- }
- }
- if err := it.Err(); err != nil {
- tu.Fatalf(t, "tb.Scan() failed: %v", err)
- }
- if len(gotKeys) != len(wantKeys) {
- tu.Fatalf(t, "Unmatched keys: got %v, want %v", gotKeys, wantKeys)
- }
-}
-
// Tests that Table.Scan works as expected.
func TestTableScan(t *testing.T) {
ctx, sName, cleanup := tu.SetupOrDie(nil)
@@ -154,7 +124,7 @@
d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
tb := tu.CreateTable(t, ctx, d, "tb")
- checkScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
fooWant := Foo{I: 4, S: "f"}
if err := tb.Put(ctx, "foo", &fooWant); err != nil {
@@ -166,29 +136,29 @@
}
// Match all keys.
- checkScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
- checkScan(t, ctx, tb, nosql.Range("", ""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
- checkScan(t, ctx, tb, nosql.Range("", "z"), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
- checkScan(t, ctx, tb, nosql.Range("a", ""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
- checkScan(t, ctx, tb, nosql.Range("a", "z"), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Range("", ""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Range("", "z"), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Range("a", ""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Range("a", "z"), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
// Match "bar" only.
- checkScan(t, ctx, tb, nosql.Prefix("b"), []string{"bar"}, []interface{}{&barWant})
- checkScan(t, ctx, tb, nosql.Prefix("bar"), []string{"bar"}, []interface{}{&barWant})
- checkScan(t, ctx, tb, nosql.Range("bar", "baz"), []string{"bar"}, []interface{}{&barWant})
- checkScan(t, ctx, tb, nosql.Range("bar", "foo"), []string{"bar"}, []interface{}{&barWant})
- checkScan(t, ctx, tb, nosql.Range("", "foo"), []string{"bar"}, []interface{}{&barWant})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix("b"), []string{"bar"}, []interface{}{&barWant})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix("bar"), []string{"bar"}, []interface{}{&barWant})
+ tu.CheckScan(t, ctx, tb, nosql.Range("bar", "baz"), []string{"bar"}, []interface{}{&barWant})
+ tu.CheckScan(t, ctx, tb, nosql.Range("bar", "foo"), []string{"bar"}, []interface{}{&barWant})
+ tu.CheckScan(t, ctx, tb, nosql.Range("", "foo"), []string{"bar"}, []interface{}{&barWant})
// Match "foo" only.
- checkScan(t, ctx, tb, nosql.Prefix("f"), []string{"foo"}, []interface{}{&fooWant})
- checkScan(t, ctx, tb, nosql.Prefix("foo"), []string{"foo"}, []interface{}{&fooWant})
- checkScan(t, ctx, tb, nosql.Range("foo", "fox"), []string{"foo"}, []interface{}{&fooWant})
- checkScan(t, ctx, tb, nosql.Range("foo", ""), []string{"foo"}, []interface{}{&fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix("f"), []string{"foo"}, []interface{}{&fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix("foo"), []string{"foo"}, []interface{}{&fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Range("foo", "fox"), []string{"foo"}, []interface{}{&fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Range("foo", ""), []string{"foo"}, []interface{}{&fooWant})
// Match nothing.
- checkScan(t, ctx, tb, nosql.Range("a", "bar"), []string{}, []interface{}{})
- checkScan(t, ctx, tb, nosql.Range("bar", "bar"), []string{}, []interface{}{})
- checkScan(t, ctx, tb, nosql.Prefix("z"), []string{}, []interface{}{})
+ tu.CheckScan(t, ctx, tb, nosql.Range("a", "bar"), []string{}, []interface{}{})
+ tu.CheckScan(t, ctx, tb, nosql.Range("bar", "bar"), []string{}, []interface{}{})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix("z"), []string{}, []interface{}{})
}
// Tests that Table.Delete works as expected.
@@ -199,7 +169,7 @@
d := tu.CreateNoSQLDatabase(t, ctx, a, "d")
tb := tu.CreateTable(t, ctx, d, "tb")
- checkScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
// Put foo and bar.
fooWant := Foo{I: 4, S: "f"}
@@ -210,25 +180,25 @@
if err := tb.Put(ctx, "bar", &barWant); err != nil {
t.Fatalf("tb.Put() failed: %v", err)
}
- checkScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
// Delete foo.
if err := tb.Delete(ctx, nosql.Prefix("f")); err != nil {
t.Fatalf("tb.Delete() failed: %v", err)
}
- checkScan(t, ctx, tb, nosql.Prefix(""), []string{"bar"}, []interface{}{&barWant})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"bar"}, []interface{}{&barWant})
// Restore foo.
if err := tb.Put(ctx, "foo", &fooWant); err != nil {
t.Fatalf("tb.Put() failed: %v", err)
}
- checkScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{&barWant, &fooWant})
// Delete everything.
if err := tb.Delete(ctx, nosql.Prefix("")); err != nil {
t.Fatalf("tb.Delete() failed: %v", err)
}
- checkScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
+ tu.CheckScan(t, ctx, tb, nosql.Prefix(""), []string{}, []interface{}{})
}
// Tests that Table.{Get,Put,Delete} work as expected.
diff --git a/v23/syncbase/nosql/database.go b/v23/syncbase/nosql/database.go
index 1a95499..d778702 100644
--- a/v23/syncbase/nosql/database.go
+++ b/v23/syncbase/nosql/database.go
@@ -10,22 +10,23 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/security/access"
- "v.io/v23/verror"
)
-func NewDatabase(parentFullName, relativeName string) Database {
+func NewDatabase(parentFullName, relativeName string) *database {
fullName := naming.Join(parentFullName, relativeName)
return &database{
- c: wire.DatabaseClient(fullName),
- fullName: fullName,
- name: relativeName,
+ c: wire.DatabaseClient(fullName),
+ parentFullName: parentFullName,
+ fullName: fullName,
+ name: relativeName,
}
}
type database struct {
- c wire.DatabaseClientMethods
- fullName string
- name string
+ c wire.DatabaseClientMethods
+ parentFullName string
+ fullName string
+ name string
}
var _ Database = (*database)(nil)
@@ -73,8 +74,12 @@
}
// BeginBatch implements Database.BeginBatch.
-func (d *database) BeginBatch(ctx *context.T, opts BatchOptions) (BatchDatabase, error) {
- return nil, verror.NewErrNotImplemented(ctx)
+func (d *database) BeginBatch(ctx *context.T, opts wire.BatchOptions) (BatchDatabase, error) {
+ relativeName, err := d.c.BeginBatch(ctx, opts)
+ if err != nil {
+ return nil, err
+ }
+ return &batch{database: *NewDatabase(d.parentFullName, relativeName)}, nil
}
// SetPermissions implements Database.SetPermissions.
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index 8b473d8..3faa52e 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -19,6 +19,12 @@
// It allows clients to pass the handle to helper methods that are
// batch-agnostic.
type DatabaseHandle interface {
+ // Name returns the relative name of this DatabaseHandle.
+ Name() string
+
+ // FullName returns the full name (object name) of this DatabaseHandle.
+ FullName() string
+
// Table returns the Table with the given name.
// relativeName must not contain slashes.
Table(relativeName string) Table
@@ -27,8 +33,6 @@
ListTables(ctx *context.T) ([]string, error)
}
-type BatchOptions wire.BatchOptions
-
// Database represents a collection of Tables. Batches, queries, sync, watch,
// etc. all operate at the Database level.
//
@@ -36,12 +40,6 @@
type Database interface {
DatabaseHandle
- // Name returns the relative name of this Database.
- Name() string
-
- // FullName returns the full name (object name) of this Database.
- FullName() string
-
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
Create(ctx *context.T, perms access.Permissions) error
@@ -73,7 +71,7 @@
//
// Concurrency semantics can be configured using BatchOptions.
// TODO(sadovsky): Maybe use varargs for options.
- BeginBatch(ctx *context.T, opts BatchOptions) (BatchDatabase, error)
+ BeginBatch(ctx *context.T, opts wire.BatchOptions) (BatchDatabase, error)
// SetPermissions and GetPermissions are included from the AccessController
// interface.
diff --git a/v23/syncbase/nosql/stream.go b/v23/syncbase/nosql/stream.go
index fa75e60..9cd2870 100644
--- a/v23/syncbase/nosql/stream.go
+++ b/v23/syncbase/nosql/stream.go
@@ -86,11 +86,7 @@
if s.err == nil {
return nil
}
- idAction := verror.ErrInternal
- if verror.ErrorID(s.err) == verror.ErrCanceled.ID {
- idAction = verror.ErrCanceled
- }
- return verror.New(idAction, nil, s.err)
+ return verror.Convert(verror.IDAction{}, nil, s.err)
}
// Cancel implements Stream.Cancel.
diff --git a/v23/syncbase/testutil/util.go b/v23/syncbase/testutil/util.go
index 7445f6b..fbcd391 100644
--- a/v23/syncbase/testutil/util.go
+++ b/v23/syncbase/testutil/util.go
@@ -8,6 +8,7 @@
import (
"io/ioutil"
"os"
+ "reflect"
"runtime/debug"
"testing"
@@ -102,6 +103,42 @@
return perms
}
+func CheckScan(t *testing.T, ctx *context.T, tb nosql.Table, r nosql.RowRange, wantKeys []string, wantValues []interface{}) {
+ if len(wantKeys) != len(wantValues) {
+ panic("bad input args")
+ }
+ it := tb.Scan(ctx, r)
+ gotKeys := []string{}
+ for it.Advance() {
+ gotKey := it.Key()
+ gotKeys = append(gotKeys, gotKey)
+ i := len(gotKeys) - 1
+ if i >= len(wantKeys) {
+ continue
+ }
+ // Check key.
+ wantKey := wantKeys[i]
+ if gotKey != wantKey {
+ Fatalf(t, "Keys do not match: got %q, want %q", gotKey, wantKey)
+ }
+ // Check value.
+ wantValue := wantValues[i]
+ gotValue := reflect.Zero(reflect.TypeOf(wantValue)).Interface()
+ if err := it.Value(&gotValue); err != nil {
+ Fatalf(t, "it.Value() failed: %v", err)
+ }
+ if !reflect.DeepEqual(gotValue, wantValue) {
+ Fatalf(t, "Values do not match: got %v, want %v", gotValue, wantValue)
+ }
+ }
+ if err := it.Err(); err != nil {
+ Fatalf(t, "tb.Scan() failed: %v", err)
+ }
+ if len(gotKeys) != len(wantKeys) {
+ Fatalf(t, "Unmatched keys: got %v, want %v", gotKeys, wantKeys)
+ }
+}
+
////////////////////////////////////////
// Internal helpers
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index aa4bfb3..1e1fe1e 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -19,6 +19,7 @@
"v.io/v23/verror"
)
+// app is a per-app singleton (i.e. not per-request) that handles App RPCs.
type app struct {
name string
s *service
@@ -66,12 +67,14 @@
func (a *app) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
// Check perms.
sn := a.s.st.NewSnapshot()
+ closeSnapshot := func() error {
+ return sn.Close()
+ }
if err := util.Get(ctx, call, sn, a, &appData{}); err != nil {
- sn.Close()
+ closeSnapshot()
return nil, err
}
- pattern := "*"
- return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+ return util.Glob(ctx, call, "*", sn, closeSnapshot, util.JoinKeyParts(util.DbInfoPrefix, a.name))
}
////////////////////////////////////////
diff --git a/x/ref/services/syncbase/server/dispatcher.go b/x/ref/services/syncbase/server/dispatcher.go
index f6a4dde..17fb042 100644
--- a/x/ref/services/syncbase/server/dispatcher.go
+++ b/x/ref/services/syncbase/server/dispatcher.go
@@ -48,8 +48,8 @@
aExists := false
var a *app
- if aint, err := disp.s.App(nil, nil, appName); err == nil {
- a = aint.(*app) // panics on failure, as desired
+ if aInt, err := disp.s.App(nil, nil, appName); err == nil {
+ a = aInt.(*app) // panics on failure, as desired
aExists = true
} else {
if verror.ErrorID(err) != verror.ErrNoExist.ID {
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 1f3cbd3..82a366d 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -5,7 +5,12 @@
package nosql
import (
+ "math/rand"
"path"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
@@ -19,19 +24,41 @@
"v.io/x/lib/vlog"
)
+// database is a per-database singleton (i.e. not per-request). It does not
+// directly handle RPCs.
+// Note: If a database does not exist at the time of a database RPC, the
+// dispatcher creates a short-lived database object to service that particular
+// request.
type database struct {
name string
a interfaces.App
// The fields below are initialized iff this database exists.
st store.Store // stores all data for a single database
+
+ // Active snapshots and transactions corresponding to client batches.
+ // TODO(sadovsky): Add timeouts and GC.
+ mu sync.Mutex // protects the fields below
+ sns map[uint64]store.Snapshot
+ txs map[uint64]store.Transaction
+}
+
+// databaseReq is a per-request object that handles Database RPCs.
+// It embeds database and tracks request-specific batch state.
+type databaseReq struct {
+ *database
+ // If non-nil, sn or tx will be non-nil.
+ batchId *uint64
+ sn store.Snapshot
+ tx store.Transaction
}
var (
- _ wire.DatabaseServerMethods = (*database)(nil)
+ _ wire.DatabaseServerMethods = (*databaseReq)(nil)
_ interfaces.Database = (*database)(nil)
_ util.Layer = (*database)(nil)
)
+// DatabaseOptions configures a database.
type DatabaseOptions struct {
// Database-level permissions.
Perms access.Permissions
@@ -62,6 +89,8 @@
name: name,
a: a,
st: st,
+ sns: make(map[uint64]store.Snapshot),
+ txs: make(map[uint64]store.Transaction),
}
data := &databaseData{
Name: d.name,
@@ -76,34 +105,100 @@
////////////////////////////////////////
// RPC methods
-func (d *database) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
// This database does not yet exist; d is just an ephemeral handle that holds
// {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
// handle and store it in d.a.dbs[d.name].
return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms)
}
-func (d *database) Delete(ctx *context.T, call rpc.ServerCall) error {
+func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return d.a.DeleteNoSQLDatabase(ctx, call, d.name)
}
-func (d *database) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
- return "", verror.NewErrNotImplemented(ctx)
+var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
+ if d.batchId != nil {
+ return "", wire.NewErrBoundToBatch(ctx)
+ }
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ var id uint64
+ var batchType string
+ for {
+ id = uint64(rng.Int63())
+ if bo.ReadOnly {
+ if _, ok := d.sns[id]; !ok {
+ d.sns[id] = d.st.NewSnapshot()
+ batchType = "sn"
+ break
+ }
+ } else {
+ if _, ok := d.txs[id]; !ok {
+ d.txs[id] = d.st.NewTransaction()
+ batchType = "tx"
+ break
+ }
+ }
+ }
+ return strings.Join([]string{d.name, batchType, strconv.FormatUint(id, 10)}, util.BatchSep), nil
}
-func (d *database) Commit(ctx *context.T, call rpc.ServerCall) error {
- return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall) error {
+ if d.batchId == nil {
+ return wire.NewErrNotBoundToBatch(ctx)
+ }
+ if d.tx == nil {
+ return wire.NewErrReadOnlyBatch(ctx)
+ }
+ var err error
+ if err = d.tx.Commit(); err == nil {
+ d.mu.Lock()
+ delete(d.txs, *d.batchId)
+ d.mu.Unlock()
+ }
+ return err
}
-func (d *database) Abort(ctx *context.T, call rpc.ServerCall) error {
- return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall) error {
+ if d.batchId == nil {
+ return wire.NewErrNotBoundToBatch(ctx)
+ }
+ var err error
+ if d.tx != nil {
+ if err = d.tx.Abort(); err == nil {
+ d.mu.Lock()
+ delete(d.txs, *d.batchId)
+ d.mu.Unlock()
+ }
+ } else {
+ if err = d.sn.Close(); err == nil {
+ d.mu.Lock()
+ delete(d.sns, *d.batchId)
+ d.mu.Unlock()
+ }
+ }
+ return err
}
-func (d *database) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return d.a.SetDatabasePerms(ctx, call, d.name, perms, version)
}
-func (d *database) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+ if d.batchId != nil {
+ return nil, "", wire.NewErrBoundToBatch(ctx)
+ }
data := &databaseData{}
if err := util.Get(ctx, call, d.st, d, data); err != nil {
return nil, "", err
@@ -111,15 +206,20 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-func (d *database) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+ if d.batchId != nil {
+ return nil, wire.NewErrBoundToBatch(ctx)
+ }
// Check perms.
sn := d.st.NewSnapshot()
+ closeSnapshot := func() error {
+ return sn.Close()
+ }
if err := util.Get(ctx, call, sn, d, &databaseData{}); err != nil {
- sn.Close()
+ closeSnapshot()
return nil, err
}
- pattern := "*"
- return util.Glob(ctx, call, pattern, sn, util.TablePrefix)
+ return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
}
////////////////////////////////////////
@@ -167,3 +267,26 @@
func (d *database) StKey() string {
return util.DatabasePrefix
}
+
+////////////////////////////////////////
+// Internal helpers
+
+func (d *databaseReq) batchReader() store.StoreReader {
+ if d.batchId == nil {
+ return nil
+ } else if d.sn != nil {
+ return d.sn
+ } else {
+ return d.tx
+ }
+}
+
+func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
+ if d.batchId == nil {
+ return nil, nil
+ } else if d.tx != nil {
+ return d.tx, nil
+ } else {
+ return nil, wire.NewErrReadOnlyBatch(nil)
+ }
+}
diff --git a/x/ref/services/syncbase/server/nosql/database_sgm.go b/x/ref/services/syncbase/server/nosql/database_sgm.go
index 768b478..b5797b0 100644
--- a/x/ref/services/syncbase/server/nosql/database_sgm.go
+++ b/x/ref/services/syncbase/server/nosql/database_sgm.go
@@ -16,39 +16,66 @@
////////////////////////////////////////
// SyncGroup RPC methods
-func (d *database) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+func (d *databaseReq) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ if d.batchId != nil {
+ return nil, wire.NewErrBoundToBatch(ctx)
+ }
return nil, verror.NewErrNotImplemented(ctx)
}
-func (d *database) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+func (d *databaseReq) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
sd := vsync.NewSyncDatabase(d)
return sd.CreateSyncGroup(ctx, call, sgName, spec, myInfo)
}
-func (d *database) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+func (d *databaseReq) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+ if d.batchId != nil {
+ return wire.SyncGroupSpec{}, wire.NewErrBoundToBatch(ctx)
+ }
return wire.SyncGroupSpec{}, verror.NewErrNotImplemented(ctx)
}
-func (d *database) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+func (d *databaseReq) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return verror.NewErrNotImplemented(ctx)
}
-func (d *database) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+func (d *databaseReq) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return verror.NewErrNotImplemented(ctx)
}
-func (d *database) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+func (d *databaseReq) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return verror.NewErrNotImplemented(ctx)
}
-func (d *database) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+func (d *databaseReq) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+ if d.batchId != nil {
+ return wire.SyncGroupSpec{}, "", wire.NewErrBoundToBatch(ctx)
+ }
return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
}
-func (d *database) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+func (d *databaseReq) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return verror.NewErrNotImplemented(ctx)
}
-func (d *database) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+func (d *databaseReq) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+ if d.batchId != nil {
+ return nil, wire.NewErrBoundToBatch(ctx)
+ }
return nil, verror.NewErrNotImplemented(ctx)
}
diff --git a/x/ref/services/syncbase/server/nosql/dispatcher.go b/x/ref/services/syncbase/server/nosql/dispatcher.go
index d8fe2f0..3bbea26 100644
--- a/x/ref/services/syncbase/server/nosql/dispatcher.go
+++ b/x/ref/services/syncbase/server/nosql/dispatcher.go
@@ -5,12 +5,14 @@
package nosql
import (
+ "strconv"
"strings"
wire "v.io/syncbase/v23/services/syncbase"
nosqlWire "v.io/syncbase/v23/services/syncbase/nosql"
pubutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
@@ -35,9 +37,15 @@
vlog.Fatal("invalid nosql.dispatcher Lookup")
}
+ dParts := strings.Split(parts[0], util.BatchSep)
+ dName := dParts[0]
+
// Validate all key atoms up front, so that we can avoid doing so in all our
// method implementations.
- for _, s := range parts {
+ if !pubutil.ValidName(dName) {
+ return nil, nil, wire.NewErrInvalidName(nil, suffix)
+ }
+ for _, s := range parts[1:] {
if !pubutil.ValidName(s) {
return nil, nil, wire.NewErrInvalidName(nil, suffix)
}
@@ -45,22 +53,28 @@
dExists := false
var d *database
- if dint, err := disp.a.NoSQLDatabase(nil, nil, parts[0]); err == nil {
- d = dint.(*database) // panics on failure, as desired
+ if dInt, err := disp.a.NoSQLDatabase(nil, nil, dName); err == nil {
+ d = dInt.(*database) // panics on failure, as desired
dExists = true
} else {
if verror.ErrorID(err) != verror.ErrNoExist.ID {
return nil, nil, err
} else {
+ // Database does not exist. Create a short-lived database object to
+ // service this request.
d = &database{
- name: parts[0],
+ name: dName,
a: disp.a,
}
}
}
+ dReq := &databaseReq{database: d}
+ if !setBatchFields(dReq, dParts) {
+ return nil, nil, wire.NewErrInvalidName(nil, suffix)
+ }
if len(parts) == 1 {
- return nosqlWire.DatabaseServer(d), nil, nil
+ return nosqlWire.DatabaseServer(dReq), nil, nil
}
// All table and row methods require the database to exist. If it doesn't,
@@ -73,21 +87,50 @@
// downstream handling of this request. Depending on the order in which things
// execute, the client may not get an error, but in any case ultimately the
// store will end up in a consistent state.
- t := &table{
+ tReq := &tableReq{
name: parts[1],
- d: d,
+ d: dReq,
}
if len(parts) == 2 {
- return nosqlWire.TableServer(t), nil, nil
+ return nosqlWire.TableServer(tReq), nil, nil
}
- r := &row{
+ rReq := &rowReq{
key: parts[2],
- t: t,
+ t: tReq,
}
if len(parts) == 3 {
- return nosqlWire.RowServer(r), nil, nil
+ return nosqlWire.RowServer(rReq), nil, nil
}
return nil, nil, verror.NewErrNoExist(nil)
}
+
+// setBatchFields sets the batch-related fields in databaseReq based on the
+// value of dParts, the parts of the database name component. It returns false
+// if dParts is malformed.
+func setBatchFields(d *databaseReq, dParts []string) bool {
+ if len(dParts) == 1 {
+ return true
+ }
+ if len(dParts) != 3 {
+ return false
+ }
+ batchId, err := strconv.ParseUint(dParts[2], 0, 64)
+ if err != nil {
+ return false
+ }
+ d.batchId = &batchId
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ var ok bool
+ switch dParts[1] {
+ case "sn":
+ d.sn, ok = d.sns[batchId]
+ case "tx":
+ d.tx, ok = d.txs[batchId]
+ default:
+ return false
+ }
+ return ok
+}
diff --git a/x/ref/services/syncbase/server/nosql/row.go b/x/ref/services/syncbase/server/nosql/row.go
index 057dc10..3397fd1 100644
--- a/x/ref/services/syncbase/server/nosql/row.go
+++ b/x/ref/services/syncbase/server/nosql/row.go
@@ -13,67 +13,95 @@
"v.io/v23/verror"
)
-// TODO(sadovsky): Extend data layout to support version tracking for sync.
-// See go/vanadium-local-structured-store.
-
-// TODO(sadovsky): Handle the case where we're in a batch.
-
-type row struct {
+// rowReq is a per-request object that handles Row RPCs.
+type rowReq struct {
key string
- t *table
+ t *tableReq
}
var (
- _ wire.RowServerMethods = (*row)(nil)
- _ util.Layer = (*row)(nil)
+ _ wire.RowServerMethods = (*rowReq)(nil)
+ _ util.Layer = (*rowReq)(nil)
)
////////////////////////////////////////
// RPC methods
-func (r *row) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
- return r.get(ctx, call, r.t.d.st)
+func (r *rowReq) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
+ impl := func(st store.StoreReader) ([]byte, error) {
+ return r.get(ctx, call, st)
+ }
+ var st store.StoreReader
+ if r.t.d.batchId != nil {
+ st = r.t.d.batchReader()
+ } else {
+ sn := r.t.d.st.NewSnapshot()
+ st = sn
+ defer sn.Close()
+ }
+ return impl(st)
}
-func (r *row) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
- return r.put(ctx, call, r.t.d.st, value)
+func (r *rowReq) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
+ impl := func(st store.StoreReadWriter) error {
+ return r.put(ctx, call, st, value)
+ }
+ if r.t.d.batchId != nil {
+ if st, err := r.t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(r.t.d.st, impl)
+ }
}
-func (r *row) Delete(ctx *context.T, call rpc.ServerCall) error {
- return r.del(ctx, call, r.t.d.st)
+func (r *rowReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+ impl := func(st store.StoreReadWriter) error {
+ return r.delete(ctx, call, st)
+ }
+ if r.t.d.batchId != nil {
+ if st, err := r.t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(r.t.d.st, impl)
+ }
}
////////////////////////////////////////
// util.Layer methods
-func (r *row) Name() string {
+func (r *rowReq) Name() string {
return r.key
}
-func (r *row) StKey() string {
+func (r *rowReq) StKey() string {
return util.JoinKeyParts(util.RowPrefix, r.stKeyPart())
}
////////////////////////////////////////
// Internal helpers
-func (r *row) stKeyPart() string {
+func (r *rowReq) stKeyPart() string {
return util.JoinKeyParts(r.t.stKeyPart(), r.key)
}
-// TODO(sadovsky): Update access checks to use prefix permissions.
-
-// checkAccess checks that this row's table exists in the database and performs
+// checkAccess checks that this row's table exists in the database, and performs
// an authorization check (currently against the table perms).
// Returns a VDL-compatible error.
-func (r *row) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
+// TODO(sadovsky): Use prefix permissions.
+func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
return util.Get(ctx, call, st, r.t, &tableData{})
}
// get reads data from the storage engine.
// Performs authorization check.
// Returns a VDL-compatible error.
-func (r *row) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
+func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
if err := r.checkAccess(ctx, call, st); err != nil {
return nil, err
}
@@ -90,7 +118,7 @@
// put writes data to the storage engine.
// Performs authorization check.
// Returns a VDL-compatible error.
-func (r *row) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
+func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
if err := r.checkAccess(ctx, call, st); err != nil {
return err
}
@@ -100,10 +128,10 @@
return nil
}
-// del deletes data from the storage engine.
+// delete deletes data from the storage engine.
// Performs authorization check.
// Returns a VDL-compatible error.
-func (r *row) del(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
if err := r.checkAccess(ctx, call, st); err != nil {
return err
}
diff --git a/x/ref/services/syncbase/server/nosql/table.go b/x/ref/services/syncbase/server/nosql/table.go
index e73afd2..7619fc4 100644
--- a/x/ref/services/syncbase/server/nosql/table.go
+++ b/x/ref/services/syncbase/server/nosql/table.go
@@ -14,22 +14,24 @@
"v.io/v23/verror"
)
-// TODO(sadovsky): Handle the case where we're in a batch.
-
-type table struct {
+// tableReq is a per-request object that handles Table RPCs.
+type tableReq struct {
name string
- d *database
+ d *databaseReq
}
var (
- _ wire.TableServerMethods = (*table)(nil)
- _ util.Layer = (*table)(nil)
+ _ wire.TableServerMethods = (*tableReq)(nil)
+ _ util.Layer = (*tableReq)(nil)
)
////////////////////////////////////////
// RPC methods
-func (t *table) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (t *tableReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+ if t.d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
// Check databaseData perms.
dData := &databaseData{}
@@ -56,7 +58,10 @@
})
}
-func (t *table) Delete(ctx *context.T, call rpc.ServerCall) error {
+func (t *tableReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+ if t.d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
// Read-check-delete tableData.
if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
@@ -70,8 +75,8 @@
})
}
-func (t *table) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
- return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
+ impl := func(st store.StoreReadWriter) error {
// Check perms.
if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
return err
@@ -88,83 +93,136 @@
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
- })
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
}
-func (t *table) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
- sn := t.d.st.NewSnapshot()
- defer sn.Close()
- // Check perms.
- if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
- return err
+func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
+ impl := func(st store.StoreReader) error {
+ // Check perms.
+ if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ return err
+ }
+ it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+ sender := call.SendStream()
+ key, value := []byte{}, []byte{}
+ for it.Advance() {
+ key, value = it.Key(key), it.Value(value)
+ parts := util.SplitKeyParts(string(key))
+ sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+ }
+ if err := it.Err(); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
}
- it := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
- sender := call.SendStream()
- key, value := []byte{}, []byte{}
- for it.Advance() {
- key, value = it.Key(key), it.Value(value)
- parts := util.SplitKeyParts(string(key))
- sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+ var st store.StoreReader
+ if t.d.batchId != nil {
+ st = t.d.batchReader()
+ } else {
+ sn := t.d.st.NewSnapshot()
+ st = sn
+ defer sn.Close()
}
- if err := it.Err(); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return impl(st)
}
-func (t *table) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
if prefix != "" {
return verror.NewErrNotImplemented(ctx)
}
- return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+ impl := func(st store.StoreReadWriter) error {
data := &tableData{}
return util.Update(ctx, call, st, t, data, func() error {
data.Perms = perms
return nil
})
- })
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
}
-func (t *table) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
+func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
if key != "" {
return nil, verror.NewErrNotImplemented(ctx)
}
- data := &tableData{}
- if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
- return nil, err
+ impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
+ data := &tableData{}
+ if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+ return nil, err
+ }
+ return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
}
- return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+ var st store.StoreReader
+ if t.d.batchId != nil {
+ st = t.d.batchReader()
+ } else {
+ sn := t.d.st.NewSnapshot()
+ st = sn
+ defer sn.Close()
+ }
+ return impl(st)
}
-func (t *table) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
+func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
return verror.NewErrNotImplemented(ctx)
}
-func (t *table) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
- sn := t.d.st.NewSnapshot()
- // Check perms.
- if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
- sn.Close()
- return nil, err
+func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+ impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
+ // Check perms.
+ if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ closeStoreReader()
+ return nil, err
+ }
+ return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
}
- pattern := "*"
- return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.RowPrefix, t.name))
+ var st store.StoreReader
+ var closeStoreReader func() error
+ if t.d.batchId != nil {
+ st = t.d.batchReader()
+ closeStoreReader = func() error {
+ return nil
+ }
+ } else {
+ sn := t.d.st.NewSnapshot()
+ st = sn
+ closeStoreReader = func() error {
+ return sn.Close()
+ }
+ }
+ return impl(st, closeStoreReader)
}
////////////////////////////////////////
// util.Layer methods
-func (t *table) Name() string {
+func (t *tableReq) Name() string {
return t.name
}
-func (t *table) StKey() string {
+func (t *tableReq) StKey() string {
return util.JoinKeyParts(util.TablePrefix, t.stKeyPart())
}
////////////////////////////////////////
// Internal helpers
-func (t *table) stKeyPart() string {
+func (t *tableReq) stKeyPart() string {
return t.name
}
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index fd3e3b8..86a2a37 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -23,15 +23,7 @@
"v.io/v23/verror"
)
-type ServiceOptions struct {
- // Service-level permissions.
- Perms access.Permissions
- // Root dir for data storage.
- RootDir string
- // Storage engine to use (for service and per-database engines).
- Engine string
-}
-
+// service is a singleton (i.e. not per-request) that handles Service RPCs.
type service struct {
st store.Store // keeps track of which apps and databases exist, etc.
sync interfaces.SyncServerMethods
@@ -48,6 +40,16 @@
_ util.Layer = (*service)(nil)
)
+// ServiceOptions configures a service.
+type ServiceOptions struct {
+ // Service-level permissions.
+ Perms access.Permissions
+ // Root dir for data storage.
+ RootDir string
+ // Storage engine to use (for service and per-database engines).
+ Engine string
+}
+
// NewService creates a new service instance and returns it.
// Returns a VDL-compatible error.
func NewService(ctx *context.T, call rpc.ServerCall, opts ServiceOptions) (*service, error) {
@@ -103,12 +105,14 @@
func (s *service) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
// Check perms.
sn := s.st.NewSnapshot()
+ closeSnapshot := func() error {
+ return sn.Close()
+ }
if err := util.Get(ctx, call, sn, s, &serviceData{}); err != nil {
- sn.Close()
+ closeSnapshot()
return nil, err
}
- pattern := "*"
- return util.Glob(ctx, call, pattern, sn, util.AppPrefix)
+ return util.Glob(ctx, call, "*", sn, closeSnapshot, util.AppPrefix)
}
////////////////////////////////////////
diff --git a/x/ref/services/syncbase/server/util/constants.go b/x/ref/services/syncbase/server/util/constants.go
index 88e1115..44d0ff6 100644
--- a/x/ref/services/syncbase/server/util/constants.go
+++ b/x/ref/services/syncbase/server/util/constants.go
@@ -23,4 +23,8 @@
const (
// Service object name suffix for Syncbase-to-Syncbase RPCs.
SyncbaseSuffix = "$internal"
+ // Separator for batch info in database names.
+ BatchSep = ":"
+ // Separator for parts of storage engine keys.
+ KeyPartSep = ":"
)
diff --git a/x/ref/services/syncbase/server/util/glob.go b/x/ref/services/syncbase/server/util/glob.go
index 87a204e..acdd74f 100644
--- a/x/ref/services/syncbase/server/util/glob.go
+++ b/x/ref/services/syncbase/server/util/glob.go
@@ -53,29 +53,29 @@
return res, nil
}
-// Takes ownership of sn.
+// Glob performs a glob. It calls closeStoreReader to close st.
// TODO(sadovsky): Why do we make developers implement Glob differently from
// other streaming RPCs? It's confusing that Glob must return immediately and
// write its results to a channel, while other streaming RPC handlers must block
// and write their results to the output stream. See nlacasse's TODO below, too.
-func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan string, error) {
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, st store.StoreReader, closeStoreReader func() error, stKeyPrefix string) (<-chan string, error) {
// TODO(sadovsky): Support glob with non-prefix pattern.
if _, err := glob.Parse(pattern); err != nil {
- sn.Close()
+ closeStoreReader()
return nil, verror.New(verror.ErrBadArg, ctx, err)
}
prefix, err := globPatternToPrefix(pattern)
if err != nil {
- sn.Close()
+ closeStoreReader()
if verror.ErrorID(err) == verror.ErrBadArg.ID {
return nil, verror.NewErrNotImplemented(ctx)
}
return nil, verror.New(verror.ErrInternal, ctx, err)
}
- it := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+ it := st.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
ch := make(chan string)
go func() {
- defer sn.Close()
+ defer closeStoreReader()
defer close(ch)
key := []byte{}
for it.Advance() {
diff --git a/x/ref/services/syncbase/server/util/key_util.go b/x/ref/services/syncbase/server/util/key_util.go
index b469f9e..80a8a6d 100644
--- a/x/ref/services/syncbase/server/util/key_util.go
+++ b/x/ref/services/syncbase/server/util/key_util.go
@@ -13,12 +13,12 @@
// JoinKeyParts builds keys for accessing data in the storage engine.
func JoinKeyParts(parts ...string) string {
// TODO(sadovsky): Figure out which delimiter makes the most sense.
- return strings.Join(parts, ":")
+ return strings.Join(parts, KeyPartSep)
}
// SplitKeyParts is the inverse of JoinKeyParts.
func SplitKeyParts(key string) []string {
- return strings.Split(key, ":")
+ return strings.Split(key, KeyPartSep)
}
// ScanPrefixArgs returns args for sn.Scan() for the specified prefix.