syncbase/vsync: Scaffolding to start integrating sync into store.

Change-Id: I2d63421cd1726408039fcd03f1cc58b8c9c42ef9
diff --git a/services/syncbase/server/dispatcher.go b/services/syncbase/server/dispatcher.go
index c676f97..4a4c9e2 100644
--- a/services/syncbase/server/dispatcher.go
+++ b/services/syncbase/server/dispatcher.go
@@ -10,6 +10,7 @@
 	wire "v.io/syncbase/v23/services/syncbase"
 	pubutil "v.io/syncbase/v23/syncbase/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/nosql"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 	"v.io/v23/verror"
@@ -34,6 +35,10 @@
 		return wire.ServiceServer(disp.s), nil, nil
 	}
 
+	if parts[0] == util.SyncbaseSuffix {
+		return disp.s.sync, nil, nil
+	}
+
 	// Validate all key atoms up front, so that we can avoid doing so in all our
 	// method implementations.
 	appName := parts[0]
diff --git a/services/syncbase/server/nosql/database_sgm.go b/services/syncbase/server/nosql/database_sgm.go
new file mode 100644
index 0000000..c19c4f4
--- /dev/null
+++ b/services/syncbase/server/nosql/database_sgm.go
@@ -0,0 +1,51 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
+)
+
+////////////////////////////////////////
+// SyncGroup RPC methods
+
+func (d *database) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+	return nil, verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+	return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+	return wire.SyncGroupSpec{}, verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+	return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+	return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+	return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+	return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+	return verror.NewErrNotImplemented(ctx)
+}
+
+func (d *database) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+	return nil, verror.NewErrNotImplemented(ctx)
+}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index c8ea107..f5dfb31 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -11,6 +11,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
+	"v.io/syncbase/x/ref/services/syncbase/vsync"
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
@@ -19,7 +20,8 @@
 )
 
 type service struct {
-	st store.Store // keeps track of which apps and databases exist, etc.
+	st   store.Store // keeps track of which apps and databases exist, etc.
+	sync vsync.SyncServerMethods
 	// Guards the fields below. Held during app Create, Delete, and
 	// SetPermissions.
 	mu   sync.Mutex
@@ -42,6 +44,12 @@
 		st:   memstore.New(),
 		apps: map[string]*app{},
 	}
+
+	var err error
+	if s.sync, err = vsync.New(ctx, call, s.st); err != nil {
+		return nil, err
+	}
+
 	data := &serviceData{
 		Perms: perms,
 	}
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 5e82ac1..815ef82 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -12,4 +12,10 @@
 	DatabasePrefix = "$database"
 	TablePrefix    = "$table"
 	RowPrefix      = "$row"
