internal storage engine: improve compile-time typing

This change does to internal storage engine API changes.

First, renaming Snapshot.Close() -> Snapshot.Abort().
Before this change, the Store interface implemented the
Snapshot interface, now it doesn't.

Second, adding the SnapshotOrTranscation interface.
Only Snapshot and Transaction implement this interface.
Transaction doesn't implement Snapshot.

Third, the StoreReadWriter becomes private. The StoreReadWriter
was replaced by the Transaction. This was done because in lots
of places the StoreReadWriter was checked to be the Transaction
at runtime.

This change also updates the code to the new API and removes
extra runtime type checking.

MultiPart: 1/2

Change-Id: Ic213cc0479a6cd8604bc62c29edaf7adb95b260e
diff --git a/services/syncbase/server/watchable/snapshot.go b/services/syncbase/server/watchable/snapshot.go
index ac3694f..37af4e1 100644
--- a/services/syncbase/server/watchable/snapshot.go
+++ b/services/syncbase/server/watchable/snapshot.go
@@ -9,6 +9,7 @@
 )
 
 type snapshot struct {
+	store.SnapshotSpecImpl
 	isn store.Snapshot
 	st  *wstore
 }
@@ -22,9 +23,9 @@
 	}
 }
 
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
-	return s.isn.Close()
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
+	return s.isn.Abort()
 }
 
 // Get implements the store.StoreReader interface.
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index 2b0037a..010643d 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -79,7 +79,7 @@
 		return st.ist.Get(key, valbuf)
 	}
 	sn := newSnapshot(st)
-	defer sn.Close()
+	defer sn.Abort()
 	return sn.Get(key, valbuf)
 }
 
@@ -95,16 +95,16 @@
 // Put implements the store.StoreWriter interface.
 func (st *wstore) Put(key, value []byte) error {
 	// Use watchable.Store transaction so this op gets logged.
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Put(key, value)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Put(key, value)
 	})
 }
 
 // Delete implements the store.StoreWriter interface.
 func (st *wstore) Delete(key []byte) error {
 	// Use watchable.Store transaction so this op gets logged.
-	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
-		return st.Delete(key)
+	return store.RunInTransaction(st, func(tx store.Transaction) error {
+		return tx.Delete(key)
 	})
 }
 
diff --git a/services/syncbase/server/watchable/stream.go b/services/syncbase/server/watchable/stream.go
index 9c8c392..26502e1 100644
--- a/services/syncbase/server/watchable/stream.go
+++ b/services/syncbase/server/watchable/stream.go
@@ -13,7 +13,7 @@
 // stream streams keys and values for versioned records.
 type stream struct {
 	iit      store.Stream
-	st       store.StoreReader
+	sntx     store.SnapshotOrTransaction
 	mu       sync.Mutex
 	err      error
 	hasValue bool
@@ -25,11 +25,10 @@
 
 // newStreamVersioned creates a new stream. It assumes all records in range
 // [start, limit) are managed, i.e. versioned.
-func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
-	checkTransactionOrSnapshot(st)
+func newStreamVersioned(sntx store.SnapshotOrTransaction, start, limit []byte) *stream {
 	return &stream{
-		iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
-		st:  st,
+		iit:  sntx.Scan(makeVersionKey(start), makeVersionKey(limit)),
+		sntx: sntx,
 	}
 }
 
@@ -46,7 +45,7 @@
 	}
 	versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
 	s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
-	s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
+	s.value, s.err = s.sntx.Get(makeAtVersionKey(s.key, version), nil)
 	if s.err != nil {
 		return false
 	}
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 93b6b33..fb82f72 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -177,7 +177,7 @@
 // operations (create, join, leave, destroy) to notify the sync watcher of the
 // change at its proper position in the timeline (the transaction commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(ctx *context.T, tx store.StoreReadWriter, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
