Timestamp, clock api and bugfix.

1) Add timestamp to values committed via watchable store.
2) Introduce a vanadium clock interface which provides
   an estimated UTC timestamp.
3) Fix bug related to sequence number read by Wrap() method
   while creating a watchable store. The bug would end up
   reassigning the sequence number of the last commited tx
   to a new tx to be committed by a newly created wstore
   wrapper object.

This change adds a timestamp field to the LogEntry struct
which is stored for each operation that is committed as
part of a transaction using watchable store. Note that keys
that are not managed by store do not have a LogEntry
associated with them and hence will not have a timestamp.

The clock interface is introduced in this CL but for now
the implementation just uses the system time directly. This
will be addressed in the next CL to come.

Change-Id: I4e0e30ace5efc4388bbc9611d59964ac8ad59cc3
diff --git a/services/syncbase/clock/types.go b/services/syncbase/clock/types.go
new file mode 100644
index 0000000..2084eca
--- /dev/null
+++ b/services/syncbase/clock/types.go
@@ -0,0 +1,15 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+// This interface provides a wrapper over system clock to allow easy testing
+// of VClock and other code that uses timestamps. Tests can implement a mock
+// SystemClock and set it on VClock using SetSystemClock() method.
+type SystemClock interface {
+	// Now returns the current UTC time in nanoseconds as known by the system.
+	// This may not reflect the real UTC time if the system clock is out of
+	// sync with UTC.
+	Now() int64
+}
diff --git a/services/syncbase/clock/vclock.go b/services/syncbase/clock/vclock.go
new file mode 100644
index 0000000..1e9acf2
--- /dev/null
+++ b/services/syncbase/clock/vclock.go
@@ -0,0 +1,57 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+	"time"
+)
+
+// VClock holds data required to provide an estimate of the UTC time at any
+// given point. The fields contained in here are
+// - systemTimeAtBoot : the time shown by the system clock at boot
+// - utcTimeAtBoot    : the estimated UTC time when the system booted
+// - skew             : the difference between the system clock and UTC time
+// - clock            : Instance of clock.SystemClock interface providing access
+//                      to the system time.
+type VClock struct {
+	systemTimeAtBoot int64
+	utcTimeAtBoot    int64
+	skew             int64
+	clock            SystemClock
+}
+
+func NewVClock() *VClock {
+	return &VClock{
+		clock: NewSystemClock(),
+	}
+}
+
+// Now returns current UTC time based on the estimation of skew that
+// the system clock has with respect to NTP.
+func (c *VClock) Now() int64 {
+	// This method returns just the current system time for now.
+	// TODO(jlodhia): implement estimation of UTC time.
+	return c.clock.Now()
+}
+
+// This method allows tests to set a mock clock instance for testability
+func (c *VClock) SetSystemClock(sysClock SystemClock) {
+	c.clock = sysClock
+}
+
+///////////////////////////////////////////////////
+// Implementation for SystemClock
+
+type systemClockImpl struct{}
+
+func (sc *systemClockImpl) Now() int64 {
+	return time.Now().UTC().UnixNano()
+}
+
+var _ SystemClock = (*systemClockImpl)(nil)
+
+func NewSystemClock() SystemClock {
+	return &systemClockImpl{}
+}
diff --git a/services/syncbase/clock/vclock_test.go b/services/syncbase/clock/vclock_test.go
new file mode 100644
index 0000000..477d142
--- /dev/null
+++ b/services/syncbase/clock/vclock_test.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package clock
+
+import (
+	"testing"
+)
+
+type systemClockMockImpl struct {
+	now int64
+}
+
+func (sc *systemClockMockImpl) Now() int64 {
+	return sc.now
+}
+
+func (sc *systemClockMockImpl) setNow(now int64) {
+	sc.now = now
+}
+
+var (
+	_ SystemClock = (*systemClockImpl)(nil)
+)
+
+func TestVClock(t *testing.T) {
+	clock := NewVClock()
+	sysClock := &systemClockMockImpl{}
+	writeTs := int64(4)
+	sysClock.setNow(writeTs)
+	clock.SetSystemClock(sysClock)
+
+	ts := clock.Now()
+	if ts != writeTs {
+		t.Errorf("timestamp expected to be %q but found to be %q", writeTs, ts)
+	}
+}
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index 44faa54..23fc48f 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -29,6 +29,7 @@
 	"sync"
 
 	pubutil "v.io/syncbase/v23/syncbase/util"
