TBR syncbase: fix mem ownership bug in watchable txn

(Submitting TBR because Ivan +1'ed but didn't +2 before his vacation.)

watchable.Transaction should've made defensive copies of mutable inputs since it
holds (and dereferences) references to them after the various methods (get, put,
etc.) return.

Change-Id: Icd025a2ae4a4121b919ba8b1ae14c431b9683ef7
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 963121e..12f5e1f 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -30,6 +30,22 @@
 
 var _ store.Transaction = (*transaction)(nil)
 
+func cp(src []byte) []byte {
+	dst := make([]byte, len(src))
+	for i := 0; i < len(src); i++ {
+		dst[i] = src[i]
+	}
+	return dst
+}
+
+func cpStrings(src []string) []string {
+	dst := make([]string, len(src))
+	for i := 0; i < len(src); i++ {
+		dst[i] = src[i]
+	}
+	return dst
+}
+
 func newTransaction(st *wstore) *transaction {
 	return &transaction{
 		itx: st.ist.NewTransaction(),
@@ -49,7 +65,7 @@
 		valbuf, err = tx.itx.Get(key, valbuf)
 	} else {
 		valbuf, err = getVersioned(tx.itx, key, valbuf)
-		tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+		tx.ops = append(tx.ops, &OpGet{GetOp{Key: cp(key)}})
 	}
 	return valbuf, err
 }
@@ -66,7 +82,7 @@
 		it = tx.itx.Scan(start, limit)
 	} else {
 		it = newStreamVersioned(tx.itx, start, limit)
-		tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+		tx.ops = append(tx.ops, &OpScan{ScanOp{Start: cp(start), Limit: cp(limit)}})
 	}
 	return it
 }
@@ -85,7 +101,7 @@
 	if err != nil {
 		return err
 	}
-	tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Version: version}})
+	tx.ops = append(tx.ops, &OpPut{PutOp{Key: cp(key), Version: version}})
 	return nil
 }
 
@@ -104,7 +120,7 @@
 	if err != nil {
 		return err
 	}
-	tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+	tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: cp(key)}})
 	return nil
 }
 
@@ -168,7 +184,8 @@
 	if wtx.err != nil {
 		return convertError(wtx.err)
 	}
-	wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: prefixes, Remove: remove}})
+	// Make a defensive copy of prefixes slice.
+	wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: cpStrings(prefixes), Remove: remove}})
 	return nil
 }
 
@@ -186,10 +203,9 @@
 		return convertError(wtx.err)
 	}
 	if !wtx.st.managesKey(key) {
-		return verror.New(verror.ErrInternal, ctx,
-			fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
+		return verror.New(verror.ErrInternal, ctx, fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
 	}
-	wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: key, Version: version}})
+	wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: cp(key), Version: cp(version)}})
 	return nil
 }
 
@@ -282,7 +298,7 @@
 	if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
 		return err
 	}
-	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
+	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: cp(key), Version: cp(version)}})
 	return nil
 }