syncbase: BeginBatch() bugfix and restartability TODOs

This CL addresses a few issues uncovered by jlodhia@.

- Makes it so that Database.BeginBatch() fails gracefully if
  the database does not exist (and adds a test).
- Restores the general mechanism for tracking existence in server's
  app/db structs, to prevent this problem from recurring elsewhere.
- Adds TODOs for supporting restartability by making service.apps
  and app.dbs act like lazily-populated caches.

Change-Id: I788d6681c322cd8bc43472b4faa4663235d240c2
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index f16f657..b35afce 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -17,6 +17,7 @@
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
 )
 
 // app is a per-app singleton (i.e. not per-request) that handles App RPCs.
@@ -24,6 +25,7 @@
 	name string
 	s    *service
 	// The fields below are initialized iff this app exists.
+	exists bool
 	// Guards the fields below. Held during database Create, Delete, and
 	// SetPermissions.
 	mu  sync.Mutex
@@ -42,6 +44,9 @@
 // TODO(sadovsky): Require the app name to match the client's blessing name.
 // I.e. reserve names at the app level of the hierarchy.
 func (a *app) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+	if a.exists {
+		return verror.New(verror.ErrExist, ctx, a.name)
+	}
 	// This app does not yet exist; a is just an ephemeral handle that holds
 	// {name string, s *service}. a.s.createApp will create a new app handle and
 	// store it in a.s.apps[a.name].
@@ -53,10 +58,16 @@
 }
 
 func (a *app) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+	if !a.exists {
+		return verror.New(verror.ErrNoExist, ctx, a.name)
+	}
 	return a.s.setAppPerms(ctx, call, a.name, perms, version)
 }
 
 func (a *app) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+	if !a.exists {
+		return nil, "", verror.New(verror.ErrNoExist, ctx, a.name)
+	}
 	data := &appData{}
 	if err := util.Get(ctx, call, a.s.st, a, data); err != nil {
 		return nil, "", err
@@ -65,6 +76,9 @@
 }
 
 func (a *app) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+	if !a.exists {
+		return nil, verror.New(verror.ErrNoExist, ctx, a.name)
+	}
 	// Check perms.
 	sn := a.s.st.NewSnapshot()
 	closeSnapshot := func() error {
@@ -84,7 +98,16 @@
 	return a.s
 }
 
+// TODO(sadovsky): Currently, we return an error (here and elsewhere) if the
+// specified db does not exist in a.dbs. But note, this will always be the case
+// after a Syncbase service restart. The implementation should be updated so
+// that a.dbs acts as an in-memory cache of db handles. If a database exists but
+// is not present in a.dbs, a.NoSQLDatabase() should open the database and add
+// its handle to a.dbs.
 func (a *app) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (interfaces.Database, error) {
+	if !a.exists {
+		vlog.Fatalf("app %q does not exist", a.name)
+	}
 	// TODO(sadovsky): Record storage engine config (e.g. LevelDB directory) in
 	// dbInfo, and add API for opening and closing storage engines.
 	a.mu.Lock()
@@ -97,6 +120,9 @@
 }
 
 func (a *app) NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	if !a.exists {
+		vlog.Fatalf("app %q does not exist", a.name)
+	}
 	// In the future this API will likely be replaced by one that streams the
 	// database names.
 	a.mu.Lock()
