Add clock sync capability to syncbase server.
1) Adds a syncservice in clock package that deals with
   clock data response from peer
2) Clock sync logic to vsync package with makes
   rpc call to peer right before initiating getDeltas
3) Deamon to service to run clockservice and
   ntpservice in the background
4) Unit tests for syncservice

Change-Id: I6434fb940a2b8c0360d6b71c213b0b0d5220960f
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 80b46a0..4e11e3f 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -12,6 +12,10 @@
 // Sync defines methods for data exchange between Syncbases.
 // TODO(hpucha): Flesh this out further.
 type Sync interface {
+	// GetTime returns metadata related to syncbase clock like syncbase clock
+	// timestamps, last NTP timestamp, num reboots, etc.
+	GetTime(req TimeReq, initiator string) (TimeResp | error)
+
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
 	// vector for one Database for either SyncGroup metadata or data.
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index 102699b..ebaa912 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -41,6 +41,9 @@
 // Sync defines methods for data exchange between Syncbases.
 // TODO(hpucha): Flesh this out further.
 type SyncClientMethods interface {
+	// GetTime returns metadata related to syncbase clock like syncbase clock
+	// timestamps, last NTP timestamp, num reboots, etc.
+	GetTime(ctx *context.T, req TimeReq, initiator string, opts ...rpc.CallOpt) (TimeResp, error)
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
 	// vector for one Database for either SyncGroup metadata or data.
@@ -107,6 +110,11 @@
 	name string
 }
 
+func (c implSyncClientStub) GetTime(ctx *context.T, i0 TimeReq, i1 string, opts ...rpc.CallOpt) (o0 TimeResp, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "GetTime", []interface{}{i0, i1}, []interface{}{&o0}, opts...)
+	return
+}
+
 func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 DeltaReq, i1 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
 	var call rpc.ClientCall
 	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0, i1}, opts...); err != nil {
@@ -472,6 +480,9 @@
 // Sync defines methods for data exchange between Syncbases.
 // TODO(hpucha): Flesh this out further.
 type SyncServerMethods interface {
+	// GetTime returns metadata related to syncbase clock like syncbase clock
+	// timestamps, last NTP timestamp, num reboots, etc.
+	GetTime(ctx *context.T, call rpc.ServerCall, req TimeReq, initiator string) (TimeResp, error)
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
 	// vector for one Database for either SyncGroup metadata or data.
@@ -528,6 +539,9 @@
 // The only difference between this interface and SyncServerMethods
 // is the streaming methods.
 type SyncServerStubMethods interface {
+	// GetTime returns metadata related to syncbase clock like syncbase clock
+	// timestamps, last NTP timestamp, num reboots, etc.
+	GetTime(ctx *context.T, call rpc.ServerCall, req TimeReq, initiator string) (TimeResp, error)
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
 	// vector for one Database for either SyncGroup metadata or data.
@@ -608,6 +622,10 @@
 	gs   *rpc.GlobState
 }
 
+func (s implSyncServerStub) GetTime(ctx *context.T, call rpc.ServerCall, i0 TimeReq, i1 string) (TimeResp, error) {
+	return s.impl.GetTime(ctx, call, i0, i1)
+}
+
 func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 DeltaReq, i1 string) error {
 	return s.impl.GetDeltas(ctx, call, i0, i1)
 }
@@ -654,6 +672,17 @@
 	Doc:     "// Sync defines methods for data exchange between Syncbases.\n// TODO(hpucha): Flesh this out further.",
 	Methods: []rpc.MethodDesc{
 		{
+			Name: "GetTime",
+			Doc:  "// GetTime returns metadata related to syncbase clock like syncbase clock\n// timestamps, last NTP timestamp, num reboots, etc.",
+			InArgs: []rpc.ArgDesc{
+				{"req", ``},       // TimeReq
+				{"initiator", ``}, // string
+			},
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // TimeResp
+			},
+		},
+		{
 			Name: "GetDeltas",
 			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector for one Database for either SyncGroup metadata or data.",
 			InArgs: []rpc.ArgDesc{
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 849e8ec..acd58f6 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -146,3 +146,34 @@
 type ChunkData struct {
 	Data []byte
 }
+
+// TimeReq contains the send timestamp from the requester.
+type TimeReq struct {
+	SendTs time.Time
+}
+
+// TimeResp contains timestamps needed by the requester to correctly estimate
+// the difference between the two clocks along with clock metadata required
+// by the requester to decide which clock is more accurate.
+type TimeResp struct {
+	// The send timestamp received in TimeReq from the originator.
+	OrigTs time.Time
+
+	// Time when the request was received.
+	RecvTs time.Time
+
+	// Time when the response was sent.
+	SendTs time.Time
+
+	// Timestamp received from NTP during last NTP sync. The last NTP sync could
+	// be done either by this device or some other device that this device
+	// synced its clock with.
+	LastNtpTs ?time.Time
+
+	// Number of reboots since last NTP sync.
+	NumReboots uint16
+
+	// Number of hops between this device and the device that did the last
+	// NTP sync.
+	NumHops uint16
+}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index a00591c..d63f32f 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -291,6 +291,42 @@
 }) {
 }
 
+// TimeReq contains the send timestamp from the requester.
+type TimeReq struct {
+	SendTs time.Time
+}
+
+func (TimeReq) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.TimeReq"`
+}) {
+}
+
+// TimeResp contains timestamps needed by the requester to correctly estimate
+// the difference between the two clocks along with clock metadata required
+// by the requester to decide which clock is more accurate.
+type TimeResp struct {
+	// The send timestamp received in TimeReq from the originator.
+	OrigTs time.Time
+	// Time when the request was received.
+	RecvTs time.Time
+	// Time when the response was sent.
+	SendTs time.Time
+	// Timestamp received from NTP during last NTP sync. The last NTP sync could
+	// be done either by this device or some other device that this device
+	// synced its clock with.
+	LastNtpTs *time.Time
+	// Number of reboots since last NTP sync.
+	NumReboots uint16
+	// Number of hops between this device and the device that did the last
+	// NTP sync.
+	NumHops uint16
+}
+
+func (TimeResp) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.TimeResp"`
+}) {
+}
+
 func init() {
 	vdl.Register((*PrefixGenVector)(nil))
 	vdl.Register((*GenVector)(nil))
@@ -305,6 +341,8 @@
 	vdl.Register((*DeltaResp)(nil))
 	vdl.Register((*ChunkHash)(nil))
 	vdl.Register((*ChunkData)(nil))
+	vdl.Register((*TimeReq)(nil))
+	vdl.Register((*TimeResp)(nil))
 }
 
 const NoGroupId = GroupId(0)
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 64d7547..e6e3787 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -19,6 +19,7 @@
 	wire "v.io/v23/services/syncbase"
 	"v.io/v23/verror"
 	"v.io/v23/vom"
