syncbase: add hook to get service and DB stores (needed by sync)
Also moves internal interfaces to a separate package, hopefully
making the code structure easier to grok.
Change-Id: I449bfa9ed184334118dd28b2695d9792c7f43f56
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 188ef16..1b5217d 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -8,6 +8,7 @@
"sync"
wire "v.io/syncbase/v23/services/syncbase"
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
@@ -25,12 +26,12 @@
// Guards the fields below. Held during database Create, Delete, and
// SetPermissions.
mu sync.Mutex
- dbs map[string]util.Database
+ dbs map[string]interfaces.Database
}
var (
_ wire.AppServerMethods = (*app)(nil)
- _ util.App = (*app)(nil)
+ _ interfaces.App = (*app)(nil)
_ util.Layer = (*app)(nil)
)
@@ -73,9 +74,9 @@
}
////////////////////////////////////////
-// util.App methods
+// interfaces.App methods
-func (a *app) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (util.Database, error) {
+func (a *app) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (interfaces.Database, error) {
// TODO(sadovsky): Record storage engine config (e.g. LevelDB directory) in
// dbInfo, and add API for opening and closing storage engines.
a.mu.Lock()
diff --git a/services/syncbase/server/dispatcher.go b/services/syncbase/server/dispatcher.go
index 4a4c9e2..7f2fd1f 100644
--- a/services/syncbase/server/dispatcher.go
+++ b/services/syncbase/server/dispatcher.go
@@ -47,8 +47,9 @@
}
aExists := false
- a, err := disp.s.app(nil, nil, appName)
- if err == nil {
+ var a *app
+ if aint, err := disp.s.App(nil, nil, appName); err == nil {
+ a = aint.(*app) // panics on failure, as desired
aExists = true
} else {
if verror.ErrorID(err) != verror.ErrNoExistOrNoAccess.ID {
diff --git a/services/syncbase/server/util/app.go b/services/syncbase/server/interfaces/app.go
similarity index 87%
rename from services/syncbase/server/util/app.go
rename to services/syncbase/server/interfaces/app.go
index 26c3e91..a893e5a 100644
--- a/services/syncbase/server/util/app.go
+++ b/services/syncbase/server/interfaces/app.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package util
+package interfaces
import (
"v.io/v23/context"
@@ -10,8 +10,7 @@
"v.io/v23/security/access"
)
-// App is an internal interface that enables nosql.database to invoke methods on
-// server.app while avoiding an import cycle.
+// App is an internal interface to the app layer.
// All methods return VDL-compatible errors.
type App interface {
// NoSQLDatabase returns the Database for the specified NoSQL database.
diff --git a/services/syncbase/server/util/database.go b/services/syncbase/server/interfaces/database.go
similarity index 78%
rename from services/syncbase/server/util/database.go
rename to services/syncbase/server/interfaces/database.go
index dd79cee..4f8bda8 100644
--- a/services/syncbase/server/util/database.go
+++ b/services/syncbase/server/interfaces/database.go
@@ -2,18 +2,21 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package util
+package interfaces
import (
+ "v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
)
-// Database is an internal interface that enables server.app to invoke methods
-// on nosql.database while avoiding an import cycle.
+// Database is an internal interface to the database layer.
// All methods return VDL-compatible errors.
type Database interface {
+ // St returns the storage engine instance for this database.
+ St() store.Store
+
// CheckPermsInternal checks whether the given RPC (ctx, call) is allowed per
// the database perms.
// Designed for use from within App.DeleteNoSQLDatabase.
diff --git a/services/syncbase/server/interfaces/doc.go b/services/syncbase/server/interfaces/doc.go
new file mode 100644
index 0000000..384f2f7
--- /dev/null
+++ b/services/syncbase/server/interfaces/doc.go
@@ -0,0 +1,10 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package interfaces defines internal interfaces for various objects in the
+// Syncbase server implementation. Defining these interfaces in a separate
+// package helps prevent import cycles: all other packages can import the
+// interfaces package, and individual modules can pass each other interfaces to
+// enable bidirectional cross-package communication.
+package interfaces
diff --git a/services/syncbase/server/interfaces/service.go b/services/syncbase/server/interfaces/service.go
new file mode 100644
index 0000000..3cbbda4
--- /dev/null
+++ b/services/syncbase/server/interfaces/service.go
@@ -0,0 +1,21 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+import (
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+)
+
+// Service is an internal interface to the service layer.
+// All methods return VDL-compatible errors.
+type Service interface {
+ // St returns the storage engine instance for this service.
+ St() store.Store
+
+ // App returns the App with the specified name.
+ App(ctx *context.T, call rpc.ServerCall, appName string) (App, error)
+}
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 7ca9271..fcc2817 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -6,6 +6,7 @@
import (
wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/memstore"
@@ -19,21 +20,21 @@
type database struct {
name string
- a util.App
+ a interfaces.App
// The fields below are initialized iff this database exists.
st store.Store // stores all data for a single database
}
var (
_ wire.DatabaseServerMethods = (*database)(nil)
- _ util.Database = (*database)(nil)
+ _ interfaces.Database = (*database)(nil)
_ util.Layer = (*database)(nil)
)
// NewDatabase creates a new database instance and returns it.
// Returns a VDL-compatible error.
// Designed for use from within App.CreateNoSQLDatabase.
-func NewDatabase(ctx *context.T, call rpc.ServerCall, a util.App, name string, perms access.Permissions) (*database, error) {
+func NewDatabase(ctx *context.T, call rpc.ServerCall, a interfaces.App, name string, perms access.Permissions) (*database, error) {
if perms == nil {
return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
}
@@ -102,7 +103,14 @@
}
////////////////////////////////////////
-// util.Database methods
+// interfaces.Database methods
+
+func (d *database) St() store.Store {
+ if d.st == nil {
+ vlog.Fatalf("database %q does not exist", d.name)
+ }
+ return d.st
+}
func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall) error {
if d.st == nil {
diff --git a/services/syncbase/server/nosql/dispatcher.go b/services/syncbase/server/nosql/dispatcher.go
index db18382..6d572ae 100644
--- a/services/syncbase/server/nosql/dispatcher.go
+++ b/services/syncbase/server/nosql/dispatcher.go
@@ -10,7 +10,7 @@
wire "v.io/syncbase/v23/services/syncbase"
nosqlWire "v.io/syncbase/v23/services/syncbase/nosql"
pubutil "v.io/syncbase/v23/syncbase/util"
- "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
@@ -18,12 +18,12 @@
)
type dispatcher struct {
- a util.App
+ a interfaces.App
}
var _ rpc.Dispatcher = (*dispatcher)(nil)
-func NewDispatcher(a util.App) *dispatcher {
+func NewDispatcher(a interfaces.App) *dispatcher {
return &dispatcher{a: a}
}
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index c26eb39..f91af3b 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -79,7 +79,7 @@
}
value, err := st.Get([]byte(r.StKey()), nil)
if err != nil {
- if _, ok := err.(*store.ErrUnknownKey); ok {
+ if verror.ErrorID(err) == store.ErrUnknownKey.ID {
// We've already done an auth check, so here we can safely return NoExist
// rather than NoExistOrNoAccess.
return nil, verror.New(verror.ErrNoExist, ctx, r.Name())
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index f5dfb31..094b60f 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -8,6 +8,7 @@
"sync"
wire "v.io/syncbase/v23/services/syncbase"
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/memstore"
@@ -30,6 +31,7 @@
var (
_ wire.ServiceServerMethods = (*service)(nil)
+ _ interfaces.Service = (*service)(nil)
_ util.Layer = (*service)(nil)
)
@@ -45,17 +47,18 @@
apps: map[string]*app{},
}
- var err error
- if s.sync, err = vsync.New(ctx, call, s.st); err != nil {
- return nil, err
- }
-
data := &serviceData{
Perms: perms,
}
if err := util.Put(ctx, call, s.st, s, data); err != nil {
return nil, err
}
+
+ var err error
+ if s.sync, err = vsync.New(ctx, call, s); err != nil {
+ return nil, err
+ }
+
return s, nil
}
@@ -95,9 +98,13 @@
}
////////////////////////////////////////
-// App management methods
+// interfaces.Service methods
-func (s *service) app(ctx *context.T, call rpc.ServerCall, appName string) (*app, error) {
+func (s *service) St() store.Store {
+ return s.st
+}
+
+func (s *service) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
s.mu.Lock()
defer s.mu.Unlock()
a, ok := s.apps[appName]
@@ -107,6 +114,9 @@
return a, nil
}
+////////////////////////////////////////
+// App management methods
+
func (s *service) createApp(ctx *context.T, call rpc.ServerCall, appName string, perms access.Permissions) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -118,7 +128,7 @@
a := &app{
name: appName,
s: s,
- dbs: map[string]util.Database{},
+ dbs: map[string]interfaces.Database{},
}
if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index f2c5fc9..2201870 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -46,7 +46,7 @@
// Returns a VDL-compatible error.
func GetWithoutAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v interface{}) error {
if err := GetObject(st, l.StKey(), v); err != nil {
- if _, ok := err.(*store.ErrUnknownKey); ok {
+ if verror.ErrorID(err) == store.ErrUnknownKey.ID {
// TODO(sadovsky): Return ErrNoExist if appropriate.
return verror.New(verror.ErrNoExistOrNoAccess, ctx, l.Name())
}
diff --git a/services/syncbase/store/model.vdl b/services/syncbase/store/model.vdl
index b8dbb79..6a56e66 100644
--- a/services/syncbase/store/model.vdl
+++ b/services/syncbase/store/model.vdl
@@ -5,7 +5,7 @@
package store
error (
- // ConcurrentTransaction means that the current transaction was not committed
+ // ConcurrentTransaction means that the current transaction failed to commit
// because its read set was invalidated by some other transaction.
ConcurrentTransaction() {"en":"Concurrent transaction{:_}"}
diff --git a/services/syncbase/store/model.vdl.go b/services/syncbase/store/model.vdl.go
index 12c235c..eec8747 100644
--- a/services/syncbase/store/model.vdl.go
+++ b/services/syncbase/store/model.vdl.go
@@ -15,7 +15,7 @@
)
var (
- // ConcurrentTransaction means that the current transaction was not committed
+ // ConcurrentTransaction means that the current transaction failed to commit
// because its read set was invalidated by some other transaction.
ErrConcurrentTransaction = verror.Register("v.io/syncbase/x/ref/services/syncbase/store.ConcurrentTransaction", verror.NoRetry, "{1:}{2:} Concurrent transaction{:_}")
// UnknownKey means the given key does not exist in the store.
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index f93967a..361b5a0 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -16,6 +16,7 @@
"sync"
"time"
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
@@ -26,13 +27,9 @@
// syncService contains the metadata for the sync module.
type syncService struct {
- // Globally unique Syncbase id.
- id int64
-
- // Store for persisting Sync metadata.
- st store.Store
-
- // State to coordinate shutting down all spawned goroutines.
+ id int64 // globally unique id for this instance of Syncbase
+ sv interfaces.Service
+ // State to coordinate shutdown of spawned goroutines.
pending sync.WaitGroup
closed chan struct{}
}
@@ -48,40 +45,40 @@
// New creates a new sync module.
//
-// Sync concurrency: sync initializes two goroutines at startup. The
-// "watcher" thread is responsible for watching the store for changes
-// to its objects. The "initiator" thread is responsible for
-// periodically contacting a peer to obtain changes from that peer. In
-// addition, the sync module responds to incoming RPCs from remote
-// sync modules.
-func New(ctx *context.T, call rpc.ServerCall, st store.Store) (*syncService, error) {
- // TODO(hpucha): Add restartability.
+// Concurrency: sync initializes two goroutines at startup: a "watcher" and an
+// "initiator". The "watcher" thread is responsible for watching the store for
+// changes to its objects. The "initiator" thread is responsible for
+// periodically contacting peers to fetch changes from them. In addition, the
+// sync module responds to incoming RPCs from remote sync modules.
+func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service) (*syncService, error) {
+ s := &syncService{sv: sv}
- // First invocation of sync.
- s := &syncService{
- id: rng.Int63(),
- st: st,
+ data := &syncData{}
+ if err := util.GetObject(sv.St(), s.StKey(), data); err != nil {
+ if verror.ErrorID(err) != store.ErrUnknownKey.ID {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ // First invocation of vsync.New().
+ // TODO(sadovsky): Maybe move guid generation and storage to serviceData.
+ data.Id = rng.Int63()
+ if err := util.PutObject(sv.St(), s.StKey(), data); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
}
- // Persist sync metadata.
- data := &syncData{
- Id: s.id,
- }
-
- if err := util.Put(ctx, call, s.st, s, data); err != nil {
- return nil, err
- }
+ // data.Id is now guaranteed to be initialized.
+ s.id = data.Id
// Channel to propagate close event to all threads.
s.closed = make(chan struct{})
s.pending.Add(2)
- // Get deltas every so often.
- go s.contactPeers()
-
- // Start a watcher thread that will get updates from local store.
+ // Start watcher thread to watch for updates to local store.
go s.watchStore()
+ // Start initiator thread to periodically get deltas from peers.
+ go s.contactPeers()
+
return s, nil
}