@@ -195,7 +195,7 @@
 // current keys and their versions to use when initializing the sync metadata
 // at the point in the timeline when these keys become syncable (at commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncSnapshotOp(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
@@ -227,10 +227,10 @@
 // GetVersion returns the current version of a managed key. This method is used
 // by the Sync module when the initiator is attempting to add new versions of
 // objects. Reading the version key is used for optimistic concurrency
-// control. At minimum, an object implementing the StoreReader interface is
+// control. At minimum, an object implementing the Transaction interface is
 // required since this is a Get operation.
-func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
-	switch w := st.(type) {
+func GetVersion(ctx *context.T, tx store.Transaction, key []byte) ([]byte, error) {
+	switch w := tx.(type) {
 	case *transaction:
 		w.mu.Lock()
 		defer w.mu.Unlock()
@@ -238,8 +238,6 @@
 			return nil, convertError(w.err)
 		}
 		return getVersion(w.itx, key)
-	case *wstore:
-		return getVersion(w.ist, key)
 	}
 	return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
 }
@@ -266,8 +264,8 @@
 // PutAtVersion puts a value for the managed key at the requested version. This
 // method is used by the Sync module exclusively when the initiator adds objects
 // with versions created on other Syncbases. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutAtVersion(ctx *context.T, tx store.StoreReadWriter, key, valbuf, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
 	wtx := tx.(*transaction)
 
 	wtx.mu.Lock()
@@ -285,8 +283,8 @@
 // version. This method is used by the Sync module exclusively when the
 // initiator selects which of the already stored versions (via PutAtVersion
 // calls) becomes the current version. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutVersion(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
 	wtx := tx.(*transaction)
 
 	wtx.mu.Lock()
diff --git a/services/syncbase/server/watchable/transaction_test.go b/services/syncbase/server/watchable/transaction_test.go
index 43b7b94..8b95536 100644
--- a/services/syncbase/server/watchable/transaction_test.go
+++ b/services/syncbase/server/watchable/transaction_test.go
@@ -33,17 +33,17 @@
 	updateVal: "val-b2",
 }
 
