veyron/services/store: Streaming put mutation api.
Previously, PutMutations took an array of mutations and applied them
within a transaction. Unfortunately, this api was memory-bound.
PutMutations now takes a stream of mutations, and applies them within a
transaction.
Specifics:
- Forbids duplicate mutations on the same id.
- Supports cancellation. If the request is cancelled in a timely
fashion, any mutations received will not be committed.
Additionally, the estore api which extends veyron2/services/store.Store
has been replaced with a raw access api.
Specifics:
- veyron/services/store/raw.Store supports Watch and PutMutations.
- The raw store is mounted at extension (".store.raw"). This prevents
non-sync clients from inadvertently discovering the raw api.
Change-Id: I6575a0ca43c220239ea90c84e6c10329c625dfbe
diff --git a/examples/storage/mdb/mdb_stored/main.go b/examples/storage/mdb/mdb_stored/main.go
index dbbd76f..3cf0403 100644
--- a/examples/storage/mdb/mdb_stored/main.go
+++ b/examples/storage/mdb/mdb_stored/main.go
@@ -27,6 +27,7 @@
"veyron2/rt"
"veyron2/security"
+ "veyron2/services/store"
)
var (
@@ -75,7 +76,7 @@
// Register the services.
storeDisp := server.NewStoreDispatcher(storeService, auth)
objectDisp := server.NewObjectDispatcher(storeService, auth)
- if err := s.Register(".store", storeDisp); err != nil {
+ if err := s.Register(store.StoreSuffix, storeDisp); err != nil {
log.Fatal("s.Register(storeDisp) failed: ", err)
}
if err := s.Register("", objectDisp); err != nil {
diff --git a/examples/todos/test/util.go b/examples/todos/test/util.go
index ba1225a..0bc3930 100644
--- a/examples/todos/test/util.go
+++ b/examples/todos/test/util.go
@@ -14,6 +14,7 @@
"veyron2/naming"
"veyron2/rt"
"veyron2/security"
+ "veyron2/services/store"
"veyron2/storage"
"veyron2/storage/vstore"
)
@@ -56,7 +57,7 @@
// Register the services.
storeDisp := server.NewStoreDispatcher(storeService, nil)
objectDisp := server.NewObjectDispatcher(storeService, nil)
- if err := s.Register(".store", storeDisp); err != nil {
+ if err := s.Register(store.StoreSuffix, storeDisp); err != nil {
log.Fatal("s.Register(storeDisp) failed: ", err)
}
if err := s.Register("", objectDisp); err != nil {
diff --git a/examples/todos/todos_stored/main.go b/examples/todos/todos_stored/main.go
index 6083edf..12f52b1 100644
--- a/examples/todos/todos_stored/main.go
+++ b/examples/todos/todos_stored/main.go
@@ -30,6 +30,7 @@
"veyron2/rt"
"veyron2/security"
+ "veyron2/services/store"
)
var (
@@ -77,7 +78,7 @@
// Register the services.
storeDisp := server.NewStoreDispatcher(storeService, auth)
objectDisp := server.NewObjectDispatcher(storeService, auth)
- if err := s.Register(".store", storeDisp); err != nil {
+ if err := s.Register(store.StoreSuffix, storeDisp); err != nil {
log.Fatal("s.Register(storeDisp) failed: ", err)
}
if err := s.Register("", objectDisp); err != nil {
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index 8f8f826..feb92a8 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -42,7 +42,7 @@
"strconv"
"strings"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/storage"
)
@@ -419,7 +419,7 @@
}
// fillFakeWatchRecords fills fake log and dag state (testing only).
-// TODO(hpucha): remove and clean up estore import.
+// TODO(hpucha): remove and clean up raw import.
func (l *iLog) fillFakeWatchRecords() {
const num = 10
var parvers []storage.Version
@@ -427,7 +427,7 @@
for i := int(0); i < num; i++ {
// Create a local log record.
curvers := storage.Version(i)
- if err := l.processWatchRecord(id, curvers, parvers, &LogValue{Mutation: estore.Mutation{Version: curvers}}); err != nil {
+ if err := l.processWatchRecord(id, curvers, parvers, &LogValue{Mutation: raw.Mutation{Version: curvers}}); err != nil {
return
}
parvers = []storage.Version{curvers}
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 462a83c..9b84904 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -8,7 +8,7 @@
"strings"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/naming"
"veyron2/storage"
@@ -67,7 +67,7 @@
// Next steps are to tie this up into mount table and auto-discover neighbors.
neighbors []string
neighborIDs []string
- estore estore.Store
+ store raw.Store
updObjects map[storage.ID]*objConflictState
}
@@ -100,7 +100,7 @@
// TODO(hpucha): Merge this with vstore handler in syncd.
if syncd.vstoreEndpoint != "" {
var err error
- i.estore, err = estore.BindStore(syncd.vstoreEndpoint + "/.store/sync")
+ i.store, err = raw.BindStore(naming.JoinAddressName(syncd.vstoreEndpoint, raw.RawStoreSuffix))
if err != nil {
vlog.Fatalf("newInitiator: cannot connect to Veyron store endpoint (%s): %s",
syncd.vstoreEndpoint, err)
@@ -420,7 +420,7 @@
}
// resolveConflicts resolves conflicts for updated objects.
-func (i *syncInitiator) resolveConflicts() ([]estore.Mutation, error) {
+func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
switch conflictResolutionPolicy {
case useVersion:
if err := i.resolveConflictsByVersion(); err != nil {
@@ -430,7 +430,7 @@
return nil, fmt.Errorf("unknown conflict resolution policy")
}
- var m []estore.Mutation
+ var m []raw.Mutation
for _, st := range i.updObjects {
// Append to mutations.
st.resolvVal.Mutation.PriorVersion = st.oldHead
@@ -460,7 +460,7 @@
if err != nil {
return err
}
- var m estore.Mutation
+ var m raw.Mutation
if lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version {
m = lrecs[0].Value.Mutation
} else {
@@ -494,7 +494,7 @@
// updateStoreAndSync updates the store, and if that is successful,
// updates log and dag data structures.
-func (i *syncInitiator) updateStoreAndSync(m []estore.Mutation) error {
+func (i *syncInitiator) updateStoreAndSync(m []raw.Mutation) error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
@@ -504,8 +504,17 @@
// will also hold the lock across PutMutations rpc to prevent
// a race with watcher. The next iteration will clean up this
// coordination.
- if i.estore != nil {
- if err := i.estore.PutMutations(m); err != nil {
+ if i.store != nil {
+ stream, err := i.store.PutMutations()
+ if err != nil {
+ return err
+ }
+ for i := range m {
+ if err := stream.Send(m[i]); err != nil {
+ return err
+ }
+ }
+ if err := stream.Finish(); err != nil {
return err
}
}
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index a524d57..c73e0f1 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -7,7 +7,7 @@
"testing"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/storage"
)
@@ -36,7 +36,7 @@
LSN: 100,
ObjID: objID,
CurVers: 20,
- Value: LogValue{Mutation: estore.Mutation{Version: 20}},
+ Value: LogValue{Mutation: raw.Mutation{Version: 20}},
}
if _, err := s.hdlInitiator.getLogRec(objID, expRec.CurVers); err == nil {
t.Errorf("GetLogRec didn't fail")
@@ -91,7 +91,7 @@
LSN: LSN(100 + v),
ObjID: objID,
CurVers: v,
- Value: LogValue{Mutation: estore.Mutation{Version: v, PriorVersion: 500 + v}},
+ Value: LogValue{Mutation: raw.Mutation{Version: v, PriorVersion: 500 + v}},
}
logKey, err := s.log.putLogRec(expRec)
if err != nil {
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 2df3fc5..783d23f 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -8,7 +8,7 @@
"os"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
)
// getFileName generates a filename for a temporary (per unit test) kvdb file.
@@ -135,7 +135,7 @@
CurVers: cmd.version,
Parents: cmd.parents,
Value: LogValue{
- Mutation: estore.Mutation{Version: cmd.version},
+ Mutation: raw.Mutation{Version: cmd.version},
},
}
stream.add(rec)
diff --git a/runtimes/google/vsync/vsync.idl b/runtimes/google/vsync/vsync.idl
index d470a3a..19f206e 100644
--- a/runtimes/google/vsync/vsync.idl
+++ b/runtimes/google/vsync/vsync.idl
@@ -1,7 +1,7 @@
package vsync
import (
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/storage"
)
@@ -44,7 +44,7 @@
// LogValue represents an object mutation within a transaction.
type LogValue struct {
// Mutation is the store mutation representing the change in the object.
- Mutation estore.Mutation
+ Mutation raw.Mutation
// Delete indicates whether the mutation resulted in the object being
// deleted from the store.
Delete bool
diff --git a/runtimes/google/vsync/vsync.idl.go b/runtimes/google/vsync/vsync.idl.go
index e01c0e9..a1958a5 100644
--- a/runtimes/google/vsync/vsync.idl.go
+++ b/runtimes/google/vsync/vsync.idl.go
@@ -4,7 +4,7 @@
package vsync
import (
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/storage"
@@ -32,7 +32,7 @@
// LogValue represents an object mutation within a transaction.
type LogValue struct {
// Mutation is the store mutation representing the change in the object.
- Mutation estore.Mutation
+ Mutation raw.Mutation
// Delete indicates whether the mutation resulted in the object being
// deleted from the store.
Delete bool
@@ -266,7 +266,7 @@
_gen_wiretype.FieldType{Type: 0x4e, Name: "Tags"},
_gen_wiretype.FieldType{Type: 0x50, Name: "Dir"},
},
- "estore.Mutation", []string(nil)},
+ "raw.Mutation", []string(nil)},
_gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
_gen_wiretype.FieldType{Type: 0x51, Name: "Mutation"},
diff --git a/runtimes/google/vsync/vsyncd.go b/runtimes/google/vsync/vsyncd.go
index 654fd10..375581b 100644
--- a/runtimes/google/vsync/vsyncd.go
+++ b/runtimes/google/vsync/vsyncd.go
@@ -10,7 +10,7 @@
import (
"sync"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/ipc"
"veyron2/storage"
@@ -112,7 +112,7 @@
// Register these Watch data types with VOM.
// TODO(tilaks): why aren't they auto-retrieved from the IDL?
- vom.Register(&estore.Mutation{})
+ vom.Register(&raw.Mutation{})
vom.Register(&storage.DEntry{})
// Channel to propagate close event to all threads.
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index ea48129..0617640 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -9,7 +9,7 @@
"fmt"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/query"
"veyron2/services/watch"
@@ -154,7 +154,7 @@
var lastResmark []byte
for i := range changes.Changes {
ch := &changes.Changes[i]
- mu, ok := ch.Value.(*estore.Mutation)
+ mu, ok := ch.Value.(*raw.Mutation)
if !ok {
return fmt.Errorf("invalid change value, not a mutation: %#v", ch)
}
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index ab0aa8d..01ae25a 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -11,7 +11,7 @@
"testing"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/query"
"veyron2/services/watch"
@@ -146,7 +146,7 @@
watch.Change{
Name: "/uid/4c6db51aa740d8c62b90df874503e285",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x4c, 0x6d, 0xb5, 0x1a, 0xa7, 0x40, 0xd8, 0xc6,
0x2b, 0x90, 0xdf, 0x87, 0x45, 0x3, 0xe2, 0x85},
PriorVersion: 0x0,
@@ -167,7 +167,7 @@
watch.Change{
Name: "/uid/082bc42e15af4fcf611d7f19a8d7831f",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
PriorVersion: 0x0,
@@ -188,7 +188,7 @@
watch.Change{
Name: "/uid/6e4a327c297d76fb5142b1b1d95b2d07",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb,
0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7},
PriorVersion: 0x0,
@@ -205,7 +205,7 @@
watch.Change{
Name: "/uid/082bc42e15af4fcf611d7f19a8d7831f",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
PriorVersion: 0x57e9d1860d1d68d8,
@@ -231,7 +231,7 @@
watch.Change{
Name: "/uid/70ff65ec0f825f44b69f895eea759d71",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44,
0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71},
PriorVersion: 0x0,
diff --git a/services/store/estore/service.idl b/services/store/estore/service.idl
deleted file mode 100644
index 4db0f1b..0000000
--- a/services/store/estore/service.idl
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
-Package estore defines an extended interface for the Veyron store.
-
-The extended interface supports synchronizing with remote stores by transporting Mutations.
-*/
-package estore
-
-import (
- "veyron2/storage"
- "veyron2/services/store"
-)
-
-// Mutation represents an update to an entry in the store, and contains enough
-// information for a privileged service to replicate the update elsewhere.
-type Mutation struct {
- // ID is the key that identifies the entry.
- ID storage.ID
-
- // The version of the entry immediately before the update. For new entries,
- // the PriorVersion is NoVersion.
- PriorVersion storage.Version
-
- // The version of the entry immediately after the update. For deleted entries,
- // the Version is NoVersion.
- Version storage.Version
-
- // IsRoot is true if
- // 1) The entry was the store root immediately before being deleted, or
- // 2) The entry is the store root immediately after the update.
- IsRoot bool
-
- // Value is value stored at this entry.
- Value anydata
-
- // Tags specify permissions on this entry.
- Tags storage.TagList
-
- // Dir is the implicit directory of this entry, and may contain references
- // to other entries in the store.
- Dir []storage.DEntry
-}
-
-// Store defines an extended interface for the Veyron store. Mutations can be
-// received via the Watcher interface, and committed via PutMutations().
-type Store interface {
- store.Store
-
- // PutMutations puts external mutations in the store, within a transaction.
- PutMutations(Mutations []Mutation) (Err error)
-}
diff --git a/services/store/estore/service.idl.go b/services/store/estore/service.idl.go
deleted file mode 100644
index 65339b0..0000000
--- a/services/store/estore/service.idl.go
+++ /dev/null
@@ -1,291 +0,0 @@
-// This file was auto-generated by the veyron idl tool.
-// Source: service.idl
-
-/*
-Package estore defines an extended interface for the Veyron store.
-
-The extended interface supports synchronizing with remote stores by transporting Mutations.
-*/
-package estore
-
-import (
- "veyron2/services/store"
-
- "veyron2/storage"
-
- // The non-user imports are prefixed with "_gen_" to prevent collisions.
- _gen_veyron2 "veyron2"
- _gen_idl "veyron2/idl"
- _gen_ipc "veyron2/ipc"
- _gen_naming "veyron2/naming"
- _gen_rt "veyron2/rt"
- _gen_wiretype "veyron2/wiretype"
-)
-
-// Mutation represents an update to an entry in the store, and contains enough
-// information for a privileged service to replicate the update elsewhere.
-type Mutation struct {
- // ID is the key that identifies the entry.
- ID storage.ID
- // The version of the entry immediately before the update. For new entries,
- // the PriorVersion is NoVersion.
- PriorVersion storage.Version
- // The version of the entry immediately after the update. For deleted entries,
- // the Version is NoVersion.
- Version storage.Version
- // IsRoot is true if
- // 1) The entry was the store root immediately before being deleted, or
- // 2) The entry is the store root immediately after the update.
- IsRoot bool
- // Value is value stored at this entry.
- Value _gen_idl.AnyData
- // Tags specify permissions on this entry.
- Tags storage.TagList
- // Dir is the implicit directory of this entry, and may contain references
- // to other entries in the store.
- Dir []storage.DEntry
-}
-
-// Store defines an extended interface for the Veyron store. Mutations can be
-// received via the Watcher interface, and committed via PutMutations().
-// Store is the interface the client binds and uses.
-// Store_InternalNoTagGetter is the interface without the TagGetter
-// and UnresolveStep methods (both framework-added, rathern than user-defined),
-// to enable embedding without method collisions. Not to be used directly by
-// clients.
-type Store_InternalNoTagGetter interface {
- store.Store_InternalNoTagGetter
-
- // PutMutations puts external mutations in the store, within a transaction.
- PutMutations(Mutations []Mutation, opts ..._gen_ipc.ClientCallOpt) (err error)
-}
-type Store interface {
- _gen_idl.TagGetter
- // UnresolveStep returns the names for the remote service, rooted at the
- // service's immediate namespace ancestor.
- UnresolveStep(opts ..._gen_ipc.ClientCallOpt) ([]string, error)
- Store_InternalNoTagGetter
-}
-
-// StoreService is the interface the server implements.
-type StoreService interface {
- store.StoreService
-
- // PutMutations puts external mutations in the store, within a transaction.
- PutMutations(context _gen_ipc.Context, Mutations []Mutation) (err error)
-}
-
-// BindStore returns the client stub implementing the Store
-// interface.
-//
-// If no _gen_ipc.Client is specified, the default _gen_ipc.Client in the
-// global Runtime is used.
-func BindStore(name string, opts ..._gen_ipc.BindOpt) (Store, error) {
- var client _gen_ipc.Client
- switch len(opts) {
- case 0:
- client = _gen_rt.R().Client()
- case 1:
- switch o := opts[0].(type) {
- case _gen_veyron2.Runtime:
- client = o.Client()
- case _gen_ipc.Client:
- client = o
- default:
- return nil, _gen_idl.ErrUnrecognizedOption
- }
- default:
- return nil, _gen_idl.ErrTooManyOptionsToBind
- }
- stub := &clientStubStore{client: client, name: name}
- stub.Store_InternalNoTagGetter, _ = store.BindStore(name, client)
-
- return stub, nil
-}
-
-// NewServerStore creates a new server stub.
-//
-// It takes a regular server implementing the StoreService
-// interface, and returns a new server stub.
-func NewServerStore(server StoreService) interface{} {
- return &ServerStubStore{
- ServerStubStore: *store.NewServerStore(server).(*store.ServerStubStore),
- service: server,
- }
-}
-
-// clientStubStore implements Store.
-type clientStubStore struct {
- store.Store_InternalNoTagGetter
-
- client _gen_ipc.Client
- name string
-}
-
-func (c *clientStubStore) GetMethodTags(method string) []interface{} {
- return GetStoreMethodTags(method)
-}
-
-func (__gen_c *clientStubStore) PutMutations(Mutations []Mutation, opts ..._gen_ipc.ClientCallOpt) (err error) {
- var call _gen_ipc.ClientCall
- if call, err = __gen_c.client.StartCall(__gen_c.name, "PutMutations", []interface{}{Mutations}, opts...); err != nil {
- return
- }
- if ierr := call.Finish(&err); ierr != nil {
- err = ierr
- }
- return
-}
-
-func (c *clientStubStore) UnresolveStep(opts ..._gen_ipc.ClientCallOpt) (reply []string, err error) {
- var call _gen_ipc.ClientCall
- if call, err = c.client.StartCall(c.name, "UnresolveStep", nil, opts...); err != nil {
- return
- }
- if ierr := call.Finish(&reply, &err); ierr != nil {
- err = ierr
- }
- return
-}
-
-// ServerStubStore wraps a server that implements
-// StoreService and provides an object that satisfies
-// the requirements of veyron2/ipc.ReflectInvoker.
-type ServerStubStore struct {
- store.ServerStubStore
-
- service StoreService
-}
-
-func (s *ServerStubStore) GetMethodTags(method string) []interface{} {
- return GetStoreMethodTags(method)
-}
-
-func (s *ServerStubStore) Signature(call _gen_ipc.ServerCall) (_gen_ipc.ServiceSignature, error) {
- result := _gen_ipc.ServiceSignature{Methods: make(map[string]_gen_ipc.MethodSignature)}
- result.Methods["PutMutations"] = _gen_ipc.MethodSignature{
- InArgs: []_gen_ipc.MethodArgument{
- {Name: "Mutations", Type: 76},
- },
- OutArgs: []_gen_ipc.MethodArgument{
- {Name: "Err", Type: 77},
- },
- }
-
- result.TypeDefs = []_gen_idl.AnyData{
- _gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "byte", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x41, Len: 0x10, Name: "storage.ID", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x35, Name: "storage.Version", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "anydata", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "TagOp", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x41, Len: 0x10, Name: "ID", Tags: []string(nil)}, _gen_wiretype.StructType{
- []_gen_wiretype.FieldType{
- _gen_wiretype.FieldType{Type: 0x45, Name: "Op"},
- _gen_wiretype.FieldType{Type: 0x46, Name: "ACL"},
- },
- "Tag", []string(nil)},
- _gen_wiretype.SliceType{Elem: 0x47, Name: "storage.TagList", Tags: []string(nil)}, _gen_wiretype.StructType{
- []_gen_wiretype.FieldType{
- _gen_wiretype.FieldType{Type: 0x3, Name: "Name"},
- _gen_wiretype.FieldType{Type: 0x46, Name: "ID"},
- },
- "storage.DEntry", []string(nil)},
- _gen_wiretype.SliceType{Elem: 0x49, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
- []_gen_wiretype.FieldType{
- _gen_wiretype.FieldType{Type: 0x42, Name: "ID"},
- _gen_wiretype.FieldType{Type: 0x43, Name: "PriorVersion"},
- _gen_wiretype.FieldType{Type: 0x43, Name: "Version"},
- _gen_wiretype.FieldType{Type: 0x2, Name: "IsRoot"},
- _gen_wiretype.FieldType{Type: 0x44, Name: "Value"},
- _gen_wiretype.FieldType{Type: 0x48, Name: "Tags"},
- _gen_wiretype.FieldType{Type: 0x4a, Name: "Dir"},
- },
- "Mutation", []string(nil)},
- _gen_wiretype.SliceType{Elem: 0x4b, Name: "", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}}
- var ss _gen_ipc.ServiceSignature
- var firstAdded int
- ss, _ = s.ServerStubStore.Signature(call)
- firstAdded = len(result.TypeDefs)
- for k, v := range ss.Methods {
- for i, _ := range v.InArgs {
- if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
- v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
- }
- }
- for i, _ := range v.OutArgs {
- if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
- v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
- }
- }
- if v.InStream >= _gen_wiretype.TypeIDFirst {
- v.InStream += _gen_wiretype.TypeID(firstAdded)
- }
- if v.OutStream >= _gen_wiretype.TypeIDFirst {
- v.OutStream += _gen_wiretype.TypeID(firstAdded)
- }
- result.Methods[k] = v
- }
- //TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
- for _, d := range ss.TypeDefs {
- switch wt := d.(type) {
- case _gen_wiretype.SliceType:
- if wt.Elem >= _gen_wiretype.TypeIDFirst {
- wt.Elem += _gen_wiretype.TypeID(firstAdded)
- }
- d = wt
- case _gen_wiretype.ArrayType:
- if wt.Elem >= _gen_wiretype.TypeIDFirst {
- wt.Elem += _gen_wiretype.TypeID(firstAdded)
- }
- d = wt
- case _gen_wiretype.MapType:
- if wt.Key >= _gen_wiretype.TypeIDFirst {
- wt.Key += _gen_wiretype.TypeID(firstAdded)
- }
- if wt.Elem >= _gen_wiretype.TypeIDFirst {
- wt.Elem += _gen_wiretype.TypeID(firstAdded)
- }
- d = wt
- case _gen_wiretype.StructType:
- for _, fld := range wt.Fields {
- if fld.Type >= _gen_wiretype.TypeIDFirst {
- fld.Type += _gen_wiretype.TypeID(firstAdded)
- }
- }
- d = wt
- }
- result.TypeDefs = append(result.TypeDefs, d)
- }
-
- return result, nil
-}
-
-func (s *ServerStubStore) UnresolveStep(call _gen_ipc.ServerCall) (reply []string, err error) {
- if unresolver, ok := s.service.(_gen_ipc.Unresolver); ok {
- return unresolver.UnresolveStep(call)
- }
- if call.Server() == nil {
- return
- }
- var published []string
- if published, err = call.Server().Published(); err != nil || published == nil {
- return
- }
- reply = make([]string, len(published))
- for i, p := range published {
- reply[i] = _gen_naming.Join(p, call.Name())
- }
- return
-}
-
-func (__gen_s *ServerStubStore) PutMutations(call _gen_ipc.ServerCall, Mutations []Mutation) (err error) {
- err = __gen_s.service.PutMutations(call, Mutations)
- return
-}
-
-func GetStoreMethodTags(method string) []interface{} {
- if resp := store.GetStoreMethodTags(method); resp != nil {
- return resp
- }
- switch method {
- case "PutMutations":
- return []interface{}{}
- default:
- return nil
- }
-}
diff --git a/services/store/memstore/blackbox/sync_integration_test.go b/services/store/memstore/blackbox/sync_integration_test.go
index 6002bd0..906f701 100644
--- a/services/store/memstore/blackbox/sync_integration_test.go
+++ b/services/store/memstore/blackbox/sync_integration_test.go
@@ -50,7 +50,7 @@
t.Fatalf("Recv() failed: %v", err)
}
// Update target
- target.PutMutations(Mutations(cb.Changes))
+ PutMutations(t, target, Mutations(cb.Changes))
GC(t, target)
// Expect that the target contains id1 and id2
@@ -83,7 +83,7 @@
t.Fatalf("Recv() failed: %v", err)
}
// Update target
- target.PutMutations(Mutations(cb.Changes))
+ PutMutations(t, target, Mutations(cb.Changes))
GC(t, target)
// Expect that the target contains id1, id2, id3
@@ -101,7 +101,7 @@
t.Fatalf("Recv() failed: %v", err)
}
// Update target
- target.PutMutations(Mutations(cb.Changes))
+ PutMutations(t, target, Mutations(cb.Changes))
GC(t, target)
// Expect that the target contains id1, id2, but not id3
diff --git a/services/store/memstore/blackbox/util.go b/services/store/memstore/blackbox/util.go
index 3ae6f13..d5e5636 100644
--- a/services/store/memstore/blackbox/util.go
+++ b/services/store/memstore/blackbox/util.go
@@ -10,9 +10,9 @@
"testing"
"time"
- "veyron/services/store/estore"
"veyron/services/store/memstore"
memwatch "veyron/services/store/memstore/watch"
+ "veyron/services/store/raw"
"veyron/services/store/service"
"veyron2/ipc"
@@ -24,6 +24,7 @@
var (
rootPublicID security.PublicID = security.FakePublicID("root")
rootCtx ipc.Context = rootContext{}
+ nullMutation = raw.Mutation{}
)
type rootContext struct{}
@@ -226,10 +227,39 @@
return &testWatcherWatchStream{c, done}
}
-func Mutations(changes []watch.Change) []estore.Mutation {
- mutations := make([]estore.Mutation, len(changes))
+// putMutationsStream implements raw.StoreServicePutMutationsStream.
+type putMutationsStream struct {
+ mus []raw.Mutation
+ index int
+}
+
+func newPutMutationsStream(mus []raw.Mutation) raw.StoreServicePutMutationsStream {
+ return &putMutationsStream{
+ mus: mus,
+ }
+}
+
+func (s *putMutationsStream) Recv() (raw.Mutation, error) {
+ if s.index < len(s.mus) {
+ index := s.index
+ s.index++
+ return s.mus[index], nil
+ }
+ return nullMutation, io.EOF
+}
+
+func PutMutations(t *testing.T, st *memstore.Store, mus []raw.Mutation) {
+ stream := newPutMutationsStream(mus)
+ if err := st.PutMutations(rootCtx, stream); err != nil {
+ _, file, line, _ := runtime.Caller(1)
+ t.Errorf("%s(%d): can't put mutations %s: %s", file, line, mus, err)
+ }
+}
+
+func Mutations(changes []watch.Change) []raw.Mutation {
+ mutations := make([]raw.Mutation, len(changes))
for i, change := range changes {
- mutations[i] = *(change.Value.(*estore.Mutation))
+ mutations[i] = *(change.Value.(*raw.Mutation))
}
return mutations
}
diff --git a/services/store/memstore/state/mutable_snapshot.go b/services/store/memstore/state/mutable_snapshot.go
index eb361ef..c95d9cd 100644
--- a/services/store/memstore/state/mutable_snapshot.go
+++ b/services/store/memstore/state/mutable_snapshot.go
@@ -4,10 +4,10 @@
"errors"
"fmt"
- "veyron/services/store/estore"
"veyron/services/store/memstore/acl"
"veyron/services/store/memstore/field"
"veyron/services/store/memstore/refs"
+ "veyron/services/store/raw"
"veyron/runtimes/google/lib/functional"
"veyron2/security"
@@ -104,15 +104,16 @@
var (
// TODO(tilaks): don't expose errors, use verror instead.
- ErrBadPath = errors.New("malformed path")
- ErrTypeMismatch = errors.New("type mismatch")
- ErrNotFound = errors.New("not found")
- ErrBadRef = errors.New("value has dangling references")
- ErrCantUnlinkByID = errors.New("can't unlink entries by ID")
- ErrPreconditionFailed = errors.New("precondition failed")
- ErrIDsDoNotMatch = errors.New("IDs do not match")
- ErrPermissionDenied = errors.New("permission denied") // TODO(tilaks): can permission denied leak store structure?
- ErrNotTagList = errors.New("not a TagList")
+ ErrBadPath = errors.New("malformed path")
+ ErrTypeMismatch = errors.New("type mismatch")
+ ErrNotFound = errors.New("not found")
+ ErrBadRef = errors.New("value has dangling references")
+ ErrCantUnlinkByID = errors.New("can't unlink entries by ID")
+ ErrPreconditionFailed = errors.New("precondition failed")
+ ErrIDsDoNotMatch = errors.New("IDs do not match")
+ ErrPermissionDenied = errors.New("permission denied") // TODO(tilaks): can permission denied leak store structure?
+ ErrNotTagList = errors.New("not a TagList")
+ ErrDuplicatePutMutation = errors.New("duplicate calls to PutMutation for the same ID")
nullID storage.ID
)
@@ -587,34 +588,37 @@
return err
}
-// PutMutations puts some externally constructed mutations. Does not update
-// cells or refs, so regular Puts, Gets and Removes may be inconsistent.
-func (sn *MutableSnapshot) PutMutations(extmu []estore.Mutation) {
- mu := sn.mutations
- for _, extm := range extmu {
- id := extm.ID
- // If the object has no version, it was deleted.
- if extm.Version == storage.NoVersion {
- mu.Deletions[id] = extm.PriorVersion
- if extm.IsRoot {
- mu.SetRootID = true
- mu.RootID = nullID
- }
- continue
- }
- if extm.IsRoot {
- mu.SetRootID = true
- mu.RootID = id
- }
- mu.Preconditions[id] = extm.PriorVersion
- m := &Mutation{
- Postcondition: extm.Version,
- Value: extm.Value,
- Dir: unflattenDir(extm.Dir),
- }
- m.UpdateRefs()
- mu.Delta[id] = m
+// PutMutation puts an externally constructed mutation. Does not update cells
+// or refs, so regular Puts, Gets and Removes may be inconsistent.
+func (sn *MutableSnapshot) PutMutation(extmu raw.Mutation) error {
+ mus := sn.mutations
+ id := extmu.ID
+ // Check that a mutation has not already been put for this id.
+ if _, ok := mus.Delta[id]; ok {
+ return ErrDuplicatePutMutation
}
+ // If the object has no version, it was deleted.
+ if extmu.Version == storage.NoVersion {
+ mus.Deletions[id] = extmu.PriorVersion
+ if extmu.IsRoot {
+ mus.SetRootID = true
+ mus.RootID = nullID
+ }
+ return nil
+ }
+ if extmu.IsRoot {
+ mus.SetRootID = true
+ mus.RootID = id
+ }
+ mus.Preconditions[id] = extmu.PriorVersion
+ mu := &Mutation{
+ Postcondition: extmu.Version,
+ Value: extmu.Value,
+ Dir: unflattenDir(extmu.Dir),
+ }
+ mu.UpdateRefs()
+ mus.Delta[id] = mu
+ return nil
}
// TODO(tilaks): revisit when vsync.Mutation.Dir is of type []*storage.DEntry
diff --git a/services/store/memstore/store.go b/services/store/memstore/store.go
index 5730236..432ef44 100644
--- a/services/store/memstore/store.go
+++ b/services/store/memstore/store.go
@@ -1,13 +1,17 @@
package memstore
import (
+ "errors"
+ "io"
+
"veyron/runtimes/google/lib/sync"
- "veyron/services/store/estore"
iquery "veyron/services/store/memstore/query"
"veyron/services/store/memstore/state"
+ "veyron/services/store/raw"
"veyron/services/store/service"
+ "veyron2/ipc"
"veyron2/query"
"veyron2/security"
"veyron2/storage"
@@ -28,6 +32,10 @@
// Store implements the service.Store interface.
var _ service.Store = (*Store)(nil)
+var (
+ ErrRequestCancelled = errors.New("request cancelled")
+)
+
// New creates a new store. admin is the public ID of the administrator, dbName
// is the path of the database directory, to which logs are written.
func New(admin security.PublicID, dbName string) (*Store, error) {
@@ -141,6 +149,32 @@
return nil
}
+// PutMutations atomically commits a stream of Mutations when the stream is
+// closed. Mutations are not committed if the request is cancelled before the
+// stream has been closed.
+func (st *Store) PutMutations(ctx ipc.Context, stream raw.StoreServicePutMutationsStream) error {
+ tr := st.newNilTransaction()
+ for {
+ mu, err := stream.Recv()
+ if err == io.EOF {
+ if ctx.IsClosed() {
+ tr.Abort()
+ return ErrRequestCancelled
+ }
+ break
+ }
+ if err != nil {
+ tr.Abort()
+ return err
+ }
+ if err := tr.snapshot.PutMutation(mu); err != nil {
+ tr.Abort()
+ return err
+ }
+ }
+ return tr.Commit()
+}
+
// Glob returns an iterator that emits all values that match the given pattern.
func (st *Store) Glob(clientID security.PublicID, tr storage.Transaction, pattern string) (service.GlobStream, error) {
sn, err := st.GetTransactionSnapshot(tr)
@@ -159,11 +193,3 @@
func (st *Store) SetConflictResolver(ty string, r storage.ConflictResolver) {
panic("not implemented")
}
-
-// PutMutations puts external mutations in the store, within a transaction.
-// TODO(tilaks): verify PutMutations requests.
-func (st *Store) PutMutations(mu []estore.Mutation) error {
- tr := st.newNilTransaction()
- tr.snapshot.PutMutations(mu)
- return tr.Commit()
-}
diff --git a/services/store/memstore/store_test.go b/services/store/memstore/store_test.go
index 842a7b9..6024504 100644
--- a/services/store/memstore/store_test.go
+++ b/services/store/memstore/store_test.go
@@ -5,8 +5,8 @@
"os"
"testing"
- "veyron/services/store/estore"
"veyron/services/store/memstore/state"
+ "veyron/services/store/raw"
"veyron2/storage"
)
@@ -133,7 +133,7 @@
// Create the state.
st, err := New(rootPublicID, dbName)
if err != nil {
- t.Fatalf("newState() failed: %v", err)
+ t.Fatalf("New() failed: %v", err)
}
// Add /, /a, /a/b
@@ -142,8 +142,8 @@
post1, post2, post3 := storage.NewVersion(), storage.NewVersion(), storage.NewVersion()
v1, v2, v3 := "v1", "v2", "v3"
- mu := []estore.Mutation{
- estore.Mutation{
+ putMutationsBatch(t, st, []raw.Mutation{
+ raw.Mutation{
ID: id1,
PriorVersion: pre1,
Version: post1,
@@ -151,7 +151,7 @@
Value: v1,
Dir: dir("a", id2),
},
- estore.Mutation{
+ raw.Mutation{
ID: id2,
PriorVersion: pre2,
Version: post2,
@@ -159,7 +159,7 @@
Value: v2,
Dir: dir("b", id3),
},
- estore.Mutation{
+ raw.Mutation{
ID: id3,
PriorVersion: pre3,
Version: post3,
@@ -167,8 +167,7 @@
Value: v3,
Dir: empty,
},
- }
- st.PutMutations(mu)
+ })
expectValue(t, st, nil, "/", v1)
expectValue(t, st, nil, "/a", v2)
@@ -178,17 +177,14 @@
pre1, pre2, pre3 = post1, post2, post3
post2 = storage.NewVersion()
- mu = []estore.Mutation{
- estore.Mutation{
- ID: id2,
- PriorVersion: pre2,
- Version: post2,
- IsRoot: false,
- Value: v2,
- Dir: empty,
- },
- }
- st.PutMutations(mu)
+ putMutationsBatch(t, st, []raw.Mutation{raw.Mutation{
+ ID: id2,
+ PriorVersion: pre2,
+ Version: post2,
+ IsRoot: false,
+ Value: v2,
+ Dir: empty,
+ }})
expectValue(t, st, nil, "/", v1)
expectValue(t, st, nil, "/a", v2)
@@ -197,15 +193,12 @@
// Garbage-collect /a/b
post3 = storage.NoVersion
- mu = []estore.Mutation{
- estore.Mutation{
- ID: id3,
- PriorVersion: pre3,
- Version: post3,
- IsRoot: false,
- },
- }
- st.PutMutations(mu)
+ putMutationsBatch(t, st, []raw.Mutation{raw.Mutation{
+ ID: id3,
+ PriorVersion: pre3,
+ Version: post3,
+ IsRoot: false,
+ }})
expectValue(t, st, nil, "/", v1)
expectValue(t, st, nil, "/a", v2)
@@ -215,15 +208,12 @@
pre1, pre2, pre3 = post1, post2, post3
post1 = storage.NoVersion
- mu = []estore.Mutation{
- estore.Mutation{
- ID: id1,
- PriorVersion: pre1,
- Version: post1,
- IsRoot: true,
- },
- }
- st.PutMutations(mu)
+ putMutationsBatch(t, st, []raw.Mutation{raw.Mutation{
+ ID: id1,
+ PriorVersion: pre1,
+ Version: post1,
+ IsRoot: true,
+ }})
expectNotExists(t, st, nil, "/")
expectNotExists(t, st, nil, "/a")
@@ -232,15 +222,12 @@
// Garbage-collect /a
post2 = storage.NoVersion
- mu = []estore.Mutation{
- estore.Mutation{
- ID: id2,
- PriorVersion: pre2,
- Version: post2,
- IsRoot: false,
- },
- }
- st.PutMutations(mu)
+ putMutationsBatch(t, st, []raw.Mutation{raw.Mutation{
+ ID: id2,
+ PriorVersion: pre2,
+ Version: post2,
+ IsRoot: false,
+ }})
expectNotExists(t, st, nil, "/")
expectNotExists(t, st, nil, "/a")
@@ -257,7 +244,7 @@
// Create the state.
st, err := New(rootPublicID, dbName)
if err != nil {
- t.Fatalf("newState() failed: %v", err)
+ t.Fatalf("New() failed: %v", err)
}
// Add /, /a
@@ -266,8 +253,8 @@
post1, post2 := storage.NewVersion(), storage.NewVersion()
v1, v2 := "v1", "v2"
- mu := []estore.Mutation{
- estore.Mutation{
+ putMutationsBatch(t, st, []raw.Mutation{
+ raw.Mutation{
ID: id1,
PriorVersion: pre1,
Version: post1,
@@ -275,7 +262,7 @@
Value: v1,
Dir: dir("a", id2),
},
- estore.Mutation{
+ raw.Mutation{
ID: id2,
PriorVersion: pre2,
Version: post2,
@@ -283,8 +270,7 @@
Value: v2,
Dir: empty,
},
- }
- st.PutMutations(mu)
+ })
expectValue(t, st, nil, "/", v1)
expectValue(t, st, nil, "/a", v2)
@@ -294,20 +280,87 @@
post2 = storage.NewVersion()
v2 = "v4"
- mu = []estore.Mutation{estore.Mutation{
+ s := putMutations(st)
+ s.Send(raw.Mutation{
ID: id2,
PriorVersion: pre2,
Version: post2,
IsRoot: true,
Value: v2,
Dir: empty,
- }}
- if err := st.PutMutations(mu); err != state.ErrPreconditionFailed {
- t.Fatal("Expected precondition to fail")
+ })
+ if err := s.Finish(); err != state.ErrPreconditionFailed {
+ t.Fatalf("Expected precondition to fail")
}
}
+func TestPutDuplicateMutations(t *testing.T) {
+ dbName, err := ioutil.TempDir(os.TempDir(), "vstore")
+ if err != nil {
+ t.Fatalf("ioutil.TempDir() failed: %v", err)
+ }
+ defer os.RemoveAll(dbName)
+
+ // Create the state.
+ st, err := New(rootPublicID, dbName)
+ if err != nil {
+ t.Fatalf("New() failed: %v", err)
+ }
+
+ id := storage.NewID()
+ s := putMutations(st)
+ s.Send(raw.Mutation{
+ ID: id,
+ PriorVersion: storage.NoVersion,
+ Version: storage.NewVersion(),
+ IsRoot: true,
+ Value: "v1",
+ Dir: empty,
+ })
+ s.Send(raw.Mutation{
+ ID: id,
+ PriorVersion: storage.NoVersion,
+ Version: storage.NewVersion(),
+ IsRoot: true,
+ Value: "v2",
+ Dir: empty,
+ })
+ if err := s.Finish(); err != state.ErrDuplicatePutMutation {
+ t.Fatalf("Expected precondition to fail")
+ }
+}
+
+func TestCancelPutMutation(t *testing.T) {
+ dbName, err := ioutil.TempDir(os.TempDir(), "vstore")
+ if err != nil {
+ t.Fatalf("ioutil.TempDir() failed: %v", err)
+ }
+ defer os.RemoveAll(dbName)
+
+ // Create the state.
+ st, err := New(rootPublicID, dbName)
+ if err != nil {
+ t.Fatalf("New() failed: %v", err)
+ }
+
+ s := putMutations(st)
+ s.Send(raw.Mutation{
+ ID: storage.NewID(),
+ PriorVersion: storage.NoVersion,
+ Version: storage.NewVersion(),
+ IsRoot: true,
+ Value: "v1",
+ Dir: empty,
+ })
+ s.Cancel()
+ if err := s.Finish(); err != ErrRequestCancelled {
+ t.Fatalf("Expected request to be cancelled")
+ }
+
+ expectNotExists(t, st, nil, "/")
+}
+
var (
empty = []storage.DEntry{}
)
diff --git a/services/store/memstore/util_test.go b/services/store/memstore/util_test.go
index 70c7563..31e7174 100644
--- a/services/store/memstore/util_test.go
+++ b/services/store/memstore/util_test.go
@@ -1,12 +1,152 @@
package memstore
import (
+ "io"
"runtime"
"testing"
+ "time"
+ "veyron/services/store/raw"
+
+ "veyron2/ipc"
+ "veyron2/security"
"veyron2/storage"
)
+// cancellableContext implements ipc.Context.
+type cancellableContext struct {
+ cancelled chan struct{}
+}
+
+func newCancellableContext() *cancellableContext {
+ return &cancellableContext{cancelled: make(chan struct{})}
+}
+
+func (*cancellableContext) Server() ipc.Server {
+ return nil
+}
+
+func (*cancellableContext) Method() string {
+ return ""
+}
+
+func (*cancellableContext) Name() string {
+ return ""
+}
+
+func (*cancellableContext) Suffix() string {
+ return ""
+}
+
+func (*cancellableContext) Label() (l security.Label) {
+ return
+}
+
+func (*cancellableContext) CaveatDischarges() security.CaveatDischargeMap {
+ return nil
+}
+
+func (*cancellableContext) LocalID() security.PublicID {
+ return rootPublicID
+}
+
+func (*cancellableContext) RemoteID() security.PublicID {
+ return rootPublicID
+}
+
+func (*cancellableContext) Deadline() (t time.Time) {
+ return
+}
+
+func (ctx *cancellableContext) IsClosed() bool {
+ select {
+ case <-ctx.cancelled:
+ return true
+ default:
+ return false
+ }
+}
+
+func (ctx *cancellableContext) Closed() <-chan struct{} {
+ return ctx.cancelled
+}
+
+// cancel synchronously closes the context. After cancel returns, calls to
+// IsClosed will return true and the stream returned by Closed will be closed.
+func (ctx *cancellableContext) cancel() {
+ close(ctx.cancelled)
+}
+
+// serverStream implements raw.StoreServicePutMutationsStream
+type serverStream struct {
+ mus <-chan raw.Mutation
+}
+
+func (s *serverStream) Recv() (raw.Mutation, error) {
+ mu, ok := <-s.mus
+ if !ok {
+ return mu, io.EOF
+ }
+ return mu, nil
+}
+
+// clientStream implements raw.StorePutMutationsStream
+type clientStream struct {
+ ctx ipc.Context
+ closed bool
+ mus chan<- raw.Mutation
+ err <-chan error
+}
+
+func (s *clientStream) Send(mu raw.Mutation) error {
+ s.mus <- mu
+ return nil
+}
+
+func (s *clientStream) CloseSend() error {
+ if !s.closed {
+ s.closed = true
+ close(s.mus)
+ }
+ return nil
+}
+
+func (s *clientStream) Finish() error {
+ s.CloseSend()
+ return <-s.err
+}
+
+func (s *clientStream) Cancel() {
+ s.ctx.(*cancellableContext).cancel()
+ s.CloseSend()
+}
+
+func putMutations(st *Store) raw.StorePutMutationsStream {
+ ctx := newCancellableContext()
+ mus := make(chan raw.Mutation)
+ err := make(chan error)
+ go func() {
+ err <- st.PutMutations(ctx, &serverStream{mus})
+ close(err)
+ }()
+ return &clientStream{
+ ctx: ctx,
+ mus: mus,
+ err: err,
+ }
+}
+
+func putMutationsBatch(t *testing.T, st *Store, mus []raw.Mutation) {
+ clientStream := putMutations(st)
+ for _, mu := range mus {
+ clientStream.Send(mu)
+ }
+ if err := clientStream.Finish(); err != nil {
+ _, file, line, _ := runtime.Caller(1)
+ t.Errorf("%s(%d): can't put mutations %s: %s", file, line, mus, err)
+ }
+}
+
func mkdir(t *testing.T, st *Store, tr storage.Transaction, path string) (storage.ID, interface{}) {
_, file, line, _ := runtime.Caller(1)
dir := &Dir{}
diff --git a/services/store/memstore/watch/sync_processor.go b/services/store/memstore/watch/sync_processor.go
index 847d9e8..15a123c 100644
--- a/services/store/memstore/watch/sync_processor.go
+++ b/services/store/memstore/watch/sync_processor.go
@@ -4,9 +4,9 @@
"errors"
"fmt"
- "veyron/services/store/estore"
"veyron/services/store/memstore/refs"
"veyron/services/store/memstore/state"
+ "veyron/services/store/raw"
"veyron2/security"
"veyron2/services/watch"
@@ -79,7 +79,7 @@
if isRoot {
p.rootVersion = cell.Version
}
- value := &estore.Mutation{
+ value := &raw.Mutation{
ID: id,
PriorVersion: storage.NoVersion,
Version: cell.Version,
@@ -123,7 +123,7 @@
p.rootID = mus.RootID
} else {
// The root was deleted, prepare a deletion change.
- value := &estore.Mutation{
+ value := &raw.Mutation{
ID: p.rootID,
PriorVersion: p.rootVersion,
Version: storage.NoVersion,
@@ -153,7 +153,7 @@
if isRoot {
p.rootVersion = mu.Postcondition
}
- value := &estore.Mutation{
+ value := &raw.Mutation{
ID: id,
PriorVersion: mus.Preconditions[id],
Version: mu.Postcondition,
@@ -180,7 +180,7 @@
delete(p.preparedDeletions, id)
continue
}
- value := &estore.Mutation{
+ value := &raw.Mutation{
ID: id,
PriorVersion: precondition,
Version: storage.NoVersion,
diff --git a/services/store/memstore/watch/test_util.go b/services/store/memstore/watch/test_util.go
index ae20bd9..29ef2ee 100644
--- a/services/store/memstore/watch/test_util.go
+++ b/services/store/memstore/watch/test_util.go
@@ -7,8 +7,8 @@
"testing"
"time"
- "veyron/services/store/estore"
"veyron/services/store/memstore"
+ "veyron/services/store/raw"
"veyron/services/store/service"
"veyron2/ipc"
@@ -252,7 +252,7 @@
if change.State != watch.Exists {
t.Fatalf("Expected id to exist: %v", id)
}
- cv := change.Value.(*estore.Mutation)
+ cv := change.Value.(*raw.Mutation)
if cv.PriorVersion != pre {
t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
}
@@ -273,7 +273,7 @@
if change.State != watch.DoesNotExist {
t.Fatalf("Expected id to not exist: %v", id)
}
- cv := change.Value.(*estore.Mutation)
+ cv := change.Value.(*raw.Mutation)
if cv.PriorVersion != pre {
t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
}
@@ -293,7 +293,7 @@
func findChange(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
for _, change := range changes {
- cv, ok := change.Value.(*estore.Mutation)
+ cv, ok := change.Value.(*raw.Mutation)
if !ok {
t.Fatal("Expected a Mutation")
}
diff --git a/services/store/raw/service.idl b/services/store/raw/service.idl
new file mode 100644
index 0000000..ec3f695
--- /dev/null
+++ b/services/store/raw/service.idl
@@ -0,0 +1,57 @@
+/*
+Package raw defines a raw interface for the Veyron store.
+The raw interface supports synchronizing with remote stores by transporting Mutations.
+*/
+package raw
+
+import (
+ "veyron2/storage"
+ "veyron2/services/watch"
+)
+
+const (
+ // The raw Store has Veyron name "<mount>/.store.raw", where <mount> is the
+ // Veyron name of the mount point.
+ RawStoreSuffix = ".store.raw"
+)
+
+// Mutation represents an update to an entry in the store, and contains enough
+// information for a privileged service to replicate the update elsewhere.
+type Mutation struct {
+ // ID is the key that identifies the entry.
+ ID storage.ID
+
+ // The version of the entry immediately before the update. For new entries,
+ // the PriorVersion is NoVersion.
+ PriorVersion storage.Version
+
+ // The version of the entry immediately after the update. For deleted entries,
+ // the Version is NoVersion.
+ Version storage.Version
+
+ // IsRoot is true if
+ // 1) The entry was the store root immediately before being deleted, or
+ // 2) The entry is the store root immediately after the update.
+ IsRoot bool
+
+ // Value is value stored at this entry.
+ Value anydata
+
+ // Tags specify permissions on this entry.
+ Tags storage.TagList
+
+ // Dir is the implicit directory of this entry, and may contain references
+ // to other entries in the store.
+ Dir []storage.DEntry
+}
+
+// Store defines a raw interface for the Veyron store. Mutations can be received
+// via the Watcher interface, and committed via PutMutation.
+type Store interface {
+ watch.Watcher
+
+ // PutMutations atomically commits a stream of Mutations when the stream is
+ // closed. Mutations are not committed if the request is cancelled before
+ // the stream has been closed.
+ PutMutations() stream<Mutation, _> error
+}
diff --git a/services/store/raw/service.idl.go b/services/store/raw/service.idl.go
new file mode 100644
index 0000000..99b932a
--- /dev/null
+++ b/services/store/raw/service.idl.go
@@ -0,0 +1,363 @@
+// This file was auto-generated by the veyron idl tool.
+// Source: service.idl
+
+/*
+Package raw defines a raw interface for the Veyron store.
+The raw interface supports synchronizing with remote stores by transporting Mutations.
+*/
+package raw
+
+import (
+ "veyron2/services/watch"
+
+ "veyron2/storage"
+
+ // The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_veyron2 "veyron2"
+ _gen_idl "veyron2/idl"
+ _gen_ipc "veyron2/ipc"
+ _gen_naming "veyron2/naming"
+ _gen_rt "veyron2/rt"
+ _gen_wiretype "veyron2/wiretype"
+)
+
+// Mutation represents an update to an entry in the store, and contains enough
+// information for a privileged service to replicate the update elsewhere.
+type Mutation struct {
+ // ID is the key that identifies the entry.
+ ID storage.ID
+ // The version of the entry immediately before the update. For new entries,
+ // the PriorVersion is NoVersion.
+ PriorVersion storage.Version
+ // The version of the entry immediately after the update. For deleted entries,
+ // the Version is NoVersion.
+ Version storage.Version
+ // IsRoot is true if
+ // 1) The entry was the store root immediately before being deleted, or
+ // 2) The entry is the store root immediately after the update.
+ IsRoot bool
+ // Value is value stored at this entry.
+ Value _gen_idl.AnyData
+ // Tags specify permissions on this entry.
+ Tags storage.TagList
+ // Dir is the implicit directory of this entry, and may contain references
+ // to other entries in the store.
+ Dir []storage.DEntry
+}
+
+const (
+ // The raw Store has Veyron name "<mount>/.store.raw", where <mount> is the
+ // Veyron name of the mount point.
+ RawStoreSuffix = ".store.raw"
+)
+
+// Store defines a raw interface for the Veyron store. Mutations can be received
+// via the Watcher interface, and committed via PutMutation.
+// Store is the interface the client binds and uses.
+// Store_InternalNoTagGetter is the interface without the TagGetter
+// and UnresolveStep methods (both framework-added, rathern than user-defined),
+// to enable embedding without method collisions. Not to be used directly by
+// clients.
+type Store_InternalNoTagGetter interface {
+ watch.Watcher_InternalNoTagGetter
+
+ // PutMutations atomically commits a stream of Mutations when the stream is
+ // closed. Mutations are not committed if the request is cancelled before
+ // the stream has been closed.
+ PutMutations(opts ..._gen_ipc.ClientCallOpt) (reply StorePutMutationsStream, err error)
+}
+type Store interface {
+ _gen_idl.TagGetter
+ // UnresolveStep returns the names for the remote service, rooted at the
+ // service's immediate namespace ancestor.
+ UnresolveStep(opts ..._gen_ipc.ClientCallOpt) ([]string, error)
+ Store_InternalNoTagGetter
+}
+
+// StoreService is the interface the server implements.
+type StoreService interface {
+ watch.WatcherService
+
+ // PutMutations atomically commits a stream of Mutations when the stream is
+ // closed. Mutations are not committed if the request is cancelled before
+ // the stream has been closed.
+ PutMutations(context _gen_ipc.Context, stream StoreServicePutMutationsStream) (err error)
+}
+
+// StorePutMutationsStream is the interface for streaming responses of the method
+// PutMutations in the service interface Store.
+type StorePutMutationsStream interface {
+
+ // Send places the item onto the output stream, blocking if there is no buffer
+ // space available.
+ Send(item Mutation) error
+
+ // CloseSend indicates to the server that no more items will be sent; server
+ // Recv calls will receive io.EOF after all sent items. Subsequent calls to
+ // Send on the client will fail. This is an optional call - it's used by
+ // streaming clients that need the server to receive the io.EOF terminator.
+ CloseSend() error
+
+ // Finish closes the stream and returns the positional return values for
+ // call.
+ Finish() (err error)
+
+ // Cancel cancels the RPC, notifying the server to stop processing.
+ Cancel()
+}
+
+// Implementation of the StorePutMutationsStream interface that is not exported.
+type implStorePutMutationsStream struct {
+ clientCall _gen_ipc.ClientCall
+}
+
+func (c *implStorePutMutationsStream) Send(item Mutation) error {
+ return c.clientCall.Send(item)
+}
+
+func (c *implStorePutMutationsStream) CloseSend() error {
+ return c.clientCall.CloseSend()
+}
+
+func (c *implStorePutMutationsStream) Finish() (err error) {
+ if ierr := c.clientCall.Finish(&err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+func (c *implStorePutMutationsStream) Cancel() {
+ c.clientCall.Cancel()
+}
+
+// StoreServicePutMutationsStream is the interface for streaming responses of the method
+// PutMutations in the service interface Store.
+type StoreServicePutMutationsStream interface {
+
+ // Recv fills itemptr with the next item in the input stream, blocking until
+ // an item is available. Returns io.EOF to indicate graceful end of input.
+ Recv() (item Mutation, err error)
+}
+
+// Implementation of the StoreServicePutMutationsStream interface that is not exported.
+type implStoreServicePutMutationsStream struct {
+ serverCall _gen_ipc.ServerCall
+}
+
+func (s *implStoreServicePutMutationsStream) Recv() (item Mutation, err error) {
+ err = s.serverCall.Recv(&item)
+ return
+}
+
+// BindStore returns the client stub implementing the Store
+// interface.
+//
+// If no _gen_ipc.Client is specified, the default _gen_ipc.Client in the
+// global Runtime is used.
+func BindStore(name string, opts ..._gen_ipc.BindOpt) (Store, error) {
+ var client _gen_ipc.Client
+ switch len(opts) {
+ case 0:
+ client = _gen_rt.R().Client()
+ case 1:
+ switch o := opts[0].(type) {
+ case _gen_veyron2.Runtime:
+ client = o.Client()
+ case _gen_ipc.Client:
+ client = o
+ default:
+ return nil, _gen_idl.ErrUnrecognizedOption
+ }
+ default:
+ return nil, _gen_idl.ErrTooManyOptionsToBind
+ }
+ stub := &clientStubStore{client: client, name: name}
+ stub.Watcher_InternalNoTagGetter, _ = watch.BindWatcher(name, client)
+
+ return stub, nil
+}
+
+// NewServerStore creates a new server stub.
+//
+// It takes a regular server implementing the StoreService
+// interface, and returns a new server stub.
+func NewServerStore(server StoreService) interface{} {
+ return &ServerStubStore{
+ ServerStubWatcher: *watch.NewServerWatcher(server).(*watch.ServerStubWatcher),
+ service: server,
+ }
+}
+
+// clientStubStore implements Store.
+type clientStubStore struct {
+ watch.Watcher_InternalNoTagGetter
+
+ client _gen_ipc.Client
+ name string
+}
+
+func (c *clientStubStore) GetMethodTags(method string) []interface{} {
+ return GetStoreMethodTags(method)
+}
+
+func (__gen_c *clientStubStore) PutMutations(opts ..._gen_ipc.ClientCallOpt) (reply StorePutMutationsStream, err error) {
+ var call _gen_ipc.ClientCall
+ if call, err = __gen_c.client.StartCall(__gen_c.name, "PutMutations", nil, opts...); err != nil {
+ return
+ }
+ reply = &implStorePutMutationsStream{clientCall: call}
+ return
+}
+
+func (c *clientStubStore) UnresolveStep(opts ..._gen_ipc.ClientCallOpt) (reply []string, err error) {
+ var call _gen_ipc.ClientCall
+ if call, err = c.client.StartCall(c.name, "UnresolveStep", nil, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+// ServerStubStore wraps a server that implements
+// StoreService and provides an object that satisfies
+// the requirements of veyron2/ipc.ReflectInvoker.
+type ServerStubStore struct {
+ watch.ServerStubWatcher
+
+ service StoreService
+}
+
+func (s *ServerStubStore) GetMethodTags(method string) []interface{} {
+ return GetStoreMethodTags(method)
+}
+
+func (s *ServerStubStore) Signature(call _gen_ipc.ServerCall) (_gen_ipc.ServiceSignature, error) {
+ result := _gen_ipc.ServiceSignature{Methods: make(map[string]_gen_ipc.MethodSignature)}
+ result.Methods["PutMutations"] = _gen_ipc.MethodSignature{
+ InArgs: []_gen_ipc.MethodArgument{},
+ OutArgs: []_gen_ipc.MethodArgument{
+ {Name: "", Type: 65},
+ },
+ InStream: 76,
+ }
+
+ result.TypeDefs = []_gen_idl.AnyData{
+ _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "byte", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x42, Len: 0x10, Name: "storage.ID", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x35, Name: "storage.Version", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "anydata", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "TagOp", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x42, Len: 0x10, Name: "ID", Tags: []string(nil)}, _gen_wiretype.StructType{
+ []_gen_wiretype.FieldType{
+ _gen_wiretype.FieldType{Type: 0x46, Name: "Op"},
+ _gen_wiretype.FieldType{Type: 0x47, Name: "ACL"},
+ },
+ "Tag", []string(nil)},
+ _gen_wiretype.SliceType{Elem: 0x48, Name: "storage.TagList", Tags: []string(nil)}, _gen_wiretype.StructType{
+ []_gen_wiretype.FieldType{
+ _gen_wiretype.FieldType{Type: 0x3, Name: "Name"},
+ _gen_wiretype.FieldType{Type: 0x47, Name: "ID"},
+ },
+ "storage.DEntry", []string(nil)},
+ _gen_wiretype.SliceType{Elem: 0x4a, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
+ []_gen_wiretype.FieldType{
+ _gen_wiretype.FieldType{Type: 0x43, Name: "ID"},
+ _gen_wiretype.FieldType{Type: 0x44, Name: "PriorVersion"},
+ _gen_wiretype.FieldType{Type: 0x44, Name: "Version"},
+ _gen_wiretype.FieldType{Type: 0x2, Name: "IsRoot"},
+ _gen_wiretype.FieldType{Type: 0x45, Name: "Value"},
+ _gen_wiretype.FieldType{Type: 0x49, Name: "Tags"},
+ _gen_wiretype.FieldType{Type: 0x4b, Name: "Dir"},
+ },
+ "Mutation", []string(nil)},
+ }
+ var ss _gen_ipc.ServiceSignature
+ var firstAdded int
+ ss, _ = s.ServerStubWatcher.Signature(call)
+ firstAdded = len(result.TypeDefs)
+ for k, v := range ss.Methods {
+ for i, _ := range v.InArgs {
+ if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ for i, _ := range v.OutArgs {
+ if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ if v.InStream >= _gen_wiretype.TypeIDFirst {
+ v.InStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ if v.OutStream >= _gen_wiretype.TypeIDFirst {
+ v.OutStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ result.Methods[k] = v
+ }
+ //TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
+ for _, d := range ss.TypeDefs {
+ switch wt := d.(type) {
+ case _gen_wiretype.SliceType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.ArrayType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.MapType:
+ if wt.Key >= _gen_wiretype.TypeIDFirst {
+ wt.Key += _gen_wiretype.TypeID(firstAdded)
+ }
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.StructType:
+ for _, fld := range wt.Fields {
+ if fld.Type >= _gen_wiretype.TypeIDFirst {
+ fld.Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ d = wt
+ }
+ result.TypeDefs = append(result.TypeDefs, d)
+ }
+
+ return result, nil
+}
+
+func (s *ServerStubStore) UnresolveStep(call _gen_ipc.ServerCall) (reply []string, err error) {
+ if unresolver, ok := s.service.(_gen_ipc.Unresolver); ok {
+ return unresolver.UnresolveStep(call)
+ }
+ if call.Server() == nil {
+ return
+ }
+ var published []string
+ if published, err = call.Server().Published(); err != nil || published == nil {
+ return
+ }
+ reply = make([]string, len(published))
+ for i, p := range published {
+ reply[i] = _gen_naming.Join(p, call.Name())
+ }
+ return
+}
+
+func (__gen_s *ServerStubStore) PutMutations(call _gen_ipc.ServerCall) (err error) {
+ stream := &implStoreServicePutMutationsStream{serverCall: call}
+ err = __gen_s.service.PutMutations(call, stream)
+ return
+}
+
+func GetStoreMethodTags(method string) []interface{} {
+ if resp := watch.GetWatcherMethodTags(method); resp != nil {
+ return resp
+ }
+ switch method {
+ case "PutMutations":
+ return []interface{}{}
+ default:
+ return nil
+ }
+}
diff --git a/services/store/server/server.go b/services/store/server/server.go
index bc46671..c148638 100644
--- a/services/store/server/server.go
+++ b/services/store/server/server.go
@@ -6,9 +6,9 @@
"sync"
"time"
- "veyron/services/store/estore"
"veyron/services/store/memstore"
memwatch "veyron/services/store/memstore/watch"
+ "veyron/services/store/raw"
"veyron/services/store/service"
"veyron2/idl"
@@ -203,9 +203,11 @@
return s.watcher.Watch(ctx, req, stream)
}
-// PutMutations puts external mutations in the store, within a transaction.
-func (s *Server) PutMutations(ctx ipc.Context, mu []estore.Mutation) error {
- return s.store.PutMutations(mu)
+// PutMutations atomically commits a stream of Mutations when the stream is
+// closed. Mutations are not committed if the request is cancelled before the
+// stream has been closed.
+func (s *Server) PutMutations(ctx ipc.Context, stream raw.StoreServicePutMutationsStream) error {
+ return s.store.PutMutations(ctx, stream)
}
// ReadConflicts returns the stream of conflicts to store values. A
@@ -214,14 +216,20 @@
panic("not implemented")
}
-// Store and object dispatchers.
-// Typically, the store dispatcher handles paths with ".store" prefix, and the
+// Store, raw store and object dispatchers.
+// Typically, the store dispatcher handles paths with ".store" prefix, the
+// raw store dispatcher handles paths with ".store.raw" prefix, and the
// object dispatcher handles paths with "" prefix.
// TODO(sadovsky): Revisit this scheme. Seems simpler to have one dispatcher?
-// NewStoreDispatcher returns an storeDispatcher.
+// NewStoreDispatcher returns a store dispatcher.
func NewStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
- return ipc.SoloDispatcher(estore.NewServerStore(s), auth)
+ return ipc.SoloDispatcher(store.NewServerStore(s), auth)
+}
+
+// NewRawStoreDispatcher returns a raw store dispatcher.
+func NewRawStoreDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
+ return ipc.SoloDispatcher(raw.NewServerStore(s), auth)
}
type objectDispatcher struct {
@@ -229,7 +237,7 @@
auth security.Authorizer
}
-// NewObjectDispatcher returns an objectDispatcher.
+// NewObjectDispatcher returns an object dispatcher.
func NewObjectDispatcher(s *Server, auth security.Authorizer) ipc.Dispatcher {
return &objectDispatcher{s: s, auth: auth}
}
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index 8cd60d9..97c3426 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -7,7 +7,7 @@
"testing"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/ipc"
"veyron2/security"
@@ -143,7 +143,7 @@
if change.State != watch.Exists {
t.Fatalf("Expected id to exist: %v", id)
}
- cv := change.Value.(*estore.Mutation)
+ cv := change.Value.(*raw.Mutation)
if cv.Value != value {
t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
}
@@ -151,7 +151,7 @@
func findChange(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
for _, change := range changes {
- cv, ok := change.Value.(*estore.Mutation)
+ cv, ok := change.Value.(*raw.Mutation)
if !ok {
t.Fatal("Expected a Mutation")
}
diff --git a/services/store/service/service.go b/services/store/service/service.go
index 51826db..47249e6 100644
--- a/services/store/service/service.go
+++ b/services/store/service/service.go
@@ -1,7 +1,7 @@
package service
import (
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/ipc"
"veyron2/query"
@@ -42,11 +42,18 @@
Glob(clientID security.PublicID, t storage.Transaction, pattern string) (GlobStream, error)
}
-// Store is the client interface to the storage system.
+// Store is the server-side interface to the storage system. It is expected to
+// sit directly behind the rpc handlers defined by
+// veyron2/services/store/service.idl.
type Store interface {
// Bind returns the Object associated with a path.
Bind(path string) Object
+ // PutMutations atomically commits a stream of Mutations when the stream is
+ // closed. Mutations are not committed if the request is cancelled before
+ // the stream has been closed.
+ PutMutations(ctx ipc.Context, stream raw.StoreServicePutMutationsStream) error
+
// Glob returns a set of names that match the glob pattern.
Glob(clientID security.PublicID, t storage.Transaction, pattern string) (GlobStream, error)
@@ -54,9 +61,6 @@
// query.
Search(t storage.Transaction, q query.Query) storage.Iterator
- // PutMutations puts external mutations in the store, within a transaction.
- PutMutations(mu []estore.Mutation) error
-
// SetConflictResolver specifies a function to perform conflict resolution.
// The <ty> represents the IDL name for the type.
SetConflictResolver(ty string, r storage.ConflictResolver)
diff --git a/services/store/stored/main.go b/services/store/stored/main.go
index 7e5ba80..3e61c11 100644
--- a/services/store/stored/main.go
+++ b/services/store/stored/main.go
@@ -7,8 +7,9 @@
// - <name> is the Veyron mount point name, default /global/vstore/<hostname>/<username>.
// - <dbName> is the filename in which to store the data.
//
-// The Store service has Veyron name, <name>/.store. Individual values with
-// path <path> have name <name>/<path>.
+// The store service has Veyron name, <name>/.store.
+// The raw store service has Veyron name, <name>/.store.raw.
+// Individual values with path <path> have name <name>/<path>.
package main
import (
@@ -18,9 +19,11 @@
"os/user"
vflag "veyron/security/flag"
+ "veyron/services/store/raw"
"veyron/services/store/server"
"veyron2/rt"
+ "veyron2/services/store"
)
var (
@@ -65,10 +68,14 @@
// Register the services.
storeDisp := server.NewStoreDispatcher(storeService, auth)
+ rawStoreDisp := server.NewRawStoreDispatcher(storeService, auth)
objectDisp := server.NewObjectDispatcher(storeService, auth)
- if err := s.Register(".store", storeDisp); err != nil {
+ if err := s.Register(store.StoreSuffix, storeDisp); err != nil {
log.Fatal("s.Register(storeDisp) failed: ", err)
}
+ if err := s.Register(raw.RawStoreSuffix, rawStoreDisp); err != nil {
+ log.Fatal("s.Register(rawStoreDisp) failed: ", err)
+ }
if err := s.Register("", objectDisp); err != nil {
log.Fatal("s.Register(objectDisp) failed: ", err)
}
diff --git a/services/store/testutil/store.go b/services/store/testutil/store.go
index 8e47eb3..560cbdd 100644
--- a/services/store/testutil/store.go
+++ b/services/store/testutil/store.go
@@ -7,11 +7,12 @@
"os"
"testing"
- store "veyron/services/store/server"
+ istore "veyron/services/store/server"
"veyron2/ipc"
"veyron2/naming"
"veyron2/security"
+ "veyron2/services/store"
)
// NewStore creates a new testing instance of the store server and returns
@@ -26,8 +27,8 @@
}
// Create a new StoreService.
- config := store.ServerConfig{Admin: id, DBName: dbName}
- storeService, err := store.New(config)
+ config := istore.ServerConfig{Admin: id, DBName: dbName}
+ storeService, err := istore.New(config)
if err != nil {
t.Fatalf("New(%v) failed: %v", config, err)
}
@@ -41,9 +42,9 @@
t.Logf("Storage server at %v", name)
// Register the services.
- storeDispatcher := store.NewStoreDispatcher(storeService, nil)
- objectDispatcher := store.NewObjectDispatcher(storeService, nil)
- if err := server.Register(name+"/.store", storeDispatcher); err != nil {
+ storeDispatcher := istore.NewStoreDispatcher(storeService, nil)
+ objectDispatcher := istore.NewObjectDispatcher(storeService, nil)
+ if err := server.Register(naming.JoinAddressName(name, store.StoreSuffix), storeDispatcher); err != nil {
t.Fatalf("Register(%v) failed: %v", storeDispatcher, err)
}
if err := server.Register(name, objectDispatcher); err != nil {