veyron/services/store: Streaming put mutation api.

Previously, PutMutations took an array of mutations and applied them
within a transaction. Unfortunately, this api was memory-bound.
PutMutations now takes a stream of mutations, and applies them within a
transaction.
Specifics:
 - Forbids duplicate mutations on the same id.
 - Supports cancellation. If the request is cancelled in a timely
   fashion, any mutations received will not be committed.

Additionally, the estore api which extends veyron2/services/store.Store
has been replaced with a raw access api.
Specifics:
 - veyron/services/store/raw.Store supports Watch and PutMutations.
 - The raw store is mounted at extension (".store.raw"). This prevents
   non-sync clients from inadvertently discovering the raw api.

Change-Id: I6575a0ca43c220239ea90c84e6c10329c625dfbe
diff --git a/examples/storage/mdb/mdb_stored/main.go b/examples/storage/mdb/mdb_stored/main.go
index dbbd76f..3cf0403 100644
--- a/examples/storage/mdb/mdb_stored/main.go
+++ b/examples/storage/mdb/mdb_stored/main.go
@@ -27,6 +27,7 @@
 
 	"veyron2/rt"
 	"veyron2/security"
+	"veyron2/services/store"
 )
 
 var (
@@ -75,7 +76,7 @@
 	// Register the services.
 	storeDisp := server.NewStoreDispatcher(storeService, auth)
 	objectDisp := server.NewObjectDispatcher(storeService, auth)
-	if err := s.Register(".store", storeDisp); err != nil {
+	if err := s.Register(store.StoreSuffix, storeDisp); err != nil {
 		log.Fatal("s.Register(storeDisp) failed: ", err)
 	}
 	if err := s.Register("", objectDisp); err != nil {
diff --git a/examples/todos/test/util.go b/examples/todos/test/util.go
index ba1225a..0bc3930 100644
--- a/examples/todos/test/util.go
+++ b/examples/todos/test/util.go
@@ -14,6 +14,7 @@
 	"veyron2/naming"
 	"veyron2/rt"
 	"veyron2/security"
+	"veyron2/services/store"
 	"veyron2/storage"
 	"veyron2/storage/vstore"
 )
@@ -56,7 +57,7 @@
 	// Register the services.
 	storeDisp := server.NewStoreDispatcher(storeService, nil)
 	objectDisp := server.NewObjectDispatcher(storeService, nil)
-	if err := s.Register(".store", storeDisp); err != nil {
+	if err := s.Register(store.StoreSuffix, storeDisp); err != nil {
 		log.Fatal("s.Register(storeDisp) failed: ", err)
 	}
 	if err := s.Register("", objectDisp); err != nil {
diff --git a/examples/todos/todos_stored/main.go b/examples/todos/todos_stored/main.go
index 6083edf..12f52b1 100644
--- a/examples/todos/todos_stored/main.go
+++ b/examples/todos/todos_stored/main.go
@@ -30,6 +30,7 @@
 
 	"veyron2/rt"
 	"veyron2/security"
+	"veyron2/services/store"
 )
 
 var (
@@ -77,7 +78,7 @@
 	// Register the services.
 	storeDisp := server.NewStoreDispatcher(storeService, auth)
 	objectDisp := server.NewObjectDispatcher(storeService, auth)
-	if err := s.Register(".store", storeDisp); err != nil {
+	if err := s.Register(store.StoreSuffix, storeDisp); err != nil {
 		log.Fatal("s.Register(storeDisp) failed: ", err)
 	}
 	if err := s.Register("", objectDisp); err != nil {
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index 8f8f826..feb92a8 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -42,7 +42,7 @@
 	"strconv"
 	"strings"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/storage"
 )
@@ -419,7 +419,7 @@
 }
 
 // fillFakeWatchRecords fills fake log and dag state (testing only).
-// TODO(hpucha): remove and clean up estore import.
+// TODO(hpucha): remove and clean up raw import.
 func (l *iLog) fillFakeWatchRecords() {
 	const num = 10
 	var parvers []storage.Version
@@ -427,7 +427,7 @@
 	for i := int(0); i < num; i++ {
 		// Create a local log record.
 		curvers := storage.Version(i)
-		if err := l.processWatchRecord(id, curvers, parvers, &LogValue{Mutation: estore.Mutation{Version: curvers}}); err != nil {
+		if err := l.processWatchRecord(id, curvers, parvers, &LogValue{Mutation: raw.Mutation{Version: curvers}}); err != nil {
 			return
 		}
 		parvers = []storage.Version{curvers}
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 462a83c..9b84904 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -8,7 +8,7 @@
 	"strings"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/naming"
 	"veyron2/storage"
@@ -67,7 +67,7 @@
 	// Next steps are to tie this up into mount table and auto-discover neighbors.
 	neighbors   []string
 	neighborIDs []string
-	estore      estore.Store
+	store       raw.Store
 
 	updObjects map[storage.ID]*objConflictState
 }
@@ -100,7 +100,7 @@
 	// TODO(hpucha): Merge this with vstore handler in syncd.
 	if syncd.vstoreEndpoint != "" {
 		var err error
-		i.estore, err = estore.BindStore(syncd.vstoreEndpoint + "/.store/sync")
+		i.store, err = raw.BindStore(naming.JoinAddressName(syncd.vstoreEndpoint, raw.RawStoreSuffix))
 		if err != nil {
 			vlog.Fatalf("newInitiator: cannot connect to Veyron store endpoint (%s): %s",
 				syncd.vstoreEndpoint, err)
@@ -420,7 +420,7 @@
 }
 
 // resolveConflicts resolves conflicts for updated objects.
-func (i *syncInitiator) resolveConflicts() ([]estore.Mutation, error) {
+func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
 	switch conflictResolutionPolicy {
 	case useVersion:
 		if err := i.resolveConflictsByVersion(); err != nil {
@@ -430,7 +430,7 @@
 		return nil, fmt.Errorf("unknown conflict resolution policy")
 	}
 
-	var m []estore.Mutation
+	var m []raw.Mutation
 	for _, st := range i.updObjects {
 		// Append to mutations.
 		st.resolvVal.Mutation.PriorVersion = st.oldHead
@@ -460,7 +460,7 @@
 		if err != nil {
 			return err
 		}
-		var m estore.Mutation
+		var m raw.Mutation
 		if lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version {
 			m = lrecs[0].Value.Mutation
 		} else {
@@ -494,7 +494,7 @@
 
 // updateStoreAndSync updates the store, and if that is successful,
 // updates log and dag data structures.
-func (i *syncInitiator) updateStoreAndSync(m []estore.Mutation) error {
+func (i *syncInitiator) updateStoreAndSync(m []raw.Mutation) error {
 	// TODO(hpucha): Eliminate reaching into syncd's lock.
 	i.syncd.lock.Lock()
 	defer i.syncd.lock.Unlock()
@@ -504,8 +504,17 @@
 	// will also hold the lock across PutMutations rpc to prevent
 	// a race with watcher. The next iteration will clean up this
 	// coordination.
-	if i.estore != nil {
-		if err := i.estore.PutMutations(m); err != nil {
+	if i.store != nil {
+		stream, err := i.store.PutMutations()
+		if err != nil {
+			return err
+		}
+		for i := range m {
+			if err := stream.Send(m[i]); err != nil {
+				return err
+			}
+		}
+		if err := stream.Finish(); err != nil {
 			return err
 		}
 	}
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index a524d57..c73e0f1 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -7,7 +7,7 @@
 	"testing"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/storage"
 )
@@ -36,7 +36,7 @@
 		LSN:     100,
 		ObjID:   objID,
 		CurVers: 20,
-		Value:   LogValue{Mutation: estore.Mutation{Version: 20}},
+		Value:   LogValue{Mutation: raw.Mutation{Version: 20}},
 	}
 	if _, err := s.hdlInitiator.getLogRec(objID, expRec.CurVers); err == nil {
 		t.Errorf("GetLogRec didn't fail")
@@ -91,7 +91,7 @@
 			LSN:     LSN(100 + v),
 			ObjID:   objID,
 			CurVers: v,
-			Value:   LogValue{Mutation: estore.Mutation{Version: v, PriorVersion: 500 + v}},
+			Value:   LogValue{Mutation: raw.Mutation{Version: v, PriorVersion: 500 + v}},
 		}
 		logKey, err := s.log.putLogRec(expRec)
 		if err != nil {
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 2df3fc5..783d23f 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -8,7 +8,7 @@
 	"os"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 )
 
 // getFileName generates a filename for a temporary (per unit test) kvdb file.
@@ -135,7 +135,7 @@
 				CurVers: cmd.version,
 				Parents: cmd.parents,
 				Value: LogValue{
-					Mutation: estore.Mutation{Version: cmd.version},
+					Mutation: raw.Mutation{Version: cmd.version},
 				},
 			}
 			stream.add(rec)
diff --git a/runtimes/google/vsync/vsync.idl b/runtimes/google/vsync/vsync.idl
index d470a3a..19f206e 100644
--- a/runtimes/google/vsync/vsync.idl
+++ b/runtimes/google/vsync/vsync.idl
@@ -1,7 +1,7 @@
 package vsync
 
 import (
-  "veyron/services/store/estore"
+  "veyron/services/store/raw"
 
   "veyron2/storage"
 )
@@ -44,7 +44,7 @@
 // LogValue represents an object mutation within a transaction.
 type LogValue struct {
   // Mutation is the store mutation representing the change in the object.
-  Mutation estore.Mutation
+  Mutation raw.Mutation
   // Delete indicates whether the mutation resulted in the object being
   // deleted from the store.
   Delete   bool
diff --git a/runtimes/google/vsync/vsync.idl.go b/runtimes/google/vsync/vsync.idl.go
index e01c0e9..a1958a5 100644
--- a/runtimes/google/vsync/vsync.idl.go
+++ b/runtimes/google/vsync/vsync.idl.go
@@ -4,7 +4,7 @@
 package vsync
 
 import (
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/storage"
 
@@ -32,7 +32,7 @@
 // LogValue represents an object mutation within a transaction.
 type LogValue struct {
 	// Mutation is the store mutation representing the change in the object.
-	Mutation estore.Mutation
+	Mutation raw.Mutation
 	// Delete indicates whether the mutation resulted in the object being
 	// deleted from the store.
 	Delete bool
@@ -266,7 +266,7 @@
 				_gen_wiretype.FieldType{Type: 0x4e, Name: "Tags"},
 				_gen_wiretype.FieldType{Type: 0x50, Name: "Dir"},
 			},
-			"estore.Mutation", []string(nil)},
+			"raw.Mutation", []string(nil)},
 		_gen_wiretype.StructType{
 			[]_gen_wiretype.FieldType{
 				_gen_wiretype.FieldType{Type: 0x51, Name: "Mutation"},
diff --git a/runtimes/google/vsync/vsyncd.go b/runtimes/google/vsync/vsyncd.go
index 654fd10..375581b 100644
--- a/runtimes/google/vsync/vsyncd.go
+++ b/runtimes/google/vsync/vsyncd.go
@@ -10,7 +10,7 @@
 import (
 	"sync"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/ipc"
 	"veyron2/storage"
@@ -112,7 +112,7 @@
 
 	// Register these Watch data types with VOM.
 	// TODO(tilaks): why aren't they auto-retrieved from the IDL?
-	vom.Register(&estore.Mutation{})
+	vom.Register(&raw.Mutation{})
 	vom.Register(&storage.DEntry{})
 
 	// Channel to propagate close event to all threads.
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index ea48129..0617640 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -9,7 +9,7 @@
 	"fmt"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/query"
 	"veyron2/services/watch"
@@ -154,7 +154,7 @@
 	var lastResmark []byte
 	for i := range changes.Changes {
 		ch := &changes.Changes[i]
-		mu, ok := ch.Value.(*estore.Mutation)
+		mu, ok := ch.Value.(*raw.Mutation)
 		if !ok {
 			return fmt.Errorf("invalid change value, not a mutation: %#v", ch)
 		}
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index ab0aa8d..01ae25a 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -11,7 +11,7 @@
 	"testing"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/query"
 	"veyron2/services/watch"
@@ -146,7 +146,7 @@
 		watch.Change{
 			Name:  "/uid/4c6db51aa740d8c62b90df874503e285",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x4c, 0x6d, 0xb5, 0x1a, 0xa7, 0x40, 0xd8, 0xc6,
 					0x2b, 0x90, 0xdf, 0x87, 0x45, 0x3, 0xe2, 0x85},
 				PriorVersion: 0x0,
@@ -167,7 +167,7 @@
 		watch.Change{
 			Name:  "/uid/082bc42e15af4fcf611d7f19a8d7831f",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
 					0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
 				PriorVersion: 0x0,
@@ -188,7 +188,7 @@
 		watch.Change{
 			Name:  "/uid/6e4a327c297d76fb5142b1b1d95b2d07",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb,
 					0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7},
 				PriorVersion: 0x0,
@@ -205,7 +205,7 @@
 		watch.Change{
 			Name:  "/uid/082bc42e15af4fcf611d7f19a8d7831f",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
 					0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
 				PriorVersion: 0x57e9d1860d1d68d8,
@@ -231,7 +231,7 @@
 		watch.Change{
 			Name:  "/uid/70ff65ec0f825f44b69f895eea759d71",
 			State: 0,
-			Value: &estore.Mutation{
+			Value: &raw.Mutation{
 				ID: storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44,
 					0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71},
 				PriorVersion: 0x0,
diff --git a/services/store/estore/service.idl b/services/store/estore/service.idl
deleted file mode 100644
index 4db0f1b..0000000
--- a/services/store/estore/service.idl
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
-Package estore defines an extended interface for the Veyron store.
-
-The extended interface supports synchronizing with remote stores by transporting Mutations.
-*/
-package estore
-
-import (
-  "veyron2/storage"
-  "veyron2/services/store"
-)
-
-// Mutation represents an update to an entry in the store, and contains enough
-// information for a privileged service to replicate the update elsewhere.
-type Mutation struct {
-  // ID is the key that identifies the entry.
-  ID storage.ID
-
-  // The version of the entry immediately before the update. For new entries,
-  // the PriorVersion is NoVersion.
-  PriorVersion storage.Version
-
-  // The version of the entry immediately after the update. For deleted entries,
-  // the Version is NoVersion.
-  Version storage.Version
-
-  // IsRoot is true if
-  // 1) The entry was the store root immediately before being deleted, or
-  // 2) The entry is the store root immediately after the update.
-  IsRoot bool
-
-  // Value is value stored at this entry.
-  Value anydata
-
-  // Tags specify permissions on this entry.
-  Tags storage.TagList
-
-  // Dir is the implicit directory of this entry, and may contain references
-  // to other entries in the store.
-  Dir []storage.DEntry
-}
-
-// Store defines an extended interface for the Veyron store. Mutations can be
-// received via the Watcher interface, and committed via PutMutations().
-type Store interface {
-  store.Store
-
-  // PutMutations puts external mutations in the store, within a transaction.
-  PutMutations(Mutations []Mutation) (Err error)
-}
diff --git a/services/store/estore/service.idl.go b/services/store/estore/service.idl.go
deleted file mode 100644
index 65339b0..0000000
--- a/services/store/estore/service.idl.go
+++ /dev/null
@@ -1,291 +0,0 @@
-// This file was auto-generated by the veyron idl tool.
-// Source: service.idl
-
-/*
-Package estore defines an extended interface for the Veyron store.
-
-The extended interface supports synchronizing with remote stores by transporting Mutations.
-*/
-package estore
-
-import (
-	"veyron2/services/store"
-
-	"veyron2/storage"
-
-	// The non-user imports are prefixed with "_gen_" to prevent collisions.
-	_gen_veyron2 "veyron2"
-	_gen_idl "veyron2/idl"
-	_gen_ipc "veyron2/ipc"
-	_gen_naming "veyron2/naming"
-	_gen_rt "veyron2/rt"
-	_gen_wiretype "veyron2/wiretype"
-)
-
-// Mutation represents an update to an entry in the store, and contains enough
-// information for a privileged service to replicate the update elsewhere.
-type Mutation struct {
-	// ID is the key that identifies the entry.
-	ID storage.ID
-	// The version of the entry immediately before the update. For new entries,
-	// the PriorVersion is NoVersion.
-	PriorVersion storage.Version
-	// The version of the entry immediately after the update. For deleted entries,
-	// the Version is NoVersion.
-	Version storage.Version
-	// IsRoot is true if
-	// 1) The entry was the store root immediately before being deleted, or
-	// 2) The entry is the store root immediately after the update.
-	IsRoot bool
-	// Value is value stored at this entry.
-	Value _gen_idl.AnyData
-	// Tags specify permissions on this entry.
-	Tags storage.TagList
-	// Dir is the implicit directory of this entry, and may contain references
-	// to other entries in the store.
-	Dir []storage.DEntry
-}
-
-// Store defines an extended interface for the Veyron store. Mutations can be
-// received via the Watcher interface, and committed via PutMutations().
-// Store is the interface the client binds and uses.
-// Store_InternalNoTagGetter is the interface without the TagGetter
-// and UnresolveStep methods (both framework-added, rathern than user-defined),
-// to enable embedding without method collisions.  Not to be used directly by
-// clients.
-type Store_InternalNoTagGetter interface {
-	store.Store_InternalNoTagGetter
-
-	// PutMutations puts external mutations in the store, within a transaction.
-	PutMutations(Mutations []Mutation, opts ..._gen_ipc.ClientCallOpt) (err error)
-}
-type Store interface {
-	_gen_idl.TagGetter
-	// UnresolveStep returns the names for the remote service, rooted at the
-	// service's immediate namespace ancestor.
-	UnresolveStep(opts ..._gen_ipc.ClientCallOpt) ([]string, error)
-	Store_InternalNoTagGetter
-}
-
-// StoreService is the interface the server implements.
-type StoreService interface {
-	store.StoreService
-
-	// PutMutations puts external mutations in the store, within a transaction.
-	PutMutations(context _gen_ipc.Context, Mutations []Mutation) (err error)
-}
-
-// BindStore returns the client stub implementing the Store
-// interface.
-//
-// If no _gen_ipc.Client is specified, the default _gen_ipc.Client in the
-// global Runtime is used.
-func BindStore(name string, opts ..._gen_ipc.BindOpt) (Store, error) {
-	var client _gen_ipc.Client
-	switch len(opts) {
-	case 0:
-		client = _gen_rt.R().Client()
-	case 1:
-		switch o := opts[0].(type) {
-		case _gen_veyron2.Runtime:
-			client = o.Client()
-		case _gen_ipc.Client:
-			client = o
-		default:
-			return nil, _gen_idl.ErrUnrecognizedOption
-		}
-	default:
-		return nil, _gen_idl.ErrTooManyOptionsToBind
-	}
-	stub := &clientStubStore{client: client, name: name}
-	stub.Store_InternalNoTagGetter, _ = store.BindStore(name, client)
-
-	return stub, nil
-}
-
-// NewServerStore creates a new server stub.
-//
-// It takes a regular server implementing the StoreService
-// interface, and returns a new server stub.
-func NewServerStore(server StoreService) interface{} {
-	return &ServerStubStore{
-		ServerStubStore: *store.NewServerStore(server).(*store.ServerStubStore),
-		service:         server,
-	}
-}
-
-// clientStubStore implements Store.
-type clientStubStore struct {
-	store.Store_InternalNoTagGetter
-
-	client _gen_ipc.Client
-	name   string
-}
-
-func (c *clientStubStore) GetMethodTags(method string) []interface{} {
-	return GetStoreMethodTags(method)
-}
-
-func (__gen_c *clientStubStore) PutMutations(Mutations []Mutation, opts ..._gen_ipc.ClientCallOpt) (err error) {
-	var call _gen_ipc.ClientCall
-	if call, err = __gen_c.client.StartCall(__gen_c.name, "PutMutations", []interface{}{Mutations}, opts...); err != nil {
-		return
-	}
-	if ierr := call.Finish(&err); ierr != nil {
-		err = ierr
-	}
-	return
-}
-
-func (c *clientStubStore) UnresolveStep(opts ..._gen_ipc.ClientCallOpt) (reply []string, err error) {
-	var call _gen_ipc.ClientCall
-	if call, err = c.client.StartCall(c.name, "UnresolveStep", nil, opts...); err != nil {
-		return
-	}
-	if ierr := call.Finish(&reply, &err); ierr != nil {
-		err = ierr
-	}
-	return
-}
-
-// ServerStubStore wraps a server that implements
-// StoreService and provides an object that satisfies
-// the requirements of veyron2/ipc.ReflectInvoker.
-type ServerStubStore struct {
-	store.ServerStubStore
-
-	service StoreService
-}
-
-func (s *ServerStubStore) GetMethodTags(method string) []interface{} {
-	return GetStoreMethodTags(method)
-}
-
-func (s *ServerStubStore) Signature(call _gen_ipc.ServerCall) (_gen_ipc.ServiceSignature, error) {
-	result := _gen_ipc.ServiceSignature{Methods: make(map[string]_gen_ipc.MethodSignature)}
-	result.Methods["PutMutations"] = _gen_ipc.MethodSignature{
-		InArgs: []_gen_ipc.MethodArgument{
-			{Name: "Mutations", Type: 76},
-		},
-		OutArgs: []_gen_ipc.MethodArgument{
-			{Name: "Err", Type: 77},
-		},
-	}
-
-	result.TypeDefs = []_gen_idl.AnyData{
-		_gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "byte", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x41, Len: 0x10, Name: "storage.ID", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x35, Name: "storage.Version", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "anydata", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "TagOp", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x41, Len: 0x10, Name: "ID", Tags: []string(nil)}, _gen_wiretype.StructType{
-			[]_gen_wiretype.FieldType{
-				_gen_wiretype.FieldType{Type: 0x45, Name: "Op"},
-				_gen_wiretype.FieldType{Type: 0x46, Name: "ACL"},
-			},
-			"Tag", []string(nil)},
-		_gen_wiretype.SliceType{Elem: 0x47, Name: "storage.TagList", Tags: []string(nil)}, _gen_wiretype.StructType{
-			[]_gen_wiretype.FieldType{
-				_gen_wiretype.FieldType{Type: 0x3, Name: "Name"},
-				_gen_wiretype.FieldType{Type: 0x46, Name: "ID"},
-			},
-			"storage.DEntry", []string(nil)},
-		_gen_wiretype.SliceType{Elem: 0x49, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
-			[]_gen_wiretype.FieldType{
-				_gen_wiretype.FieldType{Type: 0x42, Name: "ID"},
-				_gen_wiretype.FieldType{Type: 0x43, Name: "PriorVersion"},
-				_gen_wiretype.FieldType{Type: 0x43, Name: "Version"},
-				_gen_wiretype.FieldType{Type: 0x2, Name: "IsRoot"},
-				_gen_wiretype.FieldType{Type: 0x44, Name: "Value"},
-				_gen_wiretype.FieldType{Type: 0x48, Name: "Tags"},
-				_gen_wiretype.FieldType{Type: 0x4a, Name: "Dir"},
-			},
-			"Mutation", []string(nil)},
-		_gen_wiretype.SliceType{Elem: 0x4b, Name: "", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}}
-	var ss _gen_ipc.ServiceSignature
-	var firstAdded int
-	ss, _ = s.ServerStubStore.Signature(call)
-	firstAdded = len(result.TypeDefs)
-	for k, v := range ss.Methods {
-		for i, _ := range v.InArgs {
-			if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
-				v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
-			}
-		}
-		for i, _ := range v.OutArgs {
-			if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
-				v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
-			}
-		}
-		if v.InStream >= _gen_wiretype.TypeIDFirst {
-			v.InStream += _gen_wiretype.TypeID(firstAdded)
-		}
-		if v.OutStream >= _gen_wiretype.TypeIDFirst {
-			v.OutStream += _gen_wiretype.TypeID(firstAdded)
-		}
-		result.Methods[k] = v
-	}
-	//TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
-	for _, d := range ss.TypeDefs {
-		switch wt := d.(type) {
-		case _gen_wiretype.SliceType:
-			if wt.Elem >= _gen_wiretype.TypeIDFirst {
-				wt.Elem += _gen_wiretype.TypeID(firstAdded)
-			}
-			d = wt
-		case _gen_wiretype.ArrayType:
-			if wt.Elem >= _gen_wiretype.TypeIDFirst {
-				wt.Elem += _gen_wiretype.TypeID(firstAdded)
-			}
-			d = wt
-		case _gen_wiretype.MapType:
-			if wt.Key >= _gen_wiretype.TypeIDFirst {
-				wt.Key += _gen_wiretype.TypeID(firstAdded)
-			}
-			if wt.Elem >= _gen_wiretype.TypeIDFirst {
-				wt.Elem += _gen_wiretype.TypeID(firstAdded)
-			}
-			d = wt
-		case _gen_wiretype.StructType:
-			for _, fld := range wt.Fields {
-				if fld.Type >= _gen_wiretype.TypeIDFirst {
-					fld.Type += _gen_wiretype.TypeID(firstAdded)
-				}
-			}
-			d = wt
-		}
-		result.TypeDefs = append(result.TypeDefs, d)
-	}
-
-	return result, nil
-}
-
-func (s *ServerStubStore) UnresolveStep(call _gen_ipc.ServerCall) (reply []string, err error) {
-	if unresolver, ok := s.service.(_gen_ipc.Unresolver); ok {
-		return unresolver.UnresolveStep(call)
-	}
-	if call.Server() == nil {
-		return
-	}
-	var published []string
-	if published, err = call.Server().Published(); err != nil || published == nil {
-		return
-	}
-	reply = make([]string, len(published))
-	for i, p := range published {
-		reply[i] = _gen_naming.Join(p, call.Name())
-	}
-	return
-}
-
-func (__gen_s *ServerStubStore) PutMutations(call _gen_ipc.ServerCall, Mutations []Mutation) (err error) {
-	err = __gen_s.service.PutMutations(call, Mutations)
-	return
-}
-
-func GetStoreMethodTags(method string) []interface{} {
-	if resp := store.GetStoreMethodTags(method); resp != nil {
-		return resp
-	}
-	switch method {
-	case "PutMutations":
-		return []interface{}{}
-	default:
-		return nil
-	}
-}
diff --git a/services/store/memstore/blackbox/sync_integration_test.go b/services/store/memstore/blackbox/sync_integration_test.go
index 6002bd0..906f701 100644
--- a/services/store/memstore/blackbox/sync_integration_test.go
+++ b/services/store/memstore/blackbox/sync_integration_test.go
@@ -50,7 +50,7 @@
 		t.Fatalf("Recv() failed: %v", err)
 	}
 	// Update target
-	target.PutMutations(Mutations(cb.Changes))
+	PutMutations(t, target, Mutations(cb.Changes))
 	GC(t, target)
 
 	// Expect that the target contains id1 and id2
@@ -83,7 +83,7 @@
 		t.Fatalf("Recv() failed: %v", err)
 	}
 	// Update target
-	target.PutMutations(Mutations(cb.Changes))
+	PutMutations(t, target, Mutations(cb.Changes))
 	GC(t, target)
 
 	// Expect that the target contains id1, id2, id3
@@ -101,7 +101,7 @@
 		t.Fatalf("Recv() failed: %v", err)
 	}
 	// Update target
-	target.PutMutations(Mutations(cb.Changes))
+	PutMutations(t, target, Mutations(cb.Changes))
 	GC(t, target)
 
 	// Expect that the target contains id1, id2, but not id3
diff --git a/services/store/memstore/blackbox/util.go b/services/store/memstore/blackbox/util.go
index 3ae6f13..d5e5636 100644
--- a/services/store/memstore/blackbox/util.go
+++ b/services/store/memstore/blackbox/util.go
@@ -10,9 +10,9 @@
 	"testing"
 	"time"
 
-	"veyron/services/store/estore"
 	"veyron/services/store/memstore"
 	memwatch "veyron/services/store/memstore/watch"
+	"veyron/services/store/raw"
 	"veyron/services/store/service"
 
 	"veyron2/ipc"
@@ -24,6 +24,7 @@
 var (
 	rootPublicID security.PublicID = security.FakePublicID("root")
 	rootCtx      ipc.Context       = rootContext{}
+	nullMutation                   = raw.Mutation{}
 )
 
 type rootContext struct{}
@@ -226,10 +227,39 @@
 	return &testWatcherWatchStream{c, done}
 }
 
-func Mutations(changes []watch.Change) []estore.Mutation {
-	mutations := make([]estore.Mutation, len(changes))
+// putMutationsStream implements raw.StoreServicePutMutationsStream.
+type putMutationsStream struct {
+	mus   []raw.Mutation
+	index int
+}
+
+func newPutMutationsStream(mus []raw.Mutation) raw.StoreServicePutMutationsStream {
+	return &putMutationsStream{
+		mus: mus,
+	}
+}
+
+func (s *putMutationsStream) Recv() (raw.Mutation, error) {
+	if s.index < len(s.mus) {
+		index := s.index
+		s.index++
+		return s.mus[index], nil
+	}
+	return nullMutation, io.EOF
+}
+
+func PutMutations(t *testing.T, st *memstore.Store, mus []raw.Mutation) {
+	stream := newPutMutationsStream(mus)
+	if err := st.PutMutations(rootCtx, stream); err != nil {
+		_, file, line, _ := runtime.Caller(1)
+		t.Errorf("%s(%d): can't put mutations %s: %s", file, line, mus, err)
+	}
+}
+
+func Mutations(changes []watch.Change) []raw.Mutation {
+	mutations := make([]raw.Mutation, len(changes))
 	for i, change := range changes {
-		mutations[i] = *(change.Value.(*estore.Mutation))
+		mutations[i] = *(change.Value.(*raw.Mutation))
 	}
 	return mutations
 }
diff --git a/services/store/memstore/state/mutable_snapshot.go b/services/store/memstore/state/mutable_snapshot.go
index eb361ef..c95d9cd 100644
--- a/services/store/memstore/state/mutable_snapshot.go
+++ b/services/store/memstore/state/mutable_snapshot.go
@@ -4,10 +4,10 @@
 	"errors"
 	"fmt"
 
-	"veyron/services/store/estore"
 	"veyron/services/store/memstore/acl"
 	"veyron/services/store/memstore/field"
 	"veyron/services/store/memstore/refs"
+	"veyron/services/store/raw"
 
 	"veyron/runtimes/google/lib/functional"
 	"veyron2/security"
@@ -104,15 +104,16 @@
 
 var (
 	// TODO(tilaks): don't expose errors, use verror instead.
-	ErrBadPath            = errors.New("malformed path")
-	ErrTypeMismatch       = errors.New("type mismatch")
-	ErrNotFound           = errors.New("not found")
-	ErrBadRef             = errors.New("value has dangling references")
-	ErrCantUnlinkByID     = errors.New("can't unlink entries by ID")
-	ErrPreconditionFailed = errors.New("precondition failed")
-	ErrIDsDoNotMatch      = errors.New("IDs do not match")
-	ErrPermissionDenied   = errors.New("permission denied") // TODO(tilaks): can permission denied leak store structure?
-	ErrNotTagList         = errors.New("not a TagList")
+	ErrBadPath              = errors.New("malformed path")
+	ErrTypeMismatch         = errors.New("type mismatch")
+	ErrNotFound             = errors.New("not found")
+	ErrBadRef               = errors.New("value has dangling references")
+	ErrCantUnlinkByID       = errors.New("can't unlink entries by ID")
+	ErrPreconditionFailed   = errors.New("precondition failed")
+	ErrIDsDoNotMatch        = errors.New("IDs do not match")
+	ErrPermissionDenied     = errors.New("permission denied") // TODO(tilaks): can permission denied leak store structure?
+	ErrNotTagList           = errors.New("not a TagList")
+	ErrDuplicatePutMutation = errors.New("duplicate calls to PutMutation for the same ID")
 
 	nullID storage.ID
 )
@@ -587,34 +588,37 @@
 	return err
 }
 
-// PutMutations puts some externally constructed mutations. Does not update
-// cells or refs, so regular Puts, Gets and Removes may be inconsistent.
-func (sn *MutableSnapshot) PutMutations(extmu []estore.Mutation) {
-	mu := sn.mutations
-	for _, extm := range extmu {
-		id := extm.ID
-		// If the object has no version, it was deleted.
-		if extm.Version == storage.NoVersion {
-			mu.Deletions[id] = extm.PriorVersion
-			if extm.IsRoot {
-				mu.SetRootID = true
-				mu.RootID = nullID
-			}
-			continue
-		}
-		if extm.IsRoot {
-			mu.SetRootID = true
-			mu.RootID = id
-		}
-		mu.Preconditions[id] = extm.PriorVersion
-		m := &Mutation{
-			Postcondition: extm.Version,
-			Value:         extm.Value,
-			Dir:           unflattenDir(extm.Dir),
-		}
-		m.UpdateRefs()
-		mu.Delta[id] = m
+// PutMutation puts an externally constructed mutation. Does not update cells
+// or refs, so regular Puts, Gets and Removes may be inconsistent.
+func (sn *MutableSnapshot) PutMutation(extmu raw.Mutation) error {
+	mus := sn.mutations
+	id := extmu.ID
+	// Check that a mutation has not already been put for this id.
+	if _, ok := mus.Delta[id]; ok {
+		return ErrDuplicatePutMutation
 	}
+	// If the object has no version, it was deleted.
+	if extmu.Version == storage.NoVersion {
+		mus.Deletions[id] = extmu.PriorVersion
+		if extmu.IsRoot {
+			mus.SetRootID = true
+			mus.RootID = nullID
+		}
+		return nil
+	}
+	if extmu.IsRoot {
+		mus.SetRootID = true
+		mus.RootID = id
+	}
+	mus.Preconditions[id] = extmu.PriorVersion
+	mu := &Mutation{
+		Postcondition: extmu.Version,
+		Value:         extmu.Value,
+		Dir:           unflattenDir(extmu.Dir),
+	}
+	mu.UpdateRefs()
+	mus.Delta[id] = mu
+	return nil
 }
 
 // TODO(tilaks): revisit when vsync.Mutation.Dir is of type []*storage.DEntry
diff --git a/services/store/memstore/store.go b/services/store/memstore/store.go
index 5730236..432ef44 100644
--- a/services/store/memstore/store.go
+++ b/services/store/memstore/store.go
@@ -1,13 +1,17 @@
 package memstore
 
 import (
+	"errors"
+	"io"
+
 	"veyron/runtimes/google/lib/sync"
 
-	"veyron/services/store/estore"
 	iquery "veyron/services/store/memstore/query"
 	"veyron/services/store/memstore/state"
+	"veyron/services/store/raw"
 	"veyron/services/store/service"
 
+	"veyron2/ipc"
 	"veyron2/query"
 	"veyron2/security"
 	"veyron2/storage"
@@ -28,6 +32,10 @@
 // Store implements the service.Store interface.
 var _ service.Store = (*Store)(nil)
 
+var (
+	ErrRequestCancelled = errors.New("request cancelled")
+)
+
 // New creates a new store.  admin is the public ID of the administrator, dbName
 // is the path of the database directory, to which logs are written.
 func New(admin security.PublicID, dbName string) (*Store, error) {
@@ -141,6 +149,32 @@
 	return nil
 }
 
+// PutMutations atomically commits a stream of Mutations when the stream is
+// closed. Mutations are not committed if the request is cancelled before the
+// stream has been closed.
+func (st *Store) PutMutations(ctx ipc.Context, stream raw.StoreServicePutMutationsStream) error {
+	tr := st.newNilTransaction()
+	for {
+		mu, err := stream.Recv()
+		if err == io.EOF {
+			if ctx.IsClosed() {
+				tr.Abort()
+				return ErrRequestCancelled
+			}
+			break
+		}
+		if err != nil {
+			tr.Abort()
+			return err
+		}
+		if err := tr.snapshot.PutMutation(mu); err != nil {
+			tr.Abort()
+			return err
+		}
+	}
+	return tr.Commit()
+}
+
 // Glob returns an iterator that emits all values that match the given pattern.
 func (st *Store) Glob(clientID security.PublicID, tr storage.Transaction, pattern string) (service.GlobStream, error) {
 	sn, err := st.GetTransactionSnapshot(tr)
@@ -159,11 +193,3 @@
 func (st *Store) SetConflictResolver(ty string, r storage.ConflictResolver) {
 	panic("not implemented")
 }
-
-// PutMutations puts external mutations in the store, within a transaction.
-// TODO(tilaks): verify PutMutations requests.
-func (st *Store) PutMutations(mu []estore.Mutation) error {
-	tr := st.newNilTransaction()
-	tr.snapshot.PutMutations(mu)
-	return tr.Commit()
-}
diff --git a/services/store/memstore/store_test.go b/services/store/memstore/store_test.go
index 842a7b9..6024504 100644
--- a/services/store/memstore/store_test.go
+++ b/services/store/memstore/store_test.go
@@ -5,8 +5,8 @@
 	"os"
 	"testing"
 
-	"veyron/services/store/estore"
 	"veyron/services/store/memstore/state"
+	"veyron/services/store/raw"
 
 	"veyron2/storage"
 )
@@ -133,7 +133,7 @@
 	// Create the state.
 	st, err := New(rootPublicID, dbName)
 	if err != nil {
-		t.Fatalf("newState() failed: %v", err)
+		t.Fatalf("New() failed: %v", err)
 	}
 
 	// Add /, /a, /a/b
@@ -142,8 +142,8 @@
 	post1, post2, post3 := storage.NewVersion(), storage.NewVersion(), storage.NewVersion()
 	v1, v2, v3 := "v1", "v2", "v3"
 
-	mu := []estore.Mutation{
-		estore.Mutation{
+	putMutationsBatch(t, st, []raw.Mutation{
+		raw.Mutation{
 			ID:           id1,
 			PriorVersion: pre1,
 			Version:      post1,
@@ -151,7 +151,7 @@
 			Value:        v1,
 			Dir:          dir("a", id2),
 		},
-		estore.Mutation{
+		raw.Mutation{
 			ID:           id2,
 			PriorVersion: pre2,
 			Version:      post2,
@@ -159,7 +159,7 @@
 			Value:        v2,
 			Dir:          dir("b", id3),
 		},
-		estore.Mutation{
+		raw.Mutation{
 			ID:           id3,
 			PriorVersion: pre3,
 			Version:      post3,
@@ -167,8 +167,7 @@
 			Value:        v3,
 			Dir:          empty,
 		},
-	}
-	st.PutMutations(mu)
+	})
 
 	expectValue(t, st, nil, "/", v1)
 	expectValue(t, st, nil, "/a", v2)
@@ -178,17 +177,14 @@
 	pre1, pre2, pre3 = post1, post2, post3
 	post2 = storage.NewVersion()
 
-	mu = []estore.Mutation{
-		estore.Mutation{
-			ID:           id2,
-			PriorVersion: pre2,
-			Version:      post2,
-			IsRoot:       false,
-			Value:        v2,
-			Dir:          empty,
-		},
-	}
-	st.PutMutations(mu)
+	putMutationsBatch(t, st, []raw.Mutation{raw.Mutation{
+		ID:           id2,
+		PriorVersion: pre2,
+		Version:      post2,
+		IsRoot:       false,
+		Value:        v2,
+		Dir:          empty,
+	}})
 
 	expectValue(t, st, nil, "/", v1)
 	expectValue(t, st, nil, "/a", v2)
@@ -197,15 +193,12 @@
 	// Garbage-collect /a/b
 	post3 = storage.NoVersion
 
-	mu = []estore.Mutation{
-		estore.Mutation{
-			ID:           id3,
-			PriorVersion: pre3,
-			Version:      post3,
-			IsRoot:       false,
-		},
-	}
-	st.PutMutations(mu)
+	putMutationsBatch(t, st, []raw.Mutation{raw.Mutation{
+		ID:           id3,
+		PriorVersion: pre3,
+		Version:      post3,
+		IsRoot:       false,
+	}})
 
 	expectValue(t, st, nil, "/", v1)
 	expectValue(t, st, nil, "/a", v2)
@@ -215,15 +208,12 @@
 	pre1, pre2, pre3 = post1, post2, post3
 	post1 = storage.NoVersion
 
-	mu = []estore.Mutation{
-		estore.Mutation{
-			ID:           id1,
-			PriorVersion: pre1,
-			Version:      post1,
-			IsRoot:       true,
-		},
-	}
-	st.PutMutations(mu)
+	putMutationsBatch(t, st, []raw.Mutation{raw.Mutation{
+		ID:           id1,
+		PriorVersion: pre1,
+		Version:      post1,
+		IsRoot:       true,
+	}})
 
 	expectNotExists(t, st, nil, "/")
 	expectNotExists(t, st, nil, "/a")
@@ -232,15 +222,12 @@
 	// Garbage-collect /a
 	post2 = storage.NoVersion
 
-	mu = []estore.Mutation{
-		estore.Mutation{
-			ID:           id2,
-			PriorVersion: pre2,
-			Version:      post2,
-			IsRoot:       false,
-		},
-	}
-	st.PutMutations(mu)
+	putMutationsBatch(t, st, []raw.Mutation{raw.Mutation{
+		ID:           id2,
+		PriorVersion: pre2,
+		Version:      post2,
+		IsRoot:       false,
+	}})
 
 	expectNotExists(t, st, nil, "/")
 	expectNotExists(t, st, nil, "/a")
@@ -257,7 +244,7 @@
 	// Create the state.
 	st, err := New(rootPublicID, dbName)
 	if err != nil {
-		t.Fatalf("newState() failed: %v", err)
+		t.Fatalf("New() failed: %v", err)
 	}
 
 	// Add /, /a
@@ -266,8 +253,8 @@
 	post1, post2 := storage.NewVersion(), storage.NewVersion()
 	v1, v2 := "v1", "v2"
 
-	mu := []estore.Mutation{
-		estore.Mutation{
+	putMutationsBatch(t, st, []raw.Mutation{
+		raw.Mutation{
 			ID:           id1,
 			PriorVersion: pre1,
 			Version:      post1,
@@ -275,7 +262,7 @@
 			Value:        v1,
 			Dir:          dir("a", id2),
 		},
-		estore.Mutation{
+		raw.Mutation{
 			ID:           id2,
 			PriorVersion: pre2,
 			Version:      post2,
@@ -283,8 +270,7 @@
 			Value:        v2,
 			Dir:          empty,
 		},
-	}
-	st.PutMutations(mu)
+	})
 
 	expectValue(t, st, nil, "/", v1)
 	expectValue(t, st, nil, "/a", v2)
@@ -294,20 +280,87 @@
 	post2 = storage.NewVersion()
 	v2 = "v4"
 
-	mu = []estore.Mutation{estore.Mutation{
+	s := putMutations(st)
+	s.Send(raw.Mutation{
 		ID:           id2,
 		PriorVersion: pre2,
 		Version:      post2,
 		IsRoot:       true,
 		Value:        v2,
 		Dir:          empty,
-	}}
-	if err := st.PutMutations(mu); err != state.ErrPreconditionFailed {
-		t.Fatal("Expected precondition to fail")
+	})
+	if err := s.Finish(); err != state.ErrPreconditionFailed {
+		t.Fatalf("Expected precondition to fail")
 	}
 
 }
 
+func TestPutDuplicateMutations(t *testing.T) {
+	dbName, err := ioutil.TempDir(os.TempDir(), "vstore")
+	if err != nil {
+		t.Fatalf("ioutil.TempDir() failed: %v", err)
+	}
+	defer os.RemoveAll(dbName)
+
+	// Create the state.
+	st, err := New(rootPublicID, dbName)
+	if err != nil {
+		t.Fatalf("New() failed: %v", err)
+	}
+
+	id := storage.NewID()
+	s := putMutations(st)
+	s.Send(raw.Mutation{
+		ID:           id,
+		PriorVersion: storage.NoVersion,
+		Version:      storage.NewVersion(),
+		IsRoot:       true,
+		Value:        "v1",
+		Dir:          empty,
+	})
+	s.Send(raw.Mutation{
+		ID:           id,
+		PriorVersion: storage.NoVersion,
+		Version:      storage.NewVersion(),
+		IsRoot:       true,
+		Value:        "v2",
+		Dir:          empty,
+	})
+	if err := s.Finish(); err != state.ErrDuplicatePutMutation {
+		t.Fatalf("Expected precondition to fail")
+	}
+}
+
+func TestCancelPutMutation(t *testing.T) {
+	dbName, err := ioutil.TempDir(os.TempDir(), "vstore")
+	if err != nil {
+		t.Fatalf("ioutil.TempDir() failed: %v", err)
+	}
+	defer os.RemoveAll(dbName)
+
+	// Create the state.
+	st, err := New(rootPublicID, dbName)
+	if err != nil {
+		t.Fatalf("New() failed: %v", err)
+	}
+
+	s := putMutations(st)
+	s.Send(raw.Mutation{
+		ID:           storage.NewID(),
+		PriorVersion: storage.NoVersion,
+		Version:      storage.NewVersion(),
+		IsRoot:       true,
+		Value:        "v1",
+		Dir:          empty,
+	})
+	s.Cancel()
+	if err := s.Finish(); err != ErrRequestCancelled {
+		t.Fatalf("Expected request to be cancelled")
+	}
+
+	expectNotExists(t, st, nil, "/")
+}
+
 var (
 	empty = []storage.DEntry{}
 )
diff --git a/services/store/memstore/util_test.go b/services/store/memstore/util_test.go
index 70c7563..31e7174 100644
--- a/services/store/memstore/util_test.go
+++ b/services/store/memstore/util_test.go
@@ -1,12 +1,152 @@
 package memstore
 
 import (
+	"io"
 	"runtime"
 	"testing"
+	"time"
 
+	"veyron/services/store/raw"
+
+	"veyron2/ipc"
+	"veyron2/security"
 	"veyron2/storage"
 )
 
+// cancellableContext implements ipc.Context.
+type cancellableContext struct {
+	cancelled chan struct{}
+}
+
+func newCancellableContext() *cancellableContext {
+	return &cancellableContext{cancelled: make(chan struct{})}
+}
+
+func (*cancellableContext) Server() ipc.Server {
+	return nil
+}
+
+func (*cancellableContext) Method() string {
+	return ""
+}
+
+func (*cancellableContext) Name() string {
+	return ""
+}
+
+func (*cancellableContext) Suffix() string {
+	return ""
+}
+
+func (*cancellableContext) Label() (l security.Label) {
+	return
+}
+
+func (*cancellableContext) CaveatDischarges() security.CaveatDischargeMap {
+	return nil
+}
+
+func (*cancellableContext) LocalID() security.PublicID {
+	return rootPublicID
+}
+
+func (*cancellableContext) RemoteID() security.PublicID {
+	return rootPublicID
+}
+
+func (*cancellableContext) Deadline() (t time.Time) {
+	return
+}
+
+func (ctx *cancellableContext) IsClosed() bool {
+	select {
+	case <-ctx.cancelled:
+		return true
+	default:
+		return false
+	}
+}
+
+func (ctx *cancellableContext) Closed() <-chan struct{} {
+	return ctx.cancelled
+}
+
+// cancel synchronously closes the context. After cancel returns, calls to
+// IsClosed will return true and the stream returned by Closed will be closed.
+func (ctx *cancellableContext) cancel() {
+	close(ctx.cancelled)
+}
+
+// serverStream implements raw.StoreServicePutMutationsStream
+type serverStream struct {
+	mus <-chan raw.Mutation
+}
+
+func (s *serverStream) Recv() (raw.Mutation, error) {
+	mu, ok := <-s.mus
+	if !ok {
+		return mu, io.EOF
+	}
+	return mu, nil
+}
+
+// clientStream implements raw.StorePutMutationsStream
+type clientStream struct {
+	ctx    ipc.Context
+	closed bool
+	mus    chan<- raw.Mutation
+	err    <-chan error
+}
+
+func (s *clientStream) Send(mu raw.Mutation) error {
+	s.mus <- mu
+	return nil
+}
+
+func (s *clientStream) CloseSend() error {
+	if !s.closed {
+		s.closed = true
+		close(s.mus)
+	}
+	return nil
+}
+
+func (s *clientStream) Finish() error {
+	s.CloseSend()
+	return <-s.err
+}
+
+func (s *clientStream) Cancel() {
+	s.ctx.(*cancellableContext).cancel()
+	s.CloseSend()
+}
+
+func putMutations(st *Store) raw.StorePutMutationsStream {
+	ctx := newCancellableContext()
+	mus := make(chan raw.Mutation)
+	err := make(chan error)
+	go func() {
+		err <- st.PutMutations(ctx, &serverStream{mus})
+		close(err)
+	}()
+	return &clientStream{
+		ctx: ctx,
+		mus: mus,
+		err: err,
+	}
+}
+
+func putMutationsBatch(t *testing.T, st *Store, mus []raw.Mutation) {
+	clientStream := putMutations(st)
+	for _, mu := range mus {
+		clientStream.Send(mu)
+	}
+	if err := clientStream.Finish(); err != nil {
+		_, file, line, _ := runtime.Caller(1)
+		t.Errorf("%s(%d): can't put mutations %s: %s", file, line, mus, err)
+	}
+}
+
 func mkdir(t *testing.T, st *Store, tr storage.Transaction, path string) (storage.ID, interface{}) {
 	_, file, line, _ := runtime.Caller(1)
 	dir := &Dir{}
diff --git a/services/store/memstore/watch/sync_processor.go b/services/store/memstore/watch/sync_processor.go
index 847d9e8..15a123c 100644
--- a/services/store/memstore/watch/sync_processor.go
+++ b/services/store/memstore/watch/sync_processor.go
@@ -4,9 +4,9 @@
 	"errors"
 	"fmt"
 
-	"veyron/services/store/estore"
 	"veyron/services/store/memstore/refs"
 	"veyron/services/store/memstore/state"
+	"veyron/services/store/raw"
 
 	"veyron2/security"
 	"veyron2/services/watch"
@@ -79,7 +79,7 @@
 		if isRoot {
 			p.rootVersion = cell.Version
 		}
-		value := &estore.Mutation{
+		value := &raw.Mutation{
 			ID:           id,
 			PriorVersion: storage.NoVersion,
 			Version:      cell.Version,
@@ -123,7 +123,7 @@
 			p.rootID = mus.RootID
 		} else {
 			// The root was deleted, prepare a deletion change.
-			value := &estore.Mutation{
+			value := &raw.Mutation{
 				ID:           p.rootID,
 				PriorVersion: p.rootVersion,
 				Version:      storage.NoVersion,
@@ -153,7 +153,7 @@
 		if isRoot {
 			p.rootVersion = mu.Postcondition
 		}
-		value := &estore.Mutation{
+		value := &raw.Mutation{
 			ID:           id,
 			PriorVersion: mus.Preconditions[id],
 			Version:      mu.Postcondition,
@@ -180,7 +180,7 @@
 			delete(p.preparedDeletions, id)
 			continue
 		}
-		value := &estore.Mutation{
+		value := &raw.Mutation{
 			ID:           id,
 			PriorVersion: precondition,
 			Version:      storage.NoVersion,
diff --git a/services/store/memstore/watch/test_util.go b/services/store/memstore/watch/test_util.go
index ae20bd9..29ef2ee 100644
--- a/services/store/memstore/watch/test_util.go
+++ b/services/store/memstore/watch/test_util.go
@@ -7,8 +7,8 @@
 	"testing"
 	"time"
 
-	"veyron/services/store/estore"
 	"veyron/services/store/memstore"
+	"veyron/services/store/raw"
 	"veyron/services/store/service"
 
 	"veyron2/ipc"
@@ -252,7 +252,7 @@
 	if change.State != watch.Exists {
 		t.Fatalf("Expected id to exist: %v", id)
 	}
-	cv := change.Value.(*estore.Mutation)
+	cv := change.Value.(*raw.Mutation)
 	if cv.PriorVersion != pre {
 		t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
 	}
@@ -273,7 +273,7 @@
 	if change.State != watch.DoesNotExist {
 		t.Fatalf("Expected id to not exist: %v", id)
 	}
-	cv := change.Value.(*estore.Mutation)
+	cv := change.Value.(*raw.Mutation)
 	if cv.PriorVersion != pre {
 		t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
 	}
@@ -293,7 +293,7 @@
 
 func findChange(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
 	for _, change := range changes {
-		cv, ok := change.Value.(*estore.Mutation)
+		cv, ok := change.Value.(*raw.Mutation)
 		if !ok {
 			t.Fatal("Expected a Mutation")
 		}
diff --git a/services/store/raw/service.idl b/services/store/raw/service.idl
new file mode 100644
index 0000000..ec3f695
--- /dev/null
+++ b/services/store/raw/service.idl
@@ -0,0 +1,57 @@
+/*
+Package raw defines a raw interface for the Veyron store.
+The raw interface supports synchronizing with remote stores by transporting Mutations.
+*/
+package raw
+
+import (
+  "veyron2/storage"
+  "veyron2/services/watch"
+)
+
+const (
+  // The raw Store has Veyron name "<mount>/.store.raw", where <mount> is the
+  // Veyron name of the mount point.
+  RawStoreSuffix = ".store.raw"
+)
+
+// Mutation represents an update to an entry in the store, and contains enough
+// information for a privileged service to replicate the update elsewhere.
+type Mutation struct {
+  // ID is the key that identifies the entry.
+  ID storage.ID
+
+  // The version of the entry immediately before the update. For new entries,
+  // the PriorVersion is NoVersion.
+  PriorVersion storage.Version
+
+  // The version of the entry immediately after the update. For deleted entries,
+  // the Version is NoVersion.
+  Version storage.Version
+
+  // IsRoot is true if
+  // 1) The entry was the store root immediately before being deleted, or
+  // 2) The entry is the store root immediately after the update.
+  IsRoot bool
+
+  // Value is value stored at this entry.
+  Value anydata
+
+  // Tags specify permissions on this entry.
+  Tags storage.TagList
+
+  // Dir is the implicit directory of this entry, and may contain references
+  // to other entries in the store.
+  Dir []storage.DEntry
+}
+
+// Store defines a raw interface for the Veyron store. Mutations can be received
+// via the Watcher interface, and committed via PutMutation.
+type Store interface {
+  watch.Watcher
+
+  // PutMutations atomically commits a stream of Mutations when the stream is
+  // closed. Mutations are not committed if the request is cancelled before
+  // the stream has been closed.
+  PutMutations() stream<Mutation, _> error
+}
diff --git a/services/store/raw/service.idl.go b/services/store/raw/service.idl.go
new file mode 100644
index 0000000..99b932a
--- /dev/null
+++ b/services/store/raw/service.idl.go
@@ -0,0 +1,363 @@
+// This file was auto-generated by the veyron idl tool.
+// Source: service.idl
+
+/*
+Package raw defines a raw interface for the Veyron store.
+The raw interface supports synchronizing with remote stores by transporting Mutations.
+*/
+package raw
+
+import (
+	"veyron2/services/watch"
+
+	"veyron2/storage"
+
+	// The non-user imports are prefixed with "_gen_" to prevent collisions.
+	_gen_veyron2 "veyron2"
+	_gen_idl "veyron2/idl"
+	_gen_ipc "veyron2/ipc"
+	_gen_naming "veyron2/naming"
+	_gen_rt "veyron2/rt"
+	_gen_wiretype "veyron2/wiretype"
+)
+
+// Mutation represents an update to an entry in the store, and contains enough
+// information for a privileged service to replicate the update elsewhere.
+type Mutation struct {
+	// ID is the key that identifies the entry.
+	ID storage.ID
+	// The version of the entry immediately before the update. For new entries,
+	// the PriorVersion is NoVersion.
+	PriorVersion storage.Version
+	// The version of the entry immediately after the update. For deleted entries,
+	// the Version is NoVersion.
+	Version storage.Version
+	// IsRoot is true if
+	// 1) The entry was the store root immediately before being deleted, or
+	// 2) The entry is the store root immediately after the update.
+	IsRoot bool
+	// Value is value stored at this entry.
+	Value _gen_idl.AnyData
+	// Tags specify permissions on this entry.
+	Tags storage.TagList
+	// Dir is the implicit directory of this entry, and may contain references
+	// to other entries in the store.
+	Dir []storage.DEntry
+}
+
+const (
+	// The raw Store has Veyron name "<mount>/.store.raw", where <mount> is the
+	// Veyron name of the mount point.
+	RawStoreSuffix = ".store.raw"
+)
+
+// Store defines a raw interface for the Veyron store. Mutations can be received
+// via the Watcher interface, and committed via PutMutation.
+// Store is the interface the client binds and uses.
+// Store_InternalNoTagGetter is the interface without the TagGetter
+// and UnresolveStep methods (both framework-added, rathern than user-defined),
+// to enable embedding without method collisions.  Not to be used directly by
+// clients.
+type Store_InternalNoTagGetter interface {
+	watch.Watcher_InternalNoTagGetter
+
+	// PutMutations atomically commits a stream of Mutations when the stream is
+	// closed. Mutations are not committed if the request is cancelled before
+	// the stream has been closed.
+	PutMutations(opts ..._gen_ipc.ClientCallOpt) (reply StorePutMutationsStream, err error)
+}
+type Store interface {
+	_gen_idl.TagGetter
+	// UnresolveStep returns the names for the remote service, rooted at the
+	// service's immediate namespace ancestor.
+	UnresolveStep(opts ..._gen_ipc.ClientCallOpt) ([]string, error)
+	Store_InternalNoTagGetter
+}
+
+// StoreService is the interface the server implements.
+type StoreService interface {
+	watch.WatcherService
+
+	// PutMutations atomically commits a stream of Mutations when the stream is
+	// closed. Mutations are not committed if the request is cancelled before
+	// the stream has been closed.
+	PutMutations(context _gen_ipc.Context, stream StoreServicePutMutationsStream) (err error)
+}
+
+// StorePutMutationsStream is the interface for streaming responses of the method
+// PutMutations in the service interface Store.
+type StorePutMutationsStream interface {
+
+	// Send places the item onto the output stream, blocking if there is no buffer
+	// space available.
+	Send(item Mutation) error
+
+	// CloseSend indicates to the server that no more items will be sent; server
+	// Recv calls will receive io.EOF after all sent items.  Subsequent calls to
+	// Send on the client will fail.  This is an optional call - it's used by
+	// streaming clients that need the server to receive the io.EOF terminator.
+	CloseSend() error
+
+	// Finish closes the stream and returns the positional return values for
+	// call.
+	Finish() (err error)
+
+	// Cancel cancels the RPC, notifying the server to stop processing.
+	Cancel()
+}
+
+// Implementation of the StorePutMutationsStream interface that is not exported.
+type implStorePutMutationsStream struct {
+	clientCall _gen_ipc.ClientCall
+}
+
+func (c *implStorePutMutationsStream) Send(item Mutation) error {
+	return c.clientCall.Send(item)
+}
+
+func (c *implStorePutMutationsStream) CloseSend() error {
+	return c.clientCall.CloseSend()
+}
+
+func (c *implStorePutMutationsStream) Finish() (err error) {
+	if ierr := c.clientCall.Finish(&err); ierr != nil {
+		err = ierr
+	}
+	return
+}
+
+func (c *implStorePutMutationsStream) Cancel() {
+	c.clientCall.Cancel()
+}
+
+// StoreServicePutMutationsStream is the interface for streaming responses of the method
+// PutMutations in the service interface Store.
+type StoreServicePutMutationsStream interface {
+
+	// Recv fills itemptr with the next item in the input stream, blocking until
+	// an item is available.  Returns io.EOF to indicate graceful end of input.
+	Recv() (item Mutation, err error)
+}
+
+// Implementation of the StoreServicePutMutationsStream interface that is not exported.
+type implStoreServicePutMutationsStream struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implStoreServicePutMutationsStream) Recv() (item Mutation, err error) {
+	err = s.serverCall.Recv(&item)
+	return
+}
+
+// BindStore returns the client stub implementing the Store
+// interface.
+//
+// If no _gen_ipc.Client is specified, the default _gen_ipc.Client in the
+// global Runtime is used.
+func BindStore(name string, opts ..._gen_ipc.BindOpt) (Store, error) {
+	var client _gen_ipc.Client
+	switch len(opts) {
+	case 0:
+		client = _gen_rt.R().Client()
+	case 1:
+		switch o := opts[0].(type) {
+		case _gen_veyron2.Runtime:
+			client = o.Client()
+		case _gen_ipc.Client:
+			client = o
+		default:
+			return nil, _gen_idl.ErrUnrecognizedOption
+		}
+	default:
+		return nil, _gen_idl.ErrTooManyOptionsToBind
+	}
+	stub := &clientStubStore{client: client, name: name}
+	stub.Watcher_InternalNoTagGetter, _ = watch.BindWatcher(name, client)
+
+	return stub, nil
+}
+
+// NewServerStore creates a new server stub.
+//
+// It takes a regular server implementing the StoreService
+// interface, and returns a new server stub.
+func NewServerStore(server StoreService) interface{} {
+	return &ServerStubStore{
+		ServerStubWatcher: *watch.NewServerWatcher(server).(*watch.ServerStubWatcher),
+		service:           server,
+	}
+}
+
+// clientStubStore implements Store.
+type clientStubStore struct {
+	watch.Watcher_InternalNoTagGetter
+
+	client _gen_ipc.Client
+	name   string
+}
+
+func (c *clientStubStore) GetMethodTags(method string) []interface{} {
+	return GetStoreMethodTags(method)
+}
+
+func (__gen_c *clientStubStore) PutMutations(opts ..._gen_ipc.ClientCallOpt) (reply StorePutMutationsStream, err error) {
+	var call _gen_ipc.ClientCall
+	if call, err = __gen_c.client.StartCall(__gen_c.name, "PutMutations", nil, opts...); err != nil {
+		return
+	}
+	reply = &implStorePutMutationsStream{clientCall: call}
+	return
+}
+
+func (c *clientStubStore) UnresolveStep(opts ..._gen_ipc.ClientCallOpt) (reply []string, err error) {
+	var call _gen_ipc.ClientCall
+	if call, err = c.client.StartCall(c.name, "UnresolveStep", nil, opts...); err != nil {
+		return
+	}
+	if ierr := call.Finish(&reply, &err); ierr != nil {
+		err = ierr
+	}
+	return
+}
+
+// ServerStubStore wraps a server that implements
+// StoreService and provides an object that satisfies
+// the requirements of veyron2/ipc.ReflectInvoker.
+type ServerStubStore struct {
+	watch.ServerStubWatcher
+
+	service StoreService
+}
+
+func (s *ServerStubStore) GetMethodTags(method string) []interface{} {
+	return GetStoreMethodTags(method)
+}
+
+func (s *ServerStubStore) Signature(call _gen_ipc.ServerCall) (_gen_ipc.ServiceSignature, error) {
+	result := _gen_ipc.ServiceSignature{Methods: make(map[string]_gen_ipc.MethodSignature)}
+	result.Methods["PutMutations"] = _gen_ipc.MethodSignature{
+		InArgs: []_gen_ipc.MethodArgument{},
+		OutArgs: []_gen_ipc.MethodArgument{
+			{Name: "", Type: 65},
+		},
+		InStream: 76,
+	}
+
+	result.TypeDefs = []_gen_idl.AnyData{
+		_gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "byte", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x42, Len: 0x10, Name: "storage.ID", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x35, Name: "storage.Version", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "anydata", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "TagOp", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x42, Len: 0x10, Name: "ID", Tags: []string(nil)}, _gen_wiretype.StructType{
+			[]_gen_wiretype.FieldType{
+				_gen_wiretype.FieldType{Type: 0x46, Name: "Op"},
+				_gen_wiretype.FieldType{Type: 0x47, Name: "ACL"},
+			},
+			"Tag", []string(nil)},
+		_gen_wiretype.SliceType{Elem: 0x48, Name: "storage.TagList", Tags: []string(nil)}, _gen_wiretype.StructType{
+			[]_gen_wiretype.FieldType{
+				_gen_wiretype.FieldType{Type: 0x3, Name: "Name"},
+				_gen_wiretype.FieldType{Type: 0x47, Name: "ID"},
+			},
+			"storage.DEntry", []string(nil)},
+		_gen_wiretype.SliceType{Elem: 0x4a, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
+			[]_gen_wiretype.FieldType{
+				_gen_wiretype.FieldType{Type: 0x43, Name: "ID"},
+				_gen_wiretype.FieldType{Type: 0x44, Name: "PriorVersion"},
+				_gen_wiretype.FieldType{Type: 0x44, Name: "Version"},
+				_gen_wiretype.FieldType{Type: 0x2, Name: "IsRoot"},
+				_gen_wiretype.FieldType{Type: 0x45, Name: "Value"},
+				_gen_wiretype.FieldType{Type: 0x49, Name: "Tags"},
+				_gen_wiretype.FieldType{Type: 0x4b, Name: "Dir"},
+			},
+			"Mutation", []string(nil)},
+	}
+	var ss _gen_ipc.ServiceSignature
+	var firstAdded int
+	ss, _ = s.ServerStubWatcher.Signature(call)
+	firstAdded = len(result.TypeDefs)
+	for k, v := range ss.Methods {
+		for i, _ := range v.InArgs {
+			if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+				v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+			}
+		}
+		for i, _ := range v.OutArgs {
+			if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+				v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+			}
+		}
+		if v.InStream >= _gen_wiretype.TypeIDFirst {
+			v.InStream += _gen_wiretype.TypeID(firstAdded)
+		}
+		if v.OutStream >= _gen_wiretype.TypeIDFirst {
+			v.OutStream += _gen_wiretype.TypeID(firstAdded)
+		}
+		result.Methods[k] = v
+	}
+	//TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
+	for _, d := range ss.TypeDefs {
+		switch wt := d.(type) {
+		case _gen_wiretype.SliceType:
+			if wt.Elem >= _gen_wiretype.TypeIDFirst {
+				wt.Elem += _gen_wiretype.TypeID(firstAdded)
+			}
+			d = wt
+		case _gen_wiretype.ArrayType:
+			if wt.Elem >= _gen_wiretype.TypeIDFirst {
+				wt.Elem += _gen_wiretype.TypeID(firstAdded)
+			}
+			d = wt
+		case _gen_wiretype.MapType:
+			if wt.Key >= _gen_wiretype.TypeIDFirst {
+				wt.Key += _gen_wiretype.TypeID(firstAdded)
+			}
+			if wt.Elem >= _gen_wiretype.TypeIDFirst {
+				wt.Elem += _gen_wiretype.TypeID(firstAdded)
+			}
+			d = wt
+		case _gen_wiretype.StructType:
+			for _, fld := range wt.Fields {
+				if fld.Type >= _gen_wiretype.TypeIDFirst {
+					fld.Type += _gen_wiretype.TypeID(firstAdded)
+				}
+			}
+			d = wt
+		}
+		result.TypeDefs = append(result.TypeDefs, d)
+	}
+
+	return result, nil
+}
+
+func (s *ServerStubStore) UnresolveStep(call _gen_ipc.ServerCall) (reply []string, err error) {
+	if unresolver, ok := s.service.(_gen_ipc.Unresolver); ok {
+		return unresolver.UnresolveStep(call)
+	}
+	if call.Server() == nil {
+		return
+	}
+	var published []string
+	if published, err = call.Server().Published(); err != nil || published == nil {
+		return
+	}
+	reply = make([]string, len(published))
+	for i, p := range published {
+		reply[i] = _gen_naming.Join(p, call.Name())
+	}
+	return
+}
+
+func (__gen_s *ServerStubStore) PutMutations(call _gen_ipc.ServerCall) (err error) {
+	stream := &implStoreServicePutMutationsStream{serverCall: call}
+	err = __gen_s.service.PutMutations(call, stream)
+	return
+}
+
+func GetStoreMethodTags(method string) []interface{} {
+	if resp := watch.GetWatcherMethodTags(method); resp != nil {
+		return resp
+	}
+	switch method {
+	case "PutMutations":
+		return []interface{}{}
+	default:
+		return nil
+	}
+}
diff --git a/services/store/server/server.go b/services/store/server/server.go
index bc46671..c148638 100644
--- a/services/store/server/server.go
+++ b/services/store/server/server.go
@@ -6,9 +6,9 @@
 	"sync"
 	"time"
 
-	"veyron/services/store/estore"
 	"veyron/services/store/memstore"
 	memwatch "veyron/services/store/memstore/watch"
+	"veyron/services/store/raw"
 	"veyron/services/store/service"
 
 	"veyron2/idl"
@@ -203,9 +203,11 @@
 	return s.watcher.Watch(ctx, req, stream)
 }
 
-// PutMutations puts external mutations in the store, within a transaction.
-func (s *Server) PutMutations(ctx ipc.Context, mu []estore.Mutation) error {
-	return s.store.PutMutations(mu)
+// PutMutations atomically commits a stream of Mutations when the stream is
+// closed. Mutations are not committed if the request is cancelled before the
+// stream has been closed.
+func (s *Server) PutMutations(ctx ipc.Context, stream raw.StoreServicePutMutationsStream) error {
+	return s.store.PutMutations(ctx, stream)
 }
 
 // ReadConflicts returns the stream of conflicts to store values.  A
@@ -214,14 +216,20 @@
 	panic("not implemented")
 }
 
-// Store and object dispatchers.
-// Typically, the store dispatcher handles paths with ".store" prefix, and the
+// Store, raw store and object dispatchers.
+// Typically, the store dispatcher handles paths with ".store" prefix, the
+// raw store dispatcher handles paths with ".store.raw" prefix, and the
 // object dispatcher handles paths with "" prefix.
 // TODO(sadovsky): Revisit this scheme. Seems simpler to have one dispatcher?
 
-// NewStoreDispatcher returns an storeDispatcher.
+// NewStoreDispatcher returns a store dispatcher.
 func NewStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
-	return ipc.SoloDispatcher(estore.NewServerStore(s), auth)
+	return ipc.SoloDispatcher(store.NewServerStore(s), auth)
+}
+
+// NewRawStoreDispatcher returns a raw store dispatcher.
+func NewRawStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
+	return ipc.SoloDispatcher(raw.NewServerStore(s), auth)
 }
 
 type objectDispatcher struct {
@@ -229,7 +237,7 @@
 	auth security.Authorizer
 }
 
-// NewObjectDispatcher returns an objectDispatcher.
+// NewObjectDispatcher returns an object dispatcher.
 func NewObjectDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
 	return &objectDispatcher{s: s, auth: auth}
 }
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index 8cd60d9..97c3426 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -7,7 +7,7 @@
 	"testing"
 	"time"
 
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/ipc"
 	"veyron2/security"
@@ -143,7 +143,7 @@
 	if change.State != watch.Exists {
 		t.Fatalf("Expected id to exist: %v", id)
 	}
-	cv := change.Value.(*estore.Mutation)
+	cv := change.Value.(*raw.Mutation)
 	if cv.Value != value {
 		t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
 	}
@@ -151,7 +151,7 @@
 
 func findChange(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
 	for _, change := range changes {
-		cv, ok := change.Value.(*estore.Mutation)
+		cv, ok := change.Value.(*raw.Mutation)
 		if !ok {
 			t.Fatal("Expected a Mutation")
 		}
diff --git a/services/store/service/service.go b/services/store/service/service.go
index 51826db..47249e6 100644
--- a/services/store/service/service.go
+++ b/services/store/service/service.go
@@ -1,7 +1,7 @@
 package service
 
 import (
-	"veyron/services/store/estore"
+	"veyron/services/store/raw"
 
 	"veyron2/ipc"
 	"veyron2/query"
@@ -42,11 +42,18 @@
 	Glob(clientID security.PublicID, t storage.Transaction, pattern string) (GlobStream, error)
 }
 
-// Store is the client interface to the storage system.
+// Store is the server-side interface to the storage system. It is expected to
+// sit directly behind the rpc handlers defined by
+// veyron2/services/store/service.idl.
 type Store interface {
 	// Bind returns the Object associated with a path.
 	Bind(path string) Object
 
+	// PutMutations atomically commits a stream of Mutations when the stream is
+	// closed. Mutations are not committed if the request is cancelled before
+	// the stream has been closed.
+	PutMutations(ctx ipc.Context, stream raw.StoreServicePutMutationsStream) error
+
 	// Glob returns a set of names that match the glob pattern.
 	Glob(clientID security.PublicID, t storage.Transaction, pattern string) (GlobStream, error)
 
@@ -54,9 +61,6 @@
 	// query.
 	Search(t storage.Transaction, q query.Query) storage.Iterator
 
-	// PutMutations puts external mutations in the store, within a transaction.
-	PutMutations(mu []estore.Mutation) error
-
 	// SetConflictResolver specifies a function to perform conflict resolution.
 	// The <ty> represents the IDL name for the type.
 	SetConflictResolver(ty string, r storage.ConflictResolver)
diff --git a/services/store/stored/main.go b/services/store/stored/main.go
index 7e5ba80..3e61c11 100644
--- a/services/store/stored/main.go
+++ b/services/store/stored/main.go
@@ -7,8 +7,9 @@
 //     - <name> is the Veyron mount point name, default /global/vstore/<hostname>/<username>.
 //     - <dbName> is the filename in which to store the data.
 //
-// The Store service has Veyron name, <name>/.store.  Individual values with
-// path <path> have name <name>/<path>.
+// The store service has Veyron name, <name>/.store.
+// The raw store service has Veyron name, <name>/.store.raw.
+// Individual values with path <path> have name <name>/<path>.
 package main
 
 import (
@@ -18,9 +19,11 @@
 	"os/user"
 
 	vflag "veyron/security/flag"
+	"veyron/services/store/raw"
 	"veyron/services/store/server"
 
 	"veyron2/rt"
+	"veyron2/services/store"
 )
 
 var (
@@ -65,10 +68,14 @@
 
 	// Register the services.
 	storeDisp := server.NewStoreDispatcher(storeService, auth)
+	rawStoreDisp := server.NewRawStoreDispatcher(storeService, auth)
 	objectDisp := server.NewObjectDispatcher(storeService, auth)
-	if err := s.Register(".store", storeDisp); err != nil {
+	if err := s.Register(store.StoreSuffix, storeDisp); err != nil {
 		log.Fatal("s.Register(storeDisp) failed: ", err)
 	}
+	if err := s.Register(raw.RawStoreSuffix, rawStoreDisp); err != nil {
+		log.Fatal("s.Register(rawStoreDisp) failed: ", err)
+	}
 	if err := s.Register("", objectDisp); err != nil {
 		log.Fatal("s.Register(objectDisp) failed: ", err)
 	}
diff --git a/services/store/testutil/store.go b/services/store/testutil/store.go
index 8e47eb3..560cbdd 100644
--- a/services/store/testutil/store.go
+++ b/services/store/testutil/store.go
@@ -7,11 +7,12 @@
 	"os"
 	"testing"
 
-	store "veyron/services/store/server"
+	istore "veyron/services/store/server"
 
 	"veyron2/ipc"
 	"veyron2/naming"
 	"veyron2/security"
+	"veyron2/services/store"
 )
 
 // NewStore creates a new testing instance of the store server and returns
@@ -26,8 +27,8 @@
 	}
 
 	// Create a new StoreService.
-	config := store.ServerConfig{Admin: id, DBName: dbName}
-	storeService, err := store.New(config)
+	config := istore.ServerConfig{Admin: id, DBName: dbName}
+	storeService, err := istore.New(config)
 	if err != nil {
 		t.Fatalf("New(%v) failed: %v", config, err)
 	}
@@ -41,9 +42,9 @@
 	t.Logf("Storage server at %v", name)
 
 	// Register the services.
-	storeDispatcher := store.NewStoreDispatcher(storeService, nil)
-	objectDispatcher := store.NewObjectDispatcher(storeService, nil)
-	if err := server.Register(name+"/.store", storeDispatcher); err != nil {
+	storeDispatcher := istore.NewStoreDispatcher(storeService, nil)
+	objectDispatcher := istore.NewObjectDispatcher(storeService, nil)
+	if err := server.Register(naming.JoinAddressName(name, store.StoreSuffix), storeDispatcher); err != nil {
 		t.Fatalf("Register(%v) failed: %v", storeDispatcher, err)
 	}
 	if err := server.Register(name, objectDispatcher); err != nil {