+	"v.io/x/ref/services/syncbase/clock"
 	"v.io/x/ref/services/syncbase/server/interfaces"
 	"v.io/x/ref/services/syncbase/server/nosql"
 	"v.io/x/ref/services/syncbase/server/util"
@@ -28,9 +29,10 @@
 
 // service is a singleton (i.e. not per-request) that handles Service RPCs.
 type service struct {
-	st   store.Store // keeps track of which apps and databases exist, etc.
-	sync interfaces.SyncServerMethods
-	opts ServiceOptions
+	st     store.Store // keeps track of which apps and databases exist, etc.
+	sync   interfaces.SyncServerMethods
+	clockD *clock.ClockD
+	opts   ServiceOptions
 	// Guards the fields below. Held during app Create, Delete, and
 	// SetPermissions.
 	mu   sync.Mutex
@@ -125,11 +127,16 @@
 			return nil, err
 		}
 	}
+	vclock := clock.NewVClock(st)
 	// Note, vsync.New internally handles both first-time and subsequent
 	// invocations.
-	if s.sync, err = vsync.New(ctx, call, s, opts.Engine, opts.RootDir); err != nil {
+	if s.sync, err = vsync.New(ctx, call, s, opts.Engine, opts.RootDir, vclock); err != nil {
 		return nil, err
 	}
+
+	// Create a new clock deamon and run clock check and ntp check in a
+	// scheduled loop.
+	s.clockD = clock.StartClockD(ctx, vclock)
 	return s, nil
 }
 
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 5841aa9..e38377e 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -55,7 +55,6 @@
 	// Object name component for Syncbase-to-Syncbase (sync) RPCs.
 	// Sync object names have the form:
 	//     <syncbase>/%%sync/...
-	// FIXME: update all apps, again...
 	SyncbaseSuffix = "%%sync"
 	// Separator for batch info in database names.
 	// Batch object names have the form:
