syncbase/vsync: Scaffolding to start integrating sync into store.
Change-Id: I2d63421cd1726408039fcd03f1cc58b8c9c42ef9
diff --git a/services/syncbase/server/dispatcher.go b/services/syncbase/server/dispatcher.go
index c676f97..4a4c9e2 100644
--- a/services/syncbase/server/dispatcher.go
+++ b/services/syncbase/server/dispatcher.go
@@ -10,6 +10,7 @@
wire "v.io/syncbase/v23/services/syncbase"
pubutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/server/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
@@ -34,6 +35,10 @@
return wire.ServiceServer(disp.s), nil, nil
}
+ if parts[0] == util.SyncbaseSuffix {
+ return disp.s.sync, nil, nil
+ }
+
// Validate all key atoms up front, so that we can avoid doing so in all our
// method implementations.
appName := parts[0]
diff --git a/services/syncbase/server/nosql/database_sgm.go b/services/syncbase/server/nosql/database_sgm.go
new file mode 100644
index 0000000..c19c4f4
--- /dev/null
+++ b/services/syncbase/server/nosql/database_sgm.go
@@ -0,0 +1,51 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/verror"
+)
+
+////////////////////////////////////////
+// SyncGroup RPC methods
+
+func (d *database) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ return nil, verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+ return wire.SyncGroupSpec{}, verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+ return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+ return nil, verror.NewErrNotImplemented(ctx)
+}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index c8ea107..f5dfb31 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -11,6 +11,7 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/memstore"
+ "v.io/syncbase/x/ref/services/syncbase/vsync"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
@@ -19,7 +20,8 @@
)
type service struct {
- st store.Store // keeps track of which apps and databases exist, etc.
+ st store.Store // keeps track of which apps and databases exist, etc.
+ sync vsync.SyncServerMethods
// Guards the fields below. Held during app Create, Delete, and
// SetPermissions.
mu sync.Mutex
@@ -42,6 +44,12 @@
st: memstore.New(),
apps: map[string]*app{},
}
+
+ var err error
+ if s.sync, err = vsync.New(ctx, call, s.st); err != nil {
+ return nil, err
+ }
+
data := &serviceData{
Perms: perms,
}
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 5e82ac1..815ef82 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -12,4 +12,10 @@
DatabasePrefix = "$database"
TablePrefix = "$table"
RowPrefix = "$row"
+ SyncPrefix = "$sync"
+)
+
+const (
+ // Service object name suffix for Syncbase internal communication.
+ SyncbaseSuffix = "$internal"
)
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
new file mode 100644
index 0000000..a50b58c
--- /dev/null
+++ b/services/syncbase/vsync/blob.go
@@ -0,0 +1,18 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/verror"
+)
+
+//////////////////////////////////////////////////
+// Methods for blob fetch between Syncbases.
+
+func (s *syncService) FetchBlob(ctx *context.T, call rpc.ServerCall) error {
+ return verror.NewErrNotImplemented(ctx)
+}
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
new file mode 100644
index 0000000..f836cb2
--- /dev/null
+++ b/services/syncbase/vsync/initiator.go
@@ -0,0 +1,31 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import "time"
+
+var (
+ // peerSyncInterval is the duration between two consecutive
+ // peer contacts. During every peer contact, the initiator
+ // obtains any pending updates from that peer.
+ peerSyncInterval = 50 * time.Millisecond
+)
+
+// contactPeers wakes up every peerSyncInterval to select a peer, and
+// get updates from it.
+func (s *syncService) contactPeers() {
+ ticker := time.NewTicker(peerSyncInterval)
+ for {
+ select {
+ case <-s.closed:
+ ticker.Stop()
+ s.pending.Done()
+ return
+ case <-ticker.C:
+ }
+
+ // Do work.
+ }
+}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
new file mode 100644
index 0000000..f93967a
--- /dev/null
+++ b/services/syncbase/vsync/sync.go
@@ -0,0 +1,104 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Package vsync provides sync functionality for Syncbase. Sync
+// service serves incoming GetDeltas requests and contacts other peers
+// to get deltas from them. When it receives a GetDeltas request, the
+// incoming generation vector is diffed with the local generation
+// vector, and missing generations are sent back. When it receives log
+// records in response to a GetDeltas request, it replays those log
+// records to get in sync with the sender.
+import (
+ "math/rand"
+ "sync"
+ "time"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/verror"
+)
+
+// syncService contains the metadata for the sync module.
+type syncService struct {
+ // Globally unique Syncbase id.
+ id int64
+
+ // Store for persisting Sync metadata.
+ st store.Store
+
+ // State to coordinate shutting down all spawned goroutines.
+ pending sync.WaitGroup
+ closed chan struct{}
+}
+
+var (
+ rng *rand.Rand
+ _ util.Layer = (*syncService)(nil)
+)
+
+func init() {
+ rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+}
+
+// New creates a new sync module.
+//
+// Sync concurrency: sync initializes two goroutines at startup. The
+// "watcher" thread is responsible for watching the store for changes
+// to its objects. The "initiator" thread is responsible for
+// periodically contacting a peer to obtain changes from that peer. In
+// addition, the sync module responds to incoming RPCs from remote
+// sync modules.
+func New(ctx *context.T, call rpc.ServerCall, st store.Store) (*syncService, error) {
+ // TODO(hpucha): Add restartability.
+
+ // First invocation of sync.
+ s := &syncService{
+ id: rng.Int63(),
+ st: st,
+ }
+
+ // Persist sync metadata.
+ data := &syncData{
+ Id: s.id,
+ }
+
+ if err := util.Put(ctx, call, s.st, s, data); err != nil {
+ return nil, err
+ }
+
+ // Channel to propagate close event to all threads.
+ s.closed = make(chan struct{})
+ s.pending.Add(2)
+
+ // Get deltas every so often.
+ go s.contactPeers()
+
+ // Start a watcher thread that will get updates from local store.
+ go s.watchStore()
+
+ return s, nil
+}
+
+////////////////////////////////////////
+// Core sync method.
+
+func (s *syncService) GetDeltas(ctx *context.T, call rpc.ServerCall) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+////////////////////////////////////////
+// util.Layer methods.
+
+func (s *syncService) Name() string {
+ return "sync"
+}
+
+func (s *syncService) StKey() string {
+ return util.SyncPrefix
+}
diff --git a/services/syncbase/vsync/sync.vdl b/services/syncbase/vsync/sync.vdl
new file mode 100644
index 0000000..b922376
--- /dev/null
+++ b/services/syncbase/vsync/sync.vdl
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Sync defines methods for data exchange between Syncbases.
+// TODO(hpucha): Flesh this out further.
+type Sync interface {
+ // GetDeltas returns the responder's current generation vector
+ // and all the missing log records when compared to the
+ // initiator's generation vector.
+ GetDeltas() error
+
+ // SyncGroup-related methods.
+ // TODO(hpucha): Think of better names for these methods.
+
+ // CreateSyncGroup is typically invoked on a "central" peer to
+ // request the creation of a SyncGroup.
+ CreateSyncGroup() error
+
+ // JoinSyncGroup is invoked on a SyncGroup Admin and checks if
+ // the requestor can join the SyncGroup.
+ JoinSyncGroup() error
+
+ // BlobSync methods.
+ // FetchBlob returns the requested blob.
+ FetchBlob() error
+}
diff --git a/services/syncbase/vsync/sync.vdl.go b/services/syncbase/vsync/sync.vdl.go
new file mode 100644
index 0000000..e481e98
--- /dev/null
+++ b/services/syncbase/vsync/sync.vdl.go
@@ -0,0 +1,179 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: sync.vdl
+
+package vsync
+
+import (
+ // VDL system imports
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+)
+
+// SyncClientMethods is the client interface
+// containing Sync methods.
+//
+// Sync defines methods for data exchange between Syncbases.
+// TODO(hpucha): Flesh this out further.
+type SyncClientMethods interface {
+ // GetDeltas returns the responder's current generation vector
+ // and all the missing log records when compared to the
+ // initiator's generation vector.
+ GetDeltas(*context.T, ...rpc.CallOpt) error
+ // CreateSyncGroup is typically invoked on a "central" peer to
+ // request the creation of a SyncGroup.
+ CreateSyncGroup(*context.T, ...rpc.CallOpt) error
+ // JoinSyncGroup is invoked on a SyncGroup Admin and checks if
+ // the requestor can join the SyncGroup.
+ JoinSyncGroup(*context.T, ...rpc.CallOpt) error
+ // BlobSync methods.
+ // FetchBlob returns the requested blob.
+ FetchBlob(*context.T, ...rpc.CallOpt) error
+}
+
+// SyncClientStub adds universal methods to SyncClientMethods.
+type SyncClientStub interface {
+ SyncClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// SyncClient returns a client stub for Sync.
+func SyncClient(name string) SyncClientStub {
+ return implSyncClientStub{name}
+}
+
+type implSyncClientStub struct {
+ name string
+}
+
+func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "GetDeltas", nil, nil, opts...)
+ return
+}
+
+func (c implSyncClientStub) CreateSyncGroup(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "CreateSyncGroup", nil, nil, opts...)
+ return
+}
+
+func (c implSyncClientStub) JoinSyncGroup(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroup", nil, nil, opts...)
+ return
+}
+
+func (c implSyncClientStub) FetchBlob(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "FetchBlob", nil, nil, opts...)
+ return
+}
+
+// SyncServerMethods is the interface a server writer
+// implements for Sync.
+//
+// Sync defines methods for data exchange between Syncbases.
+// TODO(hpucha): Flesh this out further.
+type SyncServerMethods interface {
+ // GetDeltas returns the responder's current generation vector
+ // and all the missing log records when compared to the
+ // initiator's generation vector.
+ GetDeltas(*context.T, rpc.ServerCall) error
+ // CreateSyncGroup is typically invoked on a "central" peer to
+ // request the creation of a SyncGroup.
+ CreateSyncGroup(*context.T, rpc.ServerCall) error
+ // JoinSyncGroup is invoked on a SyncGroup Admin and checks if
+ // the requestor can join the SyncGroup.
+ JoinSyncGroup(*context.T, rpc.ServerCall) error
+ // BlobSync methods.
+ // FetchBlob returns the requested blob.
+ FetchBlob(*context.T, rpc.ServerCall) error
+}
+
+// SyncServerStubMethods is the server interface containing
+// Sync methods, as expected by rpc.Server.
+// There is no difference between this interface and SyncServerMethods
+// since there are no streaming methods.
+type SyncServerStubMethods SyncServerMethods
+
+// SyncServerStub adds universal methods to SyncServerStubMethods.
+type SyncServerStub interface {
+ SyncServerStubMethods
+ // Describe the Sync interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// SyncServer returns a server stub for Sync.
+// It converts an implementation of SyncServerMethods into
+// an object that may be used by rpc.Server.
+func SyncServer(impl SyncServerMethods) SyncServerStub {
+ stub := implSyncServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implSyncServerStub struct {
+ impl SyncServerMethods
+ gs *rpc.GlobState
+}
+
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call rpc.ServerCall) error {
+ return s.impl.GetDeltas(ctx, call)
+}
+
+func (s implSyncServerStub) CreateSyncGroup(ctx *context.T, call rpc.ServerCall) error {
+ return s.impl.CreateSyncGroup(ctx, call)
+}
+
+func (s implSyncServerStub) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
+ return s.impl.JoinSyncGroup(ctx, call)
+}
+
+func (s implSyncServerStub) FetchBlob(ctx *context.T, call rpc.ServerCall) error {
+ return s.impl.FetchBlob(ctx, call)
+}
+
+func (s implSyncServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implSyncServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{SyncDesc}
+}
+
+// SyncDesc describes the Sync interface.
+var SyncDesc rpc.InterfaceDesc = descSync
+
+// descSync hides the desc to keep godoc clean.
+var descSync = rpc.InterfaceDesc{
+ Name: "Sync",
+ PkgPath: "v.io/syncbase/x/ref/services/syncbase/vsync",
+ Doc: "// Sync defines methods for data exchange between Syncbases.\n// TODO(hpucha): Flesh this out further.",
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "GetDeltas",
+ Doc: "// GetDeltas returns the responder's current generation vector\n// and all the missing log records when compared to the\n// initiator's generation vector.",
+ },
+ {
+ Name: "CreateSyncGroup",
+ Doc: "// CreateSyncGroup is typically invoked on a \"central\" peer to\n// request the creation of a SyncGroup.",
+ },
+ {
+ Name: "JoinSyncGroup",
+ Doc: "// JoinSyncGroup is invoked on a SyncGroup Admin and checks if\n// the requestor can join the SyncGroup.",
+ },
+ {
+ Name: "FetchBlob",
+ Doc: "// BlobSync methods.\n// FetchBlob returns the requested blob.",
+ },
+ },
+}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
new file mode 100644
index 0000000..c3ab3d1
--- /dev/null
+++ b/services/syncbase/vsync/syncgroup.go
@@ -0,0 +1,25 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/verror"
+)
+
+////////////////////////////////////////////////////////////
+// SyncGroup methods between Client and Syncbase.
+
+////////////////////////////////////////////////////////////
+// Methods for SyncGroup create/join between Syncbases.
+
+func (s *syncService) CreateSyncGroup(ctx *context.T, call rpc.ServerCall) error {
+ return verror.NewErrNotImplemented(ctx)
+}
+
+func (s *syncService) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
+ return verror.NewErrNotImplemented(ctx)
+}
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
new file mode 100644
index 0000000..2434200
--- /dev/null
+++ b/services/syncbase/vsync/types.vdl
@@ -0,0 +1,10 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// syncData represents the persistent state of the sync module.
+type syncData struct {
+ Id int64
+}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
new file mode 100644
index 0000000..cc15abe
--- /dev/null
+++ b/services/syncbase/vsync/types.vdl.go
@@ -0,0 +1,27 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+package vsync
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+)
+
+// syncData represents the persistent state of the sync module.
+type syncData struct {
+ Id int64
+}
+
+func (syncData) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.syncData"`
+}) {
+}
+
+func init() {
+ vdl.Register((*syncData)(nil))
+}
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
new file mode 100644
index 0000000..a711078
--- /dev/null
+++ b/services/syncbase/vsync/watcher.go
@@ -0,0 +1,16 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// When applications update objects in the local Store, the sync
+// watcher thread learns about them asynchronously via the "watch"
+// stream of object mutations. In turn, this sync watcher thread
+// updates the DAG and ILog records to track the object change
+// histories.
+
+// watchStore processes updates obtained by watching the store.
+func (s *syncService) watchStore() {
+ defer s.pending.Done()
+}