syncbase: add hook to get service and DB stores (needed by sync)

Also moves internal interfaces to a separate package, hopefully
making the code structure easier to grok.

Change-Id: I449bfa9ed184334118dd28b2695d9792c7f43f56
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 188ef16..1b5217d 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -8,6 +8,7 @@
 	"sync"
 
 	wire "v.io/syncbase/v23/services/syncbase"
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
@@ -25,12 +26,12 @@
 	// Guards the fields below. Held during database Create, Delete, and
 	// SetPermissions.
 	mu  sync.Mutex
-	dbs map[string]util.Database
+	dbs map[string]interfaces.Database
 }
 
 var (
 	_ wire.AppServerMethods = (*app)(nil)
-	_ util.App              = (*app)(nil)
+	_ interfaces.App        = (*app)(nil)
 	_ util.Layer            = (*app)(nil)
 )
 
@@ -73,9 +74,9 @@
 }
 
 ////////////////////////////////////////
-// util.App methods
+// interfaces.App methods
 
-func (a *app) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (util.Database, error) {
+func (a *app) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (interfaces.Database, error) {
 	// TODO(sadovsky): Record storage engine config (e.g. LevelDB directory) in
 	// dbInfo, and add API for opening and closing storage engines.
 	a.mu.Lock()
diff --git a/services/syncbase/server/dispatcher.go b/services/syncbase/server/dispatcher.go
index 4a4c9e2..7f2fd1f 100644
--- a/services/syncbase/server/dispatcher.go
+++ b/services/syncbase/server/dispatcher.go
@@ -47,8 +47,9 @@
 	}
 
 	aExists := false
-	a, err := disp.s.app(nil, nil, appName)
-	if err == nil {
+	var a *app
+	if aint, err := disp.s.App(nil, nil, appName); err == nil {
+		a = aint.(*app) // panics on failure, as desired
 		aExists = true
 	} else {
 		if verror.ErrorID(err) != verror.ErrNoExistOrNoAccess.ID {
diff --git a/services/syncbase/server/util/app.go b/services/syncbase/server/interfaces/app.go
similarity index 87%
rename from services/syncbase/server/util/app.go
rename to services/syncbase/server/interfaces/app.go
index 26c3e91..a893e5a 100644
--- a/services/syncbase/server/util/app.go
+++ b/services/syncbase/server/interfaces/app.go
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package util
+package interfaces
 
 import (
 	"v.io/v23/context"
@@ -10,8 +10,7 @@
 	"v.io/v23/security/access"
 )
 
-// App is an internal interface that enables nosql.database to invoke methods on
-// server.app while avoiding an import cycle.
+// App is an internal interface to the app layer.
 // All methods return VDL-compatible errors.
 type App interface {
 	// NoSQLDatabase returns the Database for the specified NoSQL database.
diff --git a/services/syncbase/server/util/database.go b/services/syncbase/server/interfaces/database.go
similarity index 78%
rename from services/syncbase/server/util/database.go
rename to services/syncbase/server/interfaces/database.go
index dd79cee..4f8bda8 100644
--- a/services/syncbase/server/util/database.go
+++ b/services/syncbase/server/interfaces/database.go
@@ -2,18 +2,21 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package util
+package interfaces
 
 import (
+	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 )
 
-// Database is an internal interface that enables server.app to invoke methods
-// on nosql.database while avoiding an import cycle.
+// Database is an internal interface to the database layer.
 // All methods return VDL-compatible errors.
 type Database interface {
+	// St returns the storage engine instance for this database.
+	St() store.Store
+
 	// CheckPermsInternal checks whether the given RPC (ctx, call) is allowed per
 	// the database perms.
 	// Designed for use from within App.DeleteNoSQLDatabase.
diff --git a/services/syncbase/server/interfaces/doc.go b/services/syncbase/server/interfaces/doc.go
new file mode 100644
index 0000000..384f2f7
--- /dev/null
+++ b/services/syncbase/server/interfaces/doc.go
@@ -0,0 +1,10 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package interfaces defines internal interfaces for various objects in the
+// Syncbase server implementation. Defining these interfaces in a separate
+// package helps prevent import cycles: all other packages can import the
+// interfaces package, and individual modules can pass each other interfaces to
+// enable bidirectional cross-package communication.
+package interfaces
diff --git a/services/syncbase/server/interfaces/service.go b/services/syncbase/server/interfaces/service.go
new file mode 100644
index 0000000..3cbbda4
--- /dev/null
+++ b/services/syncbase/server/interfaces/service.go
@@ -0,0 +1,21 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+import (
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+)
+
+// Service is an internal interface to the service layer.
+// All methods return VDL-compatible errors.
+type Service interface {
+	// St returns the storage engine instance for this service.
+	St() store.Store
+
+	// App returns the App with the specified name.
+	App(ctx *context.T, call rpc.ServerCall, appName string) (App, error)
+}
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 7ca9271..fcc2817 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -6,6 +6,7 @@
 
 import (
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
@@ -19,21 +20,21 @@
 
 type database struct {
 	name string
-	a    util.App
+	a    interfaces.App
 	// The fields below are initialized iff this database exists.
 	st store.Store // stores all data for a single database
 }
 
 var (
 	_ wire.DatabaseServerMethods = (*database)(nil)
-	_ util.Database              = (*database)(nil)
+	_ interfaces.Database        = (*database)(nil)
 	_ util.Layer                 = (*database)(nil)
 )
 
 // NewDatabase creates a new database instance and returns it.
 // Returns a VDL-compatible error.
 // Designed for use from within App.CreateNoSQLDatabase.
-func NewDatabase(ctx *context.T, call rpc.ServerCall, a util.App, name string, perms access.Permissions) (*database, error) {
+func NewDatabase(ctx *context.T, call rpc.ServerCall, a interfaces.App, name string, perms access.Permissions) (*database, error) {
 	if perms == nil {
 		return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
 	}
@@ -102,7 +103,14 @@
 }
 
 ////////////////////////////////////////
-// util.Database methods
+// interfaces.Database methods
+
+func (d *database) St() store.Store {
+	if d.st == nil {
+		vlog.Fatalf("database %q does not exist", d.name)
+	}
+	return d.st
+}
 
 func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall) error {
 	if d.st == nil {
diff --git a/services/syncbase/server/nosql/dispatcher.go b/services/syncbase/server/nosql/dispatcher.go
index db18382..6d572ae 100644
--- a/services/syncbase/server/nosql/dispatcher.go
+++ b/services/syncbase/server/nosql/dispatcher.go
@@ -10,7 +10,7 @@
 	wire "v.io/syncbase/v23/services/syncbase"
 	nosqlWire "v.io/syncbase/v23/services/syncbase/nosql"
 	pubutil "v.io/syncbase/v23/syncbase/util"
-	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/verror"
@@ -18,12 +18,12 @@
 )
 
 type dispatcher struct {
-	a util.App
+	a interfaces.App
 }
 
 var _ rpc.Dispatcher = (*dispatcher)(nil)
 
-func NewDispatcher(a util.App) *dispatcher {
+func NewDispatcher(a interfaces.App) *dispatcher {
 	return &dispatcher{a: a}
 }
 
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index c26eb39..f91af3b 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -79,7 +79,7 @@
 	}
 	value, err := st.Get([]byte(r.StKey()), nil)
 	if err != nil {
-		if _, ok := err.(*store.ErrUnknownKey); ok {
+		if verror.ErrorID(err) == store.ErrUnknownKey.ID {
 			// We've already done an auth check, so here we can safely return NoExist
 			// rather than NoExistOrNoAccess.
 			return nil, verror.New(verror.ErrNoExist, ctx, r.Name())
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index f5dfb31..094b60f 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -8,6 +8,7 @@
 	"sync"
 
 	wire "v.io/syncbase/v23/services/syncbase"
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
@@ -30,6 +31,7 @@
 
 var (
 	_ wire.ServiceServerMethods = (*service)(nil)
+	_ interfaces.Service        = (*service)(nil)
 	_ util.Layer                = (*service)(nil)
 )
 
@@ -45,17 +47,18 @@
 		apps: map[string]*app{},
 	}
 
-	var err error
-	if s.sync, err = vsync.New(ctx, call, s.st); err != nil {
-		return nil, err
-	}
-
 	data := &serviceData{
 		Perms: perms,
 	}
 	if err := util.Put(ctx, call, s.st, s, data); err != nil {
 		return nil, err
 	}
+
+	var err error
+	if s.sync, err = vsync.New(ctx, call, s); err != nil {
+		return nil, err
+	}
+
 	return s, nil
 }
 
@@ -95,9 +98,13 @@
 }
 
 ////////////////////////////////////////
-// App management methods
+// interfaces.Service methods
 
-func (s *service) app(ctx *context.T, call rpc.ServerCall, appName string) (*app, error) {
+func (s *service) St() store.Store {
+	return s.st
+}
+
+func (s *service) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	a, ok := s.apps[appName]
@@ -107,6 +114,9 @@
 	return a, nil
 }
 
+////////////////////////////////////////
+// App management methods
+
 func (s *service) createApp(ctx *context.T, call rpc.ServerCall, appName string, perms access.Permissions) error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
@@ -118,7 +128,7 @@
 	a := &app{
 		name: appName,
 		s:    s,
-		dbs:  map[string]util.Database{},
+		dbs:  map[string]interfaces.Database{},
 	}
 
 	if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index f2c5fc9..2201870 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -46,7 +46,7 @@
 // Returns a VDL-compatible error.
 func GetWithoutAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v interface{}) error {
 	if err := GetObject(st, l.StKey(), v); err != nil {
-		if _, ok := err.(*store.ErrUnknownKey); ok {
+		if verror.ErrorID(err) == store.ErrUnknownKey.ID {
 			// TODO(sadovsky): Return ErrNoExist if appropriate.
 			return verror.New(verror.ErrNoExistOrNoAccess, ctx, l.Name())
 		}
diff --git a/services/syncbase/store/model.vdl b/services/syncbase/store/model.vdl
index b8dbb79..6a56e66 100644
--- a/services/syncbase/store/model.vdl
+++ b/services/syncbase/store/model.vdl
@@ -5,7 +5,7 @@
 package store
 
 error (
-	// ConcurrentTransaction means that the current transaction was not committed
+	// ConcurrentTransaction means that the current transaction failed to commit
 	// because its read set was invalidated by some other transaction.
 	ConcurrentTransaction() {"en":"Concurrent transaction{:_}"}
 
diff --git a/services/syncbase/store/model.vdl.go b/services/syncbase/store/model.vdl.go
index 12c235c..eec8747 100644
--- a/services/syncbase/store/model.vdl.go
+++ b/services/syncbase/store/model.vdl.go
@@ -15,7 +15,7 @@
 )
 
 var (
-	// ConcurrentTransaction means that the current transaction was not committed
+	// ConcurrentTransaction means that the current transaction failed to commit
 	// because its read set was invalidated by some other transaction.
 	ErrConcurrentTransaction = verror.Register("v.io/syncbase/x/ref/services/syncbase/store.ConcurrentTransaction", verror.NoRetry, "{1:}{2:} Concurrent transaction{:_}")
 	// UnknownKey means the given key does not exist in the store.
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index f93967a..361b5a0 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -16,6 +16,7 @@
 	"sync"
 	"time"
 
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 
@@ -26,13 +27,9 @@
 
 // syncService contains the metadata for the sync module.
 type syncService struct {
-	// Globally unique Syncbase id.
-	id int64
-
-	// Store for persisting Sync metadata.
-	st store.Store
-
-	// State to coordinate shutting down all spawned goroutines.
+	id int64 // globally unique id for this instance of Syncbase
+	sv interfaces.Service
+	// State to coordinate shutdown of spawned goroutines.
 	pending sync.WaitGroup
 	closed  chan struct{}
 }
@@ -48,40 +45,40 @@
 
 // New creates a new sync module.
 //
-// Sync concurrency: sync initializes two goroutines at startup. The
-// "watcher" thread is responsible for watching the store for changes
-// to its objects. The "initiator" thread is responsible for
-// periodically contacting a peer to obtain changes from that peer. In
-// addition, the sync module responds to incoming RPCs from remote
-// sync modules.
-func New(ctx *context.T, call rpc.ServerCall, st store.Store) (*syncService, error) {
-	// TODO(hpucha): Add restartability.
+// Concurrency: sync initializes two goroutines at startup: a "watcher" and an
+// "initiator". The "watcher" thread is responsible for watching the store for
+// changes to its objects. The "initiator" thread is responsible for
+// periodically contacting peers to fetch changes from them. In addition, the
+// sync module responds to incoming RPCs from remote sync modules.
+func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service) (*syncService, error) {
+	s := &syncService{sv: sv}
 
-	// First invocation of sync.
-	s := &syncService{
-		id: rng.Int63(),
-		st: st,
+	data := &syncData{}
+	if err := util.GetObject(sv.St(), s.StKey(), data); err != nil {
+		if verror.ErrorID(err) != store.ErrUnknownKey.ID {
+			return nil, verror.New(verror.ErrInternal, ctx, err)
+		}
+		// First invocation of vsync.New().
+		// TODO(sadovsky): Maybe move guid generation and storage to serviceData.
+		data.Id = rng.Int63()
+		if err := util.PutObject(sv.St(), s.StKey(), data); err != nil {
+			return nil, verror.New(verror.ErrInternal, ctx, err)
+		}
 	}
 
-	// Persist sync metadata.
-	data := &syncData{
-		Id: s.id,
-	}
-
-	if err := util.Put(ctx, call, s.st, s, data); err != nil {
-		return nil, err
-	}
+	// data.Id is now guaranteed to be initialized.
+	s.id = data.Id
 
 	// Channel to propagate close event to all threads.
 	s.closed = make(chan struct{})
 	s.pending.Add(2)
 
-	// Get deltas every so often.
-	go s.contactPeers()
-
-	// Start a watcher thread that will get updates from local store.
+	// Start watcher thread to watch for updates to local store.
 	go s.watchStore()
 
+	// Start initiator thread to periodically get deltas from peers.
+	go s.contactPeers()
+
 	return s, nil
 }