veyron/services/store: Streaming put mutation api.

Previously, PutMutations took an array of mutations and applied them
within a transaction. Unfortunately, this api was memory-bound.
PutMutations now takes a stream of mutations, and applies them within a
transaction.
Specifics:
 - Forbids duplicate mutations on the same id.
 - Supports cancellation. If the request is cancelled in a timely
   fashion, any mutations received will not be committed.

Additionally, the estore api which extends veyron2/services/store.Store
has been replaced with a raw access api.
Specifics:
 - veyron/services/store/raw.Store supports Watch and PutMutations.
 - The raw store is mounted at extension (".store.raw"). This prevents
   non-sync clients from inadvertently discovering the raw api.

Change-Id: I6575a0ca43c220239ea90c84e6c10329c625dfbe
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index 8f8f826..feb92a8 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -42,7 +42,7 @@
 	"strconv"
 	"strings"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/storage"
 )
@@ -419,7 +419,7 @@
 }
 
 // fillFakeWatchRecords fills fake log and dag state (testing only).
-// TODO(hpucha): remove and clean up estore import.
+// TODO(hpucha): remove and clean up raw import.
 func (l *iLog) fillFakeWatchRecords() {
 	const num = 10
 	var parvers []storage.Version
@@ -427,7 +427,7 @@
 	for i := int(0); i < num; i++ {
 		// Create a local log record.
 		curvers := storage.Version(i)
-		if err := l.processWatchRecord(id, curvers, parvers, &LogValue{Mutation: estore.Mutation{Version: curvers}}); err != nil {
+		if err := l.processWatchRecord(id, curvers, parvers, &LogValue{Mutation: raw.Mutation{Version: curvers}}); err != nil {
 			return
 		}
 		parvers = []storage.Version{curvers}
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 462a83c..9b84904 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -8,7 +8,7 @@
 	"strings"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/naming"
 	"veyron2/storage"
@@ -67,7 +67,7 @@
 	// Next steps are to tie this up into mount table and auto-discover neighbors.
 	neighbors   []string
 	neighborIDs []string
-	estore      estore.Store
+	store       raw.Store
 
 	updObjects map[storage.ID]*objConflictState
 }
@@ -100,7 +100,7 @@
 	// TODO(hpucha): Merge this with vstore handler in syncd.
 	if syncd.vstoreEndpoint != "" {
 		var err error
-		i.estore, err = estore.BindStore(syncd.vstoreEndpoint + "/.store/sync")
+		i.store, err = raw.BindStore(naming.JoinAddressName(syncd.vstoreEndpoint, raw.RawStoreSuffix))
 		if err != nil {
 			vlog.Fatalf("newInitiator: cannot connect to Veyron store endpoint (%s): %s",
 				syncd.vstoreEndpoint, err)
@@ -420,7 +420,7 @@
 }
 
 // resolveConflicts resolves conflicts for updated objects.
-func (i *syncInitiator) resolveConflicts() ([]estore.Mutation, error) {
+func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
 	switch conflictResolutionPolicy {
 	case useVersion:
 		if err := i.resolveConflictsByVersion(); err != nil {
@@ -430,7 +430,7 @@
 		return nil, fmt.Errorf("unknown conflict resolution policy")
 	}
 
-	var m []estore.Mutation
+	var m []raw.Mutation
 	for _, st := range i.updObjects {
 		// Append to mutations.
 		st.resolvVal.Mutation.PriorVersion = st.oldHead
@@ -460,7 +460,7 @@
 		if err != nil {
 			return err
 		}
-		var m estore.Mutation
+		var m raw.Mutation
 		if lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version {
 			m = lrecs[0].Value.Mutation
 		} else {
@@ -494,7 +494,7 @@
 
 // updateStoreAndSync updates the store, and if that is successful,
 // updates log and dag data structures.
-func (i *syncInitiator) updateStoreAndSync(m []estore.Mutation) error {
+func (i *syncInitiator) updateStoreAndSync(m []raw.Mutation) error {
 	// TODO(hpucha): Eliminate reaching into syncd's lock.
 	i.syncd.lock.Lock()
 	defer i.syncd.lock.Unlock()
@@ -504,8 +504,17 @@
 	// will also hold the lock across PutMutations rpc to prevent
 	// a race with watcher. The next iteration will clean up this
 	// coordination.
-	if i.estore != nil {
-		if err := i.estore.PutMutations(m); err != nil {
+	if i.store != nil {
+		stream, err := i.store.PutMutations()
+		if err != nil {
+			return err
+		}
+		for i := range m {
+			if err := stream.Send(m[i]); err != nil {
+				return err
+			}
+		}
+		if err := stream.Finish(); err != nil {
 			return err
 		}
 	}
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index a524d57..c73e0f1 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -7,7 +7,7 @@
 	"testing"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/storage"
 )
@@ -36,7 +36,7 @@
 		LSN:     100,
 		ObjID:   objID,
 		CurVers: 20,
-		Value:   LogValue{Mutation: estore.Mutation{Version: 20}},
+		Value:   LogValue{Mutation: raw.Mutation{Version: 20}},
 	}
 	if _, err := s.hdlInitiator.getLogRec(objID, expRec.CurVers); err == nil {
 		t.Errorf("GetLogRec didn't fail")
@@ -91,7 +91,7 @@
 			LSN:     LSN(100 + v),
 			ObjID:   objID,
 			CurVers: v,
-			Value:   LogValue{Mutation: estore.Mutation{Version: v, PriorVersion: 500 + v}},
+			Value:   LogValue{Mutation: raw.Mutation{Version: v, PriorVersion: 500 + v}},
 		}
 		logKey, err := s.log.putLogRec(expRec)
 		if err != nil {
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 2df3fc5..783d23f 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -8,7 +8,7 @@
 	"os"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 )
 
 // getFileName generates a filename for a temporary (per unit test) kvdb file.
@@ -135,7 +135,7 @@
 				CurVers: cmd.version,
 				Parents: cmd.parents,
 				Value: LogValue{
-					Mutation: estore.Mutation{Version: cmd.version},
+					Mutation: raw.Mutation{Version: cmd.version},
 				},
 			}
 			stream.add(rec)
diff --git a/runtimes/google/vsync/vsync.idl b/runtimes/google/vsync/vsync.idl
index d470a3a..19f206e 100644
--- a/runtimes/google/vsync/vsync.idl
+++ b/runtimes/google/vsync/vsync.idl
@@ -1,7 +1,7 @@
 package vsync
 
 import (
-  "veyron/services/store/estore"
+  "veyron/services/store/raw"
 
   "veyron2/storage"
 )
@@ -44,7 +44,7 @@
 // LogValue represents an object mutation within a transaction.
 type LogValue struct {
   // Mutation is the store mutation representing the change in the object.
-  Mutation estore.Mutation
+  Mutation raw.Mutation
   // Delete indicates whether the mutation resulted in the object being
   // deleted from the store.
   Delete   bool
diff --git a/runtimes/google/vsync/vsync.idl.go b/runtimes/google/vsync/vsync.idl.go
index e01c0e9..a1958a5 100644
--- a/runtimes/google/vsync/vsync.idl.go
+++ b/runtimes/google/vsync/vsync.idl.go
@@ -4,7 +4,7 @@
 package vsync
 
 import (
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/storage"
 
@@ -32,7 +32,7 @@
 // LogValue represents an object mutation within a transaction.
 type LogValue struct {
 	// Mutation is the store mutation representing the change in the object.
-	Mutation estore.Mutation
+	Mutation raw.Mutation
 	// Delete indicates whether the mutation resulted in the object being
 	// deleted from the store.
 	Delete bool
@@ -266,7 +266,7 @@
 				_gen_wiretype.FieldType{Type: 0x4e, Name: "Tags"},
 				_gen_wiretype.FieldType{Type: 0x50, Name: "Dir"},
 			},
-			"estore.Mutation", []string(nil)},
+			"raw.Mutation", []string(nil)},
 		_gen_wiretype.StructType{
 			[]_gen_wiretype.FieldType{
 				_gen_wiretype.FieldType{Type: 0x51, Name: "Mutation"},
diff --git a/runtimes/google/vsync/vsyncd.go b/runtimes/google/vsync/vsyncd.go
index 654fd10..375581b 100644
--- a/runtimes/google/vsync/vsyncd.go
+++ b/runtimes/google/vsync/vsyncd.go
@@ -10,7 +10,7 @@
 import (
 	"sync"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/ipc"
 	"veyron2/storage"
@@ -112,7 +112,7 @@
 
 	// Register these Watch data types with VOM.
 	// TODO(tilaks): why aren't they auto-retrieved from the IDL?
-	vom.Register(&estore.Mutation{})
+	vom.Register(&raw.Mutation{})
 	vom.Register(&storage.DEntry{})
 
 	// Channel to propagate close event to all threads.
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index ea48129..0617640 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -9,7 +9,7 @@
 	"fmt"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/query"
 	"veyron2/services/watch"
@@ -154,7 +154,7 @@
 	var lastResmark []byte
 	for i := range changes.Changes {
 		ch := &changes.Changes[i]
-		mu, ok := ch.Value.(*estore.Mutation)
+		mu, ok := ch.Value.(*raw.Mutation)
 		if !ok {
 			return fmt.Errorf("invalid change value, not a mutation: %#v", ch)
 		}
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index ab0aa8d..01ae25a 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -11,7 +11,7 @@
 	"testing"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/query"
 	"veyron2/services/watch"
@@ -146,7 +146,7 @@
 		watch.Change{
 			Name:  "/uid/4c6db51aa740d8c62b90df874503e285",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x4c, 0x6d, 0xb5, 0x1a, 0xa7, 0x40, 0xd8, 0xc6,
 					0x2b, 0x90, 0xdf, 0x87, 0x45, 0x3, 0xe2, 0x85},
 				PriorVersion: 0x0,
@@ -167,7 +167,7 @@
 		watch.Change{
 			Name:  "/uid/082bc42e15af4fcf611d7f19a8d7831f",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
 					0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
 				PriorVersion: 0x0,
@@ -188,7 +188,7 @@
 		watch.Change{
 			Name:  "/uid/6e4a327c297d76fb5142b1b1d95b2d07",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb,
 					0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7},
 				PriorVersion: 0x0,
@@ -205,7 +205,7 @@
 		watch.Change{
 			Name:  "/uid/082bc42e15af4fcf611d7f19a8d7831f",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
 					0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
 				PriorVersion: 0x57e9d1860d1d68d8,
@@ -231,7 +231,7 @@
 		watch.Change{
 			Name:  "/uid/70ff65ec0f825f44b69f895eea759d71",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44,
 					0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71},
 				PriorVersion: 0x0,