+	"v.io/syncbase/x/ref/services/syncbase/clock"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 )
@@ -71,18 +72,23 @@
 		}
 		var err error
 		seq, err = strconv.ParseUint(parts[1], 10, 64)
+		// Current value of seq points to the last transaction committed
+		// increment the value by 1.
+		seq++
 		if err != nil {
 			panic("failed to parse seq: " + key)
 		}
 	}
-	return &wstore{ist: st, opts: opts, seq: seq}, nil
+	vclock := clock.NewVClock()
+	return &wstore{ist: st, opts: opts, seq: seq, clock: vclock}, nil
 }
 
 type wstore struct {
-	ist  store.Store
-	opts *Options
-	mu   sync.Mutex // held during transaction commits; protects seq
-	seq  uint64     // sequence number, for commits
+	ist   store.Store
+	opts  *Options
+	mu    sync.Mutex    // held during transaction commits; protects seq
+	seq   uint64        // sequence number, for commits
+	clock *clock.VClock // used to provide write timestamps
 }
 
 var _ Store = (*wstore)(nil)
diff --git a/services/syncbase/server/watchable/store_test.go b/services/syncbase/server/watchable/store_test.go
index 18b8e4d..8338342 100644
--- a/services/syncbase/server/watchable/store_test.go
+++ b/services/syncbase/server/watchable/store_test.go
@@ -5,13 +5,10 @@
 package watchable
 
 import (
-	"fmt"
-	"io/ioutil"
 	"runtime"
 	"testing"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
-	"v.io/syncbase/x/ref/services/syncbase/store/leveldb"
 	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
 	"v.io/syncbase/x/ref/services/syncbase/store/test"
 )
@@ -76,8 +73,8 @@
 	if useMemstore {
 		st = memstore.New()
 	} else {
-		var dbPath string
-		st, dbPath = newLevelDB()
+		var dbPath string = getPath()
+		st = createLevelDB(dbPath)
 		defer destroyLevelDB(st, dbPath)
 	}
 	st, err := Wrap(st, &Options{ManagedPrefixes: mp})
@@ -87,22 +84,3 @@
 	defer st.Close()
 	f(t, st)
 }
-
-func newLevelDB() (store.Store, string) {
-	path, err := ioutil.TempDir("", "syncbase_leveldb")
-	if err != nil {
-		panic(fmt.Sprintf("can't create temp dir: %v", err))
-	}
-	st, err := leveldb.Open(path)
-	if err != nil {
-		panic(fmt.Sprintf("can't open db at %v: %v", path, err))
-	}
-	return st, path
-}
-
-func destroyLevelDB(st store.Store, path string) {
-	st.Close()
-	if err := leveldb.Destroy(path); err != nil {
-		panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
-	}
-}
diff --git a/services/syncbase/server/watchable/test_util.go b/services/syncbase/server/watchable/test_util.go
new file mode 100644
index 0000000..c1877e5
--- /dev/null
+++ b/services/syncbase/server/watchable/test_util.go
@@ -0,0 +1,115 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"fmt"
+	"io/ioutil"
+
+	"v.io/syncbase/x/ref/services/syncbase/clock"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/syncbase/x/ref/services/syncbase/store/leveldb"
+	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
+	"v.io/v23/vom"
+)
+
+// This file provides utility methods for tests related to watchable store.
+
+//****  Functions related to creation/cleanup of store instances  **//
+
+func createStore(useMemstore bool) (store.Store, func()) {
+	var st store.Store
+	if useMemstore {
+		st = memstore.New()
+		return st, func() {
+			st.Close()
+		}
+	} else {
+		st = createLevelDB(getPath())
+		return st, func() {
+			destroyLevelDB(st, getPath())
+		}
+	}
+}
+
+func getPath() string {
+	path, err := ioutil.TempDir("", "syncbase_leveldb")
+	if err != nil {
+		panic(fmt.Sprintf("can't create temp dir: %v", err))
+	}
+	return path
+}
+
+func createLevelDB(path string) store.Store {
+	st, err := leveldb.Open(path)
+	if err != nil {
+		panic(fmt.Sprintf("can't open db at %v: %v", path, err))
+	}
+	return st
+}
+
+func destroyLevelDB(st store.Store, path string) {
+	st.Close()
+	if err := leveldb.Destroy(path); err != nil {
+		panic(fmt.Sprintf("can't destroy db at %v: %v", path, err))
+	}
+}
+
+//****  Functions related to watchable store  **//
+
+func getSeq(st Store) uint64 {
+	wst := st.(*wstore)
+	return wst.seq
+}
+
+func setMockSystemClock(st Store, mockClock clock.SystemClock) {
+	wst := st.(*wstore)
+	wst.clock.SetSystemClock(mockClock)
+}
+
+type LogEntryReader struct {
+	stream store.Stream
+}
+
+func NewLogEntryReader(st store.Store, seq uint64) *LogEntryReader {
+	stream := st.Scan([]byte(getLogEntryKeyPrefix(seq)), []byte(getLogEntryKeyPrefix(seq+1)))
+	return &LogEntryReader{stream: stream}
+}
+
+func (ler *LogEntryReader) Advance() bool {
+	return ler.stream.Advance()
+}
+
+func (ler *LogEntryReader) GetEntry() (string, LogEntry) {
+	key := string(ler.stream.Key(nil))
+	var entry LogEntry = LogEntry{}
+	if err := vom.Decode(ler.stream.Value(nil), &entry); err != nil {
+		panic(fmt.Errorf("Failed to decode LogEntry for key: %q", key))
+	}
+	return key, entry
+}
+
+//***  Clock related utility code ********//
+
+// Mock Implementation for SystemClock
+type MockSystemClock struct {
+	time      int64 // current time returned by call to Now()
+	increment int64 // how much to increment the clock by for subsequent calls to Now()
+}
+
+func NewMockSystemClock(firstTimestamp, increment int64) *MockSystemClock {
+	return &MockSystemClock{
+		time:      firstTimestamp,
+		increment: increment,
+	}
+}
+
+func (sc *MockSystemClock) Now() int64 {
+	now := sc.time
+	sc.time += sc.increment
+	return now
+}
+
+var _ clock.SystemClock = (*MockSystemClock)(nil)
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index ace48de..e3d8422 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -117,13 +117,13 @@
 		return verror.New(verror.ErrInternal, nil, "seq maxed out")
 	}
 	// Write LogEntry records.
