Timestamp, clock api and bugfix.
1) Add timestamp to values committed via watchable store.
2) Introduce a vanadium clock interface which provides
an estimated UTC timestamp.
3) Fix bug related to sequence number read by Wrap() method
while creating a watchable store. The bug would end up
reassigning the sequence number of the last commited tx
to a new tx to be committed by a newly created wstore
wrapper object.
This change adds a timestamp field to the LogEntry struct
which is stored for each operation that is committed as
part of a transaction using watchable store. Note that keys
that are not managed by store do not have a LogEntry
associated with them and hence will not have a timestamp.
The clock interface is introduced in this CL but for now
the implementation just uses the system time directly. This
will be addressed in the next CL to come.
Change-Id: I4e0e30ace5efc4388bbc9611d59964ac8ad59cc3
diff --git a/services/syncbase/clock/types.go b/services/syncbase/clock/types.go
new file mode 100644
index 0000000..2084eca
--- /dev/null
+++ b/services/syncbase/clock/types.go
@@ -0,0 +1,15 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+// This interface provides a wrapper over system clock to allow easy testing
+// of VClock and other code that uses timestamps. Tests can implement a mock
+// SystemClock and set it on VClock using SetSystemClock() method.
+type SystemClock interface {
+ // Now returns the current UTC time in nanoseconds as known by the system.
+ // This may not reflect the real UTC time if the system clock is out of
+ // sync with UTC.
+ Now() int64
+}
diff --git a/services/syncbase/clock/vclock.go b/services/syncbase/clock/vclock.go
new file mode 100644
index 0000000..1e9acf2
--- /dev/null
+++ b/services/syncbase/clock/vclock.go
@@ -0,0 +1,57 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "time"
+)
+
+// VClock holds data required to provide an estimate of the UTC time at any
+// given point. The fields contained in here are
+// - systemTimeAtBoot : the time shown by the system clock at boot
+// - utcTimeAtBoot : the estimated UTC time when the system booted
+// - skew : the difference between the system clock and UTC time
+// - clock : Instance of clock.SystemClock interface providing access
+// to the system time.
+type VClock struct {
+ systemTimeAtBoot int64
+ utcTimeAtBoot int64
+ skew int64
+ clock SystemClock
+}
+
+func NewVClock() *VClock {
+ return &VClock{
+ clock: NewSystemClock(),
+ }
+}
+
+// Now returns current UTC time based on the estimation of skew that
+// the system clock has with respect to NTP.
+func (c *VClock) Now() int64 {
+ // This method returns just the current system time for now.
+ // TODO(jlodhia): implement estimation of UTC time.
+ return c.clock.Now()
+}
+
+// This method allows tests to set a mock clock instance for testability
+func (c *VClock) SetSystemClock(sysClock SystemClock) {
+ c.clock = sysClock
+}
+
+///////////////////////////////////////////////////
+// Implementation for SystemClock
+
+type systemClockImpl struct{}
+
+func (sc *systemClockImpl) Now() int64 {
+ return time.Now().UTC().UnixNano()
+}
+
+var _ SystemClock = (*systemClockImpl)(nil)
+
+func NewSystemClock() SystemClock {
+ return &systemClockImpl{}
+}
diff --git a/services/syncbase/clock/vclock_test.go b/services/syncbase/clock/vclock_test.go
new file mode 100644
index 0000000..477d142
--- /dev/null
+++ b/services/syncbase/clock/vclock_test.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+ "testing"
+)
+
+type systemClockMockImpl struct {
+ now int64
+}
+
+func (sc *systemClockMockImpl) Now() int64 {
+ return sc.now
+}
+
+func (sc *systemClockMockImpl) setNow(now int64) {
+ sc.now = now
+}
+
+var (
+ _ SystemClock = (*systemClockImpl)(nil)
+)
+
+func TestVClock(t *testing.T) {
+ clock := NewVClock()
+ sysClock := &systemClockMockImpl{}
+ writeTs := int64(4)
+ sysClock.setNow(writeTs)
+ clock.SetSystemClock(sysClock)
+
+ ts := clock.Now()
+ if ts != writeTs {
+ t.Errorf("timestamp expected to be %q but found to be %q", writeTs, ts)
+ }
+}
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index 44faa54..23fc48f 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -29,6 +29,7 @@
"sync"
pubutil "v.io/syncbase/v23/syncbase/util"
+ "v.io/syncbase/x/ref/services/syncbase/clock"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
)
@@ -71,18 +72,23 @@
}
var err error
seq, err = strconv.ParseUint(parts[1], 10, 64)
+ // Current value of seq points to the last transaction committed
+ // increment the value by 1.
+ seq++
if err != nil {
panic("failed to parse seq: " + key)
}
}
- return &wstore{ist: st, opts: opts, seq: seq}, nil
+ vclock := clock.NewVClock()
+ return &wstore{ist: st, opts: opts, seq: seq, clock: vclock}, nil
}
type wstore struct {
- ist store.Store
- opts *Options
- mu sync.Mutex // held during transaction commits; protects seq
- seq uint64 // sequence number, for commits
+ ist store.Store
+ opts *Options
+ mu sync.Mutex // held during transaction commits; protects seq
+ seq uint64 // sequence number, for commits
+ clock *clock.VClock // used to provide write timestamps
}
var _ Store = (*wstore)(nil)
diff --git a/services/syncbase/server/watchable/store_test.go b/services/syncbase/server/watchable/store_test.go
index 18b8e4d..8338342 100644
--- a/services/syncbase/server/watchable/store_test.go
+++ b/services/syncbase/server/watchable/store_test.go
@@ -5,13 +5,10 @@
package watchable
import (
- "fmt"
- "io/ioutil"
"runtime"
"testing"
"v.io/syncbase/x/ref/services/syncbase/store"
- "v.io/syncbase/x/ref/services/syncbase/store/leveldb"
"v.io/syncbase/x/ref/services/syncbase/store/memstore"
"v.io/syncbase/x/ref/services/syncbase/store/test"
)
@@ -76,8 +73,8 @@
if useMemstore {
st = memstore.New()
} else {
- var dbPath string
- st, dbPath = newLevelDB()
+ var dbPath string = getPath()
+ st = createLevelDB(dbPath)
defer destroyLevelDB(st, dbPath)
}
st, err := Wrap(st, &Options{ManagedPrefixes: mp})
@@ -87,22 +84,3 @@
defer st.Close()
f(t, st)
}
-
-func newLevelDB() (store.Store, string) {
- path, err := ioutil.TempDir("", "syncbase_leveldb")
- if err != nil {
- panic(fmt.Sprintf("can't create temp dir: %v", err))
- }
- st, err := leveldb.Open(path)
- if err != nil {
- panic(fmt.Sprintf("can't open db at %v: %v", path, err))
- }
- return st, path
-}
-
-func destroyLevelDB(st store.Store, path string) {
- st.Close()
- if err := leveldb.Destroy(path); err != nil {
- panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
- }
-}
diff --git a/services/syncbase/server/watchable/test_util.go b/services/syncbase/server/watchable/test_util.go
new file mode 100644
index 0000000..c1877e5
--- /dev/null
+++ b/services/syncbase/server/watchable/test_util.go
@@ -0,0 +1,115 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "fmt"
+ "io/ioutil"
+
+ "v.io/syncbase/x/ref/services/syncbase/clock"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/syncbase/x/ref/services/syncbase/store/leveldb"
+ "v.io/syncbase/x/ref/services/syncbase/store/memstore"
+ "v.io/v23/vom"
+)
+
+// This file provides utility methods for tests related to watchable store.
+
+//**** Functions related to creation/cleanup of store instances **//
+
+func createStore(useMemstore bool) (store.Store, func()) {
+ var st store.Store
+ if useMemstore {
+ st = memstore.New()
+ return st, func() {
+ st.Close()
+ }
+ } else {
+ st = createLevelDB(getPath())
+ return st, func() {
+ destroyLevelDB(st, getPath())
+ }
+ }
+}
+
+func getPath() string {
+ path, err := ioutil.TempDir("", "syncbase_leveldb")
+ if err != nil {
+ panic(fmt.Sprintf("can't create temp dir: %v", err))
+ }
+ return path
+}
+
+func createLevelDB(path string) store.Store {
+ st, err := leveldb.Open(path)
+ if err != nil {
+ panic(fmt.Sprintf("can't open db at %v: %v", path, err))
+ }
+ return st
+}
+
+func destroyLevelDB(st store.Store, path string) {
+ st.Close()
+ if err := leveldb.Destroy(path); err != nil {
+ panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
+ }
+}
+
+//**** Functions related to watchable store **//
+
+func getSeq(st Store) uint64 {
+ wst := st.(*wstore)
+ return wst.seq
+}
+
+func setMockSystemClock(st Store, mockClock clock.SystemClock) {
+ wst := st.(*wstore)
+ wst.clock.SetSystemClock(mockClock)
+}
+
+type LogEntryReader struct {
+ stream store.Stream
+}
+
+func NewLogEntryReader(st store.Store, seq uint64) *LogEntryReader {
+ stream := st.Scan([]byte(getLogEntryKeyPrefix(seq)), []byte(getLogEntryKeyPrefix(seq+1)))
+ return &LogEntryReader{stream: stream}
+}
+
+func (ler *LogEntryReader) Advance() bool {
+ return ler.stream.Advance()
+}
+
+func (ler *LogEntryReader) GetEntry() (string, LogEntry) {
+ key := string(ler.stream.Key(nil))
+ var entry LogEntry = LogEntry{}
+ if err := vom.Decode(ler.stream.Value(nil), &entry); err != nil {
+ panic(fmt.Errorf("Failed to decode LogEntry for key: %q", key))
+ }
+ return key, entry
+}
+
+//*** Clock related utility code ********//
+
+// Mock Implementation for SystemClock
+type MockSystemClock struct {
+ time int64 // current time returned by call to Now()
+ increment int64 // how much to increment the clock by for subsequent calls to Now()
+}
+
+func NewMockSystemClock(firstTimestamp, increment int64) *MockSystemClock {
+ return &MockSystemClock{
+ time: firstTimestamp,
+ increment: increment,
+ }
+}
+
+func (sc *MockSystemClock) Now() int64 {
+ now := sc.time
+ sc.time += sc.increment
+ return now
+}
+
+var _ clock.SystemClock = (*MockSystemClock)(nil)
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index ace48de..e3d8422 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -117,13 +117,13 @@
return verror.New(verror.ErrInternal, nil, "seq maxed out")
}
// Write LogEntry records.
- // Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
- // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
- keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", tx.st.seq))
+ timestamp := tx.st.clock.Now()
+ keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
for txSeq, op := range tx.ops {
key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
value := &LogEntry{
- Op: op,
+ Op: op,
+ CommitTimestamp: timestamp,
// TODO(sadovsky): This information is also captured in LogEntry keys.
// Optimize to avoid redundancy.
Continued: txSeq < len(tx.ops)-1,
@@ -149,3 +149,10 @@
tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
return tx.itx.Abort()
}
+
+// Used for testing
+func getLogEntryKeyPrefix(seq uint64) string {
+ // Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
+ // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+ return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
+}
diff --git a/services/syncbase/server/watchable/transaction_test.go b/services/syncbase/server/watchable/transaction_test.go
new file mode 100644
index 0000000..6da6422
--- /dev/null
+++ b/services/syncbase/server/watchable/transaction_test.go
@@ -0,0 +1,134 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
+// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
+// relatively expensive since the entire data map is copied. LevelDB snapshots
+// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
+const useMemstoreForTest = false
+
+type testData struct {
+ key string
+ createVal string
+ updateVal string
+}
+
+var data1 testData = testData{
+ key: "key-a",
+ createVal: "val-a1",
+ updateVal: "val-a2",
+}
+
+var data2 testData = testData{
+ key: "key-b",
+ createVal: "val-b1",
+ updateVal: "val-b2",
+}
+
+func TestLogEntityTimestamps(t *testing.T) {
+ stImpl, destroy := createStore(useMemstoreForTest)
+ defer destroy()
+ var mockClock *MockSystemClock = NewMockSystemClock(3, 1)
+
+ wst1, err := Wrap(stImpl, &Options{ManagedPrefixes: nil})
+ if err != nil {
+ t.Errorf("Failed to wrap store for create")
+ }
+ seqForCreate := getSeq(wst1)
+ setMockSystemClock(wst1, mockClock)
+
+ // Create data in store
+ if err := store.RunInTransaction(wst1, func(st store.StoreReadWriter) error {
+ // add data1
+ if err := st.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
+ return fmt.Errorf("can't put {%q: %v}: %v",
+ data1.key, data1.createVal, err)
+ }
+ // add data2
+ if err := st.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
+ return fmt.Errorf("can't put {%q: %v}: %v",
+ data2.key, data2.createVal, err)
+ }
+ return nil
+ }); err != nil {
+ panic(fmt.Errorf("can't commit transaction: %v", err))
+ }
+
+ // read and verify LogEntities written as part of above transaction
+ // We expect 2 entires in the log for the two puts.
+ // Timestamp from mockclock for the commit shoud be 3
+ verifyCommitLog(t, stImpl, seqForCreate, 2, 3)
+
+ // Update data already present in store with a new watchable store
+ wst2, err := Wrap(stImpl, &Options{ManagedPrefixes: nil})
+ setMockSystemClock(wst2, mockClock)
+ if err != nil {
+ t.Errorf("Failed to wrap store for update")
+ }
+ seqForUpdate := getSeq(wst2)
+ if seqForUpdate != (seqForCreate + 1) {
+ t.Errorf("unexpected sequence number for update. seq for create: %d, seq for update: %d", seqForCreate, seqForUpdate)
+ }
+
+ if err := store.RunInTransaction(wst2, func(st store.StoreReadWriter) error {
+ if err := checkAndUpdate(st, data1); err != nil {
+ return err
+ }
+ if err := checkAndUpdate(st, data2); err != nil {
+ return err
+ }
+ return nil
+ }); err != nil {
+ panic(fmt.Errorf("can't commit transaction: %v", err))
+ }
+
+ // read and verify LogEntities written as part of above transaction
+ // We expect 4 entires in the log for the two gets and two puts.
+ // Timestamp from mockclock for the commit shoud be 4
+ verifyCommitLog(t, stImpl, seqForUpdate, 4, 4)
+}
+
+func checkAndUpdate(st store.StoreReadWriter, data testData) error {
+ // check and update data1
+ keyBytes := []byte(data.key)
+ val, err := st.Get(keyBytes, nil)
+ if err != nil {
+ return fmt.Errorf("can't get key %q: %v", data.key, err)
+ }
+ if !bytes.Equal(val, []byte(data.createVal)) {
+ return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
+ }
+ if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
+ return fmt.Errorf("can't put {%q: %v}: %v",
+ data.key, data.updateVal, err)
+ }
+ return nil
+}
+
+func verifyCommitLog(t *testing.T, st store.Store, seq uint64, expectedEntries int, expectedTimestamp int64) {
+ var ler *LogEntryReader = NewLogEntryReader(st, seq)
+ var entryCount int = 0
+ for ler.Advance() {
+ _, entry := ler.GetEntry()
+ entryCount++
+ if entry.CommitTimestamp != expectedTimestamp {
+ errStr := "Unexpected timestamp found for entity." +
+ " Expected: %d, found: %d"
+ t.Errorf(errStr, expectedTimestamp, entry.CommitTimestamp)
+ }
+ }
+ if entryCount != expectedEntries {
+ t.Errorf("Unexpected number of log entries found. Expected: %d, found: %d", expectedEntries, entryCount)
+ }
+}
diff --git a/services/syncbase/server/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
index 7a8f629..07d1ce1 100644
--- a/services/syncbase/server/watchable/types.vdl
+++ b/services/syncbase/server/watchable/types.vdl
@@ -37,11 +37,13 @@
// LogEntry represents a single store operation. This operation may have been
// part of a transaction, as signified by the Continued boolean. Read-only
// operations (and read-only transactions) are not logged.
-// TODO(sadovsky): Log commit time and maybe some other things.
type LogEntry struct {
// The store operation that was performed.
Op Op
+ // Time when the operation was committed.
+ CommitTimestamp int64
+
// If true, this entry is followed by more entries that belong to the same
// commit as this entry.
Continued bool
diff --git a/services/syncbase/server/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
index 12b8e0b..e94587a 100644
--- a/services/syncbase/server/watchable/types.vdl.go
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -112,10 +112,11 @@
// LogEntry represents a single store operation. This operation may have been
// part of a transaction, as signified by the Continued boolean. Read-only
// operations (and read-only transactions) are not logged.
-// TODO(sadovsky): Log commit time and maybe some other things.
type LogEntry struct {
// The store operation that was performed.
Op Op
+ // Time when the operation was committed.
+ CommitTimestamp int64
// If true, this entry is followed by more entries that belong to the same
// commit as this entry.
Continued bool