@@ -109,6 +135,9 @@
 }
 
 func (a *app) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error {
+	if !a.exists {
+		vlog.Fatalf("app %q does not exist", a.name)
+	}
 	// TODO(sadovsky): Crash if any step fails, and use WAL to ensure that if we
 	// crash, upon restart we execute any remaining steps before we start handling
 	// client requests.
@@ -154,7 +183,7 @@
 	}
 	d, err := nosql.NewDatabase(ctx, call, a, dbName, nosql.DatabaseOptions{
 		Perms:   perms,
-		RootDir: a.rootDirForDB(dbName),
+		RootDir: a.rootDirForDb(dbName),
 		Engine:  a.s.opts.Engine,
 	})
 	if err != nil {
@@ -176,6 +205,9 @@
 }
 
 func (a *app) DeleteNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) error {
+	if !a.exists {
+		vlog.Fatalf("app %q does not exist", a.name)
+	}
 	// TODO(sadovsky): Crash if any step fails, and use WAL to ensure that if we
 	// crash, upon restart we execute any remaining steps before we start handling
 	// client requests.
@@ -214,7 +246,7 @@
 	if err := d.St().Close(); err != nil {
 		return err
 	}
-	if err := util.DestroyStore(a.s.opts.Engine, a.rootDirForDB(dbName)); err != nil {
+	if err := util.DestroyStore(a.s.opts.Engine, a.rootDirForDb(dbName)); err != nil {
 		return err
 	}
 
@@ -228,6 +260,9 @@
 }
 
 func (a *app) SetDatabasePerms(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, version string) error {
+	if !a.exists {
+		vlog.Fatalf("app %q does not exist", a.name)
+	}
 	a.mu.Lock()
 	defer a.mu.Unlock()
 	d, ok := a.dbs[dbName]
@@ -255,6 +290,6 @@
 	return a.name
 }
 
-func (a *app) rootDirForDB(dbName string) string {
+func (a *app) rootDirForDb(dbName string) string {
 	return path.Join(a.s.opts.RootDir, "apps", a.name, dbName)
 }
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 82a366d..b9f7d7b 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -33,7 +33,8 @@
 	name string
 	a    interfaces.App
 	// The fields below are initialized iff this database exists.
-	st store.Store // stores all data for a single database
+	exists bool
+	st     store.Store // stores all data for a single database
 
 	// Active snapshots and transactions corresponding to client batches.
 	// TODO(sadovsky): Add timeouts and GC.
@@ -86,11 +87,12 @@
 		return nil, err
 	}
 	d := &database{
-		name: name,
-		a:    a,
-		st:   st,
-		sns:  make(map[uint64]store.Snapshot),
-		txs:  make(map[uint64]store.Transaction),
+		name:   name,
+		a:      a,
+		exists: true,
+		st:     st,
+		sns:    make(map[uint64]store.Snapshot),
+		txs:    make(map[uint64]store.Transaction),
 	}
 	data := &databaseData{
 		Name:  d.name,
@@ -106,6 +108,9 @@
 // RPC methods
 
 func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+	if d.exists {
+		return verror.New(verror.ErrExist, ctx, d.name)
+	}
 	if d.batchId != nil {
 		return wire.NewErrBoundToBatch(ctx)
 	}
@@ -125,6 +130,9 @@
 var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
 
 func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
+	if !d.exists {
+		return "", verror.New(verror.ErrNoExist, ctx, d.name)
+	}
 	if d.batchId != nil {
 		return "", wire.NewErrBoundToBatch(ctx)
 	}
@@ -152,6 +160,9 @@
 }
 
 func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall) error {
+	if !d.exists {
+		return verror.New(verror.ErrNoExist, ctx, d.name)
+	}
 	if d.batchId == nil {
 		return wire.NewErrNotBoundToBatch(ctx)
 	}
@@ -168,6 +179,9 @@
 }
 
 func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall) error {
+	if !d.exists {
+		return verror.New(verror.ErrNoExist, ctx, d.name)
+	}
 	if d.batchId == nil {
 		return wire.NewErrNotBoundToBatch(ctx)
 	}
@@ -189,6 +203,9 @@
 }
 
 func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+	if !d.exists {
+		return verror.New(verror.ErrNoExist, ctx, d.name)
+	}
 	if d.batchId != nil {
 		return wire.NewErrBoundToBatch(ctx)
 	}
@@ -196,6 +213,9 @@
 }
 
 func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+	if !d.exists {
+		return nil, "", verror.New(verror.ErrNoExist, ctx, d.name)
+	}
 	if d.batchId != nil {
 		return nil, "", wire.NewErrBoundToBatch(ctx)
 	}
@@ -207,6 +227,9 @@
 }
 
 func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+	if !d.exists {
+		return nil, verror.New(verror.ErrNoExist, ctx, d.name)
+	}
 	if d.batchId != nil {
 		return nil, wire.NewErrBoundToBatch(ctx)
 	}
@@ -226,7 +249,7 @@
 // interfaces.Database methods
 
 func (d *database) St() store.Store {
-	if d.st == nil {
+	if !d.exists {
 		vlog.Fatalf("database %q does not exist", d.name)
 	}
 	return d.st
@@ -237,11 +260,14 @@
 }
 
 func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+	if !d.exists {
+		vlog.Fatalf("database %q does not exist", d.name)
+	}
 	return util.Get(ctx, call, st, d, &databaseData{})
 }
 
 func (d *database) SetPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
-	if d.st == nil {
+	if !d.exists {
 		vlog.Fatalf("database %q does not exist", d.name)
 	}
 	return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 86a2a37..e551431 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -126,6 +126,12 @@
 	return s.sync
 }
 
+// TODO(sadovsky): Currently, we return an error (here and elsewhere) if the
+// specified app does not exist in s.apps. But note, this will always be the
+// case after a Syncbase service restart. The implementation should be updated
+// so that s.apps acts as an in-memory cache of app handles. If an app exists
+// but is not present in s.apps, s.App() should open the app and add its handle
+// to s.apps.
 func (s *service) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
@@ -159,9 +165,10 @@
 	}
 
 	a := &app{
-		name: appName,
-		s:    s,
-		dbs:  map[string]interfaces.Database{},
+		name:   appName,
+		s:      s,
+		exists: true,
+		dbs:    map[string]interfaces.Database{},
 	}
 
 	if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {