syncbase: support restarts
MultiPart: 1/2
Implementation notes:
- Currently, I reinit in-memory data structures
(apps and dbs maps) at syncbase startup.
- Testing strategy: v23 integration test starts a
syncbase instance, writes to it, verifies what's
written, restarts it, and verifies what's written
a second time.
- Eventually we'll need to avoid having too many DBs
open and running out of file descriptors or some
other resource. The plan (discussed a bit with Bindu
and Raja) is to do this by replacing DB's store.Store
with a wrapper that handles store paging. I've added
a TODO for this.
Change-Id: Iec64eee3971162104127d4732595d796fc17fcbe
diff --git a/v23/syncbase/client_v23_test.go b/v23/syncbase/client_v23_test.go
index ab147bd..eaadc2c 100644
--- a/v23/syncbase/client_v23_test.go
+++ b/v23/syncbase/client_v23_test.go
@@ -5,9 +5,15 @@
package syncbase_test
import (
+ "bytes"
"fmt"
+ "io/ioutil"
+ "os"
+ "reflect"
+ "time"
"v.io/syncbase/v23/syncbase"
+ "v.io/syncbase/v23/syncbase/nosql"
tu "v.io/syncbase/v23/syncbase/testutil"
"v.io/v23"
_ "v.io/x/ref/runtime/factories/generic"
@@ -18,21 +24,26 @@
//go:generate v23 test generate
const (
- syncbaseName = "sync" // Name which syncbase mounts itself at
+ syncbaseName = "syncbase" // Name that syncbase mounts itself at.
)
+// TODO(sadovsky): All tests in this file should be updated so that the client
+// carries blessing "root/client", so that access is not granted anywhere just
+// because the server blessing name is a prefix of the client blessing name or
+// vice versa.
+
func V23TestSyncbasedPutGet(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
clientCreds, _ := t.Shell().NewChildCredentials("server/client")
serverCreds, _ := t.Shell().NewChildCredentials("server")
- cleanup := tu.StartSyncbased(t, serverCreds, syncbaseName,
+ cleanup := tu.StartSyncbased(t, serverCreds, syncbaseName, "",
`{"Read": {"In":["root/server/client"]}, "Write": {"In":["root/server/client"]}}`)
defer cleanup()
- tu.RunClient(t, clientCreds, runClient)
+ tu.RunClient(t, clientCreds, runTestSyncbasedPutGet)
}
-var runClient = modules.Register(func(env *modules.Env, args ...string) error {
+var runTestSyncbasedPutGet = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
@@ -49,7 +60,8 @@
return fmt.Errorf("unable to create a table: %v", err)
}
tb := d.Table("tb")
- // Do Put, Get on a row.
+
+ // Do Put followed by Get on a row.
r := tb.Row("r")
if err := r.Put(ctx, "testkey"); err != nil {
return fmt.Errorf("r.Put() failed: %v", err)
@@ -62,4 +74,113 @@
return fmt.Errorf("unexpected value: got %q, want %q", got, want)
}
return nil
-}, "runClient")
+}, "runTestSyncbasedPutGet")
+
+func V23TestServiceRestart(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ clientCreds, _ := t.Shell().NewChildCredentials("server/client")
+ serverCreds, _ := t.Shell().NewChildCredentials("server")
+
+ rootDir, err := ioutil.TempDir("", "syncbase_leveldb")
+ if err != nil {
+ tu.V23Fatalf(t, "can't create temp dir: %v", err)
+ }
+
+ perms := tu.DefaultPerms("root/server/client")
+ buf := new(bytes.Buffer)
+ perms.WriteTo(buf)
+ permsLiteral := buf.String()
+
+ cleanup := tu.StartSyncbased(t, serverCreds, syncbaseName, rootDir, permsLiteral)
+ tu.RunClient(t, clientCreds, runCreateHierarchy)
+ tu.RunClient(t, clientCreds, runCheckHierarchy)
+ cleanup()
+
+ cleanup = tu.StartSyncbased(t, serverCreds, syncbaseName, rootDir, permsLiteral)
+ // TODO(sadovsky): This time.Sleep() is needed so that we wait for the
+ // syncbased server to initialize before sending it RPCs from
+ // runCheckHierarchy. Without this sleep, we get errors like "Apps do not
+ // match: got [], want [a1 a2]". It'd be nice if tu.StartSyncbased would wait
+ // until the server reports that it's ready, and/or if Glob wouldn't return an
+ // empty result set before the server is ready. (Perhaps the latter happens
+ // because the mount table doesn't care that the glob receiver itself doesn't
+ // exist?)
+ time.Sleep(2 * time.Second)
+ tu.RunClient(t, clientCreds, runCheckHierarchy)
+ cleanup()
+
+ if err := os.RemoveAll(rootDir); err != nil {
+ tu.V23Fatalf(t, "can't remove dir %v: %v", rootDir, err)
+ }
+}
+
+// Creates apps, dbs, tables, and rows.
+var runCreateHierarchy = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ s := syncbase.NewService(syncbaseName)
+ for _, a := range []syncbase.App{s.App("a1"), s.App("a2")} {
+ if err := a.Create(ctx, nil); err != nil {
+ return fmt.Errorf("a.Create() failed: %v", err)
+ }
+ for _, d := range []nosql.Database{a.NoSQLDatabase("d1"), a.NoSQLDatabase("d2")} {
+ if err := d.Create(ctx, nil); err != nil {
+ return fmt.Errorf("d.Create() failed: %v", err)
+ }
+ for _, tb := range []nosql.Table{d.Table("tb1"), d.Table("tb2")} {
+ if err := d.CreateTable(ctx, tb.Name(), nil); err != nil {
+ return fmt.Errorf("d.CreateTable() failed: %v", err)
+ }
+ for _, k := range []string{"foo", "bar"} {
+ if err := tb.Put(ctx, k, k); err != nil {
+ return fmt.Errorf("tb.Put() failed: %v", err)
+ }
+ }
+ }
+ }
+ }
+ return nil
+}, "runCreateHierarchy")
+
+// Checks for the apps, dbs, tables, and rows created by runCreateHierarchy.
+var runCheckHierarchy = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+ s := syncbase.NewService(syncbaseName)
+ var got, want []string
+ var err error
+ if got, err = s.ListApps(ctx); err != nil {
+ return fmt.Errorf("s.ListApps() failed: %v", err)
+ }
+ want = []string{"a1", "a2"}
+ if !reflect.DeepEqual(got, want) {
+ return fmt.Errorf("Apps do not match: got %v, want %v", got, want)
+ }
+ for _, aName := range want {
+ a := s.App(aName)
+ if got, err = a.ListDatabases(ctx); err != nil {
+ return fmt.Errorf("a.ListDatabases() failed: %v", err)
+ }
+ want = []string{"d1", "d2"}
+ if !reflect.DeepEqual(got, want) {
+ return fmt.Errorf("Databases do not match: got %v, want %v", got, want)
+ }
+ for _, dName := range want {
+ d := a.NoSQLDatabase(dName)
+ if got, err = d.ListTables(ctx); err != nil {
+ return fmt.Errorf("d.ListTables() failed: %v", err)
+ }
+ want = []string{"tb1", "tb2"}
+ if !reflect.DeepEqual(got, want) {
+ return fmt.Errorf("Tables do not match: got %v, want %v", got, want)
+ }
+ for _, tbName := range want {
+ tb := d.Table(tbName)
+ if err := tu.ScanMatches(ctx, tb, nosql.Prefix(""), []string{"bar", "foo"}, []interface{}{"bar", "foo"}); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ return nil
+}, "runCheckHierarchy")
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index e45cd65..ff4b5cc 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -24,13 +24,13 @@
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
server0Creds, _ := t.Shell().NewChildCredentials("s0")
client0Creds, _ := t.Shell().NewChildCredentials("s0/c0")
- cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0",
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
`{"Read": {"In":["root/s0/c0"]}, "Write": {"In":["root/s0/c0"]}}`)
defer cleanSync0()
server1Creds, _ := t.Shell().NewChildCredentials("s0/s1")
client1Creds, _ := t.Shell().NewChildCredentials("s0/s1/c1")
- cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1",
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
`{"Read": {"In":["root/s0/s1/c1"]}, "Write": {"In":["root/s0/s1/c1"]}}`)
defer cleanSync1()
diff --git a/v23/syncbase/testutil/util.go b/v23/syncbase/testutil/util.go
index ceae1e6..9509c4c 100644
--- a/v23/syncbase/testutil/util.go
+++ b/v23/syncbase/testutil/util.go
@@ -62,6 +62,8 @@
return d.Table(name)
}
+// TODO(sadovsky): Drop the 'perms' argument. The only client that passes
+// non-nil, syncgroup_test.go, should use SetupOrDieCustom instead.
func SetupOrDie(perms access.Permissions) (clientCtx *context.T, serverName string, cleanup func()) {
_, clientCtx, serverName, _, cleanup = SetupOrDieCustom("client", "server", perms)
return
@@ -100,9 +102,9 @@
return perms
}
-func CheckScan(t *testing.T, ctx *context.T, tb nosql.Table, r nosql.RowRange, wantKeys []string, wantValues []interface{}) {
+func ScanMatches(ctx *context.T, tb nosql.Table, r nosql.RowRange, wantKeys []string, wantValues []interface{}) error {
if len(wantKeys) != len(wantValues) {
- panic("bad input args")
+ return fmt.Errorf("bad input args")
}
it := tb.Scan(ctx, r)
gotKeys := []string{}
@@ -116,23 +118,30 @@
// Check key.
wantKey := wantKeys[i]
if gotKey != wantKey {
- Fatalf(t, "Keys do not match: got %q, want %q", gotKey, wantKey)
+ return fmt.Errorf("Keys do not match: got %q, want %q", gotKey, wantKey)
}
// Check value.
wantValue := wantValues[i]
gotValue := reflect.Zero(reflect.TypeOf(wantValue)).Interface()
if err := it.Value(&gotValue); err != nil {
- Fatalf(t, "it.Value() failed: %v", err)
+ return fmt.Errorf("it.Value() failed: %v", err)
}
if !reflect.DeepEqual(gotValue, wantValue) {
- Fatalf(t, "Values do not match: got %v, want %v", gotValue, wantValue)
+ return fmt.Errorf("Values do not match: got %v, want %v", gotValue, wantValue)
}
}
if err := it.Err(); err != nil {
- Fatalf(t, "tb.Scan() failed: %v", err)
+ return fmt.Errorf("tb.Scan() failed: %v", err)
}
if len(gotKeys) != len(wantKeys) {
- Fatalf(t, "Unmatched keys: got %v, want %v", gotKeys, wantKeys)
+ return fmt.Errorf("Unmatched keys: got %v, want %v", gotKeys, wantKeys)
+ }
+ return nil
+}
+
+func CheckScan(t *testing.T, ctx *context.T, tb nosql.Table, r nosql.RowRange, wantKeys []string, wantValues []interface{}) {
+ if err := ScanMatches(ctx, tb, r, wantKeys, wantValues); err != nil {
+ Fatalf(t, err.Error())
}
}
diff --git a/v23/syncbase/testutil/v23util.go b/v23/syncbase/testutil/v23util.go
index 3a946d8..9a12377 100644
--- a/v23/syncbase/testutil/v23util.go
+++ b/v23/syncbase/testutil/v23util.go
@@ -16,30 +16,41 @@
"v.io/x/ref/test/v23tests"
)
-// StartSyncbased starts the syncbased binary. The syncbased invocation is
-// intended to be used from an integration test (v23.test). The returned
-// cleanup function should be called once the invokation is no longer used.
-func StartSyncbased(t *v23tests.T, creds *modules.CustomCredentials, name, permsLiteral string) (cleanup func()) {
+// StartSyncbased starts a syncbased process, intended to be accessed from an
+// integration test (run using --v23.tests). The returned cleanup function
+// should be called once the syncbased process is no longer needed.
+func StartSyncbased(t *v23tests.T, creds *modules.CustomCredentials, name, rootDir, permsLiteral string) (cleanup func()) {
syncbased := t.BuildV23Pkg("v.io/syncbase/x/ref/services/syncbase/syncbased")
- // Create dir for the store.
- path, err := ioutil.TempDir("", "syncbase_leveldb")
- if err != nil {
- V23Fatalf(t, "can't create temp dir: %v", err)
+ // Create root dir for the store.
+ rmRootDir := false
+ if rootDir == "" {
+ var err error
+ rootDir, err = ioutil.TempDir("", "syncbase_leveldb")
+ if err != nil {
+ V23Fatalf(t, "can't create temp dir: %v", err)
+ }
+ rmRootDir = true
}
- // Start syncbased
+ // Start syncbased.
invocation := syncbased.WithStartOpts(syncbased.StartOpts().WithCustomCredentials(creds)).Start(
"--v23.tcp.address=127.0.0.1:0",
"--v23.permissions.literal", permsLiteral,
"--name="+name,
- "--root-dir="+path)
+ "--root-dir="+rootDir)
return func() {
+ // TODO(sadovsky): Something's broken here. If the syncbased invocation
+ // fails (e.g. if NewService returns an error), currently it's possible for
+ // the test to fail without the crash error getting logged. This makes
+ // debugging a challenge.
go invocation.Kill(syscall.SIGINT)
stdout, stderr := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := invocation.Shutdown(stdout, stderr); err != nil {
- log.Printf("syncbase terminated with an error: %v\nstdout: %v\nstderr: %v\n", err, stdout, stderr)
+ log.Printf("syncbased terminated with an error: %v\nstdout: %v\nstderr: %v\n", err, stdout, stderr)
}
- if err := os.RemoveAll(path); err != nil {
- V23Fatalf(t, "can't destroy db at %v: %v", path, err)
+ if rmRootDir {
+ if err := os.RemoveAll(rootDir); err != nil {
+ V23Fatalf(t, "can't remove dir %v: %v", rootDir, err)
+ }
}
}
}
diff --git a/v23/syncbase/v23_test.go b/v23/syncbase/v23_test.go
index f067ee1..914c0a2 100644
--- a/v23/syncbase/v23_test.go
+++ b/v23/syncbase/v23_test.go
@@ -28,3 +28,7 @@
func TestV23SyncbasedPutGet(t *testing.T) {
v23tests.RunTest(t, V23TestSyncbasedPutGet)
}
+
+func TestV23ServiceRestart(t *testing.T) {
+ v23tests.RunTest(t, V23TestServiceRestart)
+}
diff --git a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
index 830a1c9..f08b796 100644
--- a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
+++ b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
@@ -90,7 +90,7 @@
func New(ctx *context.T, dir string) (cm *ChunkMap, err error) {
cm = new(ChunkMap)
cm.dir = dir
- cm.st, err = leveldb.Open(dir)
+ cm.st, err = leveldb.Open(dir, leveldb.OpenOptions{CreateIfMissing: true, ErrorIfExists: false})
return cm, err
}
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index 827eaf9..a2b081a 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -98,18 +98,10 @@
return a.s
}
-// TODO(sadovsky): Currently, we return an error (here and elsewhere) if the
-// specified db does not exist in a.dbs. But note, this will always be the case
-// after a Syncbase service restart. The implementation should be updated so
-// that a.dbs acts as an in-memory cache of db handles. If a database exists but
-// is not present in a.dbs, a.NoSQLDatabase() should open the database and add
-// its handle to a.dbs.
func (a *app) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (interfaces.Database, error) {
if !a.exists {
vlog.Fatalf("app %q does not exist", a.name)
}
- // TODO(sadovsky): Record storage engine config (e.g. LevelDB directory) in
- // dbInfo, and add API for opening and closing storage engines.
a.mu.Lock()
defer a.mu.Unlock()
d, ok := a.dbs[dbName]
@@ -154,6 +146,7 @@
}
// 1. Check appData perms, create dbInfo record.
+ rootDir, engine := a.rootDirForDb(dbName), a.s.opts.Engine
aData := &appData{}
if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
// Check appData perms.
@@ -170,7 +163,9 @@
}
// Write new dbInfo.
info := &dbInfo{
- Name: dbName,
+ Name: dbName,
+ RootDir: rootDir,
+ Engine: engine,
}
return a.putDbInfo(ctx, st, dbName, info)
}); err != nil {
@@ -181,10 +176,10 @@
if perms == nil {
perms = aData.Perms
}
- d, err := nosql.NewDatabase(ctx, call, a, dbName, nosql.DatabaseOptions{
+ d, err := nosql.NewDatabase(ctx, a, dbName, nosql.DatabaseOptions{
Perms: perms,
- RootDir: a.rootDirForDb(dbName),
- Engine: a.s.opts.Engine,
+ RootDir: rootDir,
+ Engine: engine,
})
if err != nil {
return err
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index c78761a..f12d437 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -38,7 +38,9 @@
a interfaces.App
// The fields below are initialized iff this database exists.
exists bool
- st store.Store // stores all data for a single database
+ // TODO(sadovsky): Make st point to a store.Store wrapper that handles paging,
+ // and do not actually open the store in NewDatabase.
+ st store.Store // stores all data for a single database
// Active snapshots and transactions corresponding to client batches.
// TODO(sadovsky): Add timeouts and GC.
@@ -73,14 +75,10 @@
Engine string
}
-// NewDatabase creates a new database instance and returns it.
-// Returns a VDL-compatible error.
-// Designed for use from within App.CreateNoSQLDatabase.
-func NewDatabase(ctx *context.T, call rpc.ServerCall, a interfaces.App, name string, opts DatabaseOptions) (*database, error) {
- if opts.Perms == nil {
- return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
- }
- st, err := util.OpenStore(opts.Engine, path.Join(opts.RootDir, opts.Engine))
+// OpenDatabase opens a database and returns a *database for it. Designed for
+// use from within NewDatabase and server.NewService.
+func OpenDatabase(ctx *context.T, a interfaces.App, name string, opts DatabaseOptions, openOpts util.OpenOptions) (*database, error) {
+ st, err := util.OpenStore(opts.Engine, path.Join(opts.RootDir, opts.Engine), openOpts)
if err != nil {
return nil, err
}
@@ -90,13 +88,26 @@
if err != nil {
return nil, err
}
- d := &database{
+ return &database{
name: name,
a: a,
exists: true,
st: st,
sns: make(map[uint64]store.Snapshot),
txs: make(map[uint64]store.Transaction),
+ }, nil
+}
+
+// NewDatabase creates a new database instance and returns it.
+// Returns a VDL-compatible error.
+// Designed for use from within App.CreateNoSQLDatabase.
+func NewDatabase(ctx *context.T, a interfaces.App, name string, opts DatabaseOptions) (*database, error) {
+ if opts.Perms == nil {
+ return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
+ }
+ d, err := OpenDatabase(ctx, a, name, opts, util.OpenOptions{CreateIfMissing: true, ErrorIfExists: true})
+ if err != nil {
+ return nil, err
}
data := &databaseData{
Name: d.name,
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index 3447fcb..9f9e5b3 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -14,6 +14,7 @@
wire "v.io/syncbase/v23/services/syncbase"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/vsync"
@@ -21,6 +22,7 @@
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
+ "v.io/v23/vom"
)
// service is a singleton (i.e. not per-request) that handles Service RPCs.
@@ -55,11 +57,12 @@
// NewService creates a new service instance and returns it.
// Returns a VDL-compatible error.
+// TODO(sadovsky): If possible, close all stores when the server is stopped.
func NewService(ctx *context.T, call rpc.ServerCall, opts ServiceOptions) (*service, error) {
if opts.Perms == nil {
return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
}
- st, err := util.OpenStore(opts.Engine, path.Join(opts.RootDir, opts.Engine))
+ st, err := util.OpenStore(opts.Engine, path.Join(opts.RootDir, opts.Engine), util.OpenOptions{CreateIfMissing: true, ErrorIfExists: false})
if err != nil {
return nil, err
}
@@ -71,9 +74,63 @@
data := &serviceData{
Perms: opts.Perms,
}
- if err := util.Put(ctx, s.st, s, data); err != nil {
- return nil, err
+ if err := util.GetWithoutAuth(ctx, st, s, &serviceData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err != nil {
+ return nil, err
+ }
+ // Service exists. Initialize in-memory data structures.
+ // Read all apps, populate apps map.
+ aIt := st.Scan(util.ScanPrefixArgs(util.AppPrefix, ""))
+ aBytes := []byte{}
+ for aIt.Advance() {
+ aBytes = aIt.Value(aBytes)
+ aData := &appData{}
+ if err := vom.Decode(aBytes, aData); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ a := &app{
+ name: aData.Name,
+ s: s,
+ exists: true,
+ dbs: make(map[string]interfaces.Database),
+ }
+ s.apps[a.name] = a
+ // Read all dbs for this app, populate dbs map.
+ dIt := st.Scan(util.ScanPrefixArgs(util.JoinKeyParts(util.DbInfoPrefix, aData.Name), ""))
+ dBytes := []byte{}
+ for dIt.Advance() {
+ dBytes = dIt.Value(dBytes)
+ info := &dbInfo{}
+ if err := vom.Decode(dBytes, info); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ d, err := nosql.OpenDatabase(ctx, a, info.Name, nosql.DatabaseOptions{
+ RootDir: info.RootDir,
+ Engine: info.Engine,
+ }, util.OpenOptions{
+ CreateIfMissing: false,
+ ErrorIfExists: false,
+ })
+ if err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ a.dbs[info.Name] = d
+ }
+ if err := dIt.Err(); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ }
+ if err := aIt.Err(); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ } else {
+ // Service does not exist.
+ if err := util.Put(ctx, st, s, data); err != nil {
+ return nil, err
+ }
}
+ // Note, vsync.New internally handles both first-time and subsequent
+ // invocations.
if s.sync, err = vsync.New(ctx, call, s, opts.Server); err != nil {
return nil, err
}
@@ -129,15 +186,11 @@
return s.sync
}
-// TODO(sadovsky): Currently, we return an error (here and elsewhere) if the
-// specified app does not exist in s.apps. But note, this will always be the
-// case after a Syncbase service restart. The implementation should be updated
-// so that s.apps acts as an in-memory cache of app handles. If an app exists
-// but is not present in s.apps, s.App() should open the app and add its handle
-// to s.apps.
func (s *service) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
s.mu.Lock()
defer s.mu.Unlock()
+ // Note, currently the service's apps map as well as per-app dbs maps are
+ // populated at startup.
a, ok := s.apps[appName]
if !ok {
return nil, verror.New(verror.ErrNoExist, ctx, appName)
@@ -171,7 +224,7 @@
name: appName,
s: s,
exists: true,
- dbs: map[string]interfaces.Database{},
+ dbs: make(map[string]interfaces.Database),
}
if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
diff --git a/x/ref/services/syncbase/server/types.vdl b/x/ref/services/syncbase/server/types.vdl
index 4a48134..4999f77 100644
--- a/x/ref/services/syncbase/server/types.vdl
+++ b/x/ref/services/syncbase/server/types.vdl
@@ -22,9 +22,13 @@
}
// dbInfo contains information about one database for an App.
-// TODO(sadovsky): Track NoSQL vs. SQL, storage engine config, etc.
+// TODO(sadovsky): Track NoSQL vs. SQL.
type dbInfo struct {
Name string
Initialized bool
Deleted bool
+ // Select fields from nosql.DatabaseOptions, needed in order to open storage
+ // engine on restart.
+ RootDir string // interpreted by storage engine
+ Engine string // name of storage engine, e.g. "leveldb"
}
diff --git a/x/ref/services/syncbase/server/types.vdl.go b/x/ref/services/syncbase/server/types.vdl.go
index a820631..aec38cd 100644
--- a/x/ref/services/syncbase/server/types.vdl.go
+++ b/x/ref/services/syncbase/server/types.vdl.go
@@ -39,11 +39,15 @@
}
// dbInfo contains information about one database for an App.
-// TODO(sadovsky): Track NoSQL vs. SQL, storage engine config, etc.
+// TODO(sadovsky): Track NoSQL vs. SQL.
type dbInfo struct {
Name string
Initialized bool
Deleted bool
+ // Select fields from nosql.DatabaseOptions, needed in order to open storage
+ // engine on restart.
+ RootDir string // interpreted by storage engine
+ Engine string // name of storage engine, e.g. "leveldb"
}
func (dbInfo) __VDLReflect(struct {
diff --git a/x/ref/services/syncbase/server/util/store_util.go b/x/ref/services/syncbase/server/util/store_util.go
index fb739ca..a30f487 100644
--- a/x/ref/services/syncbase/server/util/store_util.go
+++ b/x/ref/services/syncbase/server/util/store_util.go
@@ -125,15 +125,34 @@
return st.Put([]byte(k), bytes)
}
-func OpenStore(engine, path string) (store.Store, error) {
+type OpenOptions struct {
+ CreateIfMissing bool
+ ErrorIfExists bool
+}
+
+// OpenStore opens the given store.Store. OpenOptions are respected to the
+// degree possible for the specified engine.
+func OpenStore(engine, path string, opts OpenOptions) (store.Store, error) {
switch engine {
case "memstore":
+ if !opts.CreateIfMissing {
+ return nil, verror.New(verror.ErrInternal, nil, "cannot open memstore")
+ }
+ // By definition, the memstore does not already exist.
return memstore.New(), nil
case "leveldb":
- if err := os.MkdirAll(path, 0700); err != nil {
- return nil, verror.New(verror.ErrInternal, nil, err)
+ leveldbOpts := leveldb.OpenOptions{
+ CreateIfMissing: opts.CreateIfMissing,
+ ErrorIfExists: opts.ErrorIfExists,
}
- return leveldb.Open(path)
+ if opts.CreateIfMissing {
+ // Note, os.MkdirAll is a noop if the path already exists. We rely on
+ // leveldb to enforce ErrorIfExists.
+ if err := os.MkdirAll(path, 0700); err != nil {
+ return nil, verror.New(verror.ErrInternal, nil, err)
+ }
+ }
+ return leveldb.Open(path, leveldbOpts)
default:
return nil, verror.New(verror.ErrBadArg, nil, engine)
}
diff --git a/x/ref/services/syncbase/server/watchable/test_util.go b/x/ref/services/syncbase/server/watchable/test_util.go
index d81d79e..713b4e1 100644
--- a/x/ref/services/syncbase/server/watchable/test_util.go
+++ b/x/ref/services/syncbase/server/watchable/test_util.go
@@ -47,7 +47,7 @@
}
func createLevelDB(path string) store.Store {
- st, err := leveldb.Open(path)
+ st, err := leveldb.Open(path, leveldb.OpenOptions{CreateIfMissing: true, ErrorIfExists: true})
if err != nil {
panic(fmt.Sprintf("can't open db at %v: %v", path, err))
}
diff --git a/x/ref/services/syncbase/store/leveldb/db.go b/x/ref/services/syncbase/store/leveldb/db.go
index 67f253c..765103f 100644
--- a/x/ref/services/syncbase/store/leveldb/db.go
+++ b/x/ref/services/syncbase/store/leveldb/db.go
@@ -45,15 +45,28 @@
var _ store.Store = (*db)(nil)
-// Open opens the database located at the given path, creating it if it doesn't
-// exist.
-func Open(path string) (store.Store, error) {
+type OpenOptions struct {
+ CreateIfMissing bool
+ ErrorIfExists bool
+}
+
+// Open opens the database located at the given path.
+func Open(path string, opts OpenOptions) (store.Store, error) {
var cError *C.char
cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
+ var cOptsCreateIfMissing, cOptsErrorIfExists C.uchar
+ if opts.CreateIfMissing {
+ cOptsCreateIfMissing = 1
+ }
+ if opts.ErrorIfExists {
+ cOptsErrorIfExists = 1
+ }
+
cOpts := C.leveldb_options_create()
- C.leveldb_options_set_create_if_missing(cOpts, 1)
+ C.leveldb_options_set_create_if_missing(cOpts, cOptsCreateIfMissing)
+ C.leveldb_options_set_error_if_exists(cOpts, cOptsErrorIfExists)
C.leveldb_options_set_paranoid_checks(cOpts, 1)
defer C.leveldb_options_destroy(cOpts)
diff --git a/x/ref/services/syncbase/store/leveldb/db_test.go b/x/ref/services/syncbase/store/leveldb/db_test.go
index cf91b0d..88cddc2 100644
--- a/x/ref/services/syncbase/store/leveldb/db_test.go
+++ b/x/ref/services/syncbase/store/leveldb/db_test.go
@@ -54,6 +54,49 @@
runTest(t, test.RunTransactionsWithGetTest)
}
+func TestOpenOptions(t *testing.T) {
+ path, err := ioutil.TempDir("", "syncbase_leveldb")
+ if err != nil {
+ t.Fatalf("can't create temp dir: %v", err)
+ }
+ // DB is missing => call should fail.
+ st, err := Open(path, OpenOptions{CreateIfMissing: false, ErrorIfExists: false})
+ if err == nil {
+ t.Fatalf("open should've failed")
+ }
+ // DB is missing => call should succeed.
+ st, err = Open(path, OpenOptions{CreateIfMissing: true, ErrorIfExists: false})
+ if err != nil {
+ t.Fatalf("open failed: %v", err)
+ }
+ st.Close()
+ // DB exists => call should succeed.
+ st, err = Open(path, OpenOptions{CreateIfMissing: false, ErrorIfExists: false})
+ if err != nil {
+ t.Fatalf("open failed: %v", err)
+ }
+ st.Close()
+ // DB exists => call should fail.
+ st, err = Open(path, OpenOptions{CreateIfMissing: false, ErrorIfExists: true})
+ if err == nil {
+ t.Fatalf("open should've failed")
+ }
+ // DB exists => call should fail.
+ st, err = Open(path, OpenOptions{CreateIfMissing: true, ErrorIfExists: true})
+ if err == nil {
+ t.Fatalf("open should've failed")
+ }
+ // DB exists => call should succeed.
+ st, err = Open(path, OpenOptions{CreateIfMissing: true, ErrorIfExists: false})
+ if err != nil {
+ t.Fatalf("open failed: %v", err)
+ }
+ st.Close()
+ if err := Destroy(path); err != nil {
+ t.Fatalf("destroy failed: %v", err)
+ }
+}
+
func runTest(t *testing.T, f func(t *testing.T, st store.Store)) {
st, dbPath := newDB()
defer destroyDB(st, dbPath)
@@ -65,7 +108,7 @@
if err != nil {
panic(fmt.Sprintf("can't create temp dir: %v", err))
}
- st, err := Open(path)
+ st, err := Open(path, OpenOptions{CreateIfMissing: true, ErrorIfExists: true})
if err != nil {
panic(fmt.Sprintf("can't open db at %v: %v", path, err))
}
diff --git a/x/ref/services/syncbase/vsync/test_util.go b/x/ref/services/syncbase/vsync/test_util.go
index 92540f1..556b2d7 100644
--- a/x/ref/services/syncbase/vsync/test_util.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -121,7 +121,7 @@
engine := "leveldb"
path := fmt.Sprintf("%s/vsync_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
- st, err := util.OpenStore(engine, path)
+ st, err := util.OpenStore(engine, path, util.OpenOptions{CreateIfMissing: true, ErrorIfExists: false})
if err != nil {
t.Fatalf("cannot create store %s (%s): %v", engine, path, err)
}