blob: f93967a128bae5a98a2b7ec9923b112177d0031a [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// Package vsync provides sync functionality for Syncbase. Sync
// service serves incoming GetDeltas requests and contacts other peers
// to get deltas from them. When it receives a GetDeltas request, the
// incoming generation vector is diffed with the local generation
// vector, and missing generations are sent back. When it receives log
// records in response to a GetDeltas request, it replays those log
// records to get in sync with the sender.
import (
"math/rand"
"sync"
"time"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
)
// syncService contains the metadata for the sync module.
type syncService struct {
// Globally unique Syncbase id.
id int64
// Store for persisting Sync metadata.
st store.Store
// State to coordinate shutting down all spawned goroutines.
pending sync.WaitGroup
closed chan struct{}
}
var (
rng *rand.Rand
_ util.Layer = (*syncService)(nil)
)
func init() {
rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
}
// New creates a new sync module.
//
// Sync concurrency: sync initializes two goroutines at startup. The
// "watcher" thread is responsible for watching the store for changes
// to its objects. The "initiator" thread is responsible for
// periodically contacting a peer to obtain changes from that peer. In
// addition, the sync module responds to incoming RPCs from remote
// sync modules.
func New(ctx *context.T, call rpc.ServerCall, st store.Store) (*syncService, error) {
// TODO(hpucha): Add restartability.
// First invocation of sync.
s := &syncService{
id: rng.Int63(),
st: st,
}
// Persist sync metadata.
data := &syncData{
Id: s.id,
}
if err := util.Put(ctx, call, s.st, s, data); err != nil {
return nil, err
}
// Channel to propagate close event to all threads.
s.closed = make(chan struct{})
s.pending.Add(2)
// Get deltas every so often.
go s.contactPeers()
// Start a watcher thread that will get updates from local store.
go s.watchStore()
return s, nil
}
////////////////////////////////////////
// Core sync method.
func (s *syncService) GetDeltas(ctx *context.T, call rpc.ServerCall) error {
return verror.NewErrNotImplemented(ctx)
}
////////////////////////////////////////
// util.Layer methods.
func (s *syncService) Name() string {
return "sync"
}
func (s *syncService) StKey() string {
return util.SyncPrefix
}