Management Refactor: Introduce fs.Memstore to break Veyron store dependency
This change adds a fs.Memstore that provides a simple temporary alternative
to the Veyron store for the Application and Profile state.
Change-Id: I0fdfde481cc98ec323ee1d5b8b06b031cfa320e0
diff --git a/services/mgmt/application/impl/dispatcher.go b/services/mgmt/application/impl/dispatcher.go
index 61f5cad..dbf6b42 100644
--- a/services/mgmt/application/impl/dispatcher.go
+++ b/services/mgmt/application/impl/dispatcher.go
@@ -3,24 +3,31 @@
import (
"veyron/services/mgmt/repository"
+ "veyron/services/mgmt/lib/fs"
"veyron2/ipc"
"veyron2/security"
)
// dispatcher holds the state of the application repository dispatcher.
type dispatcher struct {
- storeRoot string
+ store *fs.Memstore
auth security.Authorizer
+ storeRoot string
}
// NewDispatcher is the dispatcher factory.
func NewDispatcher(name string, authorizer security.Authorizer) (*dispatcher, error) {
- return &dispatcher{storeRoot: name, auth: authorizer}, nil
+ // TODO(rjkroege@google.com): Use the config service.
+ store, err := fs.NewMemstore("")
+ if err != nil {
+ return nil, err
+ }
+ return &dispatcher{store: store, storeRoot: name, auth: authorizer}, nil
}
// DISPATCHER INTERFACE IMPLEMENTATION
func (d *dispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(repository.NewServerApplication(NewInvoker(d.storeRoot, suffix)))
+ invoker := ipc.ReflectInvoker(repository.NewServerApplication(NewInvoker(d.store, d.storeRoot, suffix)))
return invoker, d.auth, nil
}
diff --git a/services/mgmt/application/impl/invoker.go b/services/mgmt/application/impl/invoker.go
index 4a98a05..a0f3d2f 100644
--- a/services/mgmt/application/impl/invoker.go
+++ b/services/mgmt/application/impl/invoker.go
@@ -4,18 +4,21 @@
"errors"
"strings"
+ "veyron/services/mgmt/lib/fs"
_ "veyron/services/store/typeregistryhack"
-
"veyron2/ipc"
"veyron2/naming"
"veyron2/services/mgmt/application"
- "veyron2/storage"
- "veyron2/storage/vstore"
"veyron2/vlog"
)
// invoker holds the state of an application repository invocation.
type invoker struct {
+ // store is the storage server used for storing application
+ // metadata.
+ // All Invokers share a single dispatcher's Memstore.
+ store *fs.Memstore
+
// storeRoot is a name in the Store under which all data will be stored.
storeRoot string
// suffix is the suffix of the current invocation that is assumed to
@@ -30,8 +33,8 @@
)
// NewInvoker is the invoker factory.
-func NewInvoker(storeRoot, suffix string) *invoker {
- return &invoker{storeRoot: storeRoot, suffix: suffix}
+func NewInvoker(store *fs.Memstore, storeRoot, suffix string) *invoker {
+ return &invoker{store: store, storeRoot: storeRoot, suffix: suffix}
}
func parse(suffix string) (string, string, error) {
@@ -48,26 +51,6 @@
// APPLICATION INTERFACE IMPLEMENTATION
-// dir is used to organize directory contents in the store.
-type dir struct{}
-
-// makeParentNodes creates the parent nodes if they do not already exist.
-func makeParentNodes(context ipc.ServerContext, tx storage.Transaction, path string) error {
- pathComponents := storage.ParsePath(path)
- for i := 0; i < len(pathComponents); i++ {
- name := pathComponents[:i].String()
- object := tx.Bind(name)
- if exists, err := object.Exists(context); err != nil {
- return errOperationFailed
- } else if !exists {
- if _, err := object.Put(context, &dir{}); err != nil {
- return errOperationFailed
- }
- }
- }
- return nil
-}
-
func (i *invoker) Match(context ipc.ServerContext, profiles []string) (application.Envelope, error) {
vlog.VI(0).Infof("%v.Match(%v)", i.suffix, profiles)
empty := application.Envelope{}
@@ -78,10 +61,13 @@
if version == "" {
return empty, errInvalidSuffix
}
- root := vstore.New().Bind(i.storeRoot)
+
+ i.store.Lock()
+ defer i.store.Unlock()
+
for _, profile := range profiles {
path := naming.Join("/applications", name, profile, version)
- entry, err := root.Bind(path).Get(context)
+ entry, err := i.store.BindObject(path).Get(context)
if err != nil {
continue
}
@@ -103,25 +89,24 @@
if version == "" {
return errInvalidSuffix
}
- tx := vstore.New().NewTransaction(context, i.storeRoot)
- var entry storage.Stat
+ i.store.Lock()
+ defer i.store.Unlock()
+ // Transaction is rooted at "", so tname == tid.
+ tname, err := i.store.BindTransactionRoot("").CreateTransaction(context)
+ if err != nil {
+ return err
+ }
+
for _, profile := range profiles {
- path := naming.Join("/applications", name, profile, version)
- if err := makeParentNodes(context, tx, path); err != nil {
- return err
- }
- object := tx.Bind(path)
- if !entry.ID.IsValid() {
- if entry, err = object.Put(context, envelope); err != nil {
- return errOperationFailed
- }
- } else {
- if _, err := object.Put(context, entry.ID); err != nil {
- return errOperationFailed
- }
+ path := naming.Join(tname, "/applications", name, profile, version)
+
+ object := i.store.BindObject(path)
+ _, err := object.Put(context, envelope)
+ if err != nil {
+ return errOperationFailed
}
}
- if err := tx.Commit(context); err != nil {
+ if err := i.store.BindTransaction(tname).Commit(context); err != nil {
return errOperationFailed
}
return nil
@@ -133,9 +118,18 @@
if err != nil {
return err
}
- tx := vstore.New().NewTransaction(context, i.storeRoot)
- path := naming.Join("/applications", name, profile, version)
- object := tx.Bind(path)
+ i.store.Lock()
+ defer i.store.Unlock()
+ // Transaction is rooted at "", so tname == tid.
+ tname, err := i.store.BindTransactionRoot("").CreateTransaction(context)
+ if err != nil {
+ return err
+ }
+ path := naming.Join(tname, "/applications", name, profile)
+ if version != "" {
+ path += "/" + version
+ }
+ object := i.store.BindObject(path)
found, err := object.Exists(context)
if err != nil {
return errOperationFailed
@@ -146,7 +140,7 @@
if err := object.Remove(context); err != nil {
return errOperationFailed
}
- if err := tx.Commit(context); err != nil {
+ if err := i.store.BindTransaction(tname).Commit(context); err != nil {
return errOperationFailed
}
return nil
diff --git a/services/mgmt/lib/fs/simplestore.go b/services/mgmt/lib/fs/simplestore.go
new file mode 100644
index 0000000..beb2f40
--- /dev/null
+++ b/services/mgmt/lib/fs/simplestore.go
@@ -0,0 +1,339 @@
+// Implements a map-based store substitute that implements the legacy store API.
+package fs
+
+import (
+ "encoding/gob"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "strings"
+ "sync"
+
+ "veyron/services/mgmt/profile"
+ "veyron2/services/mgmt/application"
+ "veyron2/verror"
+)
+
+// TODO(rjkroege@google.com) Switch Memstore to the mid-August 2014 style store API.
+
+// Memstore contains the state of the memstore. It supports a single
+// transaction at a time. The current state of a Memstore under a
+// transactional name binding is the contents of puts then the contents
+// of (data - removes). puts and removes will be empty at the beginning
+// of a transaction and after an Unlock operation.
+type Memstore struct {
+ sync.Mutex
+ persistedFile string
+ haveTransactionNameBinding bool
+ locked bool
+ data map[string]interface{}
+ puts map[string]interface{}
+ removes map[string]struct{}
+}
+
+const (
+ startingMemstoreSize = 10
+ transactionNamePrefix = "memstore-transaction"
+)
+
+var keyExists = struct{}{}
+
+// The implementation of set requires gob instead of json.
+func init() {
+ gob.Register(profile.Specification{})
+ gob.Register(application.Envelope{})
+}
+
+// NewMemstore persists the Memstore to /tmp if no file is configured.
+func NewMemstore(configuredPersistentFile string) (*Memstore, error) {
+ data := make(map[string]interface{}, startingMemstoreSize)
+ if configuredPersistentFile == "" {
+ f, err := ioutil.TempFile(os.TempDir(), "memstore-gob")
+ if err != nil {
+ return nil, fmt.Errorf("TempFile(%q, %q) failed: %v", os.TempDir(), "memstore-gob", err)
+ }
+ defer f.Close()
+ configuredPersistentFile = f.Name()
+ return &Memstore{
+ data: data,
+ persistedFile: configuredPersistentFile,
+ }, nil
+ }
+
+ file, err := os.Open(configuredPersistentFile)
+ if err != nil {
+ // The file doesn't exist. Attempt to create it instead.
+ file, cerr := os.Create(configuredPersistentFile)
+ if cerr != nil {
+ return nil, fmt.Errorf("File (%q) could neither be opened (%v) or created (%v)", configuredPersistentFile, err, cerr)
+ }
+ defer file.Close()
+ } else {
+ decoder := gob.NewDecoder(file)
+ if err := decoder.Decode(&data); err != nil {
+ return nil, fmt.Errorf("Decode() failed: %v", err)
+ }
+ }
+ return &Memstore{
+ data: data,
+ persistedFile: configuredPersistentFile,
+ }, nil
+}
+
+type MemstoreObject interface {
+ Remove(_ interface{}) error
+ Exists(_ interface{}) (bool, error)
+}
+
+type boundObject struct {
+ path string
+ ms *Memstore
+ Value interface{}
+}
+
+// BindObject sets the path string for subsequent operations.
+func (ms *Memstore) BindObject(path string) *boundObject {
+ pathParts := strings.SplitN(path, "/", 2)
+ if pathParts[0] == transactionNamePrefix {
+ ms.haveTransactionNameBinding = true
+ } else {
+ ms.haveTransactionNameBinding = false
+ }
+ return &boundObject{path: pathParts[1], ms: ms}
+}
+
+func (ms *Memstore) removeChildren(path string) bool {
+ deleted := false
+ for k, _ := range ms.data {
+ if strings.HasPrefix(k, path) {
+ deleted = true
+ ms.removes[k] = keyExists
+ }
+ }
+ for k, _ := range ms.puts {
+ if strings.HasPrefix(k, path) {
+ deleted = true
+ delete(ms.puts, k)
+ }
+ }
+ return deleted
+}
+
+type Transaction interface {
+ CreateTransaction(_ interface{}) (string, error)
+ Commit(_ interface{}) error
+}
+
+// BindTransactionRoot on a Memstore always operates over the
+// entire Memstore. As a result, the root parameter is ignored.
+func (ms *Memstore) BindTransactionRoot(_ string) Transaction {
+ return ms
+}
+
+// BindTransaction on a Memstore can only use the single Memstore
+// transaction.
+func (ms *Memstore) BindTransaction(_ string) Transaction {
+ return ms
+}
+
+func (ms *Memstore) newTransactionState() {
+ ms.puts = make(map[string]interface{}, startingMemstoreSize)
+ ms.removes = make(map[string]struct{}, startingMemstoreSize)
+}
+
+func (ms *Memstore) clearTransactionState() {
+ ms.puts = nil
+ ms.removes = nil
+}
+
+// Unlock abandons an in-progress transaction before releasing the lock.
+func (ms *Memstore) Unlock() {
+ ms.locked = false
+ ms.clearTransactionState()
+ ms.Mutex.Unlock()
+}
+
+// Lock acquires a lock and caches the state of the lock.
+func (ms *Memstore) Lock() {
+ ms.Mutex.Lock()
+ ms.clearTransactionState()
+ ms.locked = true
+}
+
+// CreateTransaction requires the caller to acquire a lock on the Memstore.
+func (ms *Memstore) CreateTransaction(_ interface{}) (string, error) {
+ if ms.puts != nil || ms.removes != nil {
+ return "", verror.BadProtocolf("recursive CreateTransaction() not permitted")
+ }
+ ms.newTransactionState()
+ return transactionNamePrefix, nil
+}
+
+// Commit updates the store and persists the result.
+func (ms *Memstore) Commit(_ interface{}) error {
+ if !ms.locked || ms.puts == nil || ms.removes == nil {
+ return verror.BadProtocolf("illegal attempt to commit previously committed or abandonned transaction")
+ }
+ for k, v := range ms.puts {
+ ms.data[k] = v
+ }
+ for k, _ := range ms.removes {
+ delete(ms.data, k)
+ }
+ return ms.persist()
+}
+
+func (ms *Memstore) Abort(_ interface{}) error {
+ if !ms.locked {
+ return verror.BadProtocolf("illegal attempt to abort non-existent transaction")
+ }
+ return nil
+}
+
+func (o *boundObject) Remove(_ interface{}) error {
+ if !o.ms.locked {
+ return verror.BadProtocolf("Remove() without a transaction.")
+ }
+
+ if _, pendingRemoval := o.ms.removes[o.path]; pendingRemoval {
+ return verror.NotFoundf("path %s not in Memstore", o.path)
+ }
+
+ _, found := o.ms.data[o.path]
+ if !found && !o.ms.removeChildren(o.path) {
+ return verror.NotFoundf("path %s not in Memstore", o.path)
+ }
+ delete(o.ms.puts, o.path)
+ o.ms.removes[o.path] = keyExists
+ return nil
+}
+
+// transactionExists implements Exists() for bound names that have the
+// transaction prefix.
+func (o *boundObject) transactionExists() bool {
+ // Determine if the bound name point to a real object.
+ _, inBase := o.ms.data[o.path]
+ _, inPuts := o.ms.puts[o.path]
+ _, inRemoves := o.ms.removes[o.path]
+
+ // not yet committed.
+ if inPuts || (inBase && !inRemoves) {
+ return true
+ }
+
+ // The bound names might be a prefix of the path for a real object. For
+ // example, BindObject("/test/a"). Put(o) creates a real object o at path
+ /// test/a so the code above will cause BindObject("/test/a").Exists() to
+ // return true. Testing this is however not sufficient because
+ // BindObject(any prefix of on object path).Exists() needs to also be
+ // true. For example, here BindObject("/test").Exists() is true.
+ //
+ // Consequently, transactionExists scans all object names in the Memstore
+ // to determine if any of their names have the bound name as a prefix.
+ // Puts take precedence over removes so we scan it first.
+
+ for k, _ := range o.ms.puts {
+ if strings.HasPrefix(k, o.path) {
+ return true
+ }
+ }
+
+ // Then we scan data for matches and verify that at least one of the
+ // object names with the bound prefix have not been removed.
+
+ for k, _ := range o.ms.data {
+ if _, inRemoves := o.ms.removes[k]; strings.HasPrefix(k, o.path) && !inRemoves {
+ return true
+ }
+ }
+ return false
+}
+
+func (o *boundObject) Exists(_ interface{}) (bool, error) {
+ if o.ms.haveTransactionNameBinding {
+ return o.transactionExists(), nil
+ } else {
+ _, inBase := o.ms.data[o.path]
+ if inBase {
+ return true, nil
+ }
+ for k, _ := range o.ms.data {
+ if strings.HasPrefix(k, o.path) {
+ return true, nil
+ }
+ }
+ }
+ return false, nil
+}
+
+// transactionBoundGet implements Get while the bound name has the
+// transaction prefix.
+func (o *boundObject) transactionBoundGet() (*boundObject, error) {
+ bv, inBase := o.ms.data[o.path]
+ _, inRemoves := o.ms.removes[o.path]
+ pv, inPuts := o.ms.puts[o.path]
+
+ found := inPuts || (inBase && !inRemoves)
+ if !found {
+ return nil, verror.NotFoundf("path %s not in Memstore", o.path)
+ }
+
+ if inPuts {
+ o.Value = pv
+ } else {
+ o.Value = bv
+ }
+ return o, nil
+}
+
+func (o *boundObject) bareGet() (*boundObject, error) {
+ bv, inBase := o.ms.data[o.path]
+
+ if !inBase {
+ return nil, verror.NotFoundf("path %s not in Memstore", o.path)
+ }
+
+ o.Value = bv
+ return o, nil
+}
+
+func (o *boundObject) Get(_ interface{}) (*boundObject, error) {
+ if o.ms.haveTransactionNameBinding {
+ return o.transactionBoundGet()
+ } else {
+ return o.bareGet()
+ }
+}
+
+func (o *boundObject) Put(_ interface{}, envelope interface{}) (*boundObject, error) {
+ if !o.ms.locked {
+ return nil, verror.BadProtocolf("Put() without a transaction.")
+ }
+ switch v := envelope.(type) {
+ case application.Envelope, profile.Specification:
+ o.ms.puts[o.path] = v
+ delete(o.ms.removes, o.path)
+ o.Value = o.path
+ return o, nil
+ default:
+ return o, verror.BadProtocolf("attempted Put to Memstore of unsupported type")
+ }
+}
+
+// persist() writes the state of the Memstore to persistent storage.
+func (ms *Memstore) persist() error {
+ file, err := os.Create(ms.persistedFile)
+ if err != nil {
+ return fmt.Errorf("Create(%v) failed: %v", ms.persistedFile, err)
+ }
+ defer file.Close()
+
+ // TODO(rjkroege): Switch to the vom encoder.
+ enc := gob.NewEncoder(file)
+ err = enc.Encode(ms.data)
+ if err := enc.Encode(ms.data); err != nil {
+ return fmt.Errorf("Encode() failed: %v", err)
+ }
+ ms.clearTransactionState()
+ return nil
+}
diff --git a/services/mgmt/lib/fs/simplestore_test.go b/services/mgmt/lib/fs/simplestore_test.go
new file mode 100644
index 0000000..9aee72d
--- /dev/null
+++ b/services/mgmt/lib/fs/simplestore_test.go
@@ -0,0 +1,437 @@
+package fs
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "reflect"
+ "testing"
+
+ _ "veyron/services/mgmt/profile"
+ "veyron2/naming"
+ "veyron2/services/mgmt/application"
+ "veyron2/verror"
+)
+
+func TestNewMemstore(t *testing.T) {
+ memstore, err := NewMemstore("")
+
+ if err != nil {
+ t.Fatalf("NewMemstore() failed: %v", err)
+ }
+
+ _, err = os.Stat(memstore.persistedFile)
+ if err != nil {
+ t.Fatalf("Stat(%v) failed: %v", memstore.persistedFile, err)
+ }
+}
+
+func TestNewNamedMemstore(t *testing.T) {
+ path := filepath.Join(os.TempDir(), "namedms")
+ memstore, err := NewMemstore(path)
+ if err != nil {
+ t.Fatalf("NewMemstore() failed: %v", err)
+ }
+ defer os.Remove(path)
+
+ _, err = os.Stat(memstore.persistedFile)
+ if err != nil {
+ t.Fatalf("Stat(%v) failed: %v", path, err)
+ }
+}
+
+// Verify that all of the listed paths Exists().
+// Caller is responsible for setting up any transaction state necessary.
+func allPathsExist(ts *Memstore, paths []string) error {
+ for _, p := range paths {
+ exists, err := ts.BindObject(p).Exists(nil)
+ if err != nil {
+ return fmt.Errorf("Exists(%s) expected to succeed but failed: %v", p, err)
+ }
+ if !exists {
+ return fmt.Errorf("Exists(%s) expected to be true but is false", p)
+ }
+ }
+ return nil
+}
+
+// Verify that all of the listed paths !Exists().
+// Caller is responsible for setting up any transaction state necessary.
+func allPathsDontExist(ts *Memstore, paths []string) error {
+ for _, p := range paths {
+ exists, err := ts.BindObject(p).Exists(nil)
+ if err != nil {
+ return fmt.Errorf("Exists(%s) expected to succeed but failed: %v", p, err)
+ }
+ if exists {
+ return fmt.Errorf("Exists(%s) expected to be false but is true", p)
+ }
+ }
+ return nil
+}
+
+type PathValue struct {
+ Path string
+ Expected interface{}
+}
+
+// getEquals tests that every provided path is equal to the specified value.
+func allPathsEqual(ts *Memstore, pvs []PathValue) error {
+ for _, p := range pvs {
+ v, err := ts.BindObject(p.Path).Get(nil)
+ if err != nil {
+ return fmt.Errorf("Get(%s) expected to succeed but failed", p, err)
+ }
+ if !reflect.DeepEqual(p.Expected, v.Value) {
+ return fmt.Errorf("Unexpected non-equality for %s: got %v, expected %v", p.Path, v.Value, p.Expected)
+ }
+ }
+ return nil
+}
+
+// tP is a convenience function. It prepends the transactionNamePrefix
+// to the given path.
+func tP(path string) string {
+ return naming.Join(transactionNamePrefix, path)
+}
+
+func TestSerializeDeserialize(t *testing.T) {
+ path := filepath.Join(os.TempDir(), "namedms")
+ memstoreOriginal, err := NewMemstore(path)
+ if err != nil {
+ t.Fatalf("NewMemstore() failed: %v", err)
+ }
+ defer os.Remove(path)
+
+ // Create example data.
+ envelope := application.Envelope{
+ Args: []string{"--help"},
+ Env: []string{"DEBUG=1"},
+ Binary: "/veyron/name/of/binary",
+ }
+ secondEnvelope := application.Envelope{
+ Args: []string{"--save"},
+ Env: []string{"VEYRON=42"},
+ Binary: "/veyron/name/of/binary/is/memstored",
+ }
+
+ // TRANSACTION BEGIN
+ // Insert a value into the Memstore at /test/a
+ memstoreOriginal.Lock()
+ tname, err := memstoreOriginal.BindTransactionRoot("ignored").CreateTransaction(nil)
+ if err != nil {
+ t.Fatalf("CreateTransaction() failed: %v", err)
+ }
+ if _, err := memstoreOriginal.BindObject(tP("/test/a")).Put(nil, envelope); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+
+ if err := allPathsExist(memstoreOriginal, []string{tP("/test/a"), tP("/test")}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreOriginal, []PathValue{{tP("/test/a"), envelope}}); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ if err := memstoreOriginal.BindTransaction(tname).Commit(nil); err != nil {
+ t.Fatalf("Commit() failed: %v", err)
+ }
+ memstoreOriginal.Unlock()
+ // TRANSACTION END
+
+ // Validate persisted state.
+ if err := allPathsExist(memstoreOriginal, []string{"/test/a", "/test"}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreOriginal, []PathValue{{"/test/a", envelope}}); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ // TRANSACTION BEGIN Write a value to /test/b as well.
+ memstoreOriginal.Lock()
+ tname, err = memstoreOriginal.BindTransactionRoot("also ignored").CreateTransaction(nil)
+ bindingTnameTestB := memstoreOriginal.BindObject(tP("/test/b"))
+ if _, err := bindingTnameTestB.Put(nil, envelope); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+
+ // Validate persisted state during transaction
+ if err := allPathsExist(memstoreOriginal, []string{"/test/a", "/test"}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreOriginal, []PathValue{{"/test/a", envelope}}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ // Validate pending state during transaction
+ if err := allPathsExist(memstoreOriginal, []string{tP("/test/a"), tP("/test"), tP("/test/b")}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreOriginal, []PathValue{
+ {tP("/test/a"), envelope},
+ {tP("/test/b"), envelope}}); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ // Commit the <tname>/test/b to /test/b
+ if err := memstoreOriginal.Commit(nil); err != nil {
+ t.Fatalf("Commit() failed: %v", err)
+ }
+ memstoreOriginal.Unlock()
+ // TODO(rjkroege): Consider ensuring that Get() on <tname>/test/b should now fail.
+ // TRANSACTION END
+
+ // Validate persisted state after transaction
+ if err := allPathsExist(memstoreOriginal, []string{"/test/a", "/test", "/test/b"}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreOriginal, []PathValue{
+ {"/test/a", envelope},
+ {"/test/b", envelope}}); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ // TRANSACTION BEGIN (to be abandonned)
+ memstoreOriginal.Lock()
+ tname, err = memstoreOriginal.BindTransactionRoot("").CreateTransaction(nil)
+
+ // Exists is true before doing anything.
+ if err := allPathsExist(memstoreOriginal, []string{tP("/test")}); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ if _, err := memstoreOriginal.BindObject(tP("/test/b")).Put(nil, secondEnvelope); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+
+ // Validate persisted state during transaction
+ if err := allPathsExist(memstoreOriginal, []string{
+ "/test/a",
+ "/test/b",
+ "/test",
+ tP("/test"),
+ tP("/test/a"),
+ tP("/test/b"),
+ }); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreOriginal, []PathValue{
+ {"/test/a", envelope},
+ {"/test/b", envelope},
+ {tP("/test/b"), secondEnvelope},
+ {tP("/test/a"), envelope},
+ }); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ // Pending Remove() of /test
+ if err := memstoreOriginal.BindObject(tP("/test")).Remove(nil); err != nil {
+ t.Fatalf("Remove() failed: %v", err)
+ }
+
+ // Verify that all paths are successfully removed from the in-progress transaction.
+ if err := allPathsDontExist(memstoreOriginal, []string{tP("/test/a"), tP("/test"), tP("/test/b")}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ // But all paths remain in the persisted version.
+ if err := allPathsExist(memstoreOriginal, []string{"/test/a", "/test", "/test/b"}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreOriginal, []PathValue{
+ {"/test/a", envelope},
+ {"/test/b", envelope},
+ }); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ // At which point, Get() on the transaction won't find anything.
+ if _, err := memstoreOriginal.BindObject(tP("/test/a")).Get(nil); !verror.Is(err, verror.NotFound) {
+ t.Fatalf("Get() should have failed: got %v, expected %v", err, verror.NotFoundf("path %s not in Memstore", tname+"/test/a"))
+ }
+
+ // Attempting to Remove() it over again will fail.
+ if err := memstoreOriginal.BindObject(tP("/test/a")).Remove(nil); !verror.Is(err, verror.NotFound) {
+ t.Fatalf("Remove() should have failed: got %v, expected %v", err, verror.NotFoundf("path %s not in Memstore", tname+"/test/a"))
+ }
+
+ // Attempting to Remove() a non-existing path will fail.
+ if err := memstoreOriginal.BindObject(tP("/foo")).Remove(nil); !verror.Is(err, verror.NotFound) {
+ t.Fatalf("Remove() should have failed: got %v, expected %v", err, verror.NotFoundf("path %s not in Memstore", tname+"/foo"))
+ }
+
+ // Exists() a non-existing path will fail.
+ if present, _ := memstoreOriginal.BindObject(tP("/foo")).Exists(nil); present {
+ t.Fatalf("Exists() should have failed for non-existing path %s", tname+"/foo")
+ }
+
+ // Abort the transaction without committing it.
+ memstoreOriginal.Abort(nil)
+ memstoreOriginal.Unlock()
+ // TRANSACTION END (ABORTED)
+
+ // Validate that persisted state after abandonned transaction has not changed.
+ if err := allPathsExist(memstoreOriginal, []string{"/test/a", "/test", "/test/b"}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreOriginal, []PathValue{
+ {"/test/a", envelope},
+ {"/test/b", envelope}}); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ // Validate that Get will fail on a non-existent path.
+ if _, err := memstoreOriginal.BindObject("/test/c").Get(nil); !verror.Is(err, verror.NotFound) {
+ t.Fatalf("Get() should have failed: got %v, expected %v", err, verror.NotFoundf("path %s not in Memstore", tname+"/test/c"))
+ }
+
+ // Verify that the previous Commit() operations have persisted to
+ // disk by creating a new Memstore from the contents on disk.
+ memstoreCopy, err := NewMemstore(path)
+ if err != nil {
+ t.Fatalf("NewMemstore() failed: %v", err)
+ }
+ // Verify that memstoreCopy is an exact copy of memstoreOriginal.
+ if err := allPathsExist(memstoreCopy, []string{"/test/a", "/test", "/test/b"}); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreCopy, []PathValue{
+ {"/test/a", envelope},
+ {"/test/b", envelope}}); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ // TRANSACTION BEGIN
+ memstoreCopy.Lock()
+ tname, err = memstoreCopy.BindTransactionRoot("also ignored").CreateTransaction(nil)
+
+ // Add a pending object c to test that pending objects are deleted.
+ if _, err := memstoreCopy.BindObject(tP("/test/c")).Put(nil, secondEnvelope); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+ if err := allPathsExist(memstoreCopy, []string{
+ tP("/test/a"),
+ "/test/a",
+ tP("/test"),
+ "/test",
+ tP("/test/b"),
+ "/test/b",
+ tP("/test/c"),
+ }); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsEqual(memstoreCopy, []PathValue{
+ {tP("/test/a"), envelope},
+ {tP("/test/b"), envelope},
+ {tP("/test/c"), secondEnvelope},
+ {"/test/a", envelope},
+ {"/test/b", envelope},
+ }); err != nil {
+ t.Fatalf("%v", err)
+ }
+
+ // Remove /test/a /test/b /test/c /test
+ if err := memstoreCopy.BindObject(tP("/test")).Remove(nil); err != nil {
+ t.Fatalf("Remove() failed: %v", err)
+ }
+ // Verify that all paths are successfully removed from the in-progress transaction.
+ if err := allPathsDontExist(memstoreCopy, []string{
+ tP("/test/a"),
+ tP("/test"),
+ tP("/test/b"),
+ tP("/test/c"),
+ }); err != nil {
+ t.Fatalf("%v", err)
+ }
+ if err := allPathsExist(memstoreCopy, []string{
+ "/test/a",
+ "/test",
+ "/test/b",
+ }); err != nil {
+ t.Fatalf("%v", err)
+ }
+ // Commit the change.
+ if err = memstoreCopy.Commit(nil); err != nil {
+ t.Fatalf("Commit() failed: %v", err)
+ }
+ memstoreCopy.Unlock()
+ // TRANSACTION END
+
+ // Create a new Memstore from file to see if Remove operates are
+ // persisted.
+ memstoreRemovedCopy, err := NewMemstore(path)
+ if err != nil {
+ t.Fatalf("NewMemstore() failed for removed copy: %v", err)
+ }
+ if err := allPathsDontExist(memstoreRemovedCopy, []string{
+ "/test/a",
+ "/test",
+ "/test/b",
+ "/test/c",
+ }); err != nil {
+ t.Fatalf("%v", err)
+ }
+}
+
+func TestOperationsNeedValidBinding(t *testing.T) {
+ path := filepath.Join(os.TempDir(), "namedms")
+ memstoreOriginal, err := NewMemstore(path)
+ if err != nil {
+ t.Fatalf("NewMemstore() failed: %v", err)
+ }
+ defer os.Remove(path)
+
+ // Create example data.
+ envelope := application.Envelope{
+ Args: []string{"--help"},
+ Env: []string{"DEBUG=1"},
+ Binary: "/veyron/name/of/binary",
+ }
+
+ // TRANSACTION BEGIN
+ // Attempt inserting a value at /test/a.
+ memstoreOriginal.Lock()
+ tname, err := memstoreOriginal.BindTransactionRoot("").CreateTransaction(nil)
+ if err != nil {
+ t.Fatalf("CreateTransaction() failed: %v", err)
+ }
+
+ if err := memstoreOriginal.BindTransaction(tname).Commit(nil); err != nil {
+ t.Fatalf("Commit() failed: %v", err)
+ }
+ memstoreOriginal.Unlock()
+ // TRANSACTION END
+
+ // Put outside ot a transaction should fail.
+ bindingTnameTestA := memstoreOriginal.BindObject(naming.Join("fooey", "/test/a"))
+ if _, err := bindingTnameTestA.Put(nil, envelope); !verror.Is(err, verror.BadProtocol) {
+ t.Fatalf("Put() failed: got %v, expected %v", err, verror.BadProtocolf("Put() without a transactional binding"))
+ }
+
+ // Remove outside of a transaction should fail
+ if err := bindingTnameTestA.Remove(nil); !verror.Is(err, verror.BadProtocol) {
+ t.Fatalf("Put() failed: got %v, expected %v", err, verror.BadProtocolf("Remove() without a transactional binding"))
+ }
+
+ // Commit outside of a transaction should fail
+ if err := memstoreOriginal.BindTransaction(tname).Commit(nil); !verror.Is(err, verror.BadProtocol) {
+ t.Fatalf("Commit() failed: got %v, expected %v", err, verror.BadProtocolf("illegal attempt to commit previously committed or abandonned transaction"))
+ }
+
+ // Attempt inserting a value at /test/b
+ memstoreOriginal.Lock()
+ tname, err = memstoreOriginal.BindTransactionRoot("").CreateTransaction(nil)
+ if err != nil {
+ t.Fatalf("CreateTransaction() failed: %v", err)
+ }
+
+ bindingTnameTestB := memstoreOriginal.BindObject(tP("/test/b"))
+ if _, err := bindingTnameTestB.Put(nil, envelope); err != nil {
+ t.Fatalf("Put() failed: %v", err)
+ }
+ // Abandon transaction.
+ memstoreOriginal.Unlock()
+
+ // Remove should definitely fail on an abndonned transaction.
+ if err := bindingTnameTestB.Remove(nil); !verror.Is(err, verror.BadProtocol) {
+ t.Fatalf("Remove() failed: got %v, expected %v", err, verror.Internalf("Remove() without a transactional binding"))
+ }
+}
diff --git a/services/mgmt/profile/impl/dispatcher.go b/services/mgmt/profile/impl/dispatcher.go
index fec034c..c593614 100644
--- a/services/mgmt/profile/impl/dispatcher.go
+++ b/services/mgmt/profile/impl/dispatcher.go
@@ -3,24 +3,31 @@
import (
"veyron/services/mgmt/repository"
+ "veyron/services/mgmt/lib/fs"
"veyron2/ipc"
"veyron2/security"
)
// dispatcher holds the state of the profile repository dispatcher.
type dispatcher struct {
- storeRoot string
+ store *fs.Memstore
auth security.Authorizer
+ storeRoot string
}
// NewDispatcher is the dispatcher factory.
func NewDispatcher(name string, authorizer security.Authorizer) (*dispatcher, error) {
- return &dispatcher{storeRoot: name, auth: authorizer}, nil
+ // TODO(rjkroege@google.com): Use the config service.
+ store, err := fs.NewMemstore("")
+ if err != nil {
+ return nil, err
+ }
+ return &dispatcher{store: store, storeRoot: name, auth: authorizer}, nil
}
// DISPATCHER INTERFACE IMPLEMENTATION
func (d *dispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(repository.NewServerProfile(NewInvoker(d.storeRoot, suffix)))
+ invoker := ipc.ReflectInvoker(repository.NewServerProfile(NewInvoker(d.store, d.storeRoot, suffix)))
return invoker, d.auth, nil
}
diff --git a/services/mgmt/profile/impl/invoker.go b/services/mgmt/profile/impl/invoker.go
index 720b418..3e51159 100644
--- a/services/mgmt/profile/impl/invoker.go
+++ b/services/mgmt/profile/impl/invoker.go
@@ -3,18 +3,19 @@
import (
"errors"
+ "veyron/services/mgmt/lib/fs"
"veyron/services/mgmt/profile"
- _ "veyron/services/store/typeregistryhack"
+ // _ "veyron/services/store/typeregistryhack"
"veyron2/ipc"
"veyron2/naming"
- "veyron2/storage"
- "veyron2/storage/vstore"
"veyron2/vlog"
)
// invoker holds the profile repository invocation.
type invoker struct {
+ // store is the storage server used for storing profile data.
+ store *fs.Memstore
// storeRoot is a name in the Store under which all data will be stored.
storeRoot string
// suffix is the suffix of the current invocation that is assumed to
@@ -29,46 +30,27 @@
)
// NewInvoker is the invoker factory.
-func NewInvoker(storeRoot, suffix string) *invoker {
- return &invoker{storeRoot: storeRoot, suffix: suffix}
+func NewInvoker(store *fs.Memstore, storeRoot, suffix string) *invoker {
+ return &invoker{store: store, storeRoot: storeRoot, suffix: suffix}
}
// STORE MANAGEMENT INTERFACE IMPLEMENTATION
-// dir is used to organize directory contents in the store.
-type dir struct{}
-
-// makeParentNodes creates the parent nodes if they do not already exist.
-func makeParentNodes(context ipc.ServerContext, tx storage.Transaction, path string) error {
- pathComponents := storage.ParsePath(path)
- for i := 0; i < len(pathComponents); i++ {
- name := pathComponents[:i].String()
- object := tx.Bind(name)
- if exists, err := object.Exists(context); err != nil {
- return errOperationFailed
- } else if !exists {
- if _, err := object.Get(context); err != nil {
- if _, err := object.Put(context, &dir{}); err != nil {
- return errOperationFailed
- }
- }
- }
- }
- return nil
-}
-
func (i *invoker) Put(context ipc.ServerContext, profile profile.Specification) error {
vlog.VI(0).Infof("%v.Put(%v)", i.suffix, profile)
- tx := vstore.New().NewTransaction(context, i.storeRoot)
- path := naming.Join("/profiles", i.suffix)
- if err := makeParentNodes(context, tx, path); err != nil {
+ // Transaction is rooted at "", so tname == tid.
+ i.store.Lock()
+ defer i.store.Unlock()
+ tname, err := i.store.BindTransactionRoot("").CreateTransaction(context)
+ if err != nil {
return err
}
- object := tx.Bind(path)
+ path := naming.Join(tname, "/profiles", i.suffix)
+ object := i.store.BindObject(path)
if _, err := object.Put(context, profile); err != nil {
return errOperationFailed
}
- if err := tx.Commit(context); err != nil {
+ if err := i.store.BindTransaction(tname).Commit(context); err != nil {
return errOperationFailed
}
return nil
@@ -76,9 +58,15 @@
func (i *invoker) Remove(context ipc.ServerContext) error {
vlog.VI(0).Infof("%v.Remove()", i.suffix)
- tx := vstore.New().NewTransaction(context, i.storeRoot)
- path := naming.Join("/profiles", i.suffix)
- object := tx.Bind(path)
+ i.store.Lock()
+ defer i.store.Unlock()
+ // Transaction is rooted at "", so tname == tid.
+ tname, err := i.store.BindTransactionRoot("").CreateTransaction(context)
+ if err != nil {
+ return err
+ }
+ path := naming.Join(tname, "/profiles", i.suffix)
+ object := i.store.BindObject(path)
found, err := object.Exists(context)
if err != nil {
return errOperationFailed
@@ -89,7 +77,7 @@
if err := object.Remove(context); err != nil {
return errOperationFailed
}
- if err := tx.Commit(context); err != nil {
+ if err := i.store.BindTransaction(tname).Commit(context); err != nil {
return errOperationFailed
}
return nil
@@ -99,8 +87,12 @@
func (i *invoker) lookup(context ipc.ServerContext) (profile.Specification, error) {
empty := profile.Specification{}
- path := naming.Join(i.storeRoot, "/profiles", i.suffix)
- entry, err := vstore.New().Bind(path).Get(context)
+ path := naming.Join("/profiles", i.suffix)
+
+ i.store.Lock()
+ defer i.store.Unlock()
+
+ entry, err := i.store.BindObject(path).Get(context)
if err != nil {
return empty, errNotFound
}