veyron2/storage: Make the client library thinner.
Forked from Ken's original change (since he's on vacation):
https://veyron-review.googlesource.com/#/c/3527
This change also adapts to the new transaction model where objects
in the transaction are named relative to the transaction's name.
Without this change, clients needed to do a bunch of name joining:
st := vstore.New(storeName)
// rootName is relative to storeName.
tid := st.BindTransactionRoot(rootName).CreateTransaction(ctx)
tname := naming.Join(rootName, tid)
// obj1Name is relative to rootName.
obj1 := st.BindObject(naming.Join(tname, obj1Name)).Put(...)
st.BindTransaction(tname).Commit(ctx)
The new client code looks like:
st := vstore.New()
// rootName is a full name.
tx := st.NewTransaction(ctx, rootName)
// obj1Name is relative to rootName.
obj1 := tx.Bind(obj1Name).Put(...)
tx.Commit(ctx)
Change-Id: I34a40a9020c4716c7550da10c6c14d2aa090c648
diff --git a/examples/boxes/android/src/boxesp2p/main.go b/examples/boxes/android/src/boxesp2p/main.go
index b5f5a19..674fc4c 100644
--- a/examples/boxes/android/src/boxesp2p/main.go
+++ b/examples/boxes/android/src/boxesp2p/main.go
@@ -192,12 +192,10 @@
func (gs *goState) monitorStore() {
ctx := gs.runtime.NewContext()
- vst, err := vstore.New(gs.storeEndpoint)
- if err != nil {
- panic(fmt.Errorf("Failed to init veyron store:%v", err))
+ root := vstore.New().Bind(gs.storeEndpoint)
+ if _, err := root.Put(ctx, ""); err != nil {
+ panic(fmt.Errorf("Put for / failed:%v", err))
}
- rootName := "/"
- root := vst.BindObject(rootName)
// Watch for any box updates from the store
go func() {
@@ -223,34 +221,13 @@
err = io.EOF
}
panic(fmt.Errorf("Can't receive watch event: %s: %s", gs.storeEndpoint, err))
-
}()
// Send any box updates to the store
- tid, err := vst.BindTransactionRoot(rootName).CreateTransaction(ctx)
- if err != nil {
- panic(fmt.Errorf("CreateTransaction for %s failed:%v", rootName, err))
- }
- tname := naming.Join(rootName, tid)
- if _, err := vst.BindObject(tname).Put(ctx, ""); err != nil {
- panic(fmt.Errorf("Put for %s failed:%v", tname, err))
- }
- if err := vst.BindTransaction(tname).Commit(ctx); err != nil {
- panic(fmt.Errorf("Commit transaction failed:%v", err))
- }
for {
box := <-gs.boxList
- boxesName := "/" + box.BoxId
- tid, err = vst.BindTransactionRoot(boxesName).CreateTransaction(ctx)
- if err != nil {
- panic(fmt.Errorf("CreateTransaction for %s failed:%v", boxesName, err))
- }
- tname := naming.Join(boxesName, tid)
- if _, err := vst.BindObject(tname).Put(ctx, box); err != nil {
- panic(fmt.Errorf("Put for %s failed:%v", tname, err))
- }
- if err := vst.BindTransaction(tname).Commit(ctx); err != nil {
- panic(fmt.Errorf("Commit transaction failed:%v", err))
+ if _, err := root.Bind(box.BoxId).Put(ctx, box); err != nil {
+ panic(fmt.Errorf("Put for %s failed:%v", box.BoxId, err))
}
}
}
diff --git a/examples/mdb/mdb_init/main.go b/examples/mdb/mdb_init/main.go
index 022a362..b00013a 100644
--- a/examples/mdb/mdb_init/main.go
+++ b/examples/mdb/mdb_init/main.go
@@ -23,7 +23,6 @@
"time"
"veyron/examples/mdb/schema"
- "veyron2/naming"
"veyron2/rt"
"veyron2/storage"
"veyron2/storage/vstore"
@@ -82,9 +81,10 @@
// state is the initial store state.
type state struct {
- store storage.Store
- tname string // Current transaction name; empty if there's no transaction.
- idTable map[string]*value
+ store storage.Store
+ storeRoot string // The name of the root of the store.
+ tx storage.Transaction // Current transaction; nil if there's no transaction.
+ idTable map[string]*value
}
// value holds the ID and name of a stored value.
@@ -135,8 +135,8 @@
}
// newState returns a fresh state.
-func newState(st storage.Store) *state {
- return &state{store: st, idTable: make(map[string]*value)}
+func newState(st storage.Store, storeRoot string) *state {
+ return &state{store: st, storeRoot: storeRoot, idTable: make(map[string]*value)}
}
// find fetches a value from the store.
@@ -149,7 +149,7 @@
func (st *state) put(path string, v interface{}) {
vlog.Infof("Storing %q = %+v", path, v)
st.makeParentDirs(path)
- if _, err := st.store.BindObject(naming.Join(st.tname, path)).Put(rt.R().TODOContext(), v); err != nil {
+ if _, err := st.tx.Bind(path).Put(rt.R().TODOContext(), v); err != nil {
vlog.Infof("put failed: %s: %s", path, err)
return
}
@@ -160,7 +160,7 @@
func (st *state) putNamed(name, path string, v interface{}) {
vlog.Infof("Storing %s: %q = %+v", name, path, v)
st.makeParentDirs(path)
- s, err := st.store.BindObject(naming.Join(st.tname, path)).Put(rt.R().TODOContext(), v)
+ s, err := st.tx.Bind(path).Put(rt.R().TODOContext(), v)
if err != nil {
vlog.Infof("Put failed: %s: %s", path, err)
return
@@ -174,7 +174,7 @@
l := strings.Split(path, "/")
for i, _ := range l {
prefix := filepath.Join(l[:i]...)
- o := st.store.BindObject(naming.Join(st.tname, prefix))
+ o := st.tx.Bind(prefix)
if exist, err := o.Exists(rt.R().TODOContext()); err != nil {
vlog.Infof("Error checking existence at %q: %s", prefix, err)
} else if !exist {
@@ -186,21 +186,20 @@
}
// newTransaction starts a new transaction.
+// TODO(kash): Saving the transaction in st is not a good pattern to have in
+// examples. It is better to pass a transaction around than risk the race
+// condition of st being used from multiple threads.
func (st *state) newTransaction() {
- tid, err := st.store.BindTransactionRoot("").CreateTransaction(rt.R().TODOContext())
- if err != nil {
- vlog.Fatalf("Failed to create transaction: %s", err)
- }
- st.tname = tid // Transaction is rooted at "", so tname == tid.
+ st.tx = st.store.NewTransaction(rt.R().TODOContext(), st.storeRoot)
}
// commit commits the current transaction.
func (st *state) commit() {
- if st.tname == "" {
+ if st.tx == nil {
vlog.Fatalf("No transaction to commit")
}
- err := st.store.BindTransaction(st.tname).Commit(rt.R().TODOContext())
- st.tname = ""
+ err := st.tx.Commit(rt.R().TODOContext())
+ st.tx = nil
if err != nil {
vlog.Errorf("Failed to commit transaction: %s", err)
}
@@ -378,11 +377,7 @@
rt.Init()
vlog.Infof("Binding to store on %s", storeName)
- st, err := vstore.New(storeName)
- if err != nil {
- vlog.Fatalf("Can't connect to store: %s: %s", storeName, err)
- }
- state := newState(st)
+ state := newState(vstore.New(), storeName)
// Store all data and templates.
filepath.Walk(*templatesDir, func(path string, _ os.FileInfo, _ error) error {
diff --git a/examples/stfortune/stfortune/main.go b/examples/stfortune/stfortune/main.go
index 7cd1e0b..aff4ff4 100644
--- a/examples/stfortune/stfortune/main.go
+++ b/examples/stfortune/stfortune/main.go
@@ -43,7 +43,7 @@
// waitForStore waits for the local store to be ready by checking if
// the schema information is synchronized.
-func waitForStore(store storage.Store) {
+func waitForStore(storeAddress string) {
ctx := rt.R().NewContext()
// Register *storage.Entry for WatchGlob.
@@ -55,13 +55,14 @@
// List of paths to check in store.
paths := []string{appPath, fortunePath(""), userPath("")}
for _, path := range paths {
+ abspath := naming.Join(storeAddress, path)
req := iwatch.GlobRequest{Pattern: ""}
- stream, err := store.BindObject(path).WatchGlob(ctx, req)
+ stream, err := vstore.New().Bind(abspath).WatchGlob(ctx, req)
if err != nil {
- log.Fatalf("WatchGlob %s failed: %v", path, err)
+ log.Fatalf("WatchGlob %s failed: %v", abspath, err)
}
if !stream.RecvStream().Advance() {
- log.Fatalf("waitForStore, path: %s, Advance failed: %v", path, stream.RecvStream().Err())
+ log.Fatalf("waitForStore (abspath: %s) Advance failed: %v", abspath, stream.RecvStream().Err())
}
stream.Cancel()
}
@@ -72,7 +73,7 @@
// runAsWatcher monitors updates to the fortunes in the store and
// prints out that information. It does not return.
-func runAsWatcher(store storage.Store, user string) {
+func runAsWatcher(storeAddress string, user string) {
// TODO(tilaks): remove this when the storage.Entry is auto-registered by VOM.
vom.Register(&storage.Entry{})
ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute)
@@ -87,10 +88,11 @@
}
fmt.Printf("Running as a Watcher monitoring new fortunes under %s...\n", path)
+ abspath := naming.Join(storeAddress, path)
req := iwatch.GlobRequest{Pattern: "*"}
- stream, err := store.BindObject(path).WatchGlob(ctx, req)
+ stream, err := vstore.New().Bind(abspath).WatchGlob(ctx, req)
if err != nil {
- log.Fatalf("watcher WatchGlob %s failed: %v", path, err)
+ log.Fatalf("watcher WatchGlob %s failed: %v", abspath, err)
}
rStream := stream.RecvStream()
@@ -122,21 +124,13 @@
// pickFortuneGlob uses Glob to find all available fortunes under the input
// path and then it chooses one randomly.
-func pickFortuneGlob(store storage.Store, ctx context.T, path string) (string, error) {
- // Transaction is rooted at "", so tname == tid.
- tname, err := store.BindTransactionRoot("").CreateTransaction(ctx)
- if err != nil {
- return "", err
- }
+func pickFortuneGlob(storeAddress string, ctx context.T, path string) (string, error) {
+ tx := vstore.New().NewTransaction(ctx, storeAddress)
// This transaction is read-only, so we always abort it at the end.
- defer store.BindTransaction(tname).Abort(ctx)
+ defer tx.Abort(ctx)
- trPath := func(path string) string {
- return naming.Join(tname, path)
- }
-
- results := store.BindObject(trPath(path)).Glob(ctx, "*")
+ results := tx.Bind(path).Glob(ctx, "*")
var names []string
rStream := results.RecvStream()
for rStream.Advance() {
@@ -149,7 +143,7 @@
// Get a random fortune using the glob results.
random := rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
p := fortunePath(names[random.Intn(len(names))])
- f, err := store.BindObject(trPath(p)).Get(ctx)
+ f, err := tx.Bind(p).Get(ctx)
if err != nil {
return "", err
}
@@ -162,8 +156,9 @@
// pickFortuneQuery uses a query to find all available fortunes under the input
// path and choose one randomly.
-func pickFortuneQuery(store storage.Store, ctx context.T, path string) (string, error) {
- results := store.BindObject(path).Query(ctx,
+func pickFortuneQuery(storeAddress string, ctx context.T, path string) (string, error) {
+ abspath := naming.Join(storeAddress, path)
+ results := vstore.New().Bind(abspath).Query(ctx,
query.Query{
"* |" + // Inspect all children of path.
"type FortuneData |" + // Include only objects of type FortuneData.
@@ -187,56 +182,49 @@
// getFortune returns a random fortune corresponding to a UserName if
// specified. If not, it picks a random fortune.
-func getFortune(store storage.Store, userName string) (string, error) {
+func getFortune(storeAddress string, userName string) (string, error) {
ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute)
defer cancel()
- var p string
+ var path string
if userName != "" {
// Look for a random fortune belonging to UserName.
- p = userPath(userName)
+ path = userPath(userName)
} else {
// Look for a random fortune.
- p = fortunePath("")
+ path = fortunePath("")
}
switch *pickMethod {
case "glob":
- return pickFortuneGlob(store, ctx, p)
+ return pickFortuneGlob(storeAddress, ctx, path)
case "query":
- return pickFortuneQuery(store, ctx, p)
+ return pickFortuneQuery(storeAddress, ctx, path)
default:
return "", fmt.Errorf("unsupported value for --pick_method. use 'glob' or 'query'")
}
}
// addFortune adds a new fortune to the store and links it to the specified
-// UserName. In this process, if the UserName doesn't exist, a new
-// user is created.
-func addFortune(store storage.Store, fortune string, userName string) error {
+// UserName. In this process, if the UserName doesn't exist, a new user is
+// created.
+func addFortune(storeAddress string, fortune string, userName string) error {
ctx, cancel := rt.R().NewContext().WithTimeout(time.Minute)
defer cancel()
- // Transaction is rooted at "", so tname == tid.
- tname, err := store.BindTransactionRoot("").CreateTransaction(ctx)
- if err != nil {
- return err
- }
+ st := vstore.New()
+ tx := st.NewTransaction(ctx, storeAddress)
committed := false
defer func() {
if !committed {
- store.BindTransaction(tname).Abort(ctx)
+ tx.Abort(ctx)
}
}()
- trPath := func(path string) string {
- return naming.Join(tname, path)
- }
-
// Check if this fortune already exists. If yes, return.
hash := getMD5Hash(naming.Join(fortune, userName))
- exists, err := store.BindObject(trPath(fortunePath(hash))).Exists(ctx)
+ exists, err := tx.Bind(fortunePath(hash)).Exists(ctx)
if err != nil {
return err
}
@@ -244,8 +232,9 @@
return nil
}
- // Check if the UserName exists. If yes, get its OID. If not, create a new user.
- o := store.BindObject(trPath(userPath(userName)))
+ // Check if the UserName exists. If yes, get its OID. If not, create a new
+ // user.
+ o := tx.Bind(userPath(userName))
exists, err = o.Exists(ctx)
if err != nil {
return err
@@ -268,14 +257,14 @@
// Create a new fortune entry.
f := schema.FortuneData{Fortune: fortune, UserName: userid}
- s, err := store.BindObject(trPath(fortunePath(hash))).Put(ctx, f)
+ s, err := tx.Bind(fortunePath(hash)).Put(ctx, f)
if err != nil {
return err
}
// Link the new fortune to UserName.
p := userPath(naming.Join(userName, hash))
- if _, err = store.BindObject(trPath(p)).Put(ctx, s.ID); err != nil {
+ if _, err = tx.Bind(p).Put(ctx, s.ID); err != nil {
return err
}
@@ -285,7 +274,7 @@
// locking. When the error for this scenario is
// exposed via the Commit API, one could retry the
// transaction.
- if err := store.BindTransaction(tname).Commit(ctx); err != nil {
+ if err := tx.Commit(ctx); err != nil {
return err
}
committed = true
@@ -307,17 +296,11 @@
log.Fatal("--store needs to be specified")
}
- // Create a handle to the backend store.
- store, err := vstore.New(*storeAddress)
- if err != nil {
- log.Fatalf("Can't connect to store: %s: %v", *storeAddress, err)
- }
-
// Wait for the store to be ready before proceeding.
- waitForStore(store)
+ waitForStore(*storeAddress)
// Get a fortune from the store.
- fortune, err := getFortune(store, *user)
+ fortune, err := getFortune(*storeAddress, *user)
if err != nil {
log.Fatal("error getting fortune: ", err)
}
@@ -329,14 +312,14 @@
*user = "anonymous"
}
*user = strings.ToLower(*user)
- if err := addFortune(store, *newFortune, *user); err != nil {
+ if err := addFortune(*storeAddress, *newFortune, *user); err != nil {
log.Fatal("error adding fortune: ", err)
}
}
// Run as a watcher if --watch is set.
if *watch {
- runAsWatcher(store, *user)
+ runAsWatcher(*storeAddress, *user)
os.Exit(0)
}
}
diff --git a/examples/todos/todos_init/main.go b/examples/todos/todos_init/main.go
index 90b5179..0532ba8 100644
--- a/examples/todos/todos_init/main.go
+++ b/examples/todos/todos_init/main.go
@@ -11,7 +11,6 @@
"strings"
"veyron/examples/todos/schema"
- "veyron2/naming"
"veyron2/rt"
"veyron2/storage"
"veyron2/storage/vstore"
@@ -49,13 +48,14 @@
// state is the initial store state.
type state struct {
- store storage.Store
- tname string // Current transaction name; empty if there's no transaction.
+ store storage.Store
+ storeRoot string // The name of the root of the store.
+ tx storage.Transaction // Current transaction; nil if there's no transaction.
}
// newState returns a fresh state.
-func newState(st storage.Store) *state {
- return &state{store: st}
+func newState(st storage.Store, storeRoot string) *state {
+ return &state{store: st, storeRoot: storeRoot}
}
// put adds a value to the store, creating the path to the value if it doesn't
@@ -63,7 +63,7 @@
func (st *state) put(path string, v interface{}) {
vlog.Infof("Storing %q = %+v", path, v)
st.makeParentDirs(path)
- if _, err := st.store.BindObject(naming.Join(st.tname, path)).Put(rt.R().TODOContext(), v); err != nil {
+ if _, err := st.tx.Bind(path).Put(rt.R().TODOContext(), v); err != nil {
vlog.Errorf("put failed: %s: %s", path, err)
return
}
@@ -75,7 +75,7 @@
l := strings.Split(path, "/")
for i, _ := range l {
prefix := filepath.Join(l[:i]...)
- o := st.store.BindObject(naming.Join(st.tname, prefix))
+ o := st.tx.Bind(prefix)
if exist, err := o.Exists(rt.R().TODOContext()); err != nil {
vlog.Infof("Error checking existence at %q: %s", prefix, err)
} else if !exist {
@@ -87,21 +87,20 @@
}
// newTransaction starts a new transaction.
+// TODO(kash): Saving the transaction in st is not a good pattern to have in
+// examples. It is better to pass a transaction around than risk the race
+// condition of st being used from multiple threads.
func (st *state) newTransaction() {
- tid, err := st.store.BindTransactionRoot("").CreateTransaction(rt.R().TODOContext())
- if err != nil {
- vlog.Fatalf("Failed to create transaction: %s", err)
- }
- st.tname = tid // Transaction is rooted at "", so tname == tid.
+ st.tx = st.store.NewTransaction(rt.R().TODOContext(), st.storeRoot)
}
// commit commits the current transaction.
func (st *state) commit() {
- if st.tname == "" {
+ if st.tx == nil {
vlog.Fatalf("No transaction to commit")
}
- err := st.store.BindTransaction(st.tname).Commit(rt.R().TODOContext())
- st.tname = ""
+ err := st.tx.Commit(rt.R().TODOContext())
+ st.tx = nil
if err != nil {
vlog.Errorf("Failed to commit transaction: %s", err)
}
@@ -164,11 +163,7 @@
rt.Init()
vlog.Infof("Binding to store on %s", storeName)
- st, err := vstore.New(storeName)
- if err != nil {
- vlog.Fatalf("Can't connect to store: %s: %s", storeName, err)
- }
- state := newState(st)
+ state := newState(vstore.New(), storeName)
if err := state.processJSONFile(*dataPath); err != nil {
vlog.Errorf("Failed to write data: %s", err)
diff --git a/services/mgmt/application/impl/dispatcher.go b/services/mgmt/application/impl/dispatcher.go
index 66faf1a..61f5cad 100644
--- a/services/mgmt/application/impl/dispatcher.go
+++ b/services/mgmt/application/impl/dispatcher.go
@@ -5,28 +5,22 @@
"veyron2/ipc"
"veyron2/security"
- "veyron2/storage"
- "veyron2/storage/vstore"
)
// dispatcher holds the state of the application repository dispatcher.
type dispatcher struct {
- store storage.Store
- auth security.Authorizer
+ storeRoot string
+ auth security.Authorizer
}
// NewDispatcher is the dispatcher factory.
func NewDispatcher(name string, authorizer security.Authorizer) (*dispatcher, error) {
- store, err := vstore.New(name)
- if err != nil {
- return nil, err
- }
- return &dispatcher{store: store, auth: authorizer}, nil
+ return &dispatcher{storeRoot: name, auth: authorizer}, nil
}
// DISPATCHER INTERFACE IMPLEMENTATION
func (d *dispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(repository.NewServerApplication(NewInvoker(d.store, suffix)))
+ invoker := ipc.ReflectInvoker(repository.NewServerApplication(NewInvoker(d.storeRoot, suffix)))
return invoker, d.auth, nil
}
diff --git a/services/mgmt/application/impl/invoker.go b/services/mgmt/application/impl/invoker.go
index 7026821..4a98a05 100644
--- a/services/mgmt/application/impl/invoker.go
+++ b/services/mgmt/application/impl/invoker.go
@@ -10,14 +10,14 @@
"veyron2/naming"
"veyron2/services/mgmt/application"
"veyron2/storage"
+ "veyron2/storage/vstore"
"veyron2/vlog"
)
// invoker holds the state of an application repository invocation.
type invoker struct {
- // store is the storage server used for storing application
- // metadata.
- store storage.Store
+ // storeRoot is a name in the Store under which all data will be stored.
+ storeRoot string
// suffix is the suffix of the current invocation that is assumed to
// be used as a relative object name to identify an application.
suffix string
@@ -30,8 +30,8 @@
)
// NewInvoker is the invoker factory.
-func NewInvoker(store storage.Store, suffix string) *invoker {
- return &invoker{store: store, suffix: suffix}
+func NewInvoker(storeRoot, suffix string) *invoker {
+ return &invoker{storeRoot: storeRoot, suffix: suffix}
}
func parse(suffix string) (string, string, error) {
@@ -52,11 +52,11 @@
type dir struct{}
// makeParentNodes creates the parent nodes if they do not already exist.
-func makeParentNodes(context ipc.ServerContext, store storage.Store, path string) error {
+func makeParentNodes(context ipc.ServerContext, tx storage.Transaction, path string) error {
pathComponents := storage.ParsePath(path)
for i := 0; i < len(pathComponents); i++ {
name := pathComponents[:i].String()
- object := store.BindObject(name)
+ object := tx.Bind(name)
if exists, err := object.Exists(context); err != nil {
return errOperationFailed
} else if !exists {
@@ -78,9 +78,10 @@
if version == "" {
return empty, errInvalidSuffix
}
+ root := vstore.New().Bind(i.storeRoot)
for _, profile := range profiles {
path := naming.Join("/applications", name, profile, version)
- entry, err := i.store.BindObject(path).Get(context)
+ entry, err := root.Bind(path).Get(context)
if err != nil {
continue
}
@@ -102,18 +103,14 @@
if version == "" {
return errInvalidSuffix
}
- // Transaction is rooted at "", so tname == tid.
- tname, err := i.store.BindTransactionRoot("").CreateTransaction(context)
- if err != nil {
- return err
- }
+ tx := vstore.New().NewTransaction(context, i.storeRoot)
var entry storage.Stat
for _, profile := range profiles {
- path := naming.Join(tname, "/applications", name, profile, version)
- if err := makeParentNodes(context, i.store, path); err != nil {
+ path := naming.Join("/applications", name, profile, version)
+ if err := makeParentNodes(context, tx, path); err != nil {
return err
}
- object := i.store.BindObject(path)
+ object := tx.Bind(path)
if !entry.ID.IsValid() {
if entry, err = object.Put(context, envelope); err != nil {
return errOperationFailed
@@ -124,7 +121,7 @@
}
}
}
- if err := i.store.BindTransaction(tname).Commit(context); err != nil {
+ if err := tx.Commit(context); err != nil {
return errOperationFailed
}
return nil
@@ -136,16 +133,9 @@
if err != nil {
return err
}
- // Transaction is rooted at "", so tname == tid.
- tname, err := i.store.BindTransactionRoot("").CreateTransaction(context)
- if err != nil {
- return err
- }
- path := naming.Join(tname, "/applications", name, profile)
- if version != "" {
- path += "/" + version
- }
- object := i.store.BindObject(path)
+ tx := vstore.New().NewTransaction(context, i.storeRoot)
+ path := naming.Join("/applications", name, profile, version)
+ object := tx.Bind(path)
found, err := object.Exists(context)
if err != nil {
return errOperationFailed
@@ -156,7 +146,7 @@
if err := object.Remove(context); err != nil {
return errOperationFailed
}
- if err := i.store.BindTransaction(tname).Commit(context); err != nil {
+ if err := tx.Commit(context); err != nil {
return errOperationFailed
}
return nil
diff --git a/services/mgmt/profile/impl/dispatcher.go b/services/mgmt/profile/impl/dispatcher.go
index a8a7c83..fec034c 100644
--- a/services/mgmt/profile/impl/dispatcher.go
+++ b/services/mgmt/profile/impl/dispatcher.go
@@ -5,28 +5,22 @@
"veyron2/ipc"
"veyron2/security"
- "veyron2/storage"
- "veyron2/storage/vstore"
)
// dispatcher holds the state of the profile repository dispatcher.
type dispatcher struct {
- store storage.Store
- auth security.Authorizer
+ storeRoot string
+ auth security.Authorizer
}
// NewDispatcher is the dispatcher factory.
func NewDispatcher(name string, authorizer security.Authorizer) (*dispatcher, error) {
- store, err := vstore.New(name)
- if err != nil {
- return nil, err
- }
- return &dispatcher{store: store, auth: authorizer}, nil
+ return &dispatcher{storeRoot: name, auth: authorizer}, nil
}
// DISPATCHER INTERFACE IMPLEMENTATION
func (d *dispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(repository.NewServerProfile(NewInvoker(d.store, suffix)))
+ invoker := ipc.ReflectInvoker(repository.NewServerProfile(NewInvoker(d.storeRoot, suffix)))
return invoker, d.auth, nil
}
diff --git a/services/mgmt/profile/impl/invoker.go b/services/mgmt/profile/impl/invoker.go
index 38241e3..f077cf1 100644
--- a/services/mgmt/profile/impl/invoker.go
+++ b/services/mgmt/profile/impl/invoker.go
@@ -8,13 +8,14 @@
"veyron2/ipc"
"veyron2/naming"
"veyron2/storage"
+ "veyron2/storage/vstore"
"veyron2/vlog"
)
// invoker holds the profile repository invocation.
type invoker struct {
- // store is the storage server used for storing profile data.
- store storage.Store
+ // storeRoot is a name in the Store under which all data will be stored.
+ storeRoot string
// suffix is the suffix of the current invocation that is assumed to
// be used as a relative object name to identify a profile
// specification.
@@ -27,8 +28,8 @@
)
// NewInvoker is the invoker factory.
-func NewInvoker(store storage.Store, suffix string) *invoker {
- return &invoker{store: store, suffix: suffix}
+func NewInvoker(storeRoot, suffix string) *invoker {
+ return &invoker{storeRoot: storeRoot, suffix: suffix}
}
// STORE MANAGEMENT INTERFACE IMPLEMENTATION
@@ -37,11 +38,11 @@
type dir struct{}
// makeParentNodes creates the parent nodes if they do not already exist.
-func makeParentNodes(context ipc.ServerContext, store storage.Store, path string) error {
+func makeParentNodes(context ipc.ServerContext, tx storage.Transaction, path string) error {
pathComponents := storage.ParsePath(path)
for i := 0; i < len(pathComponents); i++ {
name := pathComponents[:i].String()
- object := store.BindObject(name)
+ object := tx.Bind(name)
if _, err := object.Get(context); err != nil {
if _, err := object.Put(context, &dir{}); err != nil {
return errOperationFailed
@@ -53,20 +54,16 @@
func (i *invoker) Put(context ipc.ServerContext, profile profile.Specification) error {
vlog.VI(0).Infof("%v.Put(%v)", i.suffix, profile)
- // Transaction is rooted at "", so tname == tid.
- tname, err := i.store.BindTransactionRoot("").CreateTransaction(context)
- if err != nil {
+ tx := vstore.New().NewTransaction(context, i.storeRoot)
+ path := naming.Join("/profiles", i.suffix)
+ if err := makeParentNodes(context, tx, path); err != nil {
return err
}
- path := naming.Join(tname, "/profiles", i.suffix)
- if err := makeParentNodes(context, i.store, path); err != nil {
- return err
- }
- object := i.store.BindObject(path)
+ object := tx.Bind(path)
if _, err := object.Put(context, profile); err != nil {
return errOperationFailed
}
- if err := i.store.BindTransaction(tname).Commit(context); err != nil {
+ if err := tx.Commit(context); err != nil {
return errOperationFailed
}
return nil
@@ -74,13 +71,9 @@
func (i *invoker) Remove(context ipc.ServerContext) error {
vlog.VI(0).Infof("%v.Remove()", i.suffix)
- // Transaction is rooted at "", so tname == tid.
- tname, err := i.store.BindTransactionRoot("").CreateTransaction(context)
- if err != nil {
- return err
- }
- path := naming.Join(tname, "/profiles", i.suffix)
- object := i.store.BindObject(path)
+ tx := vstore.New().NewTransaction(context, i.storeRoot)
+ path := naming.Join("/profiles", i.suffix)
+ object := tx.Bind(path)
found, err := object.Exists(context)
if err != nil {
return errOperationFailed
@@ -91,7 +84,7 @@
if err := object.Remove(context); err != nil {
return errOperationFailed
}
- if err := i.store.BindTransaction(tname).Commit(context); err != nil {
+ if err := tx.Commit(context); err != nil {
return errOperationFailed
}
return nil
@@ -101,8 +94,8 @@
func (i *invoker) lookup(context ipc.ServerContext) (profile.Specification, error) {
empty := profile.Specification{}
- path := naming.Join("/profiles", i.suffix)
- entry, err := i.store.BindObject(path).Get(context)
+ path := naming.Join(i.storeRoot, "/profiles", i.suffix)
+ entry, err := vstore.New().Bind(path).Get(context)
if err != nil {
return empty, errNotFound
}
diff --git a/services/store/stored/main.go b/services/store/stored/main.go
index 75e250c..c4d09b9 100644
--- a/services/store/stored/main.go
+++ b/services/store/stored/main.go
@@ -92,11 +92,7 @@
// Run viewer if requested.
if *viewerPort > 0 {
- vst, err := vstore.New(mountName)
- if err != nil {
- log.Fatalf("Failed to start viewer: %s", err)
- }
- go viewer.ListenAndServe(fmt.Sprintf(":%d", *viewerPort), vst)
+ go viewer.ListenAndServe(fmt.Sprintf(":%d", *viewerPort), mountName, vstore.New())
}
// Wait forever.
diff --git a/services/store/viewer/value.go b/services/store/viewer/value.go
index b82f09e..b2ab106 100644
--- a/services/store/viewer/value.go
+++ b/services/store/viewer/value.go
@@ -21,7 +21,7 @@
// glob performs a glob expansion of the pattern. The results are sorted.
func glob(st storage.Store, path, pattern string) ([]string, error) {
- results := st.BindObject(path).Glob(rt.R().TODOContext(), pattern)
+ results := st.Bind(path).Glob(rt.R().TODOContext(), pattern)
names := []string{}
rStream := results.RecvStream()
for rStream.Advance() {
@@ -67,7 +67,7 @@
// exist.
func (v *Value) Get(path string) interface{} {
path = v.fullpath(path)
- e, err := v.store.BindObject(path).Get(rt.R().TODOContext())
+ e, err := v.store.Bind(path).Get(rt.R().TODOContext())
if err != nil {
return nil
}
diff --git a/services/store/viewer/viewer.go b/services/store/viewer/viewer.go
index 0c6dc8c..372062b 100644
--- a/services/store/viewer/viewer.go
+++ b/services/store/viewer/viewer.go
@@ -26,6 +26,7 @@
"net/http"
"path/filepath"
+ "veyron2/naming"
"veyron2/rt"
"veyron2/storage"
"veyron2/vlog"
@@ -33,7 +34,8 @@
// server is the HTTP server handler.
type server struct {
- store storage.Store
+ storeRoot string
+ store storage.Store
}
var _ http.Handler = (*server)(nil)
@@ -74,8 +76,8 @@
// loadTemplate fetches the template for the value from the store. The template
// is based on the type of the value, under /template/<pkgPath>/<typeName>.
func (s *server) loadTemplate(v interface{}) *template.Template {
- path := templatePath(v)
- e, err := s.store.BindObject(path).Get(rt.R().TODOContext())
+ path := naming.Join(s.storeRoot, templatePath(v))
+ e, err := s.store.Bind(path).Get(rt.R().TODOContext())
if err != nil {
return nil
}
@@ -126,8 +128,8 @@
// ServeHTTP is the main HTTP handler.
func (s *server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- path := req.URL.Path
- e, err := s.store.BindObject(path).Get(rt.R().TODOContext())
+ path := naming.Join(s.storeRoot, req.URL.Path)
+ e, err := s.store.Bind(path).Get(rt.R().TODOContext())
if err != nil {
msg := fmt.Sprintf("<html><body><h1>%s</h1><h2>Error: %s</h2></body></html>",
html.EscapeString(path),
@@ -152,8 +154,8 @@
// ListenAndServe is the main entry point. It serves store at the specified
// network address.
-func ListenAndServe(addr string, st storage.Store) error {
- s := &server{store: st}
+func ListenAndServe(addr string, storeRoot string, st storage.Store) error {
+ s := &server{storeRoot: storeRoot, store: st}
vlog.Infof("Viewer running at http://localhost%s", addr)
return http.ListenAndServe(addr, s)
}
diff --git a/tools/qsh/impl/impl.go b/tools/qsh/impl/impl.go
index f85b6f9..3494882 100644
--- a/tools/qsh/impl/impl.go
+++ b/tools/qsh/impl/impl.go
@@ -11,7 +11,8 @@
"veyron2/storage"
"veyron2/storage/vstore"
- // TODO(rjkroege@google.com): Replace with the appropriate vom2 functionality when available.
+ // TODO(rjkroege@google.com): Replace with the appropriate vom2 functionality
+ // when available.
_ "veyron/services/store/typeregistryhack"
)
@@ -23,7 +24,8 @@
// Prints a single QueryResult to the provided io.Writer.
func printResult(qr storage.QueryResult, w io.Writer, indent int) {
- // TODO(rjkroege@google.com): Consider permitting the user to provide a Go template to format output.
+ // TODO(rjkroege@google.com): Consider permitting the user to provide a Go
+ // template to format output.
if v := qr.Value(); v != nil {
indenter(w, indent)
fmt.Fprintf(w, "%s: %#v\n", qr.Name(), v)
@@ -67,13 +69,7 @@
return nil
}
-func Runquery(storeName, queryString string) error {
+func RunQuery(queryRoot, queryString string) error {
ctx := rt.R().TODOContext()
-
- store, err := vstore.New(storeName)
- if err != nil {
- return err
- }
- defer store.Close()
- return printStream(store.BindObject("").Query(ctx, query.Query{queryString}), os.Stdout, 0)
+ return printStream(vstore.New().Bind(queryRoot).Query(ctx, query.Query{queryString}), os.Stdout, 0)
}
diff --git a/tools/qsh/main.go b/tools/qsh/main.go
index feb52c3..80e3885 100644
--- a/tools/qsh/main.go
+++ b/tools/qsh/main.go
@@ -1,23 +1,21 @@
package main
import (
- "veyron/tools/qsh/impl"
-
"flag"
"log"
- "os"
+
+ "veyron/tools/qsh/impl"
"veyron2/rt"
)
-var flagStoreName = flag.String("targetstore", "", "Store object name")
+var flagQueryRoot = flag.String("queryroot", "",
+ "An object name in the store to serve as the root of the query.")
const usage = `
-Synopsis: qsh [--targetstore=<store in veyron namespace>] query...
+Synopsis: qsh --queryroot=<object in the store> query...
-Runs each given query against the specified Veyron store instance. If
-no target store is specified on the command line, qsh expects the
-environment variable VEYRON_STORE to specify the store to query.
+Runs a given query starting at the given root.
`
func main() {
@@ -26,18 +24,16 @@
// TODO(rjkroege@google.com): Handle ^c nicely.
flag.Parse()
queryStringArgs := flag.Args()
-
- // Command line overrides.
- storeName := *flagStoreName
- if storeName == "" {
- storeName = os.ExpandEnv("${VEYRON_STORE}")
+ if len(queryStringArgs) != 1 {
+ log.Fatalf("qsh: Expected only one query arg\n" + usage)
}
- if storeName == "" {
- log.Fatalf("qsh: No store specified\n" + usage)
+ queryRoot := *flagQueryRoot
+ if queryRoot == "" {
+ log.Fatalf("qsh: No queryroot specified\n" + usage)
}
- err := impl.Runquery(storeName, queryStringArgs[0])
+ err := impl.RunQuery(queryRoot, queryStringArgs[0])
if err != nil {
log.Printf("qsh: When attempting query: \"%s\" experienced an error: ", queryStringArgs[0], err.Error())
}