Flesh out syncbase clock
1) Add storage adapter for clock data
2) Add clock service to keep track of changes
to local clock
3) Add ntp service to sample ntp time and update
clock skew for syncbase clock
4) Tests for above
Note: This CL does not create a daemon to run
the above two services as a cron.
Change-Id: I73881f7d4fcc319933028222e53cb7e0fccd78bb
diff --git a/x/ref/services/syncbase/clock/clock_darwin.go b/x/ref/services/syncbase/clock/clock_darwin.go
new file mode 100644
index 0000000..4c801cb
--- /dev/null
+++ b/x/ref/services/syncbase/clock/clock_darwin.go
@@ -0,0 +1,51 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "bytes"
+ "encoding/binary"
+ "syscall"
+ "time"
+ "unsafe"
+)
+
+// This file contains darwin specific implementations of functions for clock
+// package.
+
+// ElapsedTime returns the time elapsed since last boot.
+// Darwin provides a system call "kern.boottime" which returns a Timeval32
+// object containing the boot time for the system. Darwin calculates this
+// boottime based on the current clock and the internal tracking of elapsed
+// time since boot. Hence if the clock is changed, the boot time changes along
+// with it. So the difference between the current time and boot time will always
+// give us the correct elapsed time since boot.
+func (sc *systemClockImpl) ElapsedTime() (time.Duration, error) {
+ tv := syscall.Timeval32{}
+
+ if err := sysctlbyname("kern.boottime", &tv); err != nil {
+ return 0, err
+ }
+ return time.Since(time.Unix(int64(tv.Sec), int64(tv.Usec)*1000)), nil
+}
+
+// Generic Sysctl buffer unmarshalling.
+func sysctlbyname(name string, data interface{}) (err error) {
+ val, err := syscall.Sysctl(name)
+ if err != nil {
+ return err
+ }
+
+ buf := []byte(val)
+
+ switch v := data.(type) {
+ case *uint64:
+ *v = *(*uint64)(unsafe.Pointer(&buf[0]))
+ return
+ }
+
+ bbuf := bytes.NewBuffer([]byte(val))
+ return binary.Read(bbuf, binary.LittleEndian, data)
+}
diff --git a/x/ref/services/syncbase/clock/clock_linux.go b/x/ref/services/syncbase/clock/clock_linux.go
new file mode 100644
index 0000000..dabaab4
--- /dev/null
+++ b/x/ref/services/syncbase/clock/clock_linux.go
@@ -0,0 +1,26 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "syscall"
+ "time"
+)
+
+// This file contains linux specific implementations of functions for clock
+// package.
+
+// Linux System stores this information in /proc/uptime as seconds
+// since boot with a precision up to 2 decimal points.
+// NOTE: Go system call returns elapsed time in seconds and removes the decimal
+// points by rounding to the closest second. Be careful in using this value as
+// it can introduce a compounding error.
+func (sc *systemClockImpl) ElapsedTime() (time.Duration, error) {
+ var sysInfo syscall.Sysinfo_t
+ if err := syscall.Sysinfo(&sysInfo); err != nil {
+ return 0, err
+ }
+ return time.Duration(sysInfo.Uptime) * time.Second, nil
+}
diff --git a/x/ref/services/syncbase/clock/clockservice.go b/x/ref/services/syncbase/clock/clockservice.go
new file mode 100644
index 0000000..fed9a02
--- /dev/null
+++ b/x/ref/services/syncbase/clock/clockservice.go
@@ -0,0 +1,127 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "math"
+ "time"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/v23/context"
+ "v.io/v23/verror"
+ "v.io/x/lib/vlog"
+)
+
+// This file contains code related to checking current system clock to see
+// if it has been changed by any external action.
+
+// runClockCheck estimates the current system time based on saved boottime
+// and elapsed time since boot and checks if the system clock shows the same
+// time. This involves the following steps:
+// 1) Check if system was rebooted since last run. If so update the saved
+// ClockData.
+// 2) Fetch stored ClockData. If none exists, this is the first time
+// runClockCheck has been run. Write new ClockData.
+// 3) Estimate current system clock time and check if the actual system clock
+// agrees with the estimation. If not update the skew value appropriately.
+// 4) Update saved elapsed time since boot. This is used to check if the system
+// was rebooted or not. TODO(jlodhia): work with device manager to provide a
+// way to notify syncbase if the system was just rebooted.
+func (c *VClock) runClockCheck(ctx *context.T) {
+ checkSystemRebooted(ctx, c)
+
+ clockData := &ClockData{}
+ if err := c.sa.GetClockData(ctx, clockData); err != nil {
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ // VClock's cron job to setup UTC time at boot is being run for the
+ // first time. Skew is not known, hence assigning 0.
+ writeNewClockData(ctx, c, 0)
+ } else {
+ vlog.Errorf("Error while fetching clock data: %v", err)
+ }
+ return
+ }
+
+ systemTime := c.clock.Now()
+ elapsedTime, err := c.clock.ElapsedTime()
+ if err != nil {
+ vlog.Errorf("Error while fetching elapsed time: %v", err)
+ return
+ }
+
+ newClockData := &ClockData{
+ SystemTimeAtBoot: clockData.SystemTimeAtBoot,
+ Skew: clockData.Skew,
+ ElapsedTimeSinceBoot: elapsedTime.Nanoseconds(),
+ }
+
+ estimatedClockTime := clockData.SystemBootTime().Add(elapsedTime)
+ diff := estimatedClockTime.Sub(systemTime)
+ if math.Abs(float64(diff.Nanoseconds())) > util.LocalClockDriftThreshold {
+ newClockData.Skew = newClockData.Skew + diff.Nanoseconds()
+ newSystemTimeAtBoot := systemTime.Add(-elapsedTime)
+ newClockData.SystemTimeAtBoot = newSystemTimeAtBoot.UnixNano()
+ }
+
+ if err := c.sa.SetClockData(ctx, newClockData); err != nil {
+ vlog.Errorf("Error while setting clock data: %v", err)
+ }
+}
+
+func writeNewClockData(ctx *context.T, c *VClock, skew time.Duration) {
+ systemTime := c.clock.Now()
+ elapsedTime, err := c.clock.ElapsedTime()
+ if err != nil {
+ vlog.Errorf("Error while fetching elapsed time: %v", err)
+ return
+ }
+ systemTimeAtBoot := systemTime.Add(-elapsedTime)
+ clockData := &ClockData{
+ SystemTimeAtBoot: systemTimeAtBoot.UnixNano(),
+ Skew: skew.Nanoseconds(),
+ ElapsedTimeSinceBoot: elapsedTime.Nanoseconds(),
+ }
+ if err := c.sa.SetClockData(ctx, clockData); err != nil {
+ vlog.Errorf("Error while setting clock data: %v", err)
+ }
+}
+
+// checkSystemRebooted compares the elapsed time stored during the last
+// run of runClockCheck() to the current elapsed time since boot provided
+// by system clock. Since elapsed time is monotonically increasing and cannot
+// be changed unless a reboot happens, if the current value is lower than the
+// previous value then a reboot has happened since last run. If so, update
+// the boot time and elapsed time since boot appropriately.
+func checkSystemRebooted(ctx *context.T, c *VClock) bool {
+ currentSysTime := c.clock.Now()
+ elapsedTime, err := c.clock.ElapsedTime()
+ if err != nil {
+ vlog.Errorf("Error while fetching elapsed time: %v", err)
+ return false
+ }
+
+ clockData := &ClockData{}
+ if err := c.sa.GetClockData(ctx, clockData); err != nil {
+ if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ vlog.Errorf("Error while fetching clock delta: %v", err)
+ }
+ // In case of verror.ErrNoExist no clock data present. Nothing needed to
+ // be done. writeNewClockData() will write new clock data to storage.
+ return false
+ }
+
+ if elapsedTime.Nanoseconds() < clockData.ElapsedTimeSinceBoot {
+ // Since the elapsed time since last boot provided by the system is
+ // less than the elapsed time since boot seen the last time clockservice
+ // ran, the system must have rebooted in between.
+ clockData.SystemTimeAtBoot = currentSysTime.Add(-elapsedTime).UnixNano()
+ clockData.ElapsedTimeSinceBoot = elapsedTime.Nanoseconds()
+ if err := c.sa.SetClockData(ctx, clockData); err != nil {
+ vlog.Errorf("Error while setting clock data: %v", err)
+ }
+ return true
+ }
+ return false
+}
diff --git a/x/ref/services/syncbase/clock/clockservice_test.go b/x/ref/services/syncbase/clock/clockservice_test.go
new file mode 100644
index 0000000..86650c1
--- /dev/null
+++ b/x/ref/services/syncbase/clock/clockservice_test.go
@@ -0,0 +1,191 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "testing"
+ "time"
+)
+
+const (
+ constElapsedTime int64 = 50
+)
+
+func TestWriteNewClockData(t *testing.T) {
+ sysTs := time.Now()
+ sysClock := MockSystemClock(sysTs, time.Duration(constElapsedTime))
+ stAdapter := MockStorageAdapter()
+
+ clock := NewVClockWithMockServices(stAdapter, sysClock, nil)
+
+ writeNewClockData(nil, clock, 0)
+
+ expectedSystemTimeAtBoot := sysTs.UnixNano() - constElapsedTime
+ verifyClockData(t, stAdapter, 0, expectedSystemTimeAtBoot, constElapsedTime)
+}
+
+// This test runs the following scenarios
+// 1) Run checkSystemRebooted() with no ClockData stored
+// Result: no op.
+// 2) Run checkSystemRebooted() with ClockData that has SystemTimeAtBoot higher
+// than the current elapsed time.
+// Result: A new ClockData is written with updated SystemTimeAtBoot and
+// elapsed time.
+// 3) Run checkSystemRebooted() again after moving the sysClock forward
+// Result: no op.
+func TestCheckSystemRebooted(t *testing.T) {
+ sysTs := time.Now()
+ sysClock := MockSystemClock(sysTs, time.Duration(constElapsedTime))
+ stAdapter := MockStorageAdapter()
+
+ clock := NewVClockWithMockServices(stAdapter, sysClock, nil)
+
+ // stAdapter will return ErrNoExist while fetching ClockData
+ // checkSystemRebooted should return false.
+ if checkSystemRebooted(nil, clock) {
+ t.Error("Unexpected return value")
+ }
+
+ // Set clock data with elapsed time greater than constElapsedTime
+ clockData := &ClockData{25003, 25, 34569}
+ stAdapter.SetClockData(nil, clockData)
+
+ if !checkSystemRebooted(nil, clock) {
+ t.Error("Unexpected return value")
+ }
+ expectedSystemTimeAtBoot := sysTs.UnixNano() - constElapsedTime
+ verifyClockData(t, stAdapter, 25, expectedSystemTimeAtBoot, constElapsedTime)
+
+ // move clock forward without reboot and run checkSystemRebooted again
+ var timePassed int64 = 200
+ newSysTs := sysTs.Add(time.Duration(timePassed))
+ sysClock.SetNow(newSysTs)
+ sysClock.SetElapsedTime(time.Duration(constElapsedTime + timePassed))
+
+ if checkSystemRebooted(nil, clock) {
+ t.Error("Unexpected return value")
+ }
+ expectedSystemTimeAtBoot = sysTs.UnixNano() - constElapsedTime
+ verifyClockData(t, stAdapter, 25, expectedSystemTimeAtBoot, constElapsedTime)
+}
+
+// Setup: No prior ClockData present.
+// Result: A new ClockData value gets set.
+func TestRunClockCheck1(t *testing.T) {
+ sysTs := time.Now()
+ sysClock := MockSystemClock(sysTs, time.Duration(constElapsedTime))
+ stAdapter := MockStorageAdapter()
+
+ clock := NewVClockWithMockServices(stAdapter, sysClock, nil)
+
+ clock.runClockCheck(nil)
+ expectedSystemTimeAtBoot := sysTs.UnixNano() - constElapsedTime
+ verifyClockData(t, stAdapter, 0, expectedSystemTimeAtBoot, constElapsedTime)
+}
+
+// Setup: ClockData present, system clock elapsed time is lower than whats
+// stored in clock data.
+// Result: A new ClockData value gets set with new system boot time and elapsed
+// time, skew remains the same.
+func TestRunClockCheck2(t *testing.T) {
+ sysTs := time.Now()
+ sysClock := MockSystemClock(sysTs, time.Duration(constElapsedTime))
+ stAdapter := MockStorageAdapter()
+ // Set clock data with elapsed time greater than constElapsedTime
+ clockData := &ClockData{25003, 25, 34569}
+ stAdapter.SetClockData(nil, clockData)
+
+ clock := NewVClockWithMockServices(stAdapter, sysClock, nil)
+
+ clock.runClockCheck(nil)
+ expectedSystemTimeAtBoot := sysTs.UnixNano() - constElapsedTime
+ verifyClockData(t, stAdapter, 25, expectedSystemTimeAtBoot, constElapsedTime)
+}
+
+// Setup: ClockData present, system clock gets a skew of 10 seconds
+// Result: A new ClockData value gets set with new elapsed time and skew,
+// system boot time remains the same.
+func TestRunClockCheck3(t *testing.T) {
+ sysTs := time.Now()
+ sysClock := MockSystemClock(sysTs, time.Duration(constElapsedTime))
+ stAdapter := MockStorageAdapter()
+
+ bootTs := sysTs.Add(time.Duration(-constElapsedTime))
+ oldSkew := 25 * time.Second
+ clockData := &ClockData{bootTs.UnixNano(), oldSkew.Nanoseconds(), 40}
+ stAdapter.SetClockData(nil, clockData)
+
+ clock := NewVClockWithMockServices(stAdapter, sysClock, nil)
+
+ // introduce a change in sys clock
+ extraSkew := 10 * time.Second // moves clock closer to UTC
+ changedSysTs := sysTs.Add(extraSkew)
+ sysClock.SetNow(changedSysTs)
+ newSkew := 15 * time.Second
+
+ clock.runClockCheck(nil)
+ expectedSystemTimeAtBoot := bootTs.UnixNano() + extraSkew.Nanoseconds()
+ verifyClockData(t, stAdapter, newSkew.Nanoseconds(), expectedSystemTimeAtBoot, constElapsedTime)
+}
+
+func TestWithRealSysClock(t *testing.T) {
+ stAdapter := MockStorageAdapter()
+ clock := NewVClockWithMockServices(stAdapter, nil, nil)
+
+ writeNewClockData(nil, clock, 0)
+
+ // Verify if clock data was written to StorageAdapter
+ clockData := &ClockData{}
+ if err := stAdapter.GetClockData(nil, clockData); err != nil {
+ t.Errorf("Expected to find clockData, received error: %v", err)
+ }
+
+ // Verify that calling checkSystemRebooted() does nothing
+ if checkSystemRebooted(nil, clock) {
+ t.Error("Unexpected return value")
+ }
+
+ // sleep for 1 second more than the skew threshold
+ time.Sleep(1800 * time.Millisecond)
+
+ // Verify that calling runClockCheck() only updates elapsed time
+ clock.runClockCheck(nil)
+ newClockData := &ClockData{}
+ if err := stAdapter.GetClockData(nil, newClockData); err != nil {
+ t.Errorf("Expected to find clockData, received error: %v", err)
+ }
+ if newClockData.Skew != clockData.Skew {
+ t.Errorf("Unexpected value for skew: %d", newClockData.Skew)
+ }
+ if newClockData.ElapsedTimeSinceBoot <= clockData.ElapsedTimeSinceBoot {
+ t.Errorf("Unexpected value for elapsed time: %d",
+ newClockData.ElapsedTimeSinceBoot)
+ }
+ if newClockData.SystemTimeAtBoot != clockData.SystemTimeAtBoot {
+ t.Errorf("SystemTimeAtBoot expected: %d, found: %d",
+ clockData.SystemTimeAtBoot, newClockData.SystemTimeAtBoot)
+ }
+}
+
+func verifyClockData(t *testing.T, stAdapter StorageAdapter, skew int64,
+ sysTimeAtBoot int64, elapsedTime int64) {
+ // verify ClockData
+ clockData := &ClockData{}
+ if err := stAdapter.GetClockData(nil, clockData); err != nil {
+ t.Errorf("Expected to find clockData, found error: %v", err)
+ }
+
+ if clockData.Skew != skew {
+ t.Errorf("Expected value for skew: %d, found: %d", skew, clockData.Skew)
+ }
+ if clockData.ElapsedTimeSinceBoot != elapsedTime {
+ t.Errorf("Expected value for elapsed time: %d, found: %d", elapsedTime,
+ clockData.ElapsedTimeSinceBoot)
+ }
+ if clockData.SystemTimeAtBoot != sysTimeAtBoot {
+ t.Errorf("Expected value for SystemTimeAtBoot: %d, found: %d",
+ sysTimeAtBoot, clockData.SystemTimeAtBoot)
+ }
+}
diff --git a/x/ref/services/syncbase/clock/ntp.go b/x/ref/services/syncbase/clock/ntp.go
new file mode 100644
index 0000000..ba55322
--- /dev/null
+++ b/x/ref/services/syncbase/clock/ntp.go
@@ -0,0 +1,151 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "fmt"
+ "net"
+ "time"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+)
+
+const (
+ udp = "udp"
+ port = "123"
+)
+
+var _ NtpSource = (*ntpSourceImpl)(nil)
+
+func NewNtpSource(clock SystemClock) NtpSource {
+ return &ntpSourceImpl{util.NtpServerPool, clock}
+}
+
+type ntpSourceImpl struct {
+ ntpHost string
+ sc SystemClock
+}
+
+// NtpSync samples data from NTP server and returns the one which has the lowest
+// network delay. The sample with lowest network delay will have the least error
+// in computation of the offset.
+// Param sampleCount is the number of samples this method will fetch.
+func (ns *ntpSourceImpl) NtpSync(sampleCount int) (*NtpData, error) {
+ var canonicalSample *NtpData = nil
+ for i := 0; i < sampleCount; i++ {
+ if sample, err := ns.sample(); err == nil {
+ if (canonicalSample == nil) || (sample.delay < canonicalSample.delay) {
+ canonicalSample = sample
+ }
+ }
+ }
+ if canonicalSample == nil {
+ err := fmt.Errorf("Failed to get any sample from NTP server: %s", ns.ntpHost)
+ return nil, err
+ }
+ return canonicalSample, nil
+}
+
+// Sample connects to an NTP server and returns NtpData containing the clock
+// offset and the network delay experienced while talking to the server.
+//
+// NTP protocol involves sending a request of size 48 bytes with the first
+// byte containing protocol version and mode and the last 8 bytes containing
+// transmit timestamp. The current NTP version is 4. A response from NTP server
+// contains original timestamp (client's transmit timestamp from request) from
+// bytes 24 to 31, server's receive timestamp from byte 32 to 39 and server's
+// transmit time from byte 40 to 47. Client can register the response receive
+// time as soon it receives a response from server.
+// Based on the 4 timestamps the client can compute the offset between the
+// two clocks and the roundtrip network delay for the request.
+func (ns *ntpSourceImpl) sample() (*NtpData, error) {
+ raddr, err := net.ResolveUDPAddr(udp, ns.ntpHost+":"+port)
+ if err != nil {
+ return nil, err
+ }
+
+ con, err := net.DialUDP("udp", nil, raddr)
+ if err != nil {
+ return nil, err
+ }
+ defer con.Close()
+
+ msg := ns.createRequest()
+ _, err = con.Write(msg)
+ if err != nil {
+ return nil, err
+ }
+
+ con.SetDeadline(time.Now().Add(5 * time.Second))
+ _, err = con.Read(msg)
+ if err != nil {
+ return nil, err
+ }
+
+ clientReceiveTs := ns.sc.Now()
+ clientTransmitTs := extractTime(msg[24:32])
+ serverReceiveTs := extractTime(msg[32:40])
+ serverTransmitTs := extractTime(msg[40:48])
+
+ // Following code extracts the clock offset and network delay based on the
+ // transmit and receive timestamps on the client and the server as per
+ // the formula explained at http://www.eecis.udel.edu/~mills/time.html
+ data := NtpData{}
+ data.offset = (serverReceiveTs.Sub(clientTransmitTs) + serverTransmitTs.Sub(clientReceiveTs)) / 2
+ data.delay = clientReceiveTs.Sub(clientTransmitTs) - serverTransmitTs.Sub(serverReceiveTs)
+
+ return &data, nil
+}
+
+func (ns *ntpSourceImpl) createRequest() []byte {
+ data := make([]byte, 48)
+ data[0] = 0x23 // protocol version = 4, mode = 3 (Client)
+
+ // For NTP the prime epoch, or base date of era 0, is 0 h 1 January 1900 UTC
+ t0 := time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC)
+ tnow := ns.sc.Now()
+ d := tnow.Sub(t0)
+ nsec := d.Nanoseconds()
+
+ // The encoding of timestamp below is an exact opposite of the decoding
+ // being done in extractTime(). Refer extractTime() for more explaination.
+ sec := nsec / 1e9 // Integer part of seconds since epoch
+ frac := ((nsec % 1e9) << 32) / 1e9 // fractional part of seconds since epoch
+
+ // write the timestamp to Transmit Timestamp section of request.
+ data[43] = byte(sec)
+ data[42] = byte(sec >> 8)
+ data[41] = byte(sec >> 16)
+ data[40] = byte(sec >> 24)
+
+ data[47] = byte(frac)
+ data[46] = byte(frac >> 8)
+ data[45] = byte(frac >> 16)
+ data[44] = byte(frac >> 24)
+ return data
+}
+
+// ExtractTime takes a byte array which contains encoded timestamp from NTP
+// server starting at the 0th byte and is 8 bytes long. The encoded timestamp is
+// in seconds since 1900. The first 4 bytes contain the integer part of of the
+// seconds while the last 4 bytes contain the fractional part of the seconds
+// where (FFFFFFFF + 1) represents 1 second while 00000001 represents 2^(-32) of
+// a second.
+func extractTime(data []byte) time.Time {
+ var sec, frac uint64
+ sec = uint64(data[3]) | uint64(data[2])<<8 | uint64(data[1])<<16 | uint64(data[0])<<24
+ frac = uint64(data[7]) | uint64(data[6])<<8 | uint64(data[5])<<16 | uint64(data[4])<<24
+
+ // multiply the integral second part with 1Billion to convert to nanoseconds
+ nsec := sec * 1e9
+ // multiply frac part with 2^(-32) to get the correct value in seconds and
+ // then multiply with 1Billion to convert to nanoseconds. The multiply by
+ // Billion is done first to make sure that we dont loose precision.
+ nsec += (frac * 1e9) >> 32
+
+ t := time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC).Add(time.Duration(nsec)).Local()
+
+ return t
+}
diff --git a/x/ref/services/syncbase/clock/ntpservice.go b/x/ref/services/syncbase/clock/ntpservice.go
new file mode 100644
index 0000000..79045f3
--- /dev/null
+++ b/x/ref/services/syncbase/clock/ntpservice.go
@@ -0,0 +1,46 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "math"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/v23/context"
+ "v.io/v23/verror"
+ "v.io/x/lib/vlog"
+)
+
+// runNtpCheck talks to an NTP server, fetches the current UTC time from it
+// and corrects VClock time.
+func (c *VClock) runNtpCheck(ctx *context.T) error {
+ ntpData, err := c.ntpSource.NtpSync(util.NtpSampleCount)
+ if err != nil {
+ vlog.Errorf("Error while fetching ntp time: %v", err)
+ return err
+ }
+ offset := ntpData.offset
+
+ data := &ClockData{}
+ if err := c.sa.GetClockData(ctx, data); err != nil {
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ // No ClockData found, write a new one.
+ writeNewClockData(ctx, c, offset)
+ return nil
+ }
+ vlog.Info("Error while fetching clock data: %v", err)
+ vlog.Info("Overwriting clock data with NTP")
+ writeNewClockData(ctx, c, offset)
+ return nil
+ }
+
+ // Update clock skew if the difference between offset and skew is larger
+ // than NtpDiffThreshold. NtpDiffThreshold helps avoid constant tweaking of
+ // the syncbase clock.
+ if math.Abs(float64(offset.Nanoseconds() - data.Skew)) > util.NtpDiffThreshold {
+ writeNewClockData(ctx, c, offset)
+ }
+ return nil
+}
diff --git a/x/ref/services/syncbase/clock/ntpservice_test.go b/x/ref/services/syncbase/clock/ntpservice_test.go
new file mode 100644
index 0000000..e505e01
--- /dev/null
+++ b/x/ref/services/syncbase/clock/ntpservice_test.go
@@ -0,0 +1,192 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "net"
+ "testing"
+ "time"
+)
+
+func TestWithMockNtpForErr(t *testing.T) {
+ sysClock := MockSystemClock(time.Now(), 0)
+ stAdapter := MockStorageAdapter()
+ ntpSource := MockNtpSource()
+ ntpSource.Err = net.UnknownNetworkError("network err")
+
+ vclock := NewVClockWithMockServices(stAdapter, sysClock, ntpSource)
+
+ if err := vclock.runNtpCheck(nil); err == nil {
+ t.Error("Network error expected but not found")
+ }
+
+ if stAdapter.clockData != nil {
+ t.Error("Non-nil clock data found.")
+ }
+}
+
+func TestWithMockNtpForDiffBelowThreshold(t *testing.T) {
+ sysClock := MockSystemClock(time.Now(), 0) // not used
+ stAdapter := MockStorageAdapter()
+ originalData := NewClockData(0)
+ stAdapter.SetClockData(nil, &originalData)
+
+ ntpSource := MockNtpSource()
+ offset := 1800 * time.Millisecond // error threshold is 2 seconds
+ ntpSource.Data = &NtpData{offset: offset, delay: 5 * time.Millisecond}
+
+ vclock := NewVClockWithMockServices(stAdapter, sysClock, ntpSource)
+ if err := vclock.runNtpCheck(nil); err != nil {
+ t.Errorf("Unexpected err: %v", err)
+ }
+ if isClockDataChanged(stAdapter, &originalData) {
+ t.Error("ClockData expected to be unchanged but found updated")
+ }
+}
+
+func TestWithMockNtpForDiffAboveThreshold(t *testing.T) {
+ sysTs := time.Now()
+ elapsedTime := 10 * time.Minute
+ sysClock := MockSystemClock(sysTs, elapsedTime)
+
+ stAdapter := MockStorageAdapter()
+ originalData := NewClockData(0)
+ stAdapter.SetClockData(nil, &originalData)
+
+ ntpSource := MockNtpSource()
+ skew := 2100 * time.Millisecond // error threshold is 2 seconds
+ ntpSource.Data = &NtpData{offset: skew, delay: 5 * time.Millisecond}
+
+ vclock := NewVClockWithMockServices(stAdapter, sysClock, ntpSource)
+ if err := vclock.runNtpCheck(nil); err != nil {
+ t.Errorf("Unexpected err: %v", err)
+ }
+ if !isClockDataChanged(stAdapter, &originalData) {
+ t.Error("ClockData expected to be updated but found unchanged")
+ }
+ expectedBootTime := sysTs.Add(-elapsedTime).UnixNano()
+ if stAdapter.clockData.Skew != skew.Nanoseconds() {
+ t.Errorf("Skew expected to be %d but found %d",
+ skew.Nanoseconds(), stAdapter.clockData.Skew)
+ }
+ if stAdapter.clockData.ElapsedTimeSinceBoot != elapsedTime.Nanoseconds() {
+ t.Errorf("ElapsedTime expected to be %d but found %d",
+ elapsedTime.Nanoseconds(), stAdapter.clockData.ElapsedTimeSinceBoot)
+ }
+ if stAdapter.clockData.SystemTimeAtBoot != expectedBootTime {
+ t.Errorf("Skew expected to be %d but found %d",
+ expectedBootTime, stAdapter.clockData.SystemTimeAtBoot)
+ }
+}
+
+func TestWithMockNtpForDiffBelowThresholdAndExistingLargeSkew(t *testing.T) {
+ sysTs := time.Now()
+ elapsedTime := 10 * time.Minute
+ sysClock := MockSystemClock(sysTs, elapsedTime)
+
+ stAdapter := MockStorageAdapter()
+ originalData := NewClockData(2300 * time.Millisecond.Nanoseconds()) // large skew
+ stAdapter.SetClockData(nil, &originalData)
+
+ ntpSource := MockNtpSource()
+ skew := 200 * time.Millisecond // error threshold is 2 seconds
+ ntpSource.Data = &NtpData{offset: skew, delay: 5 * time.Millisecond}
+
+ vclock := NewVClockWithMockServices(stAdapter, sysClock, ntpSource)
+ if err := vclock.runNtpCheck(nil); err != nil {
+ t.Errorf("Unexpected err: %v", err)
+ }
+ if !isClockDataChanged(stAdapter, &originalData) {
+ t.Error("ClockData expected to be updated but found unchanged")
+ }
+ expectedBootTime := sysTs.Add(-elapsedTime).UnixNano()
+ if stAdapter.clockData.Skew != skew.Nanoseconds() {
+ t.Errorf("Skew expected to be %d but found %d",
+ skew.Nanoseconds(), stAdapter.clockData.Skew)
+ }
+ if stAdapter.clockData.ElapsedTimeSinceBoot != elapsedTime.Nanoseconds() {
+ t.Errorf("ElapsedTime expected to be %d but found %d",
+ elapsedTime.Nanoseconds(), stAdapter.clockData.ElapsedTimeSinceBoot)
+ }
+ if stAdapter.clockData.SystemTimeAtBoot != expectedBootTime {
+ t.Errorf("Skew expected to be %d but found %d",
+ expectedBootTime, stAdapter.clockData.SystemTimeAtBoot)
+ }
+}
+
+func TestWithMockNtpForDiffBelowThresholdWithNoStoredClockData(t *testing.T) {
+ sysTs := time.Now()
+ elapsedTime := 10 * time.Minute
+ sysClock := MockSystemClock(sysTs, elapsedTime)
+
+ stAdapter := MockStorageAdapter() // no skew data stored
+
+ ntpSource := MockNtpSource()
+ skew := 200 * time.Millisecond // error threshold is 2 seconds
+ ntpSource.Data = &NtpData{offset: skew, delay: 5 * time.Millisecond}
+
+ vclock := NewVClockWithMockServices(stAdapter, sysClock, ntpSource)
+ if err := vclock.runNtpCheck(nil); err != nil {
+ t.Errorf("Unexpected err: %v", err)
+ }
+ if !isClockDataChanged(stAdapter, nil) {
+ t.Error("ClockData expected to be updated but found unchanged")
+ }
+ expectedBootTime := sysTs.Add(-elapsedTime).UnixNano()
+ if stAdapter.clockData.Skew != skew.Nanoseconds() {
+ t.Errorf("Skew expected to be %d but found %d",
+ skew.Nanoseconds(), stAdapter.clockData.Skew)
+ }
+ if stAdapter.clockData.ElapsedTimeSinceBoot != elapsedTime.Nanoseconds() {
+ t.Errorf("ElapsedTime expected to be %d but found %d",
+ elapsedTime.Nanoseconds(), stAdapter.clockData.ElapsedTimeSinceBoot)
+ }
+ if stAdapter.clockData.SystemTimeAtBoot != expectedBootTime {
+ t.Errorf("Skew expected to be %d but found %d",
+ expectedBootTime, stAdapter.clockData.SystemTimeAtBoot)
+ }
+}
+
+/*
+Following two tests are commented out as they hit the real NTP servers
+and can resut into being flaky if the clock of the machine running continuous
+test has a skew more than 2 seconds.
+
+func TestWithRealNtp(t *testing.T) {
+ stAdapter := MockStorageAdapter()
+ originalData := NewClockData(100 * time.Millisecond.Nanoseconds()) // small skew
+ stAdapter.SetClockData(nil, &originalData)
+ vclock := NewVClockWithMockServices(stAdapter, nil, nil)
+ if err := vclock.runNtpCheck(nil); err != nil {
+ t.Errorf("Unexpected err: %v", err)
+ }
+ if isClockDataChanged(stAdapter, &originalData) {
+ t.Error("ClockData expected to be unchanged but found updated")
+ }
+}
+
+func TestWithRealNtpForNoClockData(t *testing.T) {
+ stAdapter := MockStorageAdapter()
+ vclock := NewVClockWithMockServices(stAdapter, nil, nil)
+ if err := vclock.runNtpCheck(nil); err != nil {
+ t.Errorf("Unexpected err: %v", err)
+ }
+ if !isClockDataChanged(stAdapter, nil) {
+ t.Error("ClockData expected to be updated but found unchanged")
+ }
+}
+*/
+
+func NewClockData(skew int64) ClockData {
+ return ClockData{
+ SystemTimeAtBoot: 0,
+ Skew: skew,
+ ElapsedTimeSinceBoot: 0,
+ }
+}
+
+func isClockDataChanged(stAdapter *storageAdapterMockImpl, originalData *ClockData) bool {
+ return stAdapter.clockData != originalData // check for same pointer
+}
diff --git a/x/ref/services/syncbase/clock/storage_adapter.go b/x/ref/services/syncbase/clock/storage_adapter.go
new file mode 100644
index 0000000..33b4cda
--- /dev/null
+++ b/x/ref/services/syncbase/clock/storage_adapter.go
@@ -0,0 +1,33 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/context"
+)
+
+var _ StorageAdapter = (*storageAdapterImpl)(nil)
+
+func NewStorageAdapter(st store.Store) StorageAdapter {
+ return &storageAdapterImpl{st}
+}
+
+type storageAdapterImpl struct {
+ st store.Store
+}
+
+func (sa *storageAdapterImpl) GetClockData(ctx *context.T, data *ClockData) error {
+ return util.Get(ctx, sa.st, clockDataKey(), data)
+}
+
+func (sa *storageAdapterImpl) SetClockData(ctx *context.T, data *ClockData) error {
+ return util.Put(ctx, sa.st, clockDataKey(), data)
+}
+
+func clockDataKey() string {
+ return util.ClockPrefix
+}
diff --git a/x/ref/services/syncbase/clock/test_util.go b/x/ref/services/syncbase/clock/test_util.go
new file mode 100644
index 0000000..6027f49
--- /dev/null
+++ b/x/ref/services/syncbase/clock/test_util.go
@@ -0,0 +1,119 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+// Utilities for testing clock.
+
+import (
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+)
+
+/////////////////////////////////////////////////
+// Mock for StorageAdapter
+
+var _ StorageAdapter = (*storageAdapterMockImpl)(nil)
+
+func MockStorageAdapter() *storageAdapterMockImpl {
+ return &storageAdapterMockImpl{}
+}
+
+type storageAdapterMockImpl struct {
+ clockData *ClockData
+ err error
+}
+
+func (sa *storageAdapterMockImpl) GetClockData(ctx *context.T, data *ClockData) error {
+ if sa.err != nil {
+ return sa.err
+ }
+ if sa.clockData == nil {
+ return verror.NewErrNoExist(ctx)
+ }
+ *data = *sa.clockData
+ return nil
+}
+
+func (sa *storageAdapterMockImpl) SetClockData(ctx *context.T, data *ClockData) error {
+ if sa.err != nil {
+ return sa.err
+ }
+ sa.clockData = data
+ return nil
+}
+
+func (sa *storageAdapterMockImpl) SetError(err error) {
+ sa.err = err
+}
+
+/////////////////////////////////////////////////
+// Mock for SystemClock
+
+var _ SystemClock = (*systemClockMockImpl)(nil)
+
+func MockSystemClock(now time.Time, elapsedTime time.Duration) *systemClockMockImpl {
+ return &systemClockMockImpl{
+ now: now,
+ elapsedTime: elapsedTime,
+ }
+}
+
+type systemClockMockImpl struct {
+ now time.Time
+ elapsedTime time.Duration
+}
+
+func (sc *systemClockMockImpl) Now() time.Time {
+ return sc.now
+}
+
+func (sc *systemClockMockImpl) SetNow(now time.Time) {
+ sc.now = now
+}
+
+func (sc *systemClockMockImpl) ElapsedTime() (time.Duration, error) {
+ return sc.elapsedTime, nil
+}
+
+func (sc *systemClockMockImpl) SetElapsedTime(elapsed time.Duration) {
+ sc.elapsedTime = elapsed
+}
+
+/////////////////////////////////////////////////
+// Mock for NtpSource
+
+var _ NtpSource = (*ntpSourceMockImpl)(nil)
+
+func MockNtpSource() *ntpSourceMockImpl {
+ return &ntpSourceMockImpl{}
+}
+
+type ntpSourceMockImpl struct {
+ Err error
+ Data *NtpData
+}
+
+func (ns *ntpSourceMockImpl) NtpSync(sampleCount int) (*NtpData, error) {
+ if ns.Err != nil {
+ return nil, ns.Err
+ }
+ return ns.Data, nil
+}
+
+func NewVClockWithMockServices(sa StorageAdapter, sc SystemClock, ns NtpSource) *VClock {
+ if sc == nil {
+ sc = newSystemClock()
+ }
+ if ns == nil {
+ ns = NewNtpSource(sc)
+ }
+ return &VClock{
+ clock: sc,
+ sa: sa,
+ ntpSource: ns,
+ }
+}
diff --git a/x/ref/services/syncbase/clock/types.go b/x/ref/services/syncbase/clock/types.go
index 652909b..6934f11 100644
--- a/x/ref/services/syncbase/clock/types.go
+++ b/x/ref/services/syncbase/clock/types.go
@@ -6,14 +6,48 @@
import (
"time"
+
+ "v.io/v23/context"
)
// This interface provides a wrapper over system clock to allow easy testing
-// of VClock and other code that uses timestamps. Tests can implement a mock
-// SystemClock and set it on VClock using SetSystemClock() method.
+// of VClock and other code that uses timestamps.
type SystemClock interface {
// Now returns the current UTC time as known by the system.
- // This may not reflect the real UTC time if the system clock is out of
- // sync with UTC.
+ // This may not reflect the NTP time if the system clock is out of
+ // sync with NTP.
Now() time.Time
+
+ // ElapsedTime returns a duration representing the time elapsed since the device
+ // rebooted.
+ ElapsedTime() (time.Duration, error)
+}
+
+type StorageAdapter interface {
+ GetClockData(ctx *context.T, data *ClockData) error
+ SetClockData(ctx *context.T, data *ClockData) error
+}
+
+type NtpSource interface {
+ // NtpSync obtains NtpData samples from an NTP server and returns the one
+ // which has the lowest network delay.
+ // Param sampleCount is the number of samples this method will fetch.
+ // NtpData contains the clock offset and the network delay experienced while
+ // talking to the server.
+ NtpSync(sampleCount int) (*NtpData, error)
+}
+
+type NtpData struct {
+ // Offset is the difference between the NTP time and the system clock.
+ // Adding offset to system clock will give estimated NTP time.
+ offset time.Duration
+
+ // Delay is the round trip network delay experienced while talking to NTP
+ // server. The smaller the delay, the more accurate the offset is.
+ delay time.Duration
+}
+
+func (cd *ClockData) SystemBootTime() time.Time {
+ ns := time.Second.Nanoseconds()
+ return time.Unix(cd.SystemTimeAtBoot/ns, cd.SystemTimeAtBoot%ns)
}
diff --git a/x/ref/services/syncbase/clock/types.vdl b/x/ref/services/syncbase/clock/types.vdl
new file mode 100644
index 0000000..b12f8f9
--- /dev/null
+++ b/x/ref/services/syncbase/clock/types.vdl
@@ -0,0 +1,20 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+// ClockData is the persistent state of syncbase clock used to estimate current
+// NTP time and catch any unexpected changes to system clock.
+type ClockData struct {
+ // UTC time in unix nano seconds obtained from system clock at boot.
+ SystemTimeAtBoot int64
+
+ // Skew between the system clock and NTP time.
+ Skew int64
+
+ // The elapsed time since boot as last seen during a run of clockservice.
+ // This is used to determine if the device rebooted since the last run of
+ // clockservice.
+ ElapsedTimeSinceBoot int64
+}
\ No newline at end of file
diff --git a/x/ref/services/syncbase/clock/types.vdl.go b/x/ref/services/syncbase/clock/types.vdl.go
new file mode 100644
index 0000000..4749d42
--- /dev/null
+++ b/x/ref/services/syncbase/clock/types.vdl.go
@@ -0,0 +1,35 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+package clock
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+)
+
+// ClockData is the persistent state of syncbase clock used to estimate current
+// NTP time and catch any unexpected changes to system clock.
+type ClockData struct {
+ // UTC time in unix nano seconds obtained from system clock at boot.
+ SystemTimeAtBoot int64
+ // Skew between the system clock and NTP time.
+ Skew int64
+ // The elapsed time since boot as last seen during a run of clockservice.
+ // This is used to determine if the device rebooted since the last run of
+ // clockservice.
+ ElapsedTimeSinceBoot int64
+}
+
+func (ClockData) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/clock.ClockData"`
+}) {
+}
+
+func init() {
+ vdl.Register((*ClockData)(nil))
+}
diff --git a/x/ref/services/syncbase/clock/vclock.go b/x/ref/services/syncbase/clock/vclock.go
index 6da96e3..95719f4 100644
--- a/x/ref/services/syncbase/clock/vclock.go
+++ b/x/ref/services/syncbase/clock/vclock.go
@@ -6,52 +6,68 @@
import (
"time"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/context"
+ "v.io/v23/verror"
+ "v.io/x/lib/vlog"
)
// VClock holds data required to provide an estimate of the UTC time at any
// given point. The fields contained in here are
-// - systemTimeAtBoot : the time shown by the system clock at boot
-// - utcTimeAtBoot : the estimated UTC time when the system booted
-// - skew : the difference between the system clock and UTC time
+// - systemTimeAtBoot : the time shown by the system clock at boot.
+// - skew : the difference between the system clock and UTC time.
// - clock : Instance of clock.SystemClock interface providing access
// to the system time.
+// - sa : adapter for storage of clock data.
+// - ntpSource : source for fetching NTP data.
type VClock struct {
systemTimeAtBoot time.Time
- utcTimeAtBoot time.Time
skew time.Duration
clock SystemClock
+ sa StorageAdapter
+ ntpSource NtpSource
}
-func NewVClock() *VClock {
+func NewVClock(st store.Store) *VClock {
+ sysClock := newSystemClock()
return &VClock{
- clock: NewSystemClock(),
+ clock: sysClock,
+ sa: NewStorageAdapter(st),
+ ntpSource: NewNtpSource(sysClock),
}
}
// Now returns current UTC time based on the estimation of skew that
-// the system clock has with respect to NTP.
-func (c *VClock) Now() time.Time {
- // This method returns just the current system time for now.
- // TODO(jlodhia): implement estimation of UTC time.
- return c.clock.Now()
-}
-
-// This method allows tests to set a mock clock instance for testability.
-func (c *VClock) SetSystemClock(sysClock SystemClock) {
- c.clock = sysClock
+// the system clock has with respect to NTP time.
+func (c *VClock) Now(ctx *context.T) time.Time {
+ clockData := &ClockData{}
+ if err := c.sa.GetClockData(ctx, clockData); err != nil {
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ // VClock's cron job to setup UTC time at boot has not been run yet.
+ vlog.Error("No ClockData found while creating a timestamp")
+ } else {
+ vlog.Errorf("Error while fetching clock data: %v", err)
+ }
+ vlog.Error("Returning current system clock time")
+ return c.clock.Now()
+ }
+ skew := time.Duration(clockData.Skew)
+ return c.clock.Now().Add(skew)
}
///////////////////////////////////////////////////
-// Implementation for SystemClock
+// Implementation for SystemClock.
type systemClockImpl struct{}
+// Returns system time in UTC.
func (sc *systemClockImpl) Now() time.Time {
return time.Now().UTC()
}
var _ SystemClock = (*systemClockImpl)(nil)
-func NewSystemClock() SystemClock {
+func newSystemClock() SystemClock {
return &systemClockImpl{}
}
diff --git a/x/ref/services/syncbase/clock/vclock_test.go b/x/ref/services/syncbase/clock/vclock_test.go
index 4221feb..bf92f5c 100644
--- a/x/ref/services/syncbase/clock/vclock_test.go
+++ b/x/ref/services/syncbase/clock/vclock_test.go
@@ -7,33 +7,63 @@
import (
"testing"
"time"
-)
-type systemClockMockImpl struct {
- now time.Time
-}
-
-func (sc *systemClockMockImpl) Now() time.Time {
- return sc.now
-}
-
-func (sc *systemClockMockImpl) setNow(now time.Time) {
- sc.now = now
-}
-
-var (
- _ SystemClock = (*systemClockImpl)(nil)
+ "v.io/v23/verror"
)
func TestVClock(t *testing.T) {
- clock := NewVClock()
- sysClock := &systemClockMockImpl{}
- writeTs := time.Now()
- sysClock.setNow(writeTs)
- clock.SetSystemClock(sysClock)
+ sysTs := time.Now()
+ sysClock := MockSystemClock(sysTs, 0)
+ stAdapter := MockStorageAdapter()
+ stAdapter.SetClockData(nil, &ClockData{0, 0, 0})
+ clock := NewVClockWithMockServices(stAdapter, sysClock, nil)
- ts := clock.Now()
- if ts != writeTs {
- t.Errorf("timestamp expected to be %q but found to be %q", writeTs, ts)
+ ts := clock.Now(nil)
+ if ts != sysTs {
+ t.Errorf("timestamp expected to be %q but found to be %q", sysTs, ts)
+ }
+}
+
+func TestVClockWithSkew(t *testing.T) {
+ // test with positive skew
+ checkSkew(t, 5)
+ // test with negative skew
+ checkSkew(t, -5)
+}
+
+func checkSkew(t *testing.T, skew int64) {
+ sysTs := time.Now()
+ sysClock := MockSystemClock(sysTs, 0)
+
+ var elapsedTime int64 = 100
+ stAdapter := MockStorageAdapter()
+ bootTime := sysTs.UnixNano() - elapsedTime
+ clockData := ClockData{bootTime, skew, elapsedTime}
+ stAdapter.SetClockData(nil, &clockData)
+
+ clock := NewVClockWithMockServices(stAdapter, sysClock, nil)
+
+ ts := clock.Now(nil)
+ if ts == sysTs {
+ t.Errorf("timestamp expected to be %q but found to be %q", sysTs, ts)
+ }
+ if ts.UnixNano() != (sysTs.UnixNano() + skew) {
+ t.Errorf("Unexpected vclock timestamp. vclock: %v, sysclock: %v, skew: %v", ts, sysTs, skew)
+ }
+}
+
+func TestVClockWithInternalErr(t *testing.T) {
+ sysTs := time.Now()
+ sysClock := MockSystemClock(sysTs, 0)
+
+ stAdapter := MockStorageAdapter()
+ stAdapter.SetError(verror.NewErrInternal(nil))
+
+ clock := NewVClockWithMockServices(stAdapter, sysClock, nil)
+
+ // Internal err should result in vclock falling back to the system clock.
+ ts := clock.Now(nil)
+ if ts != sysTs {
+ t.Errorf("timestamp expected to be %q but found to be %q", sysTs, ts)
}
}
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 94b37d8..2818a8c 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -15,6 +15,7 @@
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase/nosql/query_db"
"v.io/syncbase/v23/syncbase/nosql/query_exec"
+ "v.io/syncbase/x/ref/services/syncbase/clock"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -82,7 +83,8 @@
if err != nil {
return nil, err
}
- st, err = watchable.Wrap(st, &watchable.Options{
+ vclock := clock.NewVClock(a.Service().St())
+ st, err = watchable.Wrap(st, vclock, &watchable.Options{
ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
})
if err != nil {
diff --git a/x/ref/services/syncbase/server/util/constants.go b/x/ref/services/syncbase/server/util/constants.go
index 7d25d3b..ab2e401 100644
--- a/x/ref/services/syncbase/server/util/constants.go
+++ b/x/ref/services/syncbase/server/util/constants.go
@@ -4,11 +4,16 @@
package util
+import (
+ "time"
+)
+
// TODO(sadovsky): Consider using shorter strings.
// Constants related to storage engine keys.
const (
AppPrefix = "$app"
+ ClockPrefix = "$clock"
DatabasePrefix = "$database"
DbInfoPrefix = "$dbInfo"
LogPrefix = "$log"
@@ -33,3 +38,14 @@
// TODO(rogulenko): Change this constant to something out of the UTF8 space.
PrefixRangeLimitSuffix = "~"
)
+
+// Constants related to syncbase clock.
+const (
+ // The pool.ntp.org project is a big virtual cluster of timeservers
+ // providing reliable easy to use NTP service for millions of clients.
+ // See more at http://www.pool.ntp.org/en/
+ NtpServerPool = "pool.ntp.org"
+ NtpSampleCount = 15
+ LocalClockDriftThreshold = float64(time.Second)
+ NtpDiffThreshold = float64(2 * time.Second)
+)
diff --git a/x/ref/services/syncbase/server/watchable/store.go b/x/ref/services/syncbase/server/watchable/store.go
index 72eebf2..0b19678 100644
--- a/x/ref/services/syncbase/server/watchable/store.go
+++ b/x/ref/services/syncbase/server/watchable/store.go
@@ -39,7 +39,7 @@
}
// Wrap returns a watchable.Store that wraps the given store.Store.
-func Wrap(st store.Store, opts *Options) (Store, error) {
+func Wrap(st store.Store, vclock *clock.VClock, opts *Options) (Store, error) {
seq, err := getNextLogSeq(st)
if err != nil {
return nil, err
@@ -49,7 +49,7 @@
watcher: newWatcher(),
opts: opts,
seq: seq,
- clock: clock.NewVClock(),
+ clock: vclock,
}, nil
}
diff --git a/x/ref/services/syncbase/server/watchable/store_test.go b/x/ref/services/syncbase/server/watchable/store_test.go
index 07b73c4..8c1c370 100644
--- a/x/ref/services/syncbase/server/watchable/store_test.go
+++ b/x/ref/services/syncbase/server/watchable/store_test.go
@@ -8,6 +8,7 @@
"runtime"
"testing"
+ "v.io/syncbase/x/ref/services/syncbase/clock"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/test"
)
@@ -64,7 +65,8 @@
func runTest(t *testing.T, mp []string, f func(t *testing.T, st store.Store)) {
st, destroy := createStore()
defer destroy()
- st, err := Wrap(st, &Options{ManagedPrefixes: mp})
+ vClock := clock.NewVClockWithMockServices(clock.MockStorageAdapter(), nil, nil)
+ st, err := Wrap(st, vClock, &Options{ManagedPrefixes: mp})
if err != nil {
t.Fatal(err)
}
diff --git a/x/ref/services/syncbase/server/watchable/test_util.go b/x/ref/services/syncbase/server/watchable/test_util.go
index cfb254c..e14854a 100644
--- a/x/ref/services/syncbase/server/watchable/test_util.go
+++ b/x/ref/services/syncbase/server/watchable/test_util.go
@@ -75,11 +75,6 @@
return wst.seq
}
-func setMockSystemClock(st Store, mockClock clock.SystemClock) {
- wst := st.(*wstore)
- wst.clock.SetSystemClock(mockClock)
-}
-
// logEntryReader provides a stream-like interface to scan over the log entries
// of a single batch, starting for a given sequence number. It opens a stream
// that scans the log from the sequence number given. It stops after reading
@@ -142,4 +137,8 @@
return now
}
+func (sc *mockSystemClock) ElapsedTime() (time.Duration, error) {
+ return sc.increment, nil
+}
+
var _ clock.SystemClock = (*mockSystemClock)(nil)
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 0419e7c..8a67f8f 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -139,7 +139,7 @@
return verror.New(verror.ErrInternal, nil, "seq maxed out")
}
// Write LogEntry records.
- timestamp := tx.st.clock.Now().UnixNano()
+ timestamp := tx.st.clock.Now(nil).UnixNano()
seq := tx.st.seq
for i, op := range tx.ops {
key := logEntryKey(seq)
diff --git a/x/ref/services/syncbase/server/watchable/transaction_test.go b/x/ref/services/syncbase/server/watchable/transaction_test.go
index 8b95536..5fcdf94 100644
--- a/x/ref/services/syncbase/server/watchable/transaction_test.go
+++ b/x/ref/services/syncbase/server/watchable/transaction_test.go
@@ -12,6 +12,7 @@
"testing"
"time"
+ "v.io/syncbase/x/ref/services/syncbase/clock"
"v.io/syncbase/x/ref/services/syncbase/store"
)
@@ -70,13 +71,14 @@
t1 := time.Now()
inc := time.Duration(1) * time.Second
mockClock := newMockSystemClock(t1, inc)
+ var mockAdapter clock.StorageAdapter = clock.MockStorageAdapter()
- wst1, err := Wrap(ist, &Options{ManagedPrefixes: nil})
+ vclock := clock.NewVClockWithMockServices(mockAdapter, mockClock, nil)
+ wst1, err := Wrap(ist, vclock, &Options{ManagedPrefixes: nil})
if err != nil {
t.Fatalf("Wrap failed: %v", err)
}
seqForCreate := getSeq(wst1)
- setMockSystemClock(wst1, mockClock)
// Create data in store
if err := store.RunInTransaction(wst1, func(tx store.Transaction) error {
@@ -99,11 +101,10 @@
verifyCommitLog(t, ist, seqForCreate, 2, t1)
// Update data already present in store with a new watchable store
- wst2, err := Wrap(ist, &Options{ManagedPrefixes: nil})
+ wst2, err := Wrap(ist, vclock, &Options{ManagedPrefixes: nil})
if err != nil {
t.Fatalf("Wrap failed: %v", err)
}
- setMockSystemClock(wst2, mockClock)
seqForUpdate := getSeq(wst2)
// We expect the sequence number to have moved by +2 for the two puts.
if seqForUpdate != (seqForCreate + 2) {
@@ -139,7 +140,13 @@
func TestOpLogConsistency(t *testing.T) {
ist, destroy := createStore()
defer destroy()
- wst, err := Wrap(ist, &Options{ManagedPrefixes: nil})
+ t1 := time.Now()
+ inc := time.Duration(1) * time.Second
+ mockClock := newMockSystemClock(t1, inc)
+ var mockAdapter clock.StorageAdapter = clock.MockStorageAdapter()
+
+ vclock := clock.NewVClockWithMockServices(mockAdapter, mockClock, nil)
+ wst, err := Wrap(ist, vclock, &Options{ManagedPrefixes: nil})
if err != nil {
t.Fatalf("Wrap failed: %v", err)
}
diff --git a/x/ref/services/syncbase/server/watchable/util_test.go b/x/ref/services/syncbase/server/watchable/util_test.go
index 99c440a..193c06d 100644
--- a/x/ref/services/syncbase/server/watchable/util_test.go
+++ b/x/ref/services/syncbase/server/watchable/util_test.go
@@ -6,13 +6,17 @@
import (
"testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/clock"
)
// TestGetNextLogSeq tests that the getNextLogSeq helper works on range 0..10.
func TestGetNextLogSeq(t *testing.T) {
st, destroy := createStore()
defer destroy()
- st, err := Wrap(st, &Options{})
+ var mockAdapter clock.StorageAdapter = clock.MockStorageAdapter()
+ vclock := clock.NewVClockWithMockServices(mockAdapter, nil, nil)
+ st, err := Wrap(st, vclock, &Options{})
if err != nil {
t.Fatal(err)
}
diff --git a/x/ref/services/syncbase/vsync/test_util.go b/x/ref/services/syncbase/vsync/test_util.go
index a323f19..8328fda 100644
--- a/x/ref/services/syncbase/vsync/test_util.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -14,6 +14,7 @@
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/clock"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -120,7 +121,8 @@
if err != nil {
t.Fatalf("cannot create store %s (%s): %v", engine, dir, err)
}
- st, err = watchable.Wrap(st, &watchable.Options{
+ vclock := clock.NewVClock(st)
+ st, err = watchable.Wrap(st, vclock, &watchable.Options{
ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
})