-	// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
-	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
-	keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", tx.st.seq))
+	timestamp := tx.st.clock.Now()
+	keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
 	for txSeq, op := range tx.ops {
 		key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
 		value := &LogEntry{
-			Op: op,
+			Op:              op,
+			CommitTimestamp: timestamp,
 			// TODO(sadovsky): This information is also captured in LogEntry keys.
 			// Optimize to avoid redundancy.
 			Continued: txSeq < len(tx.ops)-1,
@@ -149,3 +149,10 @@
 	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
 	return tx.itx.Abort()
 }
+
+// Used for testing
+func getLogEntryKeyPrefix(seq uint64) string {
+	// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
+	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
+	return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
+}
diff --git a/services/syncbase/server/watchable/transaction_test.go b/services/syncbase/server/watchable/transaction_test.go
new file mode 100644
index 0000000..6da6422
--- /dev/null
+++ b/services/syncbase/server/watchable/transaction_test.go
@@ -0,0 +1,134 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+import (
+	"bytes"
+	"fmt"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// With Memstore, TestReadWriteRandom is slow with ManagedPrefixes=nil since
+// every watchable.Store.Get() takes a snapshot, and memstore snapshots are
+// relatively expensive since the entire data map is copied. LevelDB snapshots
+// are cheap, so with LevelDB ManagedPrefixes=nil is still reasonably fast.
+const useMemstoreForTest = false
+
+type testData struct {
+	key       string
+	createVal string
+	updateVal string
+}
+
+var data1 testData = testData{
+	key:       "key-a",
+	createVal: "val-a1",
+	updateVal: "val-a2",
+}
+
+var data2 testData = testData{
+	key:       "key-b",
+	createVal: "val-b1",
+	updateVal: "val-b2",
+}
+
+func TestLogEntityTimestamps(t *testing.T) {
+	stImpl, destroy := createStore(useMemstoreForTest)
+	defer destroy()
+	var mockClock *MockSystemClock = NewMockSystemClock(3, 1)
+
+	wst1, err := Wrap(stImpl, &Options{ManagedPrefixes: nil})
+	if err != nil {
+		t.Errorf("Failed to wrap store for create")
+	}
+	seqForCreate := getSeq(wst1)
+	setMockSystemClock(wst1, mockClock)
+
+	// Create data in store
+	if err := store.RunInTransaction(wst1, func(st store.StoreReadWriter) error {
+		// add data1
+		if err := st.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
+			return fmt.Errorf("can't put {%q: %v}: %v",
+				data1.key, data1.createVal, err)
+		}
+		// add data2
+		if err := st.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
+			return fmt.Errorf("can't put {%q: %v}: %v",
+				data2.key, data2.createVal, err)
+		}
+		return nil
+	}); err != nil {
+		panic(fmt.Errorf("can't commit transaction: %v", err))
+	}
+
+	// read and verify LogEntities written as part of above transaction
+	// We expect 2 entires in the log for the two puts.
+	// Timestamp from mockclock for the commit shoud be 3
+	verifyCommitLog(t, stImpl, seqForCreate, 2, 3)
+
+	// Update data already present in store with a new watchable store
+	wst2, err := Wrap(stImpl, &Options{ManagedPrefixes: nil})
+	setMockSystemClock(wst2, mockClock)
+	if err != nil {
+		t.Errorf("Failed to wrap store for update")
+	}
+	seqForUpdate := getSeq(wst2)
+	if seqForUpdate != (seqForCreate + 1) {
+		t.Errorf("unexpected sequence number for update. seq for create: %d, seq for update: %d", seqForCreate, seqForUpdate)
+	}
+
+	if err := store.RunInTransaction(wst2, func(st store.StoreReadWriter) error {
+		if err := checkAndUpdate(st, data1); err != nil {
+			return err
+		}
+		if err := checkAndUpdate(st, data2); err != nil {
+			return err
+		}
+		return nil
+	}); err != nil {
+		panic(fmt.Errorf("can't commit transaction: %v", err))
+	}
+
+	// read and verify LogEntities written as part of above transaction
+	// We expect 4 entires in the log for the two gets and two puts.
+	// Timestamp from mockclock for the commit shoud be 4
+	verifyCommitLog(t, stImpl, seqForUpdate, 4, 4)
+}
+
+func checkAndUpdate(st store.StoreReadWriter, data testData) error {
+	// check and update data1
+	keyBytes := []byte(data.key)
+	val, err := st.Get(keyBytes, nil)
+	if err != nil {
+		return fmt.Errorf("can't get key %q: %v", data.key, err)
+	}
+	if !bytes.Equal(val, []byte(data.createVal)) {
+		return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
+	}
+	if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
+		return fmt.Errorf("can't put {%q: %v}: %v",
+			data.key, data.updateVal, err)
+	}
+	return nil
+}
+
+func verifyCommitLog(t *testing.T, st store.Store, seq uint64, expectedEntries int, expectedTimestamp int64) {
+	var ler *LogEntryReader = NewLogEntryReader(st, seq)
+	var entryCount int = 0
+	for ler.Advance() {
+		_, entry := ler.GetEntry()
+		entryCount++
+		if entry.CommitTimestamp != expectedTimestamp {
+			errStr := "Unexpected timestamp found for entity." +
+				" Expected: %d, found: %d"
+			t.Errorf(errStr, expectedTimestamp, entry.CommitTimestamp)
+		}
+	}
+	if entryCount != expectedEntries {
+		t.Errorf("Unexpected number of log entries found. Expected: %d, found: %d", expectedEntries, entryCount)
+	}
+}
diff --git a/services/syncbase/server/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
index 7a8f629..07d1ce1 100644
--- a/services/syncbase/server/watchable/types.vdl
+++ b/services/syncbase/server/watchable/types.vdl
@@ -37,11 +37,13 @@
 // LogEntry represents a single store operation. This operation may have been
 // part of a transaction, as signified by the Continued boolean. Read-only
 // operations (and read-only transactions) are not logged.
-// TODO(sadovsky): Log commit time and maybe some other things.
 type LogEntry struct {
 	// The store operation that was performed.
 	Op Op
 
+	// Time when the operation was committed.
+	CommitTimestamp int64
+
 	// If true, this entry is followed by more entries that belong to the same
 	// commit as this entry.
 	Continued bool
diff --git a/services/syncbase/server/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
index 12b8e0b..e94587a 100644
--- a/services/syncbase/server/watchable/types.vdl.go
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -112,10 +112,11 @@
 // LogEntry represents a single store operation. This operation may have been
 // part of a transaction, as signified by the Continued boolean. Read-only
 // operations (and read-only transactions) are not logged.
-// TODO(sadovsky): Log commit time and maybe some other things.
 type LogEntry struct {
 	// The store operation that was performed.
 	Op Op
+	// Time when the operation was committed.
+	CommitTimestamp int64
 	// If true, this entry is followed by more entries that belong to the same
 	// commit as this entry.
 	Continued bool