veyron/services/store/server: server-side impl of new store API (directories and such)
Change-Id: I161aa2835f6c61fdc03a7228460c1451035b58ff
diff --git a/services/store/memstore/testing/util.go b/services/store/memstore/testing/util.go
index d93a4f4..37dece7 100644
--- a/services/store/memstore/testing/util.go
+++ b/services/store/memstore/testing/util.go
@@ -290,7 +290,7 @@
}
cv, ok := change.Value.(*storage.Entry)
if !ok {
- t.Fatal("Expected an Entry")
+ t.Fatalf("Expected an Entry")
}
if cv.Stat.ID != id {
t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
@@ -300,40 +300,24 @@
}
}
+func ExpectEntryExistsNameOnly(t *testing.T, changes []types.Change, name string) {
+ change := findEntry(t, changes, name)
+ if change.State != types.Exists {
+ t.Fatalf("Expected name to exist: %v", name)
+ }
+ _, ok := change.Value.(*storage.Entry)
+ if !ok {
+ t.Fatalf("Expected an Entry")
+ }
+}
+
func ExpectEntryDoesNotExist(t *testing.T, changes []types.Change, name string) {
change := findEntry(t, changes, name)
if change.State != types.DoesNotExist {
t.Fatalf("Expected name to not exist: %v", name)
}
if change.Value != nil {
- t.Fatal("Expected entry to be nil")
- }
-}
-
-func ExpectServiceEntryExists(t *testing.T, changes []types.Change, name string, id storage.ID, value string) {
- change := findEntry(t, changes, name)
- if change.State != types.Exists {
- t.Fatalf("Expected name to exist: %v", name)
- }
- cv, ok := change.Value.(*storage.Entry)
- if !ok {
- t.Fatal("Expected a service Entry")
- }
- if cv.Stat.ID != id {
- t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
- }
- if cv.Value != value {
- t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
- }
-}
-
-func ExpectServiceEntryDoesNotExist(t *testing.T, changes []types.Change, name string) {
- change := findEntry(t, changes, name)
- if change.State != types.DoesNotExist {
- t.Fatalf("Expected name to not exist: %v", name)
- }
- if change.Value != nil {
- t.Fatal("Expected entry to be nil")
+ t.Fatalf("Expected entry to be nil")
}
}
@@ -395,10 +379,10 @@
t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
}
if cv.Value != nil {
- t.Fatal("Expected Value to be nil")
+ t.Fatalf("Expected Value to be nil")
}
if cv.Dir != nil {
- t.Fatal("Expected Dir to be nil")
+ t.Fatalf("Expected Dir to be nil")
}
}
@@ -424,7 +408,7 @@
for _, change := range changes {
cv, ok := change.Value.(*raw.Mutation)
if !ok {
- t.Fatal("Expected a Mutation")
+ t.Fatalf("Expected a Mutation")
}
if cv.ID == id {
return change
diff --git a/services/store/server/object.go b/services/store/server/object.go
deleted file mode 100644
index 612e8f3..0000000
--- a/services/store/server/object.go
+++ /dev/null
@@ -1,194 +0,0 @@
-package server
-
-// This file defines object, which implements the server-side Object API from
-// veyron2/services/store/service.vdl.
-
-import (
- "veyron/services/store/memstore"
-
- "veyron2/ipc"
- "veyron2/query"
- "veyron2/services/mounttable"
- mttypes "veyron2/services/mounttable/types"
- "veyron2/services/store"
- "veyron2/services/watch"
- watchtypes "veyron2/services/watch/types"
- "veyron2/storage"
- "veyron2/vdl/vdlutil"
- "veyron2/verror"
-)
-
-type object struct {
- name string // will never contain a transaction id
- obj *memstore.Object
- tid transactionID // may be nullTransactionID
- server *Server
-}
-
-var (
- errNotAValue = verror.BadArgf("not a storage.Value")
- errNotAnAttribute = verror.BadArgf("not a storage.Attr")
-
- _ store.ObjectService = (*object)(nil)
-
- nullEntry storage.Entry
- nullStat storage.Stat
-)
-
-func (o *object) String() string {
- return o.name
-}
-
-func (o *object) Attributes(arg string) map[string]string {
- return map[string]string{
- "health": "ok",
- "servertype": o.String(),
- }
-}
-
-// CreateTransaction creates a transaction.
-func (o *object) CreateTransaction(ctx ipc.ServerContext, opts []vdlutil.Any) (string, error) {
- if o.tid != nullTransactionID {
- return "", errNestedTransaction
- }
- return o.server.createTransaction(ctx, o.name)
-}
-
-func (o *object) Commit(ctx ipc.ServerContext) error {
- return o.server.commitTransaction(ctx, o.tid)
-}
-
-func (o *object) Abort(ctx ipc.ServerContext) error {
- return o.server.abortTransaction(ctx, o.tid)
-}
-
-func (o *object) NewTransaction(ctx ipc.ServerContext, opts []vdlutil.Any) (string, error) {
- panic("Not implemented")
-}
-
-// Exists returns true iff the Entry has a value.
-func (o *object) Exists(ctx ipc.ServerContext) (bool, error) {
- t, err := o.server.findTransaction(ctx, o.tid)
- if err != nil {
- return false, err
- }
- return o.obj.Exists(ctx.RemoteID(), t)
-}
-
-// Get returns the value for the Object. The value returned is from the
-// most recent mutation of the entry in the Transaction, or from the
-// Transaction's snapshot if there is no mutation.
-func (o *object) Get(ctx ipc.ServerContext) (storage.Entry, error) {
- t, err := o.server.findTransaction(ctx, o.tid)
- if err != nil {
- return nullEntry, err
- }
- entry, err := o.obj.Get(ctx.RemoteID(), t)
- if err != nil {
- return nullEntry, err
- }
- return *entry, err
-}
-
-// Put modifies the value of the Object.
-func (o *object) Put(ctx ipc.ServerContext, val vdlutil.Any) (storage.Stat, error) {
- t, err := o.server.findTransaction(ctx, o.tid)
- if err != nil {
- return nullStat, err
- }
- s, err := o.obj.Put(ctx.RemoteID(), t, interface{}(val))
- if err != nil {
- return nullStat, err
- }
- return *s, err
-}
-
-// Remove removes the Object.
-func (o *object) Remove(ctx ipc.ServerContext) error {
- t, err := o.server.findTransaction(ctx, o.tid)
- if err != nil {
- return err
- }
- return o.obj.Remove(ctx.RemoteID(), t)
-}
-
-// Stat returns entry info.
-func (o *object) Stat(ctx ipc.ServerContext) (storage.Stat, error) {
- t, err := o.server.findTransaction(ctx, o.tid)
- if err != nil {
- return nullStat, err
- }
- s, err := o.obj.Stat(ctx.RemoteID(), t)
- if err != nil {
- return nullStat, err
- }
- return *s, err
-}
-
-// Query returns a sequence of objects that match the given query.
-func (o *object) Query(ctx ipc.ServerContext, q query.Query, stream store.ObjectServiceQueryStream) error {
- t, err := o.server.findTransaction(ctx, o.tid)
- if err != nil {
- return err
- }
- it, err := o.obj.Query(ctx.RemoteID(), t, q)
- if err != nil {
- return err
- }
- for it.Next() {
- if err := stream.SendStream().Send(*it.Get()); err != nil {
- it.Abort()
- return err
- }
- }
- return it.Err()
-}
-
-type globStreamSenderAdapter struct {
- stream interface {
- Send(entry mttypes.MountEntry) error
- }
-}
-
-func (a *globStreamSenderAdapter) Send(item string) error {
- return a.stream.Send(mttypes.MountEntry{Name: item})
-}
-
-type globStreamAdapter struct {
- stream mounttable.GlobbableServiceGlobStream
-}
-
-func (a *globStreamAdapter) SendStream() interface {
- Send(item string) error
-} {
- return &globStreamSenderAdapter{a.stream.SendStream()}
-}
-
-// Glob streams a series of names that match the given pattern.
-func (o *object) Glob(ctx ipc.ServerContext, pattern string, stream mounttable.GlobbableServiceGlobStream) error {
- t, err := o.server.findTransaction(ctx, o.tid)
- if err != nil {
- return err
- }
- it, err := o.obj.Glob(ctx.RemoteID(), t, pattern)
- if err != nil {
- return err
- }
- gsa := &globStreamAdapter{stream}
- for ; it.IsValid(); it.Next() {
- if err := gsa.SendStream().Send(it.Name()); err != nil {
- return err
- }
- }
- return nil
-}
-
-// WatchGlob returns a stream of changes that match a pattern.
-func (o *object) WatchGlob(ctx ipc.ServerContext, req watchtypes.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
- return o.server.watcher.WatchGlob(ctx, storage.ParsePath(o.name), req, stream)
-}
-
-// WatchQuery returns a stream of changes that satisfy a query.
-func (o *object) WatchQuery(ctx ipc.ServerContext, req watchtypes.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
- return o.server.watcher.WatchQuery(ctx, storage.ParsePath(o.name), req, stream)
-}
diff --git a/services/store/server/server.go b/services/store/server/server.go
index 2a4c279..d55e64a 100644
--- a/services/store/server/server.go
+++ b/services/store/server/server.go
@@ -20,7 +20,6 @@
"veyron2/ipc"
"veyron2/security"
- "veyron2/services/store"
"veyron2/verror"
)
@@ -34,6 +33,9 @@
var (
errNestedTransaction = verror.BadArgf("cannot create a nested Transaction")
+ // Triggers if client calls commit/abort on a name that's not part of a
+ // transaction.
+ errNoTransaction = verror.NotFoundf("no transaction")
// Note, this can happen e.g. due to expiration.
errTransactionDoesNotExist = verror.NotFoundf("transaction does not exist")
// Transaction exists, but may not be used by the caller.
@@ -116,6 +118,11 @@
}
s.pending.Add(1)
go s.gcLoop()
+ // Start with an empty directory at root.
+ rootDir := &thing{name: "", obj: s.store.Bind(""), tid: nullTransactionID, server: s}
+ if err := rootDir.makeInternal(config.Admin, nil); err != nil {
+ return nil, err
+ }
return s, nil
}
@@ -186,6 +193,9 @@
return oname[:begin] + oname[end:], transactionID(id), nil
}
+// NOTE(sadovsky): The transaction's scope should be limited to oname's subtree
+// and its parent, but for now we expand it to the entire store (and don't use
+// oname below).
func (s *Server) createTransaction(ctx transactionContext, oname string) (string, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
@@ -198,12 +208,12 @@
break
}
}
- info := &transaction{
+ tdata := &transaction{
trans: memstore.NewTransaction(),
expires: time.Now().Add(transactionMaxLifetime),
creatorCtx: ctx,
}
- s.transactions[id] = info
+ s.transactions[id] = tdata
return makeTransactionComponent(id), nil
}
@@ -218,16 +228,16 @@
if id == nullTransactionID {
return nil, nil
}
- info, ok := s.transactions[id]
+ tdata, ok := s.transactions[id]
if !ok {
return nil, errTransactionDoesNotExist
}
// A transaction may be used only by the session (and therefore client)
// that created it.
- if !info.matchesContext(ctx) {
+ if !tdata.matchesContext(ctx) {
return nil, errPermissionDenied
}
- return info.trans, nil
+ return tdata.trans, nil
}
// Commit commits the changes in the transaction to the store. The
@@ -241,7 +251,7 @@
return err
}
if t == nil {
- return errTransactionDoesNotExist
+ return errNoTransaction
}
err = t.Commit()
delete(s.transactions, id)
@@ -260,7 +270,7 @@
return err
}
if t == nil {
- return errTransactionDoesNotExist
+ return errNoTransaction
}
err = t.Abort()
delete(s.transactions, id)
@@ -300,9 +310,9 @@
s.mutex.Lock()
now := time.Now()
- for id, t := range s.transactions {
- if now.After(t.expires) {
- t.trans.Abort()
+ for id, tdata := range s.transactions {
+ if now.After(tdata.expires) {
+ tdata.trans.Abort()
delete(s.transactions, id)
}
}
@@ -341,27 +351,24 @@
}
func (d *storeDispatcher) lookupServer(suffix string) (interface{}, error) {
- // Strip leading "/" if present so that server internals can reliably use
- // naming.Join(suffix, "foo").
+ // Strip leading "/" if present so that server internals can assume a
+ // particular form.
suffix = strings.TrimPrefix(suffix, "/")
if strings.HasSuffix(suffix, raw.RawStoreSuffix) {
return raw.NewServerStore(d.s), nil
} else {
- // TODO(sadovsky): Create Object, Transaction, and TransactionRoot stubs,
- // merge them, and return the result. See TODO in
- // veyron2/services/store/service.vdl.
- o, err := d.s.lookupObject(suffix)
+ t, err := d.s.lookupThing(suffix)
if err != nil {
return nil, err
}
- return store.NewServerObject(o), nil
+ return NewServerstoreThing(t), nil
}
}
-func (s *Server) lookupObject(name string) (*object, error) {
- oname, tid, err := stripTransactionComponent(name)
+func (s *Server) lookupThing(name string) (*thing, error) {
+ name, tid, err := stripTransactionComponent(name)
if err != nil {
return nil, err
}
- return &object{name: oname, obj: s.store.Bind(oname), tid: tid, server: s}, nil
+ return &thing{name: name, obj: s.store.Bind(name), tid: tid, server: s}, nil
}
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index b933b66..a0f3ca7 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -10,14 +10,13 @@
"testing"
_ "veyron/lib/testutil" // initialize vlog
- watchtesting "veyron/services/store/memstore/testing"
+ storetest "veyron/services/store/memstore/testing"
"veyron/services/store/raw"
"veyron2/ipc"
"veyron2/naming"
"veyron2/rt"
"veyron2/security"
- "veyron2/services/store"
"veyron2/services/watch/types"
"veyron2/storage"
_ "veyron2/vlog"
@@ -65,19 +64,19 @@
return s, closer
}
-func lookupObjectOrDie(s *Server, name string) *object {
- o, err := s.lookupObject(name)
+func lookupThingOrDie(s *Server, name string) *thing {
+ t, err := s.lookupThing(name)
if err != nil {
panic(err)
}
- return o
+ return t
}
-// createTransaction creates a new transaction and returns its store-relative
-// name.
+// createTransaction creates a new transaction and returns its name relative to
+// the root of the store.
func createTransaction(t *testing.T, s *Server, ctx ipc.ServerContext, name string) string {
_, file, line, _ := runtime.Caller(1)
- tid, err := lookupObjectOrDie(s, name).CreateTransaction(ctx, nil)
+ tid, err := lookupThingOrDie(s, name).NewTransaction(ctx, nil)
if err != nil {
t.Fatalf("%s(%d): can't create transaction %s: %s", file, line, name, err)
}
@@ -88,9 +87,9 @@
s, c := newServer()
defer c()
- _, err := s.lookupObject("/$tid.bad/foo")
+ _, err := s.lookupThing("/$tid.bad/foo")
if err == nil {
- t.Errorf("lookupObject should've failed, but didn't")
+ t.Fatalf("lookupThing should've failed, but didn't")
}
}
@@ -99,451 +98,454 @@
s, c := newServer()
defer c()
- rootCtx := watchtesting.NewFakeServerContext(rootPublicID)
+ rootCtx := storetest.NewFakeServerContext(rootPublicID)
tname := createTransaction(t, s, rootCtx, "/")
- if _, err := lookupObjectOrDie(s, tname).CreateTransaction(rootCtx, nil); err == nil {
+ if _, err := lookupThingOrDie(s, tname).NewTransaction(rootCtx, nil); err == nil {
t.Fatalf("creating nested transaction at %s should've failed, but didn't", tname)
}
// Try again with a valid object in between the two $tid components;
// CreateTransaction should still fail.
- lookupObjectOrDie(s, tname).Put(rootCtx, newValue())
+ lookupThingOrDie(s, tname).Put(rootCtx, newValue())
foo := naming.Join(tname, "foo")
- if _, err := lookupObjectOrDie(s, foo).CreateTransaction(rootCtx, nil); err == nil {
+ if _, err := lookupThingOrDie(s, foo).NewTransaction(rootCtx, nil); err == nil {
t.Fatalf("creating nested transaction at %s should've failed, but didn't", foo)
}
}
-func TestPutGetRemoveRoot(t *testing.T) {
+func TestPutGetRemoveObject(t *testing.T) {
s, c := newServer()
defer c()
- testPutGetRemove(t, s, "/")
-}
-
-func TestPutGetRemoveChild(t *testing.T) {
- rt.Init()
-
- s, c := newServer()
- defer c()
-
- {
- rootCtx := watchtesting.NewFakeServerContext(rootPublicID)
-
- // Create a root.
- name := "/"
- value := newValue()
-
- tobj1 := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
- if _, err := tobj1.Put(rootCtx, value); err != nil {
- t.Errorf("Unexpected error: %s", err)
- }
- if err := tobj1.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
- }
-
- tobj2 := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
- if ok, err := tobj2.Exists(rootCtx); !ok || err != nil {
- t.Errorf("Should exist: %s", err)
- }
- if _, err := tobj2.Get(rootCtx); err != nil {
- t.Errorf("Object should exist: %s", err)
- }
- if err := tobj2.Abort(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
- }
- }
-
- testPutGetRemove(t, s, "/Entries/a")
+ testPutGetRemove(t, s, "a")
}
func testPutGetRemove(t *testing.T, s *Server, name string) {
rt.Init()
- rootCtx := watchtesting.NewFakeServerContext(rootPublicID)
+ rootCtx := storetest.NewFakeServerContext(rootPublicID)
value := newValue()
{
// Check that the object does not exist.
- tobj := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
+ tobj := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name))
if ok, err := tobj.Exists(rootCtx); ok || err != nil {
- t.Errorf("Should not exist: %s", err)
+ t.Fatalf("Should not exist: %s", err)
}
if v, err := tobj.Get(rootCtx); v.Stat.ID.IsValid() && err == nil {
- t.Errorf("Should not exist: %v, %s", v, err)
+ t.Fatalf("Should not exist: %v, %s", v, err)
}
}
{
// Add the object.
- tobj1 := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
+ tobj1 := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name))
if _, err := tobj1.Put(rootCtx, value); err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
if ok, err := tobj1.Exists(rootCtx); !ok || err != nil {
- t.Errorf("Should exist: %s", err)
+ t.Fatalf("Should exist: %s", err)
}
if _, err := tobj1.Get(rootCtx); err != nil {
- t.Errorf("Object should exist: %s", err)
+ t.Fatalf("Object should exist: %s", err)
}
// Transactions are isolated.
- tobj2 := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
+ tobj2 := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name))
if ok, err := tobj2.Exists(rootCtx); ok || err != nil {
- t.Errorf("Should not exist: %s", err)
+ t.Fatalf("Should not exist: %s", err)
}
if v, err := tobj2.Get(rootCtx); v.Stat.ID.IsValid() && err == nil {
- t.Errorf("Should not exist: %v, %s", v, err)
+ t.Fatalf("Should not exist: %v, %s", v, err)
}
// Apply tobj1.
if err := tobj1.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
// tobj2 is still isolated.
if ok, err := tobj2.Exists(rootCtx); ok || err != nil {
- t.Errorf("Should not exist: %s", err)
+ t.Fatalf("Should not exist: %s", err)
}
if v, err := tobj2.Get(rootCtx); v.Stat.ID.IsValid() && err == nil {
- t.Errorf("Should not exist: %v, %s", v, err)
+ t.Fatalf("Should not exist: %v, %s", v, err)
}
// tobj3 observes the commit.
- tobj3 := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
+ tobj3 := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name))
if ok, err := tobj3.Exists(rootCtx); !ok || err != nil {
- t.Errorf("Should exist: %s", err)
+ t.Fatalf("Should exist: %s", err)
}
if _, err := tobj3.Get(rootCtx); err != nil {
- t.Errorf("Object should exist: %s", err)
+ t.Fatalf("Object should exist: %s", err)
}
}
{
// Remove the object.
- tobj1 := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
+ tobj1 := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name))
if err := tobj1.Remove(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
if ok, err := tobj1.Exists(rootCtx); ok || err != nil {
- t.Errorf("Should not exist: %s", err)
+ t.Fatalf("Should not exist: %s", err)
}
if v, err := tobj1.Get(rootCtx); v.Stat.ID.IsValid() || err == nil {
- t.Errorf("Object should not exist: %T, %v, %s", v, v, err)
+ t.Fatalf("Object should not exist: %T, %v, %s", v, v, err)
}
// The removal is isolated.
- tobj2 := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
+ tobj2 := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name))
if ok, err := tobj2.Exists(rootCtx); !ok || err != nil {
- t.Errorf("Should exist: %s", err)
+ t.Fatalf("Should exist: %s", err)
}
if _, err := tobj2.Get(rootCtx); err != nil {
- t.Errorf("Object should exist: %s", err)
+ t.Fatalf("Object should exist: %s", err)
}
// Apply tobj1.
if err := tobj1.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
// The removal is isolated.
if ok, err := tobj2.Exists(rootCtx); !ok || err != nil {
- t.Errorf("Should exist: %s", err)
+ t.Fatalf("Should exist: %s", err)
}
if _, err := tobj2.Get(rootCtx); err != nil {
- t.Errorf("Object should exist: %s", err)
+ t.Fatalf("Object should exist: %s", err)
}
}
{
// Check that the object does not exist.
- tobj1 := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
+ tobj1 := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name))
if ok, err := tobj1.Exists(rootCtx); ok || err != nil {
- t.Errorf("Should not exist")
+ t.Fatalf("Should not exist")
}
if v, err := tobj1.Get(rootCtx); v.Stat.ID.IsValid() && err == nil {
- t.Errorf("Should not exist: %v, %s", v, err)
+ t.Fatalf("Should not exist: %v, %s", v, err)
}
}
}
-// TODO(sadovsky): Add test cases for committing and aborting an expired
-// transaction. The client should get back errTransactionDoesNotExist.
-
-func TestWatch(t *testing.T) {
- rt.Init()
- rootCtx := watchtesting.NewFakeServerContext(rootPublicID)
-
- s, c := newServer()
- defer c()
-
- name1 := "/"
- value1 := "v1"
- var id1 storage.ID
-
- // Before the watch request has been made, commit a transaction that puts /.
- {
- tobj := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name1))
- st, err := tobj.Put(rootCtx, value1)
- if err != nil {
- t.Errorf("Unexpected error: %s", err)
- }
- id1 = st.ID
- if err := tobj.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
- }
- }
-
- // Start a watch request.
- req := raw.Request{}
- ws := watchtesting.WatchRaw(rootPublicID, s.Watch, req)
-
- rStream := ws.RecvStream()
- // Check that watch detects the changes in the first transaction.
- {
- changes := []types.Change{}
- if !rStream.Advance() {
- t.Error("Advance() failed: %v", rStream.Err())
- }
- change := rStream.Value()
- changes = append(changes, change)
- if change.Continued {
- t.Error("Expected change to be the last in this transaction")
- }
- watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
- }
-
- name2 := "/a"
- value2 := "v2"
- var id2 storage.ID
-
- // Commit a second transaction that puts /a.
- {
- tobj := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name2))
- st, err := tobj.Put(rootCtx, value2)
- if err != nil {
- t.Errorf("Unexpected error: %s", err)
- }
- id2 = st.ID
- if err := tobj.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
- }
- }
-
- // Check that watch detects the changes in the second transaction.
- {
- changes := []types.Change{}
- if !rStream.Advance() {
- t.Error("Advance() failed: %v", rStream.Err())
- }
- change := rStream.Value()
- changes = append(changes, change)
- if !change.Continued {
- t.Error("Expected change to continue the transaction")
- }
- if !rStream.Advance() {
- t.Error("Advance() failed: %v", rStream.Err())
- }
- change = rStream.Value()
- changes = append(changes, change)
- if change.Continued {
- t.Error("Expected change to be the last in this transaction")
- }
- watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
- watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id2, value2)
- }
-}
+// TODO(sadovsky): Add more test cases for Commit/Abort:
+// - expired transaction: server should return errTransactionDoesNotExist
+// - no transaction: server should return errNoTransaction
func TestWatchGlob(t *testing.T) {
rt.Init()
- rootCtx := watchtesting.NewFakeServerContext(rootPublicID)
+ rootCtx := storetest.NewFakeServerContext(rootPublicID)
s, c := newServer()
defer c()
- value1 := "v1"
- var id1 storage.ID
+ dirname, objname := "/a", "/a/b"
+ dir, obj := lookupThingOrDie(s, dirname), lookupThingOrDie(s, objname)
- name1, name2 := "/", "/a"
- o1, o2 := lookupObjectOrDie(s, name1), lookupObjectOrDie(s, name2)
-
- // Before the watch request has been made, commit a transaction that puts /.
+ // Before the watch request has been made, commit a transaction that makes
+ // directory /a.
{
- tobj := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name1))
- st, err := tobj.Put(rootCtx, value1)
+ tdir := lookupThingOrDie(s, createTransaction(t, s, rootCtx, dirname))
+ err := tdir.Make(rootCtx)
if err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
- id1 = st.ID
- if err := tobj.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
+ if err := tdir.Commit(rootCtx); err != nil {
+ t.Fatalf("Unexpected error: %s", err)
}
}
- // Start watch requests on / and /a.
+ // Start watch requests on /a and /a/b.
req := types.GlobRequest{Pattern: "..."}
- ws1 := watchtesting.WatchGlob(rootPublicID, o1.WatchGlob, req)
- ws2 := watchtesting.WatchGlob(rootPublicID, o2.WatchGlob, req)
+ wdir := storetest.WatchGlob(rootPublicID, dir.WatchGlob, req)
+ wobj := storetest.WatchGlob(rootPublicID, obj.WatchGlob, req)
- rStream1 := ws1.RecvStream()
- rStream2 := ws2.RecvStream()
- // The watch on / should send a change on /.
- {
- changes := []types.Change{}
- if !rStream1.Advance() {
- t.Error("Advance() failed: %v", rStream1.Err())
- }
- change := rStream1.Value()
- changes = append(changes, change)
- if change.Continued {
- t.Error("Expected change to be the last in this transaction")
- }
- watchtesting.ExpectServiceEntryExists(t, changes, "", id1, value1)
- }
- // The watch on /a should send no change. The first change it sends is
- // verified below.
+ rStreamDir := wdir.RecvStream()
+ rStreamObj := wobj.RecvStream()
- value2 := "v2"
- var id2 storage.ID
-
- // Commit a second transaction that puts /a.
- {
- tobj := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name2))
- st, err := tobj.Put(rootCtx, value2)
- if err != nil {
- t.Errorf("Unexpected error: %s", err)
- }
- id2 = st.ID
- if err := tobj.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
- }
- }
-
- // The watch on / should send changes on / and /a.
- {
- changes := []types.Change{}
- if !rStream1.Advance() {
- t.Error("Advance() failed: %v", rStream1.Err())
- }
- change := rStream1.Value()
- changes = append(changes, change)
- if !change.Continued {
- t.Error("Expected change to continue the transaction")
- }
- if !rStream1.Advance() {
- t.Error("Advance() failed: %v", rStream1.Err())
- }
- change = rStream1.Value()
- changes = append(changes, change)
- if change.Continued {
- t.Error("Expected change to be the last in this transaction")
- }
- watchtesting.ExpectServiceEntryExists(t, changes, "", id1, value1)
- watchtesting.ExpectServiceEntryExists(t, changes, "a", id2, value2)
- }
// The watch on /a should send a change on /a.
{
- changes := []types.Change{}
- if !rStream2.Advance() {
- t.Error("Advance() failed: %v", rStream2.Err())
+ if !rStreamDir.Advance() {
+ t.Fatalf("Advance() failed: %v", rStreamDir.Err())
}
- change := rStream2.Value()
+ change := rStreamDir.Value()
+ if change.Continued {
+ t.Fatalf("Expected change to be the last in this transaction")
+ }
+ }
+ // The watch on /a/b should send no change. The first change it sends is
+ // verified below.
+
+ value := "v"
+ var id storage.ID
+
+ // Commit a second transaction that puts /a/b.
+ {
+ tobj := lookupThingOrDie(s, createTransaction(t, s, rootCtx, objname))
+ st, err := tobj.Put(rootCtx, value)
+ if err != nil {
+ t.Fatalf("Unexpected error: %s", err)
+ }
+ id = st.ID
+ if err := tobj.Commit(rootCtx); err != nil {
+ t.Fatalf("Unexpected error: %s", err)
+ }
+ }
+
+ // The watch on /a should send changes on /a and /a/b.
+ {
+ changes := []types.Change{}
+ if !rStreamDir.Advance() {
+ t.Fatalf("Advance() failed: %v", rStreamDir.Err())
+ }
+ change := rStreamDir.Value()
+ changes = append(changes, change)
+ if !change.Continued {
+ t.Fatalf("Expected change to NOT be the last in this transaction")
+ }
+ if !rStreamDir.Advance() {
+ t.Fatalf("Advance() failed: %v", rStreamDir.Err())
+ }
+ change = rStreamDir.Value()
changes = append(changes, change)
if change.Continued {
- t.Error("Expected change to be the last in this transaction")
+ t.Fatalf("Expected change to be the last in this transaction")
}
- watchtesting.ExpectServiceEntryExists(t, changes, "a", id2, value2)
+ storetest.ExpectEntryExistsNameOnly(t, changes, "a")
+ storetest.ExpectEntryExists(t, changes, "a/b", id, value)
+ }
+ // The watch on /a/b should send a change on /a/b.
+ {
+ changes := []types.Change{}
+ if !rStreamObj.Advance() {
+ t.Fatalf("Advance() failed: %v", rStreamObj.Err())
+ }
+ change := rStreamObj.Value()
+ changes = append(changes, change)
+ if change.Continued {
+ t.Fatalf("Expected change to be the last in this transaction")
+ }
+ storetest.ExpectEntryExists(t, changes, "a/b", id, value)
}
}
-func TestGarbageCollectionOnCommit(t *testing.T) {
+func TestRawWatch(t *testing.T) {
rt.Init()
- rootCtx := watchtesting.NewFakeServerContext(rootPublicID)
+ rootCtx := storetest.NewFakeServerContext(rootPublicID)
s, c := newServer()
defer c()
- name1 := "/"
+ name1 := "/a"
value1 := "v1"
var id1 storage.ID
- // Before the watch request has been made, commit a transaction that puts /.
+ // Before the watch request has been made, commit a transaction that puts /a.
{
- tobj := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name1))
+ tobj := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name1))
st, err := tobj.Put(rootCtx, value1)
if err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
id1 = st.ID
if err := tobj.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
}
// Start a watch request.
req := raw.Request{}
- ws := watchtesting.WatchRaw(rootPublicID, s.Watch, req)
+ ws := storetest.WatchRaw(rootPublicID, s.Watch, req)
rStream := ws.RecvStream()
// Check that watch detects the changes in the first transaction.
{
changes := []types.Change{}
+ // First change is making the root dir (in server.go), second is updating
+ // the root dir (adding a dir entry), third is putting /a.
if !rStream.Advance() {
- t.Error("Advance() failed: %v", rStream.Err())
+ t.Fatalf("Advance() failed: %v", rStream.Err())
}
change := rStream.Value()
changes = append(changes, change)
if change.Continued {
- t.Error("Expected change to be the last in this transaction")
+ t.Fatalf("Expected change to be the last in this transaction")
}
- watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
+ if !rStream.Advance() {
+ t.Fatalf("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
+ if !change.Continued {
+ t.Fatalf("Expected change to NOT be the last in this transaction")
+ }
+ if !rStream.Advance() {
+ t.Fatalf("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
+ if change.Continued {
+ t.Fatalf("Expected change to be the last in this transaction")
+ }
+ storetest.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
}
- name2 := "/a"
+ name2 := "/b"
value2 := "v2"
var id2 storage.ID
- // Commit a second transaction that puts /a.
+ // Commit a second transaction that puts /b.
{
- tobj := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name2))
+ tobj := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name2))
st, err := tobj.Put(rootCtx, value2)
if err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
id2 = st.ID
if err := tobj.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
}
// Check that watch detects the changes in the second transaction.
{
changes := []types.Change{}
+ // First change is updating the root dir (adding a dir entry), second is
+ // putting /b.
if !rStream.Advance() {
- t.Error("Advance() failed: %v", rStream.Err())
+ t.Fatalf("Advance() failed: %v", rStream.Err())
}
change := rStream.Value()
changes = append(changes, change)
if !change.Continued {
- t.Error("Expected change to continue the transaction")
+ t.Fatalf("Expected change to NOT be the last in this transaction")
}
if !rStream.Advance() {
- t.Error("Advance() failed: %v", rStream.Err())
+ t.Fatalf("Advance() failed: %v", rStream.Err())
}
change = rStream.Value()
changes = append(changes, change)
if change.Continued {
- t.Error("Expected change to be the last in this transaction")
+ t.Fatalf("Expected change to be the last in this transaction")
}
- watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
- watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id2, value2)
+ // Note, we don't know the ID of the root dir so we can't check that it
+ // exists in 'changes'.
+ storetest.ExpectMutationExistsNoVersionCheck(t, changes, id2, value2)
+ }
+}
+
+// Note, this test is identical to TestRawWatch up until the removal of /b.
+func TestGarbageCollectionOnCommit(t *testing.T) {
+ rt.Init()
+ rootCtx := storetest.NewFakeServerContext(rootPublicID)
+
+ s, c := newServer()
+ defer c()
+
+ name1 := "/a"
+ value1 := "v1"
+ var id1 storage.ID
+
+ // Before the watch request has been made, commit a transaction that puts /a.
+ {
+ tobj := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name1))
+ st, err := tobj.Put(rootCtx, value1)
+ if err != nil {
+ t.Fatalf("Unexpected error: %s", err)
+ }
+ id1 = st.ID
+ if err := tobj.Commit(rootCtx); err != nil {
+ t.Fatalf("Unexpected error: %s", err)
+ }
}
- // Commit a third transaction that removes /a.
+ // Start a watch request.
+ req := raw.Request{}
+ ws := storetest.WatchRaw(rootPublicID, s.Watch, req)
+
+ rStream := ws.RecvStream()
+ // Check that watch detects the changes in the first transaction.
{
- tobj := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, "/a"))
+ changes := []types.Change{}
+ // First change is making the root dir (in server.go), second is updating
+ // the root dir (adding a dir entry), third is putting /a.
+ if !rStream.Advance() {
+ t.Fatalf("Advance() failed: %v", rStream.Err())
+ }
+ change := rStream.Value()
+ changes = append(changes, change)
+ if change.Continued {
+ t.Fatalf("Expected change to be the last in this transaction")
+ }
+ if !rStream.Advance() {
+ t.Fatalf("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
+ if !change.Continued {
+ t.Fatalf("Expected change to NOT be the last in this transaction")
+ }
+ if !rStream.Advance() {
+ t.Fatalf("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
+ if change.Continued {
+ t.Fatalf("Expected change to be the last in this transaction")
+ }
+ storetest.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
+ }
+
+ name2 := "/b"
+ value2 := "v2"
+ var id2 storage.ID
+
+ // Commit a second transaction that puts /b.
+ {
+ tobj := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name2))
+ st, err := tobj.Put(rootCtx, value2)
+ if err != nil {
+ t.Fatalf("Unexpected error: %s", err)
+ }
+ id2 = st.ID
+ if err := tobj.Commit(rootCtx); err != nil {
+ t.Fatalf("Unexpected error: %s", err)
+ }
+ }
+
+ // Check that watch detects the changes in the second transaction.
+ {
+ changes := []types.Change{}
+ // First change is updating the root dir (adding a dir entry), second is
+ // putting /b.
+ if !rStream.Advance() {
+ t.Fatalf("Advance() failed: %v", rStream.Err())
+ }
+ change := rStream.Value()
+ changes = append(changes, change)
+ if !change.Continued {
+ t.Fatalf("Expected change to NOT be the last in this transaction")
+ }
+ if !rStream.Advance() {
+ t.Fatalf("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
+ if change.Continued {
+ t.Fatalf("Expected change to be the last in this transaction")
+ }
+ // Note, we don't know the ID of the root dir so we can't check that it
+ // exists in 'changes'.
+ storetest.ExpectMutationExistsNoVersionCheck(t, changes, id2, value2)
+ }
+
+ // Commit a third transaction that removes /b.
+ {
+ tobj := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name2))
if err := tobj.Remove(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
if err := tobj.Commit(rootCtx); err != nil {
- t.Errorf("Unexpected error: %s", err)
+ t.Fatalf("Unexpected error: %s", err)
}
}
@@ -551,35 +553,36 @@
{
changes := []types.Change{}
if !rStream.Advance() {
- t.Error("Advance() failed: %v", rStream.Err())
+ t.Fatalf("Advance() failed: %v", rStream.Err())
}
change := rStream.Value()
changes = append(changes, change)
if change.Continued {
- t.Error("Expected change to be the last in this transaction")
+ t.Fatalf("Expected change to be the last in this transaction")
}
- watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
+ // Note, we don't know the ID of the root dir so we can't check that it
+ // exists in 'changes'.
}
- // Check that watch detects the garbage collection of /a.
+ // Check that watch detects the garbage collection of /b.
{
changes := []types.Change{}
if !rStream.Advance() {
- t.Error("Advance() failed: %v", rStream.Err())
+ t.Fatalf("Advance() failed: %v", rStream.Err())
}
change := rStream.Value()
changes = append(changes, change)
if change.Continued {
- t.Error("Expected change to be the last in this transaction")
+ t.Fatalf("Expected change to be the last in this transaction")
}
- watchtesting.ExpectMutationDoesNotExistNoVersionCheck(t, changes, id2)
+ storetest.ExpectMutationDoesNotExistNoVersionCheck(t, changes, id2)
}
}
func TestTransactionSecurity(t *testing.T) {
rt.Init()
- rootCtx := watchtesting.NewFakeServerContext(rootPublicID)
- blessedCtx := watchtesting.NewFakeServerContext(blessedPublicId)
+ rootCtx := storetest.NewFakeServerContext(rootPublicID)
+ blessedCtx := storetest.NewFakeServerContext(blessedPublicId)
s, c := newServer()
defer c()
@@ -589,55 +592,55 @@
value := newValue()
// Create a transaction in the root's session.
- tobj := lookupObjectOrDie(s, createTransaction(t, s, rootCtx, name))
+ tobj := lookupThingOrDie(s, createTransaction(t, s, rootCtx, name))
// Check that the transaction cannot be accessed by the blessee.
if _, err := tobj.Exists(blessedCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if _, err := tobj.Get(blessedCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if _, err := tobj.Put(blessedCtx, value); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if err := tobj.Remove(blessedCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if err := tobj.Abort(blessedCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if err := tobj.Commit(blessedCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
// Create a transaction in the blessee's session.
- tobj = lookupObjectOrDie(s, createTransaction(t, s, blessedCtx, name))
+ tobj = lookupThingOrDie(s, createTransaction(t, s, blessedCtx, name))
// Check that the transaction cannot be accessed by the root.
if _, err := tobj.Exists(rootCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if _, err := tobj.Get(rootCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if _, err := tobj.Put(rootCtx, value); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if err := tobj.Remove(rootCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if err := tobj.Abort(rootCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
if err := tobj.Commit(rootCtx); err != errPermissionDenied {
- t.Errorf("Unexpected error: %v", err)
+ t.Fatalf("Unexpected error: %v", err)
}
}
func TestStoreDispatcher(t *testing.T) {
rawType := reflect.PtrTo(reflect.TypeOf(raw.ServerStubStore{}))
- objectType := reflect.PtrTo(reflect.TypeOf(store.ServerStubObject{}))
+ thingType := reflect.PtrTo(reflect.TypeOf(ServerStubstoreThing{}))
tests := []struct {
name string
@@ -646,9 +649,9 @@
{raw.RawStoreSuffix, rawType},
{"a/b/" + raw.RawStoreSuffix, rawType},
{"a/b/c" + raw.RawStoreSuffix, rawType},
- {"", objectType},
- {"a/b/", objectType},
- {"a/b/c", objectType},
+ {"", thingType},
+ {"a/b/", thingType},
+ {"a/b/c", thingType},
}
s, c := newServer()
@@ -660,10 +663,10 @@
for _, test := range tests {
serv, err := d.lookupServer(test.name)
if err != nil {
- t.Errorf("error looking up %s: %s", test.name, err)
+ t.Fatalf("error looking up %s: %s", test.name, err)
}
if reflect.TypeOf(serv) != test.t {
- t.Errorf("error looking up %s. got %T, expected %v", test.name, serv, test.t)
+ t.Fatalf("error looking up %s. got %T, expected %v", test.name, serv, test.t)
}
}
}
diff --git a/services/store/server/service.vdl b/services/store/server/service.vdl
new file mode 100644
index 0000000..f4648ee
--- /dev/null
+++ b/services/store/server/service.vdl
@@ -0,0 +1,14 @@
+package server
+
+import (
+ "veyron2/services/store"
+)
+
+// Named 'storeThing' instead of 'thing' so that the struct in thing.go can be
+// named 'thing'.
+type storeThing interface {
+ store.DirSpecific
+ store.ObjectSpecific
+ store.DirOrObject
+ store.Transaction
+}
diff --git a/services/store/server/service.vdl.go b/services/store/server/service.vdl.go
new file mode 100644
index 0000000..f596c17
--- /dev/null
+++ b/services/store/server/service.vdl.go
@@ -0,0 +1,408 @@
+// This file was auto-generated by the veyron vdl tool.
+// Source: service.vdl
+
+package server
+
+import (
+ "veyron2/services/store"
+
+ // The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_veyron2 "veyron2"
+ _gen_context "veyron2/context"
+ _gen_ipc "veyron2/ipc"
+ _gen_naming "veyron2/naming"
+ _gen_vdlutil "veyron2/vdl/vdlutil"
+ _gen_wiretype "veyron2/wiretype"
+)
+
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
+// Named 'storeThing' instead of 'thing' so that the struct in thing.go can be
+// named 'thing'.
+// storeThing is the interface the client binds and uses.
+// storeThing_ExcludingUniversal is the interface without internal framework-added methods
+// to enable embedding without method collisions. Not to be used directly by clients.
+type storeThing_ExcludingUniversal interface {
+ store.DirSpecific_ExcludingUniversal
+ store.ObjectSpecific_ExcludingUniversal
+ store.DirOrObject_ExcludingUniversal
+ store.Transaction_ExcludingUniversal
+}
+type storeThing interface {
+ _gen_ipc.UniversalServiceMethods
+ storeThing_ExcludingUniversal
+}
+
+// storeThingService is the interface the server implements.
+type storeThingService interface {
+ store.DirSpecificService
+ store.ObjectSpecificService
+ store.DirOrObjectService
+ store.TransactionService
+}
+
+// BindstoreThing returns the client stub implementing the storeThing
+// interface.
+//
+// If no _gen_ipc.Client is specified, the default _gen_ipc.Client in the
+// global Runtime is used.
+func BindstoreThing(name string, opts ..._gen_ipc.BindOpt) (storeThing, error) {
+ var client _gen_ipc.Client
+ switch len(opts) {
+ case 0:
+ // Do nothing.
+ case 1:
+ if clientOpt, ok := opts[0].(_gen_ipc.Client); opts[0] == nil || ok {
+ client = clientOpt
+ } else {
+ return nil, _gen_vdlutil.ErrUnrecognizedOption
+ }
+ default:
+ return nil, _gen_vdlutil.ErrTooManyOptionsToBind
+ }
+ stub := &clientStubstoreThing{defaultClient: client, name: name}
+ stub.DirSpecific_ExcludingUniversal, _ = store.BindDirSpecific(name, client)
+ stub.ObjectSpecific_ExcludingUniversal, _ = store.BindObjectSpecific(name, client)
+ stub.DirOrObject_ExcludingUniversal, _ = store.BindDirOrObject(name, client)
+ stub.Transaction_ExcludingUniversal, _ = store.BindTransaction(name, client)
+
+ return stub, nil
+}
+
+// NewServerstoreThing creates a new server stub.
+//
+// It takes a regular server implementing the storeThingService
+// interface, and returns a new server stub.
+func NewServerstoreThing(server storeThingService) interface{} {
+ return &ServerStubstoreThing{
+ ServerStubDirSpecific: *store.NewServerDirSpecific(server).(*store.ServerStubDirSpecific),
+ ServerStubObjectSpecific: *store.NewServerObjectSpecific(server).(*store.ServerStubObjectSpecific),
+ ServerStubDirOrObject: *store.NewServerDirOrObject(server).(*store.ServerStubDirOrObject),
+ ServerStubTransaction: *store.NewServerTransaction(server).(*store.ServerStubTransaction),
+ service: server,
+ }
+}
+
+// clientStubstoreThing implements storeThing.
+type clientStubstoreThing struct {
+ store.DirSpecific_ExcludingUniversal
+ store.ObjectSpecific_ExcludingUniversal
+ store.DirOrObject_ExcludingUniversal
+ store.Transaction_ExcludingUniversal
+
+ defaultClient _gen_ipc.Client
+ name string
+}
+
+func (__gen_c *clientStubstoreThing) client(ctx _gen_context.T) _gen_ipc.Client {
+ if __gen_c.defaultClient != nil {
+ return __gen_c.defaultClient
+ }
+ return _gen_veyron2.RuntimeFromContext(ctx).Client()
+}
+
+func (__gen_c *clientStubstoreThing) UnresolveStep(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply []string, err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client(ctx).StartCall(ctx, __gen_c.name, "UnresolveStep", nil, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+func (__gen_c *clientStubstoreThing) Signature(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply _gen_ipc.ServiceSignature, err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client(ctx).StartCall(ctx, __gen_c.name, "Signature", nil, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+func (__gen_c *clientStubstoreThing) GetMethodTags(ctx _gen_context.T, method string, opts ..._gen_ipc.CallOpt) (reply []interface{}, err error) {
+ var call _gen_ipc.Call
+ if call, err = __gen_c.client(ctx).StartCall(ctx, __gen_c.name, "GetMethodTags", []interface{}{method}, opts...); err != nil {
+ return
+ }
+ if ierr := call.Finish(&reply, &err); ierr != nil {
+ err = ierr
+ }
+ return
+}
+
+// ServerStubstoreThing wraps a server that implements
+// storeThingService and provides an object that satisfies
+// the requirements of veyron2/ipc.ReflectInvoker.
+type ServerStubstoreThing struct {
+ store.ServerStubDirSpecific
+ store.ServerStubObjectSpecific
+ store.ServerStubDirOrObject
+ store.ServerStubTransaction
+
+ service storeThingService
+}
+
+func (__gen_s *ServerStubstoreThing) GetMethodTags(call _gen_ipc.ServerCall, method string) ([]interface{}, error) {
+ // TODO(bprosnitz) GetMethodTags() will be replaces with Signature().
+ // Note: This exhibits some weird behavior like returning a nil error if the method isn't found.
+ // This will change when it is replaced with Signature().
+ if resp, err := __gen_s.ServerStubDirSpecific.GetMethodTags(call, method); resp != nil || err != nil {
+ return resp, err
+ }
+ if resp, err := __gen_s.ServerStubObjectSpecific.GetMethodTags(call, method); resp != nil || err != nil {
+ return resp, err
+ }
+ if resp, err := __gen_s.ServerStubDirOrObject.GetMethodTags(call, method); resp != nil || err != nil {
+ return resp, err
+ }
+ if resp, err := __gen_s.ServerStubTransaction.GetMethodTags(call, method); resp != nil || err != nil {
+ return resp, err
+ }
+ return nil, nil
+}
+
+func (__gen_s *ServerStubstoreThing) Signature(call _gen_ipc.ServerCall) (_gen_ipc.ServiceSignature, error) {
+ result := _gen_ipc.ServiceSignature{Methods: make(map[string]_gen_ipc.MethodSignature)}
+
+ result.TypeDefs = []_gen_vdlutil.Any{}
+ var ss _gen_ipc.ServiceSignature
+ var firstAdded int
+ ss, _ = __gen_s.ServerStubDirSpecific.Signature(call)
+ firstAdded = len(result.TypeDefs)
+ for k, v := range ss.Methods {
+ for i, _ := range v.InArgs {
+ if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ for i, _ := range v.OutArgs {
+ if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ if v.InStream >= _gen_wiretype.TypeIDFirst {
+ v.InStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ if v.OutStream >= _gen_wiretype.TypeIDFirst {
+ v.OutStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ result.Methods[k] = v
+ }
+ //TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
+ for _, d := range ss.TypeDefs {
+ switch wt := d.(type) {
+ case _gen_wiretype.SliceType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.ArrayType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.MapType:
+ if wt.Key >= _gen_wiretype.TypeIDFirst {
+ wt.Key += _gen_wiretype.TypeID(firstAdded)
+ }
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.StructType:
+ for i, fld := range wt.Fields {
+ if fld.Type >= _gen_wiretype.TypeIDFirst {
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
+ }
+ result.TypeDefs = append(result.TypeDefs, d)
+ }
+ ss, _ = __gen_s.ServerStubObjectSpecific.Signature(call)
+ firstAdded = len(result.TypeDefs)
+ for k, v := range ss.Methods {
+ for i, _ := range v.InArgs {
+ if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ for i, _ := range v.OutArgs {
+ if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ if v.InStream >= _gen_wiretype.TypeIDFirst {
+ v.InStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ if v.OutStream >= _gen_wiretype.TypeIDFirst {
+ v.OutStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ result.Methods[k] = v
+ }
+ //TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
+ for _, d := range ss.TypeDefs {
+ switch wt := d.(type) {
+ case _gen_wiretype.SliceType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.ArrayType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.MapType:
+ if wt.Key >= _gen_wiretype.TypeIDFirst {
+ wt.Key += _gen_wiretype.TypeID(firstAdded)
+ }
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.StructType:
+ for i, fld := range wt.Fields {
+ if fld.Type >= _gen_wiretype.TypeIDFirst {
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
+ }
+ result.TypeDefs = append(result.TypeDefs, d)
+ }
+ ss, _ = __gen_s.ServerStubDirOrObject.Signature(call)
+ firstAdded = len(result.TypeDefs)
+ for k, v := range ss.Methods {
+ for i, _ := range v.InArgs {
+ if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ for i, _ := range v.OutArgs {
+ if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ if v.InStream >= _gen_wiretype.TypeIDFirst {
+ v.InStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ if v.OutStream >= _gen_wiretype.TypeIDFirst {
+ v.OutStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ result.Methods[k] = v
+ }
+ //TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
+ for _, d := range ss.TypeDefs {
+ switch wt := d.(type) {
+ case _gen_wiretype.SliceType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.ArrayType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.MapType:
+ if wt.Key >= _gen_wiretype.TypeIDFirst {
+ wt.Key += _gen_wiretype.TypeID(firstAdded)
+ }
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.StructType:
+ for i, fld := range wt.Fields {
+ if fld.Type >= _gen_wiretype.TypeIDFirst {
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
+ }
+ result.TypeDefs = append(result.TypeDefs, d)
+ }
+ ss, _ = __gen_s.ServerStubTransaction.Signature(call)
+ firstAdded = len(result.TypeDefs)
+ for k, v := range ss.Methods {
+ for i, _ := range v.InArgs {
+ if v.InArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.InArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ for i, _ := range v.OutArgs {
+ if v.OutArgs[i].Type >= _gen_wiretype.TypeIDFirst {
+ v.OutArgs[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ if v.InStream >= _gen_wiretype.TypeIDFirst {
+ v.InStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ if v.OutStream >= _gen_wiretype.TypeIDFirst {
+ v.OutStream += _gen_wiretype.TypeID(firstAdded)
+ }
+ result.Methods[k] = v
+ }
+ //TODO(bprosnitz) combine type definitions from embeded interfaces in a way that doesn't cause duplication.
+ for _, d := range ss.TypeDefs {
+ switch wt := d.(type) {
+ case _gen_wiretype.SliceType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.ArrayType:
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.MapType:
+ if wt.Key >= _gen_wiretype.TypeIDFirst {
+ wt.Key += _gen_wiretype.TypeID(firstAdded)
+ }
+ if wt.Elem >= _gen_wiretype.TypeIDFirst {
+ wt.Elem += _gen_wiretype.TypeID(firstAdded)
+ }
+ d = wt
+ case _gen_wiretype.StructType:
+ for i, fld := range wt.Fields {
+ if fld.Type >= _gen_wiretype.TypeIDFirst {
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
+ }
+ }
+ d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
+ }
+ result.TypeDefs = append(result.TypeDefs, d)
+ }
+
+ return result, nil
+}
+
+func (__gen_s *ServerStubstoreThing) UnresolveStep(call _gen_ipc.ServerCall) (reply []string, err error) {
+ if unresolver, ok := __gen_s.service.(_gen_ipc.Unresolver); ok {
+ return unresolver.UnresolveStep(call)
+ }
+ if call.Server() == nil {
+ return
+ }
+ var published []string
+ if published, err = call.Server().Published(); err != nil || published == nil {
+ return
+ }
+ reply = make([]string, len(published))
+ for i, p := range published {
+ reply[i] = _gen_naming.Join(p, call.Name())
+ }
+ return
+}
diff --git a/services/store/server/thing.go b/services/store/server/thing.go
new file mode 100644
index 0000000..10bfe7c
--- /dev/null
+++ b/services/store/server/thing.go
@@ -0,0 +1,306 @@
+package server
+
+// This file defines thing, which implements the server-side Thing API from
+// veyron2/services/store/service.vdl.
+
+import (
+ "veyron/services/store/memstore"
+
+ "veyron2/ipc"
+ "veyron2/query"
+ "veyron2/security"
+ "veyron2/services/mounttable"
+ mttypes "veyron2/services/mounttable/types"
+ "veyron2/services/store"
+ "veyron2/services/watch"
+ watchtypes "veyron2/services/watch/types"
+ "veyron2/storage"
+ "veyron2/vdl/vdlutil"
+ "veyron2/verror"
+)
+
+const (
+ // Large random value, used to indicate that this memstore object represents
+ // a directory.
+ dirValue int64 = 9380751577234
+)
+
+type thing struct {
+ name string // will never contain a transaction id
+ obj *memstore.Object
+ tid transactionID // may be nullTransactionID
+ server *Server
+}
+
+var (
+ // TODO(sadovsky): Make sure all of these error cases are covered by tests.
+ // TODO(sadovsky): Revisit these error types.
+ errMakeDirObjectExists = verror.Existsf("Object exists in Dir.Make path")
+ errCalledObjectMethodOnDir = verror.BadArgf("called Object method on Dir")
+ errPutObjectInObject = verror.BadArgf("put Object in another Object")
+ errCannotRemoveRootDir = verror.BadArgf("cannot remove root Dir")
+
+ _ storeThingService = (*thing)(nil)
+
+ nullEntry storage.Entry
+ nullStat storage.Stat
+)
+
+func isDir(entry *storage.Entry) bool {
+ value, ok := entry.Value.(int64)
+ return ok && value == dirValue
+}
+
+func isObject(entry *storage.Entry) bool {
+ return !isDir(entry)
+}
+
+func (t *thing) String() string {
+ return t.name
+}
+
+func (t *thing) Attributes(arg string) map[string]string {
+ return map[string]string{
+ "health": "ok",
+ "servertype": t.String(),
+ }
+}
+
+////////////////////////////////////////
+// DirOrObject methods
+
+// Remove removes this thing.
+func (t *thing) Remove(ctx ipc.ServerContext) error {
+ tx, err := t.server.findTransaction(ctx, t.tid)
+ if err != nil {
+ return err
+ }
+ path := storage.ParsePath(t.name)
+ if len(path) == 0 {
+ return errCannotRemoveRootDir
+ }
+ return t.obj.Remove(ctx.RemoteID(), tx)
+}
+
+// Query returns a sequence of objects that match the given query.
+func (t *thing) Query(ctx ipc.ServerContext, q query.Query, stream store.DirOrObjectServiceQueryStream) error {
+ tx, err := t.server.findTransaction(ctx, t.tid)
+ if err != nil {
+ return err
+ }
+ it, err := t.obj.Query(ctx.RemoteID(), tx, q)
+ if err != nil {
+ return err
+ }
+ for it.Next() {
+ if err := stream.SendStream().Send(*it.Get()); err != nil {
+ it.Abort()
+ return err
+ }
+ }
+ return it.Err()
+}
+
+// Stat returns information about this thing.
+func (t *thing) Stat(ctx ipc.ServerContext) (storage.Stat, error) {
+ tx, err := t.server.findTransaction(ctx, t.tid)
+ if err != nil {
+ return nullStat, err
+ }
+ s, err := t.obj.Stat(ctx.RemoteID(), tx)
+ if err != nil {
+ return nullStat, err
+ }
+ // Determine the Kind.
+ entry, err := t.obj.Get(ctx.RemoteID(), tx)
+ if err != nil {
+ // TODO(sadovsky): Is this the right thing to return here? If obj.Get()
+ // returns state.errNotFound, it probably makes more sense to return
+ // nullStat and nil error. (Note, for now t.obj.Stat() is not implemented
+ // and always returns an error, so we never actually get here.)
+ return nullStat, err
+ }
+ if isDir(entry) {
+ s.Kind = storage.DirKind
+ } else {
+ s.Kind = storage.ObjectKind
+ }
+ return *s, err
+}
+
+// Exists returns true iff this thing is present in the store.
+func (t *thing) Exists(ctx ipc.ServerContext) (bool, error) {
+ tx, err := t.server.findTransaction(ctx, t.tid)
+ if err != nil {
+ return false, err
+ }
+ return t.obj.Exists(ctx.RemoteID(), tx)
+}
+
+// NewTransaction creates a transaction with the given options. It returns the
+// name of the transaction relative to this thing's name.
+func (t *thing) NewTransaction(ctx ipc.ServerContext, opts []vdlutil.Any) (string, error) {
+ if t.tid != nullTransactionID {
+ return "", errNestedTransaction
+ }
+ return t.server.createTransaction(ctx, t.name)
+}
+
+// Glob streams a series of names that match the given pattern.
+func (t *thing) Glob(ctx ipc.ServerContext, pattern string, stream mounttable.GlobbableServiceGlobStream) error {
+ tx, err := t.server.findTransaction(ctx, t.tid)
+ if err != nil {
+ return err
+ }
+ it, err := t.obj.Glob(ctx.RemoteID(), tx, pattern)
+ if err != nil {
+ return err
+ }
+ gsa := &globStreamAdapter{stream}
+ for ; it.IsValid(); it.Next() {
+ if err := gsa.SendStream().Send(it.Name()); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// WatchGlob returns a stream of changes that match a pattern.
+func (t *thing) WatchGlob(ctx ipc.ServerContext, req watchtypes.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
+ return t.server.watcher.WatchGlob(ctx, storage.ParsePath(t.name), req, stream)
+}
+
+// WatchQuery returns a stream of changes that satisfy a query.
+func (t *thing) WatchQuery(ctx ipc.ServerContext, req watchtypes.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
+ return t.server.watcher.WatchQuery(ctx, storage.ParsePath(t.name), req, stream)
+}
+
+////////////////////////////////////////
+// Dir-only methods
+
+// Called by Make(), and also called directly by server.go with a nil
+// transaction to create the root directory.
+func (t *thing) makeInternal(remoteID security.PublicID, tx *memstore.Transaction) error {
+ // Make dirs from the top down. Return error if we encounter an Object.
+ parts := storage.PathName{""}
+ parts = append(parts, storage.ParsePath(t.name)...)
+ // Set to true once we encounter a path component that doesn't already
+ // exist. Create new dirs from there on down.
+ newTree := false
+ for i, _ := range parts {
+ obj := t.server.store.Bind(parts[:i+1].String())
+ if !newTree {
+ // Note, obj.Get() returns state.errNotFound if the entry does not exist.
+ // TODO(sadovsky): Check for that specific error type and propagate all
+ // other errors.
+ entry, err := obj.Get(remoteID, tx)
+ if err != nil {
+ newTree = true
+ } else if isObject(entry) {
+ return errMakeDirObjectExists
+ }
+ }
+ if newTree {
+ obj.Put(remoteID, tx, dirValue)
+ }
+ }
+ return nil
+}
+
+func (t *thing) Make(ctx ipc.ServerContext) error {
+ tx, err := t.server.findTransaction(ctx, t.tid)
+ if err != nil {
+ return err
+ }
+ return t.makeInternal(ctx.RemoteID(), tx)
+}
+
+////////////////////////////////////////
+// Object-only methods
+
+// Get returns the value for the Object. The value returned is from the
+// most recent mutation of the entry in the Transaction, or from the
+// Transaction's snapshot if there is no mutation.
+func (t *thing) Get(ctx ipc.ServerContext) (storage.Entry, error) {
+ tx, err := t.server.findTransaction(ctx, t.tid)
+ if err != nil {
+ return nullEntry, err
+ }
+ // Note, obj.Get() returns state.errNotFound if the entry does not exist.
+ entry, err := t.obj.Get(ctx.RemoteID(), tx)
+ if err != nil {
+ return nullEntry, err
+ }
+ if isDir(entry) {
+ return nullEntry, errCalledObjectMethodOnDir
+ }
+ return *entry, err
+}
+
+// Put modifies the value of the Object.
+func (t *thing) Put(ctx ipc.ServerContext, val vdlutil.Any) (storage.Stat, error) {
+ tx, err := t.server.findTransaction(ctx, t.tid)
+ if err != nil {
+ return nullStat, err
+ }
+ // Verify that this entry either doesn't exist or exists and is an Object.
+ // Note, obj.Get() returns state.errNotFound if the entry does not exist.
+ // TODO(sadovsky): Check for that specific error type and propagate all
+ // other errors.
+ entry, err := t.obj.Get(ctx.RemoteID(), tx)
+ if err == nil && isDir(entry) {
+ return nullStat, errCalledObjectMethodOnDir
+ }
+ // Verify that the parent already exists and is a Dir.
+ // Note, at this point we know t.name isn't the root of the store, b/c if it
+ // were, the check above would've failed.
+ path := storage.ParsePath(t.name)
+ path = path[:len(path)-1] // path to parent
+ // Note, we don't return an error here if path doesn't exist -- we let the
+ // memstore Put() code handle that case.
+ entry, err = t.server.store.Bind(path.String()).Get(ctx.RemoteID(), tx)
+ if err == nil && isObject(entry) {
+ return nullStat, errPutObjectInObject
+ }
+ s, err := t.obj.Put(ctx.RemoteID(), tx, interface{}(val))
+ if err != nil {
+ return nullStat, err
+ }
+ // TODO(sadovsky): Add test for this.
+ s.Kind = storage.ObjectKind
+ return *s, err
+}
+
+////////////////////////////////////////
+// Transaction methods
+
+func (t *thing) Commit(ctx ipc.ServerContext) error {
+ return t.server.commitTransaction(ctx, t.tid)
+}
+
+func (t *thing) Abort(ctx ipc.ServerContext) error {
+ return t.server.abortTransaction(ctx, t.tid)
+}
+
+////////////////////////////////////////
+// Internals
+
+type globStreamSenderAdapter struct {
+ stream interface {
+ Send(entry mttypes.MountEntry) error
+ }
+}
+
+func (a *globStreamSenderAdapter) Send(item string) error {
+ return a.stream.Send(mttypes.MountEntry{Name: item})
+}
+
+type globStreamAdapter struct {
+ stream mounttable.GlobbableServiceGlobStream
+}
+
+func (a *globStreamAdapter) SendStream() interface {
+ Send(item string) error
+} {
+ return &globStreamSenderAdapter{a.stream.SendStream()}
+}