@@ -72,4 +71,7 @@
 	NtpSampleCount           = 15
 	LocalClockDriftThreshold = float64(time.Second)
 	NtpDiffThreshold         = float64(2 * time.Second)
+	PeerSyncDiffThreshold    = NtpDiffThreshold
+	RebootTolerance          = float64(time.Minute)
+	HopTolerance             = 2
 )
diff --git a/services/syncbase/server/watchable/store_test.go b/services/syncbase/server/watchable/store_test.go
index d44cc08..53bbec1 100644
--- a/services/syncbase/server/watchable/store_test.go
+++ b/services/syncbase/server/watchable/store_test.go
@@ -65,7 +65,7 @@
 func runTest(t *testing.T, mp []string, f func(t *testing.T, st store.Store)) {
 	st, destroy := createStore()
 	defer destroy()
-	vClock := clock.NewVClockWithMockServices(clock.MockStorageAdapter(), nil, nil)
+	vClock := clock.NewVClockWithMockServices(st, nil, nil)
 	st, err := Wrap(st, vClock, &Options{ManagedPrefixes: mp})
 	if err != nil {
 		t.Fatal(err)
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 3178cb7..b4556c3 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -140,7 +140,7 @@
 		return verror.New(verror.ErrInternal, nil, "seq maxed out")
 	}
 	// Write LogEntry records.
-	timestamp := tx.st.clock.Now(nil).UnixNano()
+	timestamp := tx.st.clock.Now().UnixNano()
 	seq := tx.st.seq
 	for i, op := range tx.ops {
 		key := logEntryKey(seq)
@@ -177,7 +177,7 @@
 // GetStoreTime returns the current time from the given transaction store.
 func GetStoreTime(ctx *context.T, tx store.Transaction) time.Time {
 	wtx := tx.(*transaction)
-	return wtx.st.clock.Now(ctx)
+	return wtx.st.clock.Now()
 }
 
 // AddSyncGroupOp injects a SyncGroup operation notification in the log entries
diff --git a/services/syncbase/server/watchable/transaction_test.go b/services/syncbase/server/watchable/transaction_test.go
index 681b3f7..518f7b3 100644
--- a/services/syncbase/server/watchable/transaction_test.go
+++ b/services/syncbase/server/watchable/transaction_test.go
@@ -71,9 +71,7 @@
 	t1 := time.Now()
 	inc := time.Duration(1) * time.Second
 	mockClock := newMockSystemClock(t1, inc)
-	var mockAdapter clock.StorageAdapter = clock.MockStorageAdapter()
-
-	vclock := clock.NewVClockWithMockServices(mockAdapter, mockClock, nil)
+	vclock := clock.NewVClockWithMockServices(ist, mockClock, nil)
 	wst1, err := Wrap(ist, vclock, &Options{ManagedPrefixes: nil})
 	if err != nil {
 		t.Fatalf("Wrap failed: %v", err)
@@ -143,9 +141,7 @@
 	t1 := time.Now()
 	inc := time.Duration(1) * time.Second
 	mockClock := newMockSystemClock(t1, inc)
-	var mockAdapter clock.StorageAdapter = clock.MockStorageAdapter()
-
-	vclock := clock.NewVClockWithMockServices(mockAdapter, mockClock, nil)
+	vclock := clock.NewVClockWithMockServices(ist, mockClock, nil)
 	wst, err := Wrap(ist, vclock, &Options{ManagedPrefixes: nil})
 	if err != nil {
 		t.Fatalf("Wrap failed: %v", err)
diff --git a/services/syncbase/server/watchable/util_test.go b/services/syncbase/server/watchable/util_test.go
index e0db4ac..b7b4412 100644
--- a/services/syncbase/server/watchable/util_test.go
+++ b/services/syncbase/server/watchable/util_test.go
@@ -14,8 +14,7 @@
 func TestGetNextLogSeq(t *testing.T) {
 	st, destroy := createStore()
 	defer destroy()
-	var mockAdapter clock.StorageAdapter = clock.MockStorageAdapter()
-	vclock := clock.NewVClockWithMockServices(mockAdapter, nil, nil)
+	vclock := clock.NewVClockWithMockServices(st, nil, nil)
 	st, err := Wrap(st, vclock, &Options{})
 	if err != nil {
 		t.Fatal(err)