-func checkAndUpdate(st store.StoreReadWriter, data testData) error {
+func checkAndUpdate(tx store.Transaction, data testData) error {
 	// check and update data1
 	keyBytes := []byte(data.key)
-	val, err := st.Get(keyBytes, nil)
+	val, err := tx.Get(keyBytes, nil)
 	if err != nil {
 		return fmt.Errorf("can't get key %q: %v", data.key, err)
 	}
 	if !bytes.Equal(val, []byte(data.createVal)) {
 		return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
 	}
-	if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
+	if err := tx.Put(keyBytes, []byte(data.updateVal)); err != nil {
 		return fmt.Errorf("can't put {%q: %v}: %v", data.key, data.updateVal, err)
 	}
 	return nil
@@ -79,13 +79,13 @@
 	setMockSystemClock(wst1, mockClock)
 
 	// Create data in store
-	if err := store.RunInTransaction(wst1, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(wst1, func(tx store.Transaction) error {
 		// add data1
-		if err := st.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
+		if err := tx.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
 			return fmt.Errorf("can't put {%q: %v}: %v", data1.key, data1.createVal, err)
 		}
 		// add data2
-		if err := st.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
+		if err := tx.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
 			return fmt.Errorf("can't put {%q: %v}: %v", data2.key, data2.createVal, err)
 		}
 		return nil
@@ -110,11 +110,11 @@
 		t.Errorf("unexpected sequence number for update. seq for create: %d, seq for update: %d", seqForCreate, seqForUpdate)
 	}
 
-	if err := store.RunInTransaction(wst2, func(st store.StoreReadWriter) error {
-		if err := checkAndUpdate(st, data1); err != nil {
+	if err := store.RunInTransaction(wst2, func(tx store.Transaction) error {
+		if err := checkAndUpdate(tx, data1); err != nil {
 			return err
 		}
-		if err := checkAndUpdate(st, data2); err != nil {
+		if err := checkAndUpdate(tx, data2); err != nil {
 			return err
 		}
 		return nil
@@ -144,33 +144,33 @@
 		t.Fatalf("Wrap failed: %v", err)
 	}
 
-	if err := store.RunInTransaction(wst, func(st store.StoreReadWriter) error {
+	if err := store.RunInTransaction(wst, func(tx store.Transaction) error {
 		putKey, putVal := []byte("foo"), []byte("bar")
-		if err := st.Put(putKey, putVal); err != nil {
+		if err := tx.Put(putKey, putVal); err != nil {
 			return err
 		}
 		getKey := []byte("foo")
-		if getVal, err := st.Get(getKey, nil); err != nil {
+		if getVal, err := tx.Get(getKey, nil); err != nil {
 			return err
 		} else {
 			eq(t, getVal, putVal)
 		}
 		start, limit := []byte("aaa"), []byte("bbb")
-		st.Scan(start, limit)
+		tx.Scan(start, limit)
 		delKey := []byte("foo")
-		if err := st.Delete(delKey); err != nil {
+		if err := tx.Delete(delKey); err != nil {
 			return err
 		}
 		sgPrefixes := []string{"sga", "sgb"}
-		if err := AddSyncGroupOp(nil, st, sgPrefixes, false); err != nil {
+		if err := AddSyncGroupOp(nil, tx, sgPrefixes, false); err != nil {
 			return err
 		}
 		snKey, snVersion := []byte("aa"), []byte("123")
-		if err := AddSyncSnapshotOp(nil, st, snKey, snVersion); err != nil {
+		if err := AddSyncSnapshotOp(nil, tx, snKey, snVersion); err != nil {
 			return err
 		}
 		pvKey, pvVersion := []byte("pv"), []byte("456")
-		if err := PutVersion(nil, st, pvKey, pvVersion); err != nil {
+		if err := PutVersion(nil, tx, pvKey, pvVersion); err != nil {
 			return err
 		}
 		for _, buf := range [][]byte{putKey, putVal, getKey, start, limit, delKey, snKey, snVersion, pvKey, pvVersion} {
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
index 3d08129..64f8eeb 100644
--- a/services/syncbase/server/watchable/util.go
+++ b/services/syncbase/server/watchable/util.go
@@ -50,21 +50,20 @@
 	return []byte(join(string(key), string(version)))
 }
 
-func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
-	return st.Get(makeVersionKey(key), nil)
+func getVersion(sntx store.SnapshotOrTransaction, key []byte) ([]byte, error) {
+	return sntx.Get(makeVersionKey(key), nil)
 }
 
 func getAtVersion(st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
 	return st.Get(makeAtVersionKey(key, version), valbuf)
 }
 
-func getVersioned(st store.StoreReader, key, valbuf []byte) ([]byte, error) {
-	checkTransactionOrSnapshot(st)
-	version, err := getVersion(st, key)
+func getVersioned(sntx store.SnapshotOrTransaction, key, valbuf []byte) ([]byte, error) {
+	version, err := getVersion(sntx, key)
 	if err != nil {
 		return valbuf, err
 	}
-	return getAtVersion(st, key, valbuf, version)
+	return getAtVersion(sntx, key, valbuf, version)
 }
 
 func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
@@ -82,14 +81,6 @@
 	return tx.Delete(makeVersionKey(key))
 }
 
-func checkTransactionOrSnapshot(st store.StoreReader) {
-	_, isTransaction := st.(store.Transaction)
-	_, isSnapshot := st.(store.Snapshot)
-	if !isTransaction && !isSnapshot {
-		panic("neither a Transaction nor a Snapshot")
-	}
-}
-
 func getLogEntryKey(seq uint64) string {
 	// Note: MaxUint64 is 0xffffffffffffffff.
 	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.