{go/src/veyron2/storage,go/src/veyron2/services/store,go/src/veyron/services/store}: new transaction API
Primary description of new API:
https://docs.google.com/a/google.com/document/d/1Zob450G5rQDHmWXmg6UvVy3IH-hSiS8eE5L9IY62sW0/edit
Change-Id: I7272d389414ec27db8d599f0d562938a55e43a97
diff --git a/services/store/server/server.go b/services/store/server/server.go
index 8eb4478..8b72fc4 100644
--- a/services/store/server/server.go
+++ b/services/store/server/server.go
@@ -1,8 +1,15 @@
// Package server implements a storage service.
+
package server
+// This file defines Server, which implements the server-side Store API from
+// veyron2/services/store/service.vdl.
+
import (
+ "fmt"
+ "math/rand"
"reflect"
+ "strconv"
"strings"
"sync"
"time"
@@ -15,12 +22,11 @@
"veyron2/ipc"
"veyron2/security"
"veyron2/services/store"
- "veyron2/vdl/vdlutil"
"veyron2/verror"
)
const (
- // transactionMAxLifetime is the maximum duration before a transaction will
+ // transactionMaxLifetime is the maximum duration before a transaction will
// be garbage collected.
//
// TODO(jyh): This should probably be a configuration parameter.
@@ -28,28 +34,28 @@
)
var (
- // Server implements the StoreService interface.
_ store.StoreService = (*Server)(nil)
- nullTransactionID store.TransactionID
-
- errTransactionAlreadyExists = verror.Existsf("transaction already exists")
- errTransactionDoesNotExist = verror.NotFoundf("transaction does not exist")
+ errNestedTransaction = verror.BadArgf("cannot create a nested Transaction")
+ // Note, this can happen e.g. due to expiration.
+ errTransactionDoesNotExist = verror.NotFoundf("transaction does not exist")
// Transaction exists, but may not be used by the caller.
errPermissionDenied = verror.NotAuthorizedf("permission denied")
+
+ rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+ nullTransactionID transactionID
)
-// Server stores the dictionary of all media items. It has a scanner.Scanner
-// for collecting files from the filesystem. For each file, a FileService is
-// registered to serve the file.
+// Server implements Store and uses memstore internally.
type Server struct {
mutex sync.RWMutex
// store is the actual store implementation.
store service.Store
- // transactions are the set of active transactions.
- transactions map[store.TransactionID]*transaction
+ // transactions is the set of active transactions.
+ transactions map[transactionID]*transaction
// Transaction garbage collection.
pending sync.WaitGroup
@@ -60,6 +66,13 @@
watcher service.Watcher
}
+// transactionID is an internal transaction identifier chosen by the server.
+//
+// TODO(jyh): Consider using a larger identifier space to reduce chance of
+// collisions. (Note, createTransaction handles collisions when generating
+// transactionIDs.)
+type transactionID uint64
+
// transactionContext defines the context in which a transaction is used. A
// transaction may be used only in the context that created it.
// transactionContext weakly identifies a session by the local and remote
@@ -67,9 +80,11 @@
// TODO(tilaks): Use the local and remote addresses to identify the session.
// Does a session with a mobile device break if the remote address changes?
type transactionContext interface {
- // LocalID returns the PublicID of the principal at the local end of the request.
+ // LocalID returns the PublicID of the principal at the local end of the
+ // request.
LocalID() security.PublicID
- // RemoteID returns the PublicID of the principal at the remote end of the request.
+ // RemoteID returns the PublicID of the principal at the remote end of the
+ // request.
RemoteID() security.PublicID
}
@@ -97,7 +112,7 @@
}
s := &Server{
store: mstore,
- transactions: make(map[store.TransactionID]*transaction),
+ transactions: make(map[transactionID]*transaction),
ticker: time.NewTicker(time.Second),
closed: make(chan struct{}),
watcher: mwatcher,
@@ -127,14 +142,82 @@
}
}
-// findTransaction returns the transaction for the TransactionID.
-func (s *Server) findTransaction(ctx transactionContext, id store.TransactionID) (service.Transaction, error) {
+// findTransactionComponent returns the (begin, end) offsets of the "$tid.*"
+// component in the given object name, or (-1, -1) if oname does not contain a
+// transaction component.
+func findTransactionComponent(oname string) (int, int) {
+ begin := 0
+ if !strings.HasPrefix(oname, "$tid") {
+ begin = strings.Index(oname, "/$tid")
+ }
+ if begin == -1 {
+ return -1, -1
+ }
+ end := strings.Index(oname[begin+1:], "/")
+ if end == -1 {
+ end = len(oname)
+ } else {
+ end += begin + 1
+ }
+ return begin, end
+}
+
+// TODO(sadovsky): One of the following:
+// - Reserve prefix string "$tid." for internal use.
+// - Reserve prefix char "$" for internal use.
+// - Require users to escape prefix char "$" when they are referring to their
+// own data, e.g. "\$foo".
+func makeTransactionComponent(id transactionID) string {
+ return fmt.Sprintf("$tid.%d", id)
+}
+
+// stripTransactionComponent returns the given object name with its "$tid.*"
+// component removed, and also returns the stripped transactionID.
+// Examples:
+// "/foo/$tid.123/bar" => {"/foo/bar", transactionID(123)}
+// "/foo/bar" => {"/foo/bar", nullTransactionID}
+func stripTransactionComponent(oname string) (string, transactionID, error) {
+ begin, end := findTransactionComponent(oname)
+ if begin == -1 {
+ return oname, nullTransactionID, nil
+ }
+ tc := oname[begin:end]
+ id, err := strconv.ParseInt(tc[strings.LastIndex(tc, ".")+1:], 10, 64)
+ if err != nil {
+ return "", nullTransactionID, fmt.Errorf("Failed to extract id from %q", tc)
+ }
+ return oname[:begin] + oname[end:], transactionID(id), nil
+}
+
+func (s *Server) createTransaction(ctx transactionContext, oname string) (string, error) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ var id transactionID
+ for {
+ id = transactionID(rng.Int63())
+ _, ok := s.transactions[id]
+ if !ok {
+ break
+ }
+ }
+ info := &transaction{
+ trans: memstore.NewTransaction(),
+ expires: time.Now().Add(transactionMaxLifetime),
+ creatorCtx: ctx,
+ }
+ s.transactions[id] = info
+ return makeTransactionComponent(id), nil
+}
+
+// findTransaction returns the transaction for the given transaction ID.
+func (s *Server) findTransaction(ctx transactionContext, id transactionID) (service.Transaction, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.findTransactionLocked(ctx, id)
}
-func (s *Server) findTransactionLocked(ctx transactionContext, id store.TransactionID) (service.Transaction, error) {
+func (s *Server) findTransactionLocked(ctx transactionContext, id transactionID) (service.Transaction, error) {
if id == nullTransactionID {
return nil, nil
}
@@ -150,6 +233,43 @@
return info.trans, nil
}
+// Commit commits the changes in the transaction to the store. The
+// operation is atomic, so all mutations are performed, or none. Returns an
+// error if the transaction aborted.
+func (s *Server) commitTransaction(ctx transactionContext, id transactionID) error {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ t, err := s.findTransactionLocked(ctx, id)
+ if err != nil {
+ return err
+ }
+ if t == nil {
+ return errTransactionDoesNotExist
+ }
+ err = t.Commit()
+ delete(s.transactions, id)
+ return err
+}
+
+// Abort discards a transaction. This is an optimization; transactions
+// eventually time out and get discarded. However, live transactions
+// consume resources, so if you know that you won't be using a transaction
+// anymore, you should discard it explicitly.
+func (s *Server) abortTransaction(ctx transactionContext, id transactionID) error {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ t, err := s.findTransactionLocked(ctx, id)
+ if err != nil {
+ return err
+ }
+ if t == nil {
+ return errTransactionDoesNotExist
+ }
+ err = t.Abort()
+ delete(s.transactions, id)
+ return err
+}
+
func (t *transaction) matchesContext(ctx transactionContext) bool {
creatorCtx := t.creatorCtx
return membersEqual(creatorCtx.LocalID().Names(), ctx.LocalID().Names()) &&
@@ -193,60 +313,6 @@
}
}
-// CreateTransaction creates a transaction.
-func (s *Server) CreateTransaction(ctx ipc.ServerContext, id store.TransactionID, opts []vdlutil.Any) error {
- s.mutex.Lock()
- defer s.mutex.Unlock()
-
- info, ok := s.transactions[id]
- if ok {
- return errTransactionAlreadyExists
- }
- info = &transaction{
- trans: memstore.NewTransaction(),
- expires: time.Now().Add(transactionMaxLifetime),
- creatorCtx: ctx,
- }
- s.transactions[id] = info
- return nil
-}
-
-// Commit commits the changes in the transaction to the store. The
-// operation is atomic, so all mutations are performed, or none. Returns an
-// error if the transaction aborted.
-func (s *Server) Commit(ctx ipc.ServerContext, id store.TransactionID) error {
- s.mutex.Lock()
- defer s.mutex.Unlock()
-
- t, err := s.findTransactionLocked(ctx, id)
- if err != nil {
- return err
- }
- if t == nil {
- return errTransactionDoesNotExist
- }
- err = t.Commit()
- delete(s.transactions, id)
- return err
-}
-
-// Abort discards a transaction.
-func (s *Server) Abort(ctx ipc.ServerContext, id store.TransactionID) error {
- s.mutex.Lock()
- defer s.mutex.Unlock()
-
- t, err := s.findTransactionLocked(ctx, id)
- if err != nil {
- return err
- }
- if t == nil {
- return errTransactionDoesNotExist
- }
- err = t.Abort()
- delete(s.transactions, id)
- return err
-}
-
// Watch returns a stream of all changes.
func (s *Server) Watch(ctx ipc.ServerContext, req raw.Request, stream raw.StoreServiceWatchStream) error {
return s.watcher.WatchRaw(ctx, req, stream)
@@ -276,20 +342,37 @@
}
func (d *storeDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
- return ipc.ReflectInvoker(d.lookupServer(suffix)), d.auth, nil
+ serv, err := d.lookupServer(suffix)
+ if err != nil {
+ return nil, nil, err
+ }
+ return ipc.ReflectInvoker(serv), d.auth, nil
}
-func (d *storeDispatcher) lookupServer(suffix string) interface{} {
+func (d *storeDispatcher) lookupServer(suffix string) (interface{}, error) {
+ // Strip leading "/" if present so that server internals can reliably use
+ // naming.Join(suffix, "foo").
+ suffix = strings.TrimPrefix(suffix, "/")
if strings.HasSuffix(suffix, store.StoreSuffix) {
- return store.NewServerStore(d.s)
+ return store.NewServerStore(d.s), nil
} else if strings.HasSuffix(suffix, raw.RawStoreSuffix) {
- return raw.NewServerStore(d.s)
+ return raw.NewServerStore(d.s), nil
} else {
- o := d.s.lookupObject(suffix)
- return store.NewServerObject(o)
+ // TODO(sadovsky): Create Object, Transaction, and TransactionRoot stubs,
+ // merge them, and return the result. See TODO in
+ // veyron2/services/store/service.vdl.
+ o, err := d.s.lookupObject(suffix)
+ if err != nil {
+ return nil, err
+ }
+ return store.NewServerObject(o), nil
}
}
-func (s *Server) lookupObject(name string) *object {
- return &object{name: name, obj: s.store.Bind(name), server: s}
+func (s *Server) lookupObject(name string) (*object, error) {
+ oname, tid, err := stripTransactionComponent(name)
+ if err != nil {
+ return nil, err
+ }
+ return &object{name: oname, obj: s.store.Bind(oname), tid: tid, server: s}, nil
}