+	SyncPrefix     = "$sync"
+)
+
+const (
+	// Service object name suffix for Syncbase internal communication.
+	SyncbaseSuffix = "$internal"
 )
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
new file mode 100644
index 0000000..a50b58c
--- /dev/null
+++ b/services/syncbase/vsync/blob.go
@@ -0,0 +1,18 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
+)
+
+//////////////////////////////////////////////////
+// Methods for blob fetch between Syncbases.
+
+func (s *syncService) FetchBlob(ctx *context.T, call rpc.ServerCall) error {
+	return verror.NewErrNotImplemented(ctx)
+}
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
new file mode 100644
index 0000000..f836cb2
--- /dev/null
+++ b/services/syncbase/vsync/initiator.go
@@ -0,0 +1,31 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import "time"
+
+var (
+	// peerSyncInterval is the duration between two consecutive
+	// peer contacts.  During every peer contact, the initiator
+	// obtains any pending updates from that peer.
+	peerSyncInterval = 50 * time.Millisecond
+)
+
+// contactPeers wakes up every peerSyncInterval to select a peer, and
+// get updates from it.
+func (s *syncService) contactPeers() {
+	ticker := time.NewTicker(peerSyncInterval)
+	for {
+		select {
+		case <-s.closed:
+			ticker.Stop()
+			s.pending.Done()
+			return
+		case <-ticker.C:
+		}
+
+		// Do work.
+	}
+}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
new file mode 100644
index 0000000..f93967a
--- /dev/null
+++ b/services/syncbase/vsync/sync.go
@@ -0,0 +1,104 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Package vsync provides sync functionality for Syncbase. Sync
+// service serves incoming GetDeltas requests and contacts other peers
+// to get deltas from them. When it receives a GetDeltas request, the
+// incoming generation vector is diffed with the local generation
+// vector, and missing generations are sent back. When it receives log
+// records in response to a GetDeltas request, it replays those log
+// records to get in sync with the sender.
+import (
+	"math/rand"
+	"sync"
+	"time"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
+)
+
+// syncService contains the metadata for the sync module.
+type syncService struct {
+	// Globally unique Syncbase id.
+	id int64
+
+	// Store for persisting Sync metadata.
+	st store.Store
+
+	// State to coordinate shutting down all spawned goroutines.
+	pending sync.WaitGroup
+	closed  chan struct{}
+}
+
+var (
+	rng *rand.Rand
+	_   util.Layer = (*syncService)(nil)
+)
+
+func init() {
+	rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+}
+
+// New creates a new sync module.
+//
+// Sync concurrency: sync initializes two goroutines at startup. The
+// "watcher" thread is responsible for watching the store for changes
+// to its objects. The "initiator" thread is responsible for
+// periodically contacting a peer to obtain changes from that peer. In
+// addition, the sync module responds to incoming RPCs from remote
+// sync modules.
+func New(ctx *context.T, call rpc.ServerCall, st store.Store) (*syncService, error) {
+	// TODO(hpucha): Add restartability.
+
+	// First invocation of sync.
+	s := &syncService{
+		id: rng.Int63(),
+		st: st,
+	}
+
+	// Persist sync metadata.
+	data := &syncData{
+		Id: s.id,
+	}
+
+	if err := util.Put(ctx, call, s.st, s, data); err != nil {
+		return nil, err
+	}
+
+	// Channel to propagate close event to all threads.
+	s.closed = make(chan struct{})
+	s.pending.Add(2)
+
+	// Get deltas every so often.
+	go s.contactPeers()
+
+	// Start a watcher thread that will get updates from local store.
+	go s.watchStore()
+
+	return s, nil
+}
+
+////////////////////////////////////////
+// Core sync method.
+
+func (s *syncService) GetDeltas(ctx *context.T, call rpc.ServerCall) error {
+	return verror.NewErrNotImplemented(ctx)
+}
+
+////////////////////////////////////////
+// util.Layer methods.
+
+func (s *syncService) Name() string {
+	return "sync"
+}
+
+func (s *syncService) StKey() string {
+	return util.SyncPrefix
+}
diff --git a/services/syncbase/vsync/sync.vdl b/services/syncbase/vsync/sync.vdl
new file mode 100644
index 0000000..b922376
--- /dev/null
+++ b/services/syncbase/vsync/sync.vdl
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Sync defines methods for data exchange between Syncbases.
+// TODO(hpucha): Flesh this out further.
+type Sync interface {
+	// GetDeltas returns the responder's current generation vector
+	// and all the missing log records when compared to the
+	// initiator's generation vector.
+	GetDeltas() error
+
+	// SyncGroup-related methods.
+	// TODO(hpucha): Think of better names for these methods.
+
+	// CreateSyncGroup is typically invoked on a "central" peer to
+	// request the creation of a SyncGroup.
+	CreateSyncGroup() error
+
+	// JoinSyncGroup is invoked on a SyncGroup Admin and checks if
+	// the requestor can join the SyncGroup.
+	JoinSyncGroup() error
+
+	// BlobSync methods.
+	// FetchBlob returns the requested blob.
+	FetchBlob() error
+}
diff --git a/services/syncbase/vsync/sync.vdl.go b/services/syncbase/vsync/sync.vdl.go
new file mode 100644
index 0000000..e481e98
--- /dev/null
+++ b/services/syncbase/vsync/sync.vdl.go
@@ -0,0 +1,179 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: sync.vdl
+
+package vsync
+
+import (
+	// VDL system imports
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+)
+
+// SyncClientMethods is the client interface
+// containing Sync methods.
+//
+// Sync defines methods for data exchange between Syncbases.
+// TODO(hpucha): Flesh this out further.
+type SyncClientMethods interface {
+	// GetDeltas returns the responder's current generation vector
+	// and all the missing log records when compared to the
+	// initiator's generation vector.
+	GetDeltas(*context.T, ...rpc.CallOpt) error
+	// CreateSyncGroup is typically invoked on a "central" peer to
+	// request the creation of a SyncGroup.
+	CreateSyncGroup(*context.T, ...rpc.CallOpt) error
+	// JoinSyncGroup is invoked on a SyncGroup Admin and checks if
+	// the requestor can join the SyncGroup.
+	JoinSyncGroup(*context.T, ...rpc.CallOpt) error
+	// BlobSync methods.
+	// FetchBlob returns the requested blob.
+	FetchBlob(*context.T, ...rpc.CallOpt) error
+}
+
+// SyncClientStub adds universal methods to SyncClientMethods.
+type SyncClientStub interface {
+	SyncClientMethods
+	rpc.UniversalServiceMethods
+}
+
+// SyncClient returns a client stub for Sync.
+func SyncClient(name string) SyncClientStub {
+	return implSyncClientStub{name}
+}
+
+type implSyncClientStub struct {
+	name string
+}
+
+func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "GetDeltas", nil, nil, opts...)
+	return
+}
+
+func (c implSyncClientStub) CreateSyncGroup(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "CreateSyncGroup", nil, nil, opts...)
+	return
+}
+
+func (c implSyncClientStub) JoinSyncGroup(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroup", nil, nil, opts...)
+	return
+}
+
+func (c implSyncClientStub) FetchBlob(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "FetchBlob", nil, nil, opts...)
+	return
+}
+
+// SyncServerMethods is the interface a server writer
+// implements for Sync.
+//
+// Sync defines methods for data exchange between Syncbases.
+// TODO(hpucha): Flesh this out further.
+type SyncServerMethods interface {
+	// GetDeltas returns the responder's current generation vector
+	// and all the missing log records when compared to the
+	// initiator's generation vector.
+	GetDeltas(*context.T, rpc.ServerCall) error
+	// CreateSyncGroup is typically invoked on a "central" peer to
+	// request the creation of a SyncGroup.
+	CreateSyncGroup(*context.T, rpc.ServerCall) error
+	// JoinSyncGroup is invoked on a SyncGroup Admin and checks if
+	// the requestor can join the SyncGroup.
+	JoinSyncGroup(*context.T, rpc.ServerCall) error
+	// BlobSync methods.
+	// FetchBlob returns the requested blob.
+	FetchBlob(*context.T, rpc.ServerCall) error
+}
+
+// SyncServerStubMethods is the server interface containing
+// Sync methods, as expected by rpc.Server.
+// There is no difference between this interface and SyncServerMethods
+// since there are no streaming methods.
+type SyncServerStubMethods SyncServerMethods
+
+// SyncServerStub adds universal methods to SyncServerStubMethods.
+type SyncServerStub interface {
+	SyncServerStubMethods
+	// Describe the Sync interfaces.
+	Describe__() []rpc.InterfaceDesc
+}
+
+// SyncServer returns a server stub for Sync.
+// It converts an implementation of SyncServerMethods into
+// an object that may be used by rpc.Server.
+func SyncServer(impl SyncServerMethods) SyncServerStub {
+	stub := implSyncServerStub{
+		impl: impl,
+	}
+	// Initialize GlobState; always check the stub itself first, to handle the
+	// case where the user has the Glob method defined in their VDL source.
+	if gs := rpc.NewGlobState(stub); gs != nil {
+		stub.gs = gs
+	} else if gs := rpc.NewGlobState(impl); gs != nil {
+		stub.gs = gs
+	}
+	return stub
+}
+
+type implSyncServerStub struct {
+	impl SyncServerMethods
+	gs   *rpc.GlobState
+}
+
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call rpc.ServerCall) error {
+	return s.impl.GetDeltas(ctx, call)
+}
+
+func (s implSyncServerStub) CreateSyncGroup(ctx *context.T, call rpc.ServerCall) error {
+	return s.impl.CreateSyncGroup(ctx, call)
+}
+
+func (s implSyncServerStub) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
+	return s.impl.JoinSyncGroup(ctx, call)
+}
+
+func (s implSyncServerStub) FetchBlob(ctx *context.T, call rpc.ServerCall) error {
+	return s.impl.FetchBlob(ctx, call)
+}
+
+func (s implSyncServerStub) Globber() *rpc.GlobState {
+	return s.gs
+}
+
+func (s implSyncServerStub) Describe__() []rpc.InterfaceDesc {
+	return []rpc.InterfaceDesc{SyncDesc}
+}
+
+// SyncDesc describes the Sync interface.
+var SyncDesc rpc.InterfaceDesc = descSync
+
+// descSync hides the desc to keep godoc clean.
+var descSync = rpc.InterfaceDesc{
+	Name:    "Sync",
+	PkgPath: "v.io/syncbase/x/ref/services/syncbase/vsync",
+	Doc:     "// Sync defines methods for data exchange between Syncbases.\n// TODO(hpucha): Flesh this out further.",
+	Methods: []rpc.MethodDesc{
+		{
+			Name: "GetDeltas",
+			Doc:  "// GetDeltas returns the responder's current generation vector\n// and all the missing log records when compared to the\n// initiator's generation vector.",
+		},
+		{
+			Name: "CreateSyncGroup",
+			Doc:  "// CreateSyncGroup is typically invoked on a \"central\" peer to\n// request the creation of a SyncGroup.",
+		},
+		{
+			Name: "JoinSyncGroup",
+			Doc:  "// JoinSyncGroup is invoked on a SyncGroup Admin and checks if\n// the requestor can join the SyncGroup.",
+		},
+		{
+			Name: "FetchBlob",
+			Doc:  "// BlobSync methods.\n// FetchBlob returns the requested blob.",
+		},
+	},
+}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
new file mode 100644
index 0000000..c3ab3d1
--- /dev/null
+++ b/services/syncbase/vsync/syncgroup.go
@@ -0,0 +1,25 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
+)
+
+////////////////////////////////////////////////////////////
+// SyncGroup methods between Client and Syncbase.
+
+////////////////////////////////////////////////////////////
+// Methods for SyncGroup create/join between Syncbases.
+
+func (s *syncService) CreateSyncGroup(ctx *context.T, call rpc.ServerCall) error {
+	return verror.NewErrNotImplemented(ctx)
+}
+
+func (s *syncService) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
+	return verror.NewErrNotImplemented(ctx)
+}
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
new file mode 100644
index 0000000..2434200
--- /dev/null
+++ b/services/syncbase/vsync/types.vdl
@@ -0,0 +1,10 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// syncData represents the persistent state of the sync module.
+type syncData struct {
+	Id int64
+}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
new file mode 100644
index 0000000..cc15abe
--- /dev/null
+++ b/services/syncbase/vsync/types.vdl.go
@@ -0,0 +1,27 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+package vsync
+
+import (
+	// VDL system imports
+	"v.io/v23/vdl"
+)
+
+// syncData represents the persistent state of the sync module.
+type syncData struct {
+	Id int64
+}
+
+func (syncData) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.syncData"`
+}) {
+}
+
+func init() {
+	vdl.Register((*syncData)(nil))
+}
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
new file mode 100644
index 0000000..a711078
--- /dev/null
+++ b/services/syncbase/vsync/watcher.go
@@ -0,0 +1,16 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// When applications update objects in the local Store, the sync
+// watcher thread learns about them asynchronously via the "watch"
+// stream of object mutations. In turn, this sync watcher thread
+// updates the DAG and ILog records to track the object change
+// histories.
+
+// watchStore processes updates obtained by watching the store.
+func (s *syncService) watchStore() {
+	defer s.pending.Done()
+}