veyron/services/store: Streaming put mutation api.
Previously, PutMutations took an array of mutations and applied them
within a transaction. Unfortunately, this api was memory-bound.
PutMutations now takes a stream of mutations, and applies them within a
transaction.
Specifics:
- Forbids duplicate mutations on the same id.
- Supports cancellation. If the request is cancelled in a timely
fashion, any mutations received will not be committed.
Additionally, the estore api which extends veyron2/services/store.Store
has been replaced with a raw access api.
Specifics:
- veyron/services/store/raw.Store supports Watch and PutMutations.
- The raw store is mounted at extension (".store.raw"). This prevents
non-sync clients from inadvertently discovering the raw api.
Change-Id: I6575a0ca43c220239ea90c84e6c10329c625dfbe
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index 8f8f826..feb92a8 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -42,7 +42,7 @@
"strconv"
"strings"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/storage"
)
@@ -419,7 +419,7 @@
}
// fillFakeWatchRecords fills fake log and dag state (testing only).
-// TODO(hpucha): remove and clean up estore import.
+// TODO(hpucha): remove and clean up raw import.
func (l *iLog) fillFakeWatchRecords() {
const num = 10
var parvers []storage.Version
@@ -427,7 +427,7 @@
for i := int(0); i < num; i++ {
// Create a local log record.
curvers := storage.Version(i)
- if err := l.processWatchRecord(id, curvers, parvers, &LogValue{Mutation: estore.Mutation{Version: curvers}}); err != nil {
+ if err := l.processWatchRecord(id, curvers, parvers, &LogValue{Mutation: raw.Mutation{Version: curvers}}); err != nil {
return
}
parvers = []storage.Version{curvers}
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 462a83c..9b84904 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -8,7 +8,7 @@
"strings"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/naming"
"veyron2/storage"
@@ -67,7 +67,7 @@
// Next steps are to tie this up into mount table and auto-discover neighbors.
neighbors []string
neighborIDs []string
- estore estore.Store
+ store raw.Store
updObjects map[storage.ID]*objConflictState
}
@@ -100,7 +100,7 @@
// TODO(hpucha): Merge this with vstore handler in syncd.
if syncd.vstoreEndpoint != "" {
var err error
- i.estore, err = estore.BindStore(syncd.vstoreEndpoint + "/.store/sync")
+ i.store, err = raw.BindStore(naming.JoinAddressName(syncd.vstoreEndpoint, raw.RawStoreSuffix))
if err != nil {
vlog.Fatalf("newInitiator: cannot connect to Veyron store endpoint (%s): %s",
syncd.vstoreEndpoint, err)
@@ -420,7 +420,7 @@
}
// resolveConflicts resolves conflicts for updated objects.
-func (i *syncInitiator) resolveConflicts() ([]estore.Mutation, error) {
+func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
switch conflictResolutionPolicy {
case useVersion:
if err := i.resolveConflictsByVersion(); err != nil {
@@ -430,7 +430,7 @@
return nil, fmt.Errorf("unknown conflict resolution policy")
}
- var m []estore.Mutation
+ var m []raw.Mutation
for _, st := range i.updObjects {
// Append to mutations.
st.resolvVal.Mutation.PriorVersion = st.oldHead
@@ -460,7 +460,7 @@
if err != nil {
return err
}
- var m estore.Mutation
+ var m raw.Mutation
if lrecs[0].Value.Mutation.Version > lrecs[1].Value.Mutation.Version {
m = lrecs[0].Value.Mutation
} else {
@@ -494,7 +494,7 @@
// updateStoreAndSync updates the store, and if that is successful,
// updates log and dag data structures.
-func (i *syncInitiator) updateStoreAndSync(m []estore.Mutation) error {
+func (i *syncInitiator) updateStoreAndSync(m []raw.Mutation) error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
@@ -504,8 +504,17 @@
// will also hold the lock across PutMutations rpc to prevent
// a race with watcher. The next iteration will clean up this
// coordination.
- if i.estore != nil {
- if err := i.estore.PutMutations(m); err != nil {
+ if i.store != nil {
+ stream, err := i.store.PutMutations()
+ if err != nil {
+ return err
+ }
+ for i := range m {
+ if err := stream.Send(m[i]); err != nil {
+ return err
+ }
+ }
+ if err := stream.Finish(); err != nil {
return err
}
}
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index a524d57..c73e0f1 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -7,7 +7,7 @@
"testing"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/storage"
)
@@ -36,7 +36,7 @@
LSN: 100,
ObjID: objID,
CurVers: 20,
- Value: LogValue{Mutation: estore.Mutation{Version: 20}},
+ Value: LogValue{Mutation: raw.Mutation{Version: 20}},
}
if _, err := s.hdlInitiator.getLogRec(objID, expRec.CurVers); err == nil {
t.Errorf("GetLogRec didn't fail")
@@ -91,7 +91,7 @@
LSN: LSN(100 + v),
ObjID: objID,
CurVers: v,
- Value: LogValue{Mutation: estore.Mutation{Version: v, PriorVersion: 500 + v}},
+ Value: LogValue{Mutation: raw.Mutation{Version: v, PriorVersion: 500 + v}},
}
logKey, err := s.log.putLogRec(expRec)
if err != nil {
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 2df3fc5..783d23f 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -8,7 +8,7 @@
"os"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
)
// getFileName generates a filename for a temporary (per unit test) kvdb file.
@@ -135,7 +135,7 @@
CurVers: cmd.version,
Parents: cmd.parents,
Value: LogValue{
- Mutation: estore.Mutation{Version: cmd.version},
+ Mutation: raw.Mutation{Version: cmd.version},
},
}
stream.add(rec)
diff --git a/runtimes/google/vsync/vsync.idl b/runtimes/google/vsync/vsync.idl
index d470a3a..19f206e 100644
--- a/runtimes/google/vsync/vsync.idl
+++ b/runtimes/google/vsync/vsync.idl
@@ -1,7 +1,7 @@
package vsync
import (
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/storage"
)
@@ -44,7 +44,7 @@
// LogValue represents an object mutation within a transaction.
type LogValue struct {
// Mutation is the store mutation representing the change in the object.
- Mutation estore.Mutation
+ Mutation raw.Mutation
// Delete indicates whether the mutation resulted in the object being
// deleted from the store.
Delete bool
diff --git a/runtimes/google/vsync/vsync.idl.go b/runtimes/google/vsync/vsync.idl.go
index e01c0e9..a1958a5 100644
--- a/runtimes/google/vsync/vsync.idl.go
+++ b/runtimes/google/vsync/vsync.idl.go
@@ -4,7 +4,7 @@
package vsync
import (
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/storage"
@@ -32,7 +32,7 @@
// LogValue represents an object mutation within a transaction.
type LogValue struct {
// Mutation is the store mutation representing the change in the object.
- Mutation estore.Mutation
+ Mutation raw.Mutation
// Delete indicates whether the mutation resulted in the object being
// deleted from the store.
Delete bool
@@ -266,7 +266,7 @@
_gen_wiretype.FieldType{Type: 0x4e, Name: "Tags"},
_gen_wiretype.FieldType{Type: 0x50, Name: "Dir"},
},
- "estore.Mutation", []string(nil)},
+ "raw.Mutation", []string(nil)},
_gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
_gen_wiretype.FieldType{Type: 0x51, Name: "Mutation"},
diff --git a/runtimes/google/vsync/vsyncd.go b/runtimes/google/vsync/vsyncd.go
index 654fd10..375581b 100644
--- a/runtimes/google/vsync/vsyncd.go
+++ b/runtimes/google/vsync/vsyncd.go
@@ -10,7 +10,7 @@
import (
"sync"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/ipc"
"veyron2/storage"
@@ -112,7 +112,7 @@
// Register these Watch data types with VOM.
// TODO(tilaks): why aren't they auto-retrieved from the IDL?
- vom.Register(&estore.Mutation{})
+ vom.Register(&raw.Mutation{})
vom.Register(&storage.DEntry{})
// Channel to propagate close event to all threads.
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index ea48129..0617640 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -9,7 +9,7 @@
"fmt"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/query"
"veyron2/services/watch"
@@ -154,7 +154,7 @@
var lastResmark []byte
for i := range changes.Changes {
ch := &changes.Changes[i]
- mu, ok := ch.Value.(*estore.Mutation)
+ mu, ok := ch.Value.(*raw.Mutation)
if !ok {
return fmt.Errorf("invalid change value, not a mutation: %#v", ch)
}
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index ab0aa8d..01ae25a 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -11,7 +11,7 @@
"testing"
"time"
- "veyron/services/store/estore"
+ "veyron/services/store/raw"
"veyron2/query"
"veyron2/services/watch"
@@ -146,7 +146,7 @@
watch.Change{
Name: "/uid/4c6db51aa740d8c62b90df874503e285",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x4c, 0x6d, 0xb5, 0x1a, 0xa7, 0x40, 0xd8, 0xc6,
0x2b, 0x90, 0xdf, 0x87, 0x45, 0x3, 0xe2, 0x85},
PriorVersion: 0x0,
@@ -167,7 +167,7 @@
watch.Change{
Name: "/uid/082bc42e15af4fcf611d7f19a8d7831f",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
PriorVersion: 0x0,
@@ -188,7 +188,7 @@
watch.Change{
Name: "/uid/6e4a327c297d76fb5142b1b1d95b2d07",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb,
0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7},
PriorVersion: 0x0,
@@ -205,7 +205,7 @@
watch.Change{
Name: "/uid/082bc42e15af4fcf611d7f19a8d7831f",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf,
0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f},
PriorVersion: 0x57e9d1860d1d68d8,
@@ -231,7 +231,7 @@
watch.Change{
Name: "/uid/70ff65ec0f825f44b69f895eea759d71",
State: 0,
- Value: &estore.Mutation{
+ Value: &raw.Mutation{
ID: storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44,
0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71},
PriorVersion: 0x0,