syncbase: BeginBatch() bugfix and restartability TODOs
This CL addresses a few issues uncovered by jlodhia@.
- Makes it so that Database.BeginBatch() fails gracefully if
the database does not exist (and adds a test).
- Restores the general mechanism for tracking existence in server's
app/db structs, to prevent this problem from recurring elsewhere.
- Adds TODOs for supporting restartability by making service.apps
and app.dbs act like lazily-populated caches.
Change-Id: I788d6681c322cd8bc43472b4faa4663235d240c2
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index f16f657..b35afce 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -17,6 +17,7 @@
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
)
// app is a per-app singleton (i.e. not per-request) that handles App RPCs.
@@ -24,6 +25,7 @@
name string
s *service
// The fields below are initialized iff this app exists.
+ exists bool
// Guards the fields below. Held during database Create, Delete, and
// SetPermissions.
mu sync.Mutex
@@ -42,6 +44,9 @@
// TODO(sadovsky): Require the app name to match the client's blessing name.
// I.e. reserve names at the app level of the hierarchy.
func (a *app) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+ if a.exists {
+ return verror.New(verror.ErrExist, ctx, a.name)
+ }
// This app does not yet exist; a is just an ephemeral handle that holds
// {name string, s *service}. a.s.createApp will create a new app handle and
// store it in a.s.apps[a.name].
@@ -53,10 +58,16 @@
}
func (a *app) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+ if !a.exists {
+ return verror.New(verror.ErrNoExist, ctx, a.name)
+ }
return a.s.setAppPerms(ctx, call, a.name, perms, version)
}
func (a *app) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+ if !a.exists {
+ return nil, "", verror.New(verror.ErrNoExist, ctx, a.name)
+ }
data := &appData{}
if err := util.Get(ctx, call, a.s.st, a, data); err != nil {
return nil, "", err
@@ -65,6 +76,9 @@
}
func (a *app) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+ if !a.exists {
+ return nil, verror.New(verror.ErrNoExist, ctx, a.name)
+ }
// Check perms.
sn := a.s.st.NewSnapshot()
closeSnapshot := func() error {
@@ -84,7 +98,16 @@
return a.s
}
+// TODO(sadovsky): Currently, we return an error (here and elsewhere) if the
+// specified db does not exist in a.dbs. But note, this will always be the case
+// after a Syncbase service restart. The implementation should be updated so
+// that a.dbs acts as an in-memory cache of db handles. If a database exists but
+// is not present in a.dbs, a.NoSQLDatabase() should open the database and add
+// its handle to a.dbs.
func (a *app) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (interfaces.Database, error) {
+ if !a.exists {
+ vlog.Fatalf("app %q does not exist", a.name)
+ }
// TODO(sadovsky): Record storage engine config (e.g. LevelDB directory) in
// dbInfo, and add API for opening and closing storage engines.
a.mu.Lock()
@@ -97,6 +120,9 @@
}
func (a *app) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ if !a.exists {
+ vlog.Fatalf("app %q does not exist", a.name)
+ }
// In the future this API will likely be replaced by one that streams the
// database names.
a.mu.Lock()
@@ -109,6 +135,9 @@
}
func (a *app) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error {
+ if !a.exists {
+ vlog.Fatalf("app %q does not exist", a.name)
+ }
// TODO(sadovsky): Crash if any step fails, and use WAL to ensure that if we
// crash, upon restart we execute any remaining steps before we start handling
// client requests.
@@ -154,7 +183,7 @@
}
d, err := nosql.NewDatabase(ctx, call, a, dbName, nosql.DatabaseOptions{
Perms: perms,
- RootDir: a.rootDirForDB(dbName),
+ RootDir: a.rootDirForDb(dbName),
Engine: a.s.opts.Engine,
})
if err != nil {
@@ -176,6 +205,9 @@
}
func (a *app) DeleteNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) error {
+ if !a.exists {
+ vlog.Fatalf("app %q does not exist", a.name)
+ }
// TODO(sadovsky): Crash if any step fails, and use WAL to ensure that if we
// crash, upon restart we execute any remaining steps before we start handling
// client requests.
@@ -214,7 +246,7 @@
if err := d.St().Close(); err != nil {
return err
}
- if err := util.DestroyStore(a.s.opts.Engine, a.rootDirForDB(dbName)); err != nil {
+ if err := util.DestroyStore(a.s.opts.Engine, a.rootDirForDb(dbName)); err != nil {
return err
}
@@ -228,6 +260,9 @@
}
func (a *app) SetDatabasePerms(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, version string) error {
+ if !a.exists {
+ vlog.Fatalf("app %q does not exist", a.name)
+ }
a.mu.Lock()
defer a.mu.Unlock()
d, ok := a.dbs[dbName]
@@ -255,6 +290,6 @@
return a.name
}
-func (a *app) rootDirForDB(dbName string) string {
+func (a *app) rootDirForDb(dbName string) string {
return path.Join(a.s.opts.RootDir, "apps", a.name, dbName)
}
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 82a366d..b9f7d7b 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -33,7 +33,8 @@
name string
a interfaces.App
// The fields below are initialized iff this database exists.
- st store.Store // stores all data for a single database
+ exists bool
+ st store.Store // stores all data for a single database
// Active snapshots and transactions corresponding to client batches.
// TODO(sadovsky): Add timeouts and GC.
@@ -86,11 +87,12 @@
return nil, err
}
d := &database{
- name: name,
- a: a,
- st: st,
- sns: make(map[uint64]store.Snapshot),
- txs: make(map[uint64]store.Transaction),
+ name: name,
+ a: a,
+ exists: true,
+ st: st,
+ sns: make(map[uint64]store.Snapshot),
+ txs: make(map[uint64]store.Transaction),
}
data := &databaseData{
Name: d.name,
@@ -106,6 +108,9 @@
// RPC methods
func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+ if d.exists {
+ return verror.New(verror.ErrExist, ctx, d.name)
+ }
if d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
@@ -125,6 +130,9 @@
var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
+ if !d.exists {
+ return "", verror.New(verror.ErrNoExist, ctx, d.name)
+ }
if d.batchId != nil {
return "", wire.NewErrBoundToBatch(ctx)
}
@@ -152,6 +160,9 @@
}
func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall) error {
+ if !d.exists {
+ return verror.New(verror.ErrNoExist, ctx, d.name)
+ }
if d.batchId == nil {
return wire.NewErrNotBoundToBatch(ctx)
}
@@ -168,6 +179,9 @@
}
func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall) error {
+ if !d.exists {
+ return verror.New(verror.ErrNoExist, ctx, d.name)
+ }
if d.batchId == nil {
return wire.NewErrNotBoundToBatch(ctx)
}
@@ -189,6 +203,9 @@
}
func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+ if !d.exists {
+ return verror.New(verror.ErrNoExist, ctx, d.name)
+ }
if d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
@@ -196,6 +213,9 @@
}
func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+ if !d.exists {
+ return nil, "", verror.New(verror.ErrNoExist, ctx, d.name)
+ }
if d.batchId != nil {
return nil, "", wire.NewErrBoundToBatch(ctx)
}
@@ -207,6 +227,9 @@
}
func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+ if !d.exists {
+ return nil, verror.New(verror.ErrNoExist, ctx, d.name)
+ }
if d.batchId != nil {
return nil, wire.NewErrBoundToBatch(ctx)
}
@@ -226,7 +249,7 @@
// interfaces.Database methods
func (d *database) St() store.Store {
- if d.st == nil {
+ if !d.exists {
vlog.Fatalf("database %q does not exist", d.name)
}
return d.st
@@ -237,11 +260,14 @@
}
func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+ if !d.exists {
+ vlog.Fatalf("database %q does not exist", d.name)
+ }
return util.Get(ctx, call, st, d, &databaseData{})
}
func (d *database) SetPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
- if d.st == nil {
+ if !d.exists {
vlog.Fatalf("database %q does not exist", d.name)
}
return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 86a2a37..e551431 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -126,6 +126,12 @@
return s.sync
}
+// TODO(sadovsky): Currently, we return an error (here and elsewhere) if the
+// specified app does not exist in s.apps. But note, this will always be the
+// case after a Syncbase service restart. The implementation should be updated
+// so that s.apps acts as an in-memory cache of app handles. If an app exists
+// but is not present in s.apps, s.App() should open the app and add its handle
+// to s.apps.
func (s *service) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -159,9 +165,10 @@
}
a := &app{
- name: appName,
- s: s,
- dbs: map[string]interfaces.Database{},
+ name: appName,
+ s: s,
+ exists: true,
+ dbs: map[string]interfaces.Database